Skip to content

Commit

Permalink
[FLINK-33191][Connector/Kafka] Remove dependency on Flink Shaded
Browse files Browse the repository at this point in the history
  • Loading branch information
MartijnVisser authored and tzulitai committed Oct 11, 2023
1 parent 26ab532 commit e9d3089
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kafka.util;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

/** Factory for Jackson mappers. */
public final class JacksonMapperFactory {

public static ObjectMapper createObjectMapper() {
final ObjectMapper objectMapper = new ObjectMapper();
registerModules(objectMapper);
return objectMapper;
}

public static ObjectMapper createObjectMapper(JsonFactory jsonFactory) {
final ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
registerModules(objectMapper);
return objectMapper;
}

private static void registerModules(ObjectMapper mapper) {
mapper.registerModule(new JavaTimeModule())
.registerModule(new Jdk8Module().configureAbsentsAsNulls(true))
.disable(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}

private JacksonMapperFactory() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.flink.connector.kafka.source.reader.deserializer;

import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.Collector;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.connector.kafka.util.JacksonMapperFactory;
import org.apache.flink.connector.testutils.formats.DummyInitializationContext;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.util.jackson.JacksonMapperFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;

Expand Down
24 changes: 18 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ under the License.
<zookeeper.version>3.5.9</zookeeper.version>
<confluent.version>7.2.2</confluent.version>

<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
<jackson-bom.version>2.15.2</jackson-bom.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.9.1</junit5.version>
<assertj.version>3.23.1</assertj.version>
Expand All @@ -80,13 +80,25 @@ under the License.
</properties>

<dependencies>
<!-- Root dependencies for all projects -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.13.4-16.1</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<!-- Java 8 Date/time -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<!-- Java 8 Datatypes -->
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
</dependency>

<!-- Root dependencies for all projects -->

<!-- Logging API -->
<dependency>
Expand Down

0 comments on commit e9d3089

Please # to comment.