Skip to content

Commit

Permalink
General cleanup around the json handling. Move to using a single Obje…
Browse files Browse the repository at this point in the history
…ctMapper across the whole application. Added serializers for Struct and SourceRecord.
  • Loading branch information
jcustenborder committed Jan 20, 2017
1 parent 0618a26 commit 5089a11
Show file tree
Hide file tree
Showing 14 changed files with 364 additions and 445 deletions.

This file was deleted.

This file was deleted.

31 changes: 14 additions & 17 deletions src/main/java/io/confluent/kafka/connect/splunk/EventConverter.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,7 +17,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
Expand Down Expand Up @@ -58,7 +57,6 @@ class EventConverter {

private static final Map<String, ?> EMPTY_MAP = new HashMap<>();

final ObjectMapper mapper;
final SplunkHttpSourceConnectorConfig config;
final String topicPrefix;
final boolean topicPerIndex;
Expand All @@ -68,8 +66,7 @@ class EventConverter {

Time time = new SystemTime();

EventConverter(ObjectMapper mapper, SplunkHttpSourceConnectorConfig config) {
this.mapper = mapper;
EventConverter(SplunkHttpSourceConnectorConfig config) {
this.config = config;
this.topicPerIndex = this.config.topicPerIndex();
this.topicPrefix = this.config.topicPrefix();
Expand All @@ -78,20 +75,20 @@ class EventConverter {
this.defaultIndex = this.config.defaultIndex();
}

static <T> void setFieldValue(ObjectMapper mapper, JsonNode messageNode, Struct struct, String fieldName, Class<T> cls) {
static <T> void setFieldValue(JsonNode messageNode, Struct struct, String fieldName, Class<T> cls) {
T structValue = null;

if (messageNode.has(fieldName)) {
JsonNode valueNode = messageNode.get(fieldName);

if (String.class.equals(cls) && valueNode.isObject()) {
try {
structValue = (T) mapper.writeValueAsString(valueNode);
structValue = (T) ObjectMapperFactory.INSTANCE.writeValueAsString(valueNode);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
} else if (!valueNode.isNull()) {
structValue = mapper.convertValue(valueNode, cls);
structValue = ObjectMapperFactory.INSTANCE.convertValue(valueNode, cls);
}
}

Expand All @@ -105,12 +102,12 @@ public SourceRecord convert(JsonNode messageNode, String remoteHost) {
Struct keyStruct = new Struct(KEY_SCHEMA);
Struct valueStruct = new Struct(VALUE_SCHEMA);

setFieldValue(this.mapper, messageNode, valueStruct, "time", Date.class);
setFieldValue(this.mapper, messageNode, valueStruct, "host", String.class);
setFieldValue(this.mapper, messageNode, valueStruct, "source", String.class);
setFieldValue(this.mapper, messageNode, valueStruct, "sourcetype", String.class);
setFieldValue(this.mapper, messageNode, valueStruct, "index", String.class);
setFieldValue(this.mapper, messageNode, valueStruct, "event", String.class);
setFieldValue(messageNode, valueStruct, "time", Date.class);
setFieldValue(messageNode, valueStruct, "host", String.class);
setFieldValue(messageNode, valueStruct, "source", String.class);
setFieldValue(messageNode, valueStruct, "sourcetype", String.class);
setFieldValue(messageNode, valueStruct, "index", String.class);
setFieldValue(messageNode, valueStruct, "event", String.class);

if (null == valueStruct.get("time")) {
valueStruct.put("time", new Date(this.time.milliseconds()));
Expand Down
27 changes: 12 additions & 15 deletions src/main/java/io/confluent/kafka/connect/splunk/EventIterator.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -18,7 +18,6 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.BufferedReader;
import java.io.IOException;
Expand All @@ -27,30 +26,28 @@

class EventIterator implements Iterator<JsonNode>, AutoCloseable {
final JsonFactory jsonFactory;
final ObjectMapper objectMapper;
final JsonParser jsonParser;
final Iterator<JsonNode> iterator;

EventIterator(JsonFactory jsonFactory, ObjectMapper objectMapper, JsonParser jsonParser, Iterator<JsonNode> iterator) {
EventIterator(JsonFactory jsonFactory, JsonParser jsonParser, Iterator<JsonNode> iterator) {
this.jsonFactory = jsonFactory;
this.objectMapper = objectMapper;
this.jsonParser = jsonParser;
this.iterator = iterator;
}

public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, BufferedReader bufferedReader) throws IOException {
public static EventIterator create(JsonFactory jsonFactory, BufferedReader bufferedReader) throws IOException {
JsonParser jsonParser = jsonFactory.createParser(bufferedReader);
return create(objectMapper, jsonFactory, jsonParser);
return create(jsonFactory, jsonParser);
}

public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, InputStream inputStream) throws IOException {
public static EventIterator create(JsonFactory jsonFactory, InputStream inputStream) throws IOException {
JsonParser jsonParser = jsonFactory.createParser(inputStream);
return create(objectMapper, jsonFactory, jsonParser);
return create(jsonFactory, jsonParser);
}

public static EventIterator create(ObjectMapper objectMapper, JsonFactory jsonFactory, JsonParser jsonParser) throws IOException {
Iterator<JsonNode> iterator = objectMapper.readValues(jsonParser, JsonNode.class);
return new EventIterator(jsonFactory, objectMapper, jsonParser, iterator);
public static EventIterator create(JsonFactory jsonFactory, JsonParser jsonParser) throws IOException {
Iterator<JsonNode> iterator = ObjectMapperFactory.INSTANCE.readValues(jsonParser, JsonNode.class);
return new EventIterator(jsonFactory, jsonParser, iterator);
}

@Override
Expand Down
17 changes: 7 additions & 10 deletions src/main/java/io/confluent/kafka/connect/splunk/EventServlet.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com)
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -17,7 +17,6 @@

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.json.Json;
import io.confluent.kafka.connect.utils.data.SourceRecordConcurrentLinkedDeque;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -37,19 +36,17 @@ public class EventServlet extends HttpServlet {
private static final Logger log = LoggerFactory.getLogger(EventServlet.class);
ServletConfig servletConfig;
JsonFactory jsonFactory;
ObjectMapper objectMapper;
EventConverter converter;
SourceRecordConcurrentLinkedDeque recordQueue;

SplunkHttpSourceConnectorConfig config;
Set<String> allowedIndexes;


public void configure(SplunkHttpSourceConnectorConfig config, JsonFactory jsonFactory, ObjectMapper objectMapper, SourceRecordConcurrentLinkedDeque recordQueue) {
public void configure(SplunkHttpSourceConnectorConfig config, JsonFactory jsonFactory, SourceRecordConcurrentLinkedDeque recordQueue) {
this.config = config;
this.jsonFactory = jsonFactory;
this.objectMapper = objectMapper;
this.converter = new EventConverter(this.objectMapper, this.config);
this.converter = new EventConverter(this.config);
this.recordQueue = recordQueue;
this.allowedIndexes = this.config.allowedIndexes();
}
Expand Down Expand Up @@ -93,7 +90,7 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
try {

try (BufferedReader bodyReader = request.getReader()) {
try (EventIterator iterator = EventIterator.create(this.objectMapper, this.jsonFactory, bodyReader)) {
try (EventIterator iterator = EventIterator.create(this.jsonFactory, bodyReader)) {

while (iterator.hasNext()) {
JsonNode jsonNode = iterator.next();
Expand Down
Loading

0 comments on commit 5089a11

Please # to comment.