From 5089a113c9b037be263ed106c02bb97399712437 Mon Sep 17 00:00:00 2001 From: Jeremy Custenborder Date: Thu, 19 Jan 2017 23:40:11 -0600 Subject: [PATCH] General cleanup around the json handling. Move to using a single ObjectMapper across the whole application. Added serializers for Struct and SourceRecord. --- .../connect/splunk/DateDeserializer.java | 51 ----- .../kafka/connect/splunk/DateSerializer.java | 44 ----- .../kafka/connect/splunk/EventConverter.java | 31 ++- .../kafka/connect/splunk/EventIterator.java | 27 ++- .../kafka/connect/splunk/EventServlet.java | 17 +- .../connect/splunk/ObjectMapperFactory.java | 186 +++++++++++++++++- .../connect/splunk/SinkRecordContent.java | 185 ++--------------- .../connect/splunk/SplunkHttpSinkTask.java | 14 +- .../connect/splunk/SplunkHttpSourceTask.java | 12 +- .../connect/splunk/DateSerializationTest.java | 34 +--- .../connect/splunk/EventConverterTest.java | 24 ++- .../connect/splunk/EventIteratorTest.java | 12 +- .../connect/splunk/SinkRecordContentTest.java | 159 +++++++++------ .../splunk/SplunkHttpSourceTaskTest.java | 13 +- 14 files changed, 364 insertions(+), 445 deletions(-) delete mode 100644 src/main/java/io/confluent/kafka/connect/splunk/DateDeserializer.java delete mode 100644 src/main/java/io/confluent/kafka/connect/splunk/DateSerializer.java diff --git a/src/main/java/io/confluent/kafka/connect/splunk/DateDeserializer.java b/src/main/java/io/confluent/kafka/connect/splunk/DateDeserializer.java deleted file mode 100644 index 338a130..0000000 --- a/src/main/java/io/confluent/kafka/connect/splunk/DateDeserializer.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.kafka.connect.splunk; - -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.Date; - -public class DateDeserializer extends StdDeserializer { - - public DateDeserializer() { - this(null); - } - - public DateDeserializer(Class t) { - super(t); - } - - @Override - public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException { - JsonNode node = jsonParser.getCodec().readTree(jsonParser); - - if (node.isNull()) { - return null; - } else if (node.isBigDecimal()) { - BigDecimal decimal = node.decimalValue().setScale(3); - return new Date(decimal.unscaledValue().longValue()); - } - - return null; - } -} \ No newline at end of file diff --git a/src/main/java/io/confluent/kafka/connect/splunk/DateSerializer.java b/src/main/java/io/confluent/kafka/connect/splunk/DateSerializer.java deleted file mode 100644 index 119a7d4..0000000 --- a/src/main/java/io/confluent/kafka/connect/splunk/DateSerializer.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.kafka.connect.splunk; - -import com.fasterxml.jackson.core.JsonGenerationException; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Date; - -class DateSerializer extends StdSerializer { - - public DateSerializer() { - this(null); - } - - public DateSerializer(Class t) { - super(t); - } - - @Override - public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonGenerationException { - long time = date.getTime(); - BigDecimal value = new BigDecimal(BigInteger.valueOf(time), 3); - jsonGenerator.writeNumber(value); - } -} diff --git a/src/main/java/io/confluent/kafka/connect/splunk/EventConverter.java b/src/main/java/io/confluent/kafka/connect/splunk/EventConverter.java index bf1b1fc..10ac8e2 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/EventConverter.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/EventConverter.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; @@ -58,7 +57,6 @@ class EventConverter { private static final Map EMPTY_MAP = new HashMap<>(); - final ObjectMapper mapper; final SplunkHttpSourceConnectorConfig config; final String topicPrefix; final boolean topicPerIndex; @@ -68,8 +66,7 @@ class EventConverter { Time time = new SystemTime(); - EventConverter(ObjectMapper mapper, SplunkHttpSourceConnectorConfig config) { - this.mapper = mapper; + EventConverter(SplunkHttpSourceConnectorConfig config) { this.config = config; this.topicPerIndex = this.config.topicPerIndex(); this.topicPrefix = this.config.topicPrefix(); @@ -78,7 +75,7 @@ class EventConverter { this.defaultIndex = this.config.defaultIndex(); } - static void setFieldValue(ObjectMapper mapper, JsonNode messageNode, Struct struct, String fieldName, Class cls) { + static void setFieldValue(JsonNode messageNode, Struct struct, String fieldName, Class cls) { T structValue = null; if (messageNode.has(fieldName)) { @@ -86,12 +83,12 @@ static void setFieldValue(ObjectMapper mapper, JsonNode messageNode, Struct if (String.class.equals(cls) && valueNode.isObject()) { try { - structValue = (T) mapper.writeValueAsString(valueNode); + structValue = (T) ObjectMapperFactory.INSTANCE.writeValueAsString(valueNode); } catch (JsonProcessingException e) { throw new IllegalStateException(e); } } else if (!valueNode.isNull()) { - structValue = mapper.convertValue(valueNode, cls); + structValue = ObjectMapperFactory.INSTANCE.convertValue(valueNode, cls); } } @@ -105,12 +102,12 @@ public SourceRecord convert(JsonNode messageNode, String remoteHost) { Struct keyStruct = new Struct(KEY_SCHEMA); Struct valueStruct = new Struct(VALUE_SCHEMA); - setFieldValue(this.mapper, messageNode, valueStruct, "time", Date.class); - setFieldValue(this.mapper, messageNode, valueStruct, "host", String.class); - setFieldValue(this.mapper, messageNode, valueStruct, "source", String.class); - setFieldValue(this.mapper, messageNode, valueStruct, "sourcetype", String.class); - setFieldValue(this.mapper, messageNode, valueStruct, "index", String.class); - setFieldValue(this.mapper, messageNode, valueStruct, "event", String.class); + setFieldValue(messageNode, valueStruct, "time", Date.class); + setFieldValue(messageNode, valueStruct, "host", String.class); + setFieldValue(messageNode, valueStruct, "source", String.class); + setFieldValue(messageNode, valueStruct, "sourcetype", String.class); + setFieldValue(messageNode, valueStruct, "index", String.class); + setFieldValue(messageNode, valueStruct, "event", String.class); if (null == valueStruct.get("time")) { valueStruct.put("time", new Date(this.time.milliseconds())); diff --git a/src/main/java/io/confluent/kafka/connect/splunk/EventIterator.java b/src/main/java/io/confluent/kafka/connect/splunk/EventIterator.java index 9425216..2a67232 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/EventIterator.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/EventIterator.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.BufferedReader; import java.io.IOException; @@ -27,30 +26,28 @@ class EventIterator implements Iterator, AutoCloseable { final JsonFactory jsonFactory; - final ObjectMapper objectMapper; final JsonParser jsonParser; final Iterator iterator; - EventIterator(JsonFactory jsonFactory, ObjectMapper objectMapper, JsonParser jsonParser, Iterator iterator) { + EventIterator(JsonFactory jsonFactory, JsonParser jsonParser, Iterator iterator) { this.jsonFactory = jsonFactory; - this.objectMapper = objectMapper; this.jsonParser = jsonParser; this.iterator = iterator; } - public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, BufferedReader bufferedReader) throws IOException { + public static EventIterator create(JsonFactory jsonFactory, BufferedReader bufferedReader) throws IOException { JsonParser jsonParser = jsonFactory.createParser(bufferedReader); - return create(objectMapper, jsonFactory, jsonParser); + return create(jsonFactory, jsonParser); } - public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, InputStream inputStream) throws IOException { + public static EventIterator create(JsonFactory jsonFactory, InputStream inputStream) throws IOException { JsonParser jsonParser = jsonFactory.createParser(inputStream); - return create(objectMapper, jsonFactory, jsonParser); + return create(jsonFactory, jsonParser); } - public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, JsonParser jsonParser) throws IOException { - Iterator iterator = objectMapper.readValues(jsonParser, JsonNode.class); - return new EventIterator(jsonFactory, objectMapper, jsonParser, iterator); + public static EventIterator create(JsonFactory jsonFactory, JsonParser jsonParser) throws IOException { + Iterator iterator = ObjectMapperFactory.INSTANCE.readValues(jsonParser, JsonNode.class); + return new EventIterator(jsonFactory, jsonParser, iterator); } @Override diff --git a/src/main/java/io/confluent/kafka/connect/splunk/EventServlet.java b/src/main/java/io/confluent/kafka/connect/splunk/EventServlet.java index 52b6fcb..cb95da8 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/EventServlet.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/EventServlet.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.json.Json; import io.confluent.kafka.connect.utils.data.SourceRecordConcurrentLinkedDeque; import org.apache.kafka.connect.source.SourceRecord; @@ -37,7 +36,6 @@ public class EventServlet extends HttpServlet { private static final Logger log = LoggerFactory.getLogger(EventServlet.class); ServletConfig servletConfig; JsonFactory jsonFactory; - ObjectMapper objectMapper; EventConverter converter; SourceRecordConcurrentLinkedDeque recordQueue; @@ -45,11 +43,10 @@ public class EventServlet extends HttpServlet { Set allowedIndexes; - public void configure(SplunkHttpSourceConnectorConfig config, JsonFactory jsonFactory, ObjectMapper objectMapper, SourceRecordConcurrentLinkedDeque recordQueue) { + public void configure(SplunkHttpSourceConnectorConfig config, JsonFactory jsonFactory, SourceRecordConcurrentLinkedDeque recordQueue) { this.config = config; this.jsonFactory = jsonFactory; - this.objectMapper = objectMapper; - this.converter = new EventConverter(this.objectMapper, this.config); + this.converter = new EventConverter(this.config); this.recordQueue = recordQueue; this.allowedIndexes = this.config.allowedIndexes(); } @@ -93,7 +90,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) try { try (BufferedReader bodyReader = request.getReader()) { - try (EventIterator iterator = EventIterator.create(this.objectMapper, this.jsonFactory, bodyReader)) { + try (EventIterator iterator = EventIterator.create(this.jsonFactory, bodyReader)) { while (iterator.hasNext()) { JsonNode jsonNode = iterator.next(); diff --git a/src/main/java/io/confluent/kafka/connect/splunk/ObjectMapperFactory.java b/src/main/java/io/confluent/kafka/connect/splunk/ObjectMapperFactory.java index 5a861d6..5cc7d68 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/ObjectMapperFactory.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/ObjectMapperFactory.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,21 +15,195 @@ */ package io.confluent.kafka.connect.splunk; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableSet; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import java.io.IOException; +import java.math.BigDecimal; import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; class ObjectMapperFactory { - public static ObjectMapper create() { + + public static final ObjectMapper INSTANCE; + + static { ObjectMapper mapper = new ObjectMapper(); SimpleModule module = new SimpleModule(); module.addSerializer(Date.class, new DateSerializer()); module.addDeserializer(Date.class, new DateDeserializer()); + module.addSerializer(Struct.class, new StructSerializer()); + module.addSerializer(SinkRecord.class, new SinkRecordSerializer()); mapper.registerModule(module); mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true); - return mapper; + mapper.configure(SerializationFeature.WRITE_NULL_MAP_VALUES, false); + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + + INSTANCE = mapper; + } + + static class DateDeserializer extends JsonDeserializer { + @Override + public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + JsonNode node = jsonParser.getCodec().readTree(jsonParser); + + if (node.isNull()) { + return null; + } else if (node.isNumber()) { + BigDecimal decimal = node.decimalValue().setScale(3); + return new Date(decimal.unscaledValue().longValue()); + } + + return null; + } + } + + static class DateSerializer extends JsonSerializer { + @Override + public void serialize(Date date, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + long time = date.getTime(); + BigDecimal value = BigDecimal.valueOf(time, 3); + jsonGenerator.writeNumber(value); + } + } + + static final Set RESERVED_METADATA = ImmutableSet.of( + "time", + "host", + "source", + "sourcetype", + "index" + ); + + static class StructSerializer extends JsonSerializer { + + @Override + public void serialize(Struct struct, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { + List fields = struct.schema().fields(); + Map values = new LinkedHashMap<>(fields.size()); + + for (Field field : fields) { + if (!RESERVED_METADATA.contains(field.name())) + values.put(field.name(), struct.get(field)); + } + + jsonGenerator.writeObject(values); + } + } + + public static class Event { + public String host; + public Date time; + public String sourcetype; + public String index; + public String source; + public Object event; + + boolean setValue(Object key, Object value) { + if (null == value) { + return false; + } + + if ("time".equals(key)) { + if (value instanceof BigDecimal) { + BigDecimal decimal = (BigDecimal) value; + this.time = new Date(decimal.unscaledValue().longValue()); + } else if (value instanceof Date) { + this.time = (Date) value; + } + } + + if ("host".equals(key)) { + this.host = value.toString(); + } + if ("source".equals(key)) { + this.source = value.toString(); + } + if ("sourcetype".equals(key)) { + this.sourcetype = value.toString(); + } + if ("index".equals(key)) { + this.index = value.toString(); + } + + return RESERVED_METADATA.contains(key); + } + + } + + static class SinkRecordSerializer extends JsonSerializer { + + void handleMap(Event event) { + final Map input = (Map) event.event; + final Map result = new LinkedHashMap(input.size()); + + for (Object key : input.keySet()) { + Object value = input.get(key); + + if (!event.setValue(key, value)) { + result.put(key, value); + } + } + + event.event = result.isEmpty() ? null : result; + } + + void handleStruct(Event event) { + final Struct input = (Struct) event.event; + List fields = input.schema().fields(); + final Map result = new LinkedHashMap(fields.size()); + + for (Field field : fields) { + Object key = field.name(); + Object value = input.get(field); + + if (null == value) { + continue; + } + + if (!event.setValue(key, value)) { + result.put(key, value); + } + } + + event.event = result.isEmpty() ? null : result; + } + + + @Override + public void serialize(SinkRecord sinkRecord, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException, JsonProcessingException { + Event event = new Event(); + event.event = sinkRecord.value(); + + if (event.event instanceof Map) { + handleMap(event); + } else if (event.event instanceof Struct) { + handleStruct(event); + } + + //TODO: When we go to the next Kafka version. Check for null date and use the timestamp of the SinkRecord. + + jsonGenerator.writeObject(event); + } } + + } diff --git a/src/main/java/io/confluent/kafka/connect/splunk/SinkRecordContent.java b/src/main/java/io/confluent/kafka/connect/splunk/SinkRecordContent.java index aa91f4d..51263ba 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/SinkRecordContent.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/SinkRecordContent.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,40 +15,22 @@ */ package io.confluent.kafka.connect.splunk; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.api.client.http.HttpContent; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import java.io.IOException; import java.io.OutputStream; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; class SinkRecordContent implements HttpContent { - final ObjectMapper mapper; final Collection sinkRecords; - final Cache> schemaCache = CacheBuilder.newBuilder() - .maximumSize(1000) - .build(); - SinkRecordContent(ObjectMapper mapper, Collection sinkRecords) { - this.mapper = mapper; + SinkRecordContent(SinkRecord... records) { + this(Arrays.asList(records)); + } + + SinkRecordContent(Collection sinkRecords) { this.sinkRecords = sinkRecords; } @@ -67,124 +49,6 @@ public boolean retrySupported() { return false; } - List getWriters(Schema schema) { - List writers = new ArrayList<>(); - - Map fieldLookup = new HashMap<>(); - for (Field field : schema.fields()) { - fieldLookup.put(field.name(), field); - } - Set fields = new HashSet<>(fieldLookup.keySet()); - - if (fieldLookup.containsKey("host")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - String host = struct.getString("host"); - if (null != host) { - objectNode.put("host", host); - } - } - }); - fields.remove("host"); - } - - if (fieldLookup.containsKey("hostname")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - String host = struct.getString("hostname"); - if (null != host) { - objectNode.put("host", host); - } - } - }); - fields.remove("hostname"); - } - - if (fieldLookup.containsKey("time")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - Date dateTime = (Date) struct.get("time"); - if (null != dateTime) { - BigDecimal time = new BigDecimal(BigInteger.valueOf(dateTime.getTime()), 3); - objectNode.put("time", time); - } - } - }); - fields.remove("time"); - } - - if (fieldLookup.containsKey("date")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - Date dateTime = (Date) struct.get("date"); - if (null != dateTime) { - BigDecimal time = new BigDecimal(BigInteger.valueOf(dateTime.getTime()), 3); - objectNode.put("time", time); - } - } - }); - fields.remove("date"); - } - - if (fieldLookup.containsKey("sourcetype")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - String host = struct.getString("sourcetype"); - if (null != host) { - objectNode.put("sourcetype", host); - } - } - }); - fields.remove("sourcetype"); - } - - if (fieldLookup.containsKey("index")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - String host = struct.getString("index"); - if (null != host) { - objectNode.put("index", host); - } - } - }); - fields.remove("index"); - } - - if (fieldLookup.containsKey("source")) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - String host = struct.getString("source"); - if (null != host) { - objectNode.put("source", host); - } - } - }); - fields.remove("source"); - } - - // Process the left over fields and put them in the event. - for (final String field : fields) { - writers.add(new ValueWriter() { - @Override - public void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct) { - Object value = struct.get(field); - - if (null != value) { - eventNode.put(field, value.toString()); - } - } - }); - } - - return writers; - } @Override public void writeTo(OutputStream outputStream) throws IOException { @@ -195,38 +59,11 @@ public void writeTo(OutputStream outputStream) throws IOException { outputStream.write((int) '\n'); } - final Struct valueStruct = (Struct) sinkRecord.value(); - List writers; - try { - writers = this.schemaCache.get(valueStruct.schema(), new Callable>() { - @Override - public List call() throws Exception { - return getWriters(valueStruct.schema()); - } - }); - } catch (ExecutionException e) { - throw new IOException(e); - } - - ObjectNode objectNode = mapper.createObjectNode(); - ObjectNode eventNode = mapper.createObjectNode(); - - for (ValueWriter writer : writers) { - writer.write(objectNode, eventNode, valueStruct); - } - - if (eventNode.size() > 0) { - objectNode.put("event", eventNode); - } - - mapper.writeValue(outputStream, objectNode); + ObjectMapperFactory.INSTANCE.writeValue(outputStream, sinkRecord); index++; } outputStream.flush(); } - interface ValueWriter { - void write(ObjectNode objectNode, ObjectNode eventNode, Struct struct); - } } diff --git a/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSinkTask.java b/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSinkTask.java index 4d14639..29425ba 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSinkTask.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSinkTask.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,6 @@ */ package io.confluent.kafka.connect.splunk; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.http.GZipEncoding; import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpMediaType; @@ -55,7 +54,6 @@ public class SplunkHttpSinkTask extends SinkTask { HttpRequestFactory httpRequestFactory; HttpRequestInitializer httpRequestInitializer; GenericUrl eventCollectorUrl; - ObjectMapper mapper; @Override public String version() { @@ -140,8 +138,6 @@ public void initialize(HttpRequest httpRequest) throws IOException { if (log.isInfoEnabled()) { log.info("Setting Splunk Http Event Collector Url to {}", this.eventCollectorUrl); } - - this.mapper = ObjectMapperFactory.create(); } @@ -159,7 +155,7 @@ public void put(Collection collection) { log.debug("Posting {} message(s) to {}", collection.size(), this.eventCollectorUrl); } - SinkRecordContent sinkRecordContent = new SinkRecordContent(this.mapper, collection); + SinkRecordContent sinkRecordContent = new SinkRecordContent(collection); if (log.isDebugEnabled()) { try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { diff --git a/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTask.java b/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTask.java index 078e129..0cf19bc 100644 --- a/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTask.java +++ b/src/main/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTask.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,6 @@ package io.confluent.kafka.connect.splunk; import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.kafka.connect.utils.data.SourceRecordConcurrentLinkedDeque; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; @@ -95,9 +94,8 @@ public void start(Map map) { } JsonFactory jsonFactory = new JsonFactory(); - ObjectMapper objectMapper = ObjectMapperFactory.create(); - this.eventServlet.configure(this.config, jsonFactory, objectMapper, this.sourceRecordConcurrentLinkedDeque); + this.eventServlet.configure(this.config, jsonFactory, this.sourceRecordConcurrentLinkedDeque); } @Override diff --git a/src/test/java/io/confluent/kafka/connect/splunk/DateSerializationTest.java b/src/test/java/io/confluent/kafka/connect/splunk/DateSerializationTest.java index ad2f551..8215389 100644 --- a/src/test/java/io/confluent/kafka/connect/splunk/DateSerializationTest.java +++ b/src/test/java/io/confluent/kafka/connect/splunk/DateSerializationTest.java @@ -15,45 +15,29 @@ */ package io.confluent.kafka.connect.splunk; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Before; import org.junit.Test; +import java.io.IOException; import java.math.BigDecimal; import java.util.Date; import static org.junit.Assert.*; public class DateSerializationTest { - ObjectMapper mapper; - - @Before - public void setup() { - this.mapper = ObjectMapperFactory.create(); + void roundtrip(final Date expected) throws IOException { + String input = ObjectMapperFactory.INSTANCE.writeValueAsString(expected); + Date actual = ObjectMapperFactory.INSTANCE.readValue(input, Date.class); + assertEquals("actual does not match.", expected, actual); } @Test - public void roundTrip() { + public void roundtrip() throws IOException { final Date expected = new Date(1472507332123L); - final BigDecimal expectedIntermediateValue = BigDecimal.valueOf(expected.getTime(), 3); - JsonNode intermediateNode = this.mapper.convertValue(expected, JsonNode.class); - assertNotNull(intermediateNode); - assertTrue("intermediateNode should be a number.", intermediateNode.isNumber()); - BigDecimal intermediateValue = intermediateNode.decimalValue(); - intermediateValue.setScale(3); - assertEquals(expectedIntermediateValue, intermediateValue); - final Date actual = this.mapper.convertValue(intermediateNode, Date.class); - assertEquals(expected, actual); + roundtrip(expected); + roundtrip(null); } - - @Test - public void nulls() { - Date date = this.mapper.convertValue(null, Date.class); - assertNull(date); - JsonNode inputNode = this.mapper.convertValue(null, JsonNode.class); - date = this.mapper.convertValue(inputNode, Date.class); - assertNull(date); - } - } diff --git a/src/test/java/io/confluent/kafka/connect/splunk/EventConverterTest.java b/src/test/java/io/confluent/kafka/connect/splunk/EventConverterTest.java index f8f1cba..b6b4b2f 100644 --- a/src/test/java/io/confluent/kafka/connect/splunk/EventConverterTest.java +++ b/src/test/java/io/confluent/kafka/connect/splunk/EventConverterTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -41,7 +41,6 @@ public class EventConverterTest { final String TOPIC_PREFIX_CONF = "topic"; - ObjectMapper mapper; SplunkHttpSourceConnectorConfig config; Map settings; @@ -53,7 +52,6 @@ static JsonNode readNode(String fileName) throws IOException { @Before public void setup() { - this.mapper = ObjectMapperFactory.create(); this.settings = new LinkedHashMap<>(); this.settings.put(SplunkHttpSourceConnectorConfig.TOPIC_PREFIX_CONF, TOPIC_PREFIX_CONF); this.settings.put(SplunkHttpSourceConnectorConfig.TOPIC_PER_INDEX_CONF, Boolean.FALSE.toString()); @@ -80,7 +78,7 @@ void assertSourceRecord(final Map expected, final ConnectRecord recor Object structValue = valueStruct.get(entry.getKey()); if (entry.getValue() instanceof Map) { - String text = this.mapper.writeValueAsString(entry.getValue()); + String text = ObjectMapperFactory.INSTANCE.writeValueAsString(entry.getValue()); String structText = (String) structValue; assertThat(entry.getKey() + " should match.", structText, IsEqual.equalTo(text)); } else { @@ -100,11 +98,11 @@ public void StringBody() throws JsonProcessingException { "event", "Hello world!" ); - JsonNode jsonNode = this.mapper.convertValue(expected, JsonNode.class); + JsonNode jsonNode = ObjectMapperFactory.INSTANCE.convertValue(expected, JsonNode.class); assertNotNull("jsonNode should not be null.", jsonNode); assertTrue("jsonNode should be an object.", jsonNode.isObject()); - EventConverter eventConverter = new EventConverter(mapper, config); + EventConverter eventConverter = new EventConverter(config); eventConverter.time = mock(Time.class); when(eventConverter.time.milliseconds()).thenReturn(TIME); SourceRecord sourceRecord = eventConverter.convert(jsonNode, "192.168.1.10"); @@ -124,11 +122,11 @@ public void complexBody() throws JsonProcessingException { ) ); - JsonNode jsonNode = this.mapper.convertValue(expected, JsonNode.class); + JsonNode jsonNode = ObjectMapperFactory.INSTANCE.convertValue(expected, JsonNode.class); assertNotNull("jsonNode should not be null.", jsonNode); assertTrue("jsonNode should be an object.", jsonNode.isObject()); - EventConverter eventConverter = new EventConverter(mapper, config); + EventConverter eventConverter = new EventConverter(config); eventConverter.time = mock(Time.class); when(eventConverter.time.milliseconds()).thenReturn(TIME); SourceRecord sourceRecord = eventConverter.convert(jsonNode, "192.168.1.10"); @@ -143,11 +141,11 @@ public void minimal() throws JsonProcessingException { "event", "Hello world!" ); - JsonNode jsonNode = this.mapper.convertValue(expected, JsonNode.class); + JsonNode jsonNode = ObjectMapperFactory.INSTANCE.convertValue(expected, JsonNode.class); assertNotNull("jsonNode should not be null.", jsonNode); assertTrue("jsonNode should be an object.", jsonNode.isObject()); - EventConverter eventConverter = new EventConverter(mapper, config); + EventConverter eventConverter = new EventConverter(config); eventConverter.time = mock(Time.class); when(eventConverter.time.milliseconds()).thenReturn(TIME); SourceRecord sourceRecord = eventConverter.convert(jsonNode, "192.168.1.10"); diff --git a/src/test/java/io/confluent/kafka/connect/splunk/EventIteratorTest.java b/src/test/java/io/confluent/kafka/connect/splunk/EventIteratorTest.java index 9e7ee35..82cb351 100644 --- a/src/test/java/io/confluent/kafka/connect/splunk/EventIteratorTest.java +++ b/src/test/java/io/confluent/kafka/connect/splunk/EventIteratorTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,7 +17,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.Test; import java.io.IOException; @@ -30,10 +29,9 @@ public static InputStream events() { @Test public void iterate() throws IOException { - ObjectMapper mapper = new ObjectMapper(); JsonFactory factory = new JsonFactory(); InputStream inputStream = events(); - EventIterator iterator = EventIterator.create(mapper, factory, inputStream); + EventIterator iterator = EventIterator.create(factory, inputStream); while (iterator.hasNext()) { JsonNode jsonNode = iterator.next(); diff --git a/src/test/java/io/confluent/kafka/connect/splunk/SinkRecordContentTest.java b/src/test/java/io/confluent/kafka/connect/splunk/SinkRecordContentTest.java index 33146bb..2792b7d 100644 --- a/src/test/java/io/confluent/kafka/connect/splunk/SinkRecordContentTest.java +++ b/src/test/java/io/confluent/kafka/connect/splunk/SinkRecordContentTest.java @@ -15,96 +15,137 @@ */ package io.confluent.kafka.connect.splunk; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; -import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.Map; -public class SinkRecordContentTest { - +import static org.junit.Assert.assertEquals; - public static void addRecord(Collection records, Map values) { - Struct valueStruct = new Struct(EventConverter.VALUE_SCHEMA); - for (Map.Entry entry : values.entrySet()) { - valueStruct.put(entry.getKey(), entry.getValue()); - } +public class SinkRecordContentTest { + private static final Logger log = LoggerFactory.getLogger(SinkRecordContentTest.class); - records.add( - new SinkRecord( - "topic", - 1, - EventConverter.KEY_SCHEMA, - null, - EventConverter.VALUE_SCHEMA, - valueStruct, - 1L - ) - ); + SinkRecord record(Object value) { + return record(null, value); } - @Test - public void issue3() throws IOException { - - Map input = ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp"); - - Collection sinkRecords = Arrays.asList( - new SinkRecord( - "topic", - 1, - null, - null, - null, - input, - 1L - ) + SinkRecord record(Schema schema, Object value) { + SinkRecord record = new SinkRecord( + "topic", + 1, + null, + null, + schema, + value, + 1L ); - ObjectMapper mapper = ObjectMapperFactory.create(); + return record; + } - SinkRecordContent content = new SinkRecordContent(mapper, sinkRecords); - String actual; + void test(final SinkRecord input, final String expected) throws IOException { + SinkRecordContent content = new SinkRecordContent(input); + final String actual; try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { content.writeTo(outputStream); actual = new String(outputStream.toByteArray(), "UTF-8"); } - System.out.println(actual); -// Assert.assertEquals(expected, actual); + log.trace("actual = {}", actual); + assertEquals(expected, actual); } + @Test + public void map() throws IOException { + final Map value = ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp", "sourcetype", "txt", "index", "main"); + final SinkRecord record = record(value); + final String expected = "{\"host\":\"hostname.example.com\",\"time\":1472256858.924,\"sourcetype\":\"txt\",\"index\":\"main\",\"source\":\"testapp\"}"; + test(record, expected); + } @Test - public void writeTo() throws IOException { - Collection sinkRecords = new ArrayList<>(); - addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com")); - addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp")); - addRecord(sinkRecords, ImmutableMap.of("host", "hostname.example.com", "time", new Date(1472256858924L), "source", "testapp", "sourcetype", "txt", "index", "main")); + public void struct000() throws IOException { + final Struct value = new Struct(EventConverter.VALUE_SCHEMA).put("host", "hostname.example.com"); + final SinkRecord record = record(value); - ObjectMapper mapper = ObjectMapperFactory.create(); + final String expected = "{\"host\":\"hostname.example.com\"}"; + test(record, expected); + } - SinkRecordContent content = new SinkRecordContent(mapper, sinkRecords); + @Test + public void struct001() throws IOException { + final Struct value = new Struct(EventConverter.VALUE_SCHEMA) + .put("host", "hostname.example.com") + .put("time", new Date(1472256858924L)) + .put("source", "testapp"); + final SinkRecord record = record(value); + + final String expected = "{\"host\":\"hostname.example.com\",\"time\":1472256858.924,\"source\":\"testapp\"}"; + test(record, expected); + } - String actual; - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - content.writeTo(outputStream); - actual = new String(outputStream.toByteArray(), "UTF-8"); - } + @Test + public void struct002() throws IOException { + final Struct value = new Struct(EventConverter.VALUE_SCHEMA) + .put("host", "hostname.example.com") + .put("time", new Date(1472256858924L)) + .put("source", "testapp") + .put("sourcetype", "txt") + .put("index", "main"); + final SinkRecord record = record(value); + + final String expected = "{\"host\":\"hostname.example.com\",\"time\":1472256858.924,\"sourcetype\":\"txt\",\"index\":\"main\",\"source\":\"testapp\"}"; + test(record, expected); + } - final String expected = "{\"host\":\"hostname.example.com\"}\n" + - "{\"host\":\"hostname.example.com\",\"time\":1472256858.924,\"source\":\"testapp\"}\n" + - "{\"host\":\"hostname.example.com\",\"time\":1472256858.924,\"sourcetype\":\"txt\",\"index\":\"main\",\"source\":\"testapp\"}"; + @Test + public void string001() throws IOException { + final SinkRecord record = record("This is a random value"); - System.out.println(actual); - Assert.assertEquals(expected, actual); + final String expected = "{\"event\":\"This is a random value\"}"; + test(record, expected); } + @Test + public void boolean001() throws IOException { + final SinkRecord record = record(true); + + final String expected = "{\"event\":true}"; + test(record, expected); + } + + @Test + public void number001() throws IOException { + final SinkRecord record = record(12341233); + + final String expected = "{\"event\":12341233}"; + test(record, expected); + } + + public static void addRecord(Collection records, Map values) { + Struct valueStruct = new Struct(EventConverter.VALUE_SCHEMA); + for (Map.Entry entry : values.entrySet()) { + valueStruct.put(entry.getKey(), entry.getValue()); + } + + records.add( + new SinkRecord( + "topic", + 1, + EventConverter.KEY_SCHEMA, + null, + EventConverter.VALUE_SCHEMA, + valueStruct, + 1L + ) + ); + } } diff --git a/src/test/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTaskTest.java b/src/test/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTaskTest.java index 0c327d4..f0a9936 100644 --- a/src/test/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTaskTest.java +++ b/src/test/java/io/confluent/kafka/connect/splunk/SplunkHttpSourceTaskTest.java @@ -1,12 +1,12 @@ /** * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,9 +54,7 @@ static BufferedReader records(Map... records) throws IOException { byte[] buffer; try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) { - SinkRecordContent content = new SinkRecordContent( - ObjectMapperFactory.create(), - sinkRecords); + SinkRecordContent content = new SinkRecordContent(sinkRecords); content.writeTo(outputStream); buffer = outputStream.toByteArray(); } @@ -76,7 +74,6 @@ public void setup() { settings.put(SplunkHttpSourceConnectorConfig.EVENT_COLLECTOR_INDEX_DEFAULT_CONF, "default"); this.task.start(settings); assertNotNull(this.task.eventServlet); - assertNotNull(this.task.eventServlet.objectMapper); assertNotNull(this.task.eventServlet.config); assertNotNull(this.task.eventServlet.converter); }