Skip to content

Commit 28e173f

Browse files
committed
#88 | Initial idea to support pagination
1 parent 5d02dd4 commit 28e173f

14 files changed

+142
-11
lines changed

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfig.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@
4242
import static com.github.castorm.kafka.connect.common.ConfigUtils.breakDownMap;
4343
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
4444
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
45-
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
4645
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
46+
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
47+
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
4748

4849
@Getter
4950
class HttpSourceConnectorConfig extends AbstractConfig {
@@ -55,6 +56,11 @@ class HttpSourceConnectorConfig extends AbstractConfig {
5556
private static final String RECORD_SORTER = "http.record.sorter";
5657
private static final String RECORD_FILTER_FACTORY = "http.record.filter.factory";
5758
private static final String OFFSET_INITIAL = "http.offset.initial";
59+
private static final String HANDLE_PAGINATION = "http.request.pagination.handle";
60+
private static final String NEXT_PAGE_URL_MODE = "http.request.pagination.mode";
61+
private static final String NEXT_PAGE_BASE_URL = "http.request.pagination.baseurl";
62+
private static final String OVERWRITE = "overwrite";
63+
private static final String APPEND = "append";
5864

5965
private final TimerThrottler throttler;
6066
private final HttpRequestFactory requestFactory;
@@ -63,6 +69,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
6369
private final SourceRecordFilterFactory recordFilterFactory;
6470
private final SourceRecordSorter recordSorter;
6571
private final Map<String, String> initialOffset;
72+
private final Boolean handlePagination;
73+
private final Boolean appendNextPageUrl;
74+
private final String baseUrl;
6675

6776
HttpSourceConnectorConfig(Map<String, ?> originals) {
6877
super(config(), originals);
@@ -74,6 +83,9 @@ class HttpSourceConnectorConfig extends AbstractConfig {
7483
recordSorter = getConfiguredInstance(RECORD_SORTER, SourceRecordSorter.class);
7584
recordFilterFactory = getConfiguredInstance(RECORD_FILTER_FACTORY, SourceRecordFilterFactory.class);
7685
initialOffset = breakDownMap(getString(OFFSET_INITIAL));
86+
handlePagination = getBoolean(HANDLE_PAGINATION);
87+
appendNextPageUrl = getString(NEXT_PAGE_URL_MODE).equalsIgnoreCase(APPEND);
88+
baseUrl = getString(NEXT_PAGE_BASE_URL);
7789
}
7890

7991
public static ConfigDef config() {
@@ -84,6 +96,9 @@ public static ConfigDef config() {
8496
.define(RESPONSE_PARSER, CLASS, PolicyHttpResponseParser.class, HIGH, "Response Parser Class")
8597
.define(RECORD_SORTER, CLASS, OrderDirectionSourceRecordSorter.class, LOW, "Record Sorter Class")
8698
.define(RECORD_FILTER_FACTORY, CLASS, OffsetRecordFilterFactory.class, LOW, "Record Filter Factory Class")
87-
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset");
99+
.define(OFFSET_INITIAL, STRING, "", HIGH, "Starting offset")
100+
.define(HANDLE_PAGINATION, BOOLEAN, false, LOW, "Handle Pagination")
101+
.define(NEXT_PAGE_URL_MODE, STRING, OVERWRITE, ConfigDef.ValidString.in(APPEND, OVERWRITE), LOW, "Append or overwrite the next page URL")
102+
.define(NEXT_PAGE_BASE_URL, STRING, "", LOW, "Base URL in case of append mode");
88103
}
89104
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/HttpSourceTask.java

+50-4
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@
4040

4141
import java.io.IOException;
4242
import java.time.Instant;
43-
import java.util.List;
44-
import java.util.Map;
43+
import java.util.*;
4544
import java.util.function.Function;
4645

4746
import static com.github.castorm.kafka.connect.common.VersionUtils.getVersion;
@@ -67,6 +66,16 @@ public class HttpSourceTask extends SourceTask {
6766

6867
private SourceRecordFilterFactory recordFilterFactory;
6968

69+
private Boolean handlePagination;
70+
71+
private Boolean appendNextPageUrl;
72+
73+
private String baseUrl;
74+
75+
private String modifiedUrl;
76+
77+
private HttpRequest request = null;
78+
7079
private ConfirmationWindow<Map<String, ?>> confirmationWindow = new ConfirmationWindow<>(emptyList());
7180

7281
@Getter
@@ -92,6 +101,10 @@ public void start(Map<String, String> settings) {
92101
recordSorter = config.getRecordSorter();
93102
recordFilterFactory = config.getRecordFilterFactory();
94103
offset = loadOffset(config.getInitialOffset());
104+
handlePagination = !Objects.isNull(config.getHandlePagination()) && config.getHandlePagination();
105+
appendNextPageUrl = !Objects.isNull(config.getAppendNextPageUrl()) && config.getAppendNextPageUrl();
106+
baseUrl = config.getBaseUrl();
107+
modifiedUrl = null;
95108
}
96109

97110
private Offset loadOffset(Map<String, String> initialOffset) {
@@ -104,11 +117,37 @@ public List<SourceRecord> poll() throws InterruptedException {
104117

105118
throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));
106119

107-
HttpRequest request = requestFactory.createRequest(offset);
120+
// HttpRequest request = requestFactory.createRequest(offset);
121+
122+
List<SourceRecord> records = new ArrayList<>();
123+
124+
if(handlePagination && !Objects.isNull(modifiedUrl)) {
125+
request = HttpRequest.builder()
126+
.method(request.getMethod())
127+
.url(modifiedUrl)
128+
.headers(request.getHeaders())
129+
.body(request.getBody())
130+
.build();
131+
} else {
132+
request = requestFactory.createRequest(offset);
133+
}
108134

109135
HttpResponse response = execute(request);
110136

111-
List<SourceRecord> records = responseParser.parse(response);
137+
records.addAll(responseParser.parse(response));
138+
139+
if(handlePagination) {
140+
Optional<String> nextPageUrl = responseParser.getNextPageUrl(response);
141+
log.info("Next page URL: {}", nextPageUrl.orElse("no value"));
142+
if( isNextPageUrlPresent(nextPageUrl) ) {
143+
modifiedUrl = appendNextPageUrl
144+
? baseUrl + nextPageUrl.orElse("")
145+
: nextPageUrl.orElse(null);
146+
} else {
147+
modifiedUrl = null;
148+
}
149+
}
150+
112151

113152
List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
114153
.filter(recordFilterFactory.create(offset))
@@ -129,6 +168,13 @@ private HttpResponse execute(HttpRequest request) {
129168
}
130169
}
131170

171+
private Boolean isNextPageUrlPresent(Optional<String> nextPageUrl) {
172+
return nextPageUrl.isPresent() &&
173+
!Objects.isNull(nextPageUrl.orElse(null)) &&
174+
!nextPageUrl.orElse(null).equalsIgnoreCase("null");
175+
}
176+
177+
132178
private static List<Map<String, ?>> extractOffsets(List<SourceRecord> recordsToSend) {
133179
return recordsToSend.stream()
134180
.map(SourceRecord::sourceOffset)

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParser.java

+5
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.Optional;
3233
import java.util.function.Function;
3334

3435
import static java.util.stream.Collectors.toList;
@@ -59,4 +60,8 @@ public List<SourceRecord> parse(HttpResponse response) {
5960
.map(recordMapper::map)
6061
.collect(toList());
6162
}
63+
64+
public Optional<String> getNextPageUrl(HttpResponse response) {
65+
return recordParser.getNextPageUrl(response);
66+
}
6267
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParser.java

+14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.Optional;
3132
import java.util.function.Function;
3233

3334
import static java.util.Collections.emptyList;
@@ -65,4 +66,17 @@ public List<SourceRecord> parse(HttpResponse response) {
6566
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
6667
}
6768
}
69+
70+
@Override
71+
public Optional<String> getNextPageUrl(HttpResponse response) {
72+
switch (policy.resolve(response)) {
73+
case PROCESS:
74+
return delegate.getNextPageUrl(response);
75+
case SKIP:
76+
return Optional.empty();
77+
case FAIL:
78+
default:
79+
throw new IllegalStateException(String.format("Policy failed for response code: %s, body: %s", response.getCode(), ofNullable(response.getBody()).map(String::new).orElse("")));
80+
}
81+
}
6882
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonKvRecordHttpResponseParser.java

+5
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ public List<KvRecord> parse(HttpResponse response) {
6565
.collect(toList());
6666
}
6767

68+
@Override
69+
public Optional<String> getNextPageUrl(HttpResponse response) {
70+
return responseParser.getNextPageUrl(response.getBody());
71+
}
72+
6873
private KvRecord map(JacksonRecord record) {
6974

7075
Map<String, Object> offsets = record.getOffset();

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParser.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void configure(Map<String, ?> settings) {
6161
keyPointer = config.getKeyPointer();
6262
valuePointer = config.getValuePointer();
6363
offsetPointers = config.getOffsetPointers();
64-
timestampPointer = config.getTimestampPointer();
64+
6565
}
6666

6767
/**

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonRecordParserConfig.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static java.util.stream.Collectors.toMap;
4040
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
4141
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
42+
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
4243
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
4344

4445
@Getter
@@ -49,12 +50,14 @@ public class JacksonRecordParserConfig extends AbstractConfig {
4950
private static final String ITEM_KEY_POINTER = "http.response.record.key.pointer";
5051
private static final String ITEM_TIMESTAMP_POINTER = "http.response.record.timestamp.pointer";
5152
private static final String ITEM_OFFSET_VALUE_POINTER = "http.response.record.offset.pointer";
53+
private static final String NEXT_PAGE_POINTER = "http.response.next.page.pointer";
5254

5355
private final JsonPointer recordsPointer;
5456
private final List<JsonPointer> keyPointer;
5557
private final JsonPointer valuePointer;
5658
private final Optional<JsonPointer> timestampPointer;
5759
private final Map<String, JsonPointer> offsetPointers;
60+
private final Optional<JsonPointer> nextPagePointer;
5861

5962
JacksonRecordParserConfig(Map<String, ?> originals) {
6063
super(config(), originals);
@@ -65,6 +68,7 @@ public class JacksonRecordParserConfig extends AbstractConfig {
6568
offsetPointers = breakDownMap(getString(ITEM_OFFSET_VALUE_POINTER)).entrySet().stream()
6669
.map(entry -> new SimpleEntry<>(entry.getKey(), compile(entry.getValue())))
6770
.collect(toMap(Entry::getKey, Entry::getValue));
71+
nextPagePointer = ofNullable(getString(NEXT_PAGE_POINTER)).map(JsonPointer::compile);
6872
}
6973

7074
public static ConfigDef config() {
@@ -73,6 +77,7 @@ public static ConfigDef config() {
7377
.define(ITEM_POINTER, STRING, "/", HIGH, "Item JsonPointer")
7478
.define(ITEM_KEY_POINTER, STRING, null, HIGH, "Item Key JsonPointers")
7579
.define(ITEM_TIMESTAMP_POINTER, STRING, null, MEDIUM, "Item Timestamp JsonPointer")
76-
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers");
80+
.define(ITEM_OFFSET_VALUE_POINTER, STRING, "", MEDIUM, "Item Offset JsonPointers")
81+
.define(NEXT_PAGE_POINTER, STRING, "/next", LOW, "Pointer for next page");
7782
}
7883
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonResponseRecordParser.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.Configurable;
2929

3030
import java.util.Map;
31+
import java.util.Optional;
3132
import java.util.function.Function;
3233
import java.util.stream.Stream;
3334

@@ -45,6 +46,10 @@ public class JacksonResponseRecordParser implements Configurable {
4546

4647
private JsonPointer recordsPointer;
4748

49+
private Optional<JsonPointer> nextPagePointer;
50+
51+
private JsonNode jsonBody;
52+
4853
public JacksonResponseRecordParser() {
4954
this(new JacksonRecordParser(), new JacksonSerializer(new ObjectMapper()));
5055
}
@@ -61,14 +66,21 @@ public void configure(Map<String, ?> settings) {
6166

6267
Stream<JacksonRecord> getRecords(byte[] body) {
6368

64-
JsonNode jsonBody = serializer.deserialize(body);
69+
this.jsonBody = serializer.deserialize(body);
6570

6671
Map<String, Object> responseOffset = getResponseOffset(jsonBody);
6772

6873
return serializer.getArrayAt(jsonBody, recordsPointer)
6974
.map(jsonRecord -> toJacksonRecord(jsonRecord, responseOffset));
7075
}
7176

77+
Optional<String> getNextPageUrl(byte[] body) {
78+
return nextPagePointer.map(pointer ->
79+
serializer.checkIfNonNull(this.jsonBody, pointer)
80+
? serializer.getObjectAt(this.jsonBody, pointer).asText()
81+
: null);
82+
}
83+
7284
private Map<String, Object> getResponseOffset(JsonNode node) {
7385
return emptyMap();
7486
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/jackson/JacksonSerializer.java

+5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ Stream<JsonNode> getArrayAt(JsonNode node, JsonPointer pointer) {
6262
return array.isArray() ? stream(array.spliterator(), false) : Stream.of(array);
6363
}
6464

65+
boolean checkIfNonNull(JsonNode node, JsonPointer pointer) {
66+
return !node.at(pointer).isMissingNode();
67+
}
68+
69+
6570
private static JsonNode getRequiredAt(JsonNode body, JsonPointer recordsPointer) {
6671
return JSON_ROOT.equals(recordsPointer) ? body : body.requiredAt(recordsPointer);
6772
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/HttpResponseParser.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@
2626

2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Optional;
2930

30-
@FunctionalInterface
3131
public interface HttpResponseParser extends Configurable {
3232

3333
List<SourceRecord> parse(HttpResponse response);
3434

3535
default void configure(Map<String, ?> map) {
3636
// Do nothing
3737
}
38+
39+
Optional<String> getNextPageUrl(HttpResponse response);
3840
}

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/response/spi/KvRecordHttpResponseParser.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@
2626

2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Optional;
2930

30-
@FunctionalInterface
3131
public interface KvRecordHttpResponseParser extends Configurable {
3232

3333
List<KvRecord> parse(HttpResponse response);
3434

3535
default void configure(Map<String, ?> map) {
3636
// Do nothing
3737
}
38+
39+
Optional<String> getNextPageUrl(HttpResponse response);
40+
41+
3842
}

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/HttpSourceConnectorConfigTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.HashMap;
4444
import java.util.List;
4545
import java.util.Map;
46+
import java.util.Optional;
4647

4748
import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.config;
4849
import static com.github.castorm.kafka.connect.http.HttpSourceConnectorConfigTest.Fixture.configWithout;
@@ -136,6 +137,11 @@ public static class TestResponseParser implements HttpResponseParser {
136137
public List<SourceRecord> parse(HttpResponse response) {
137138
return null;
138139
}
140+
141+
@Override
142+
public Optional<String> getNextPageUrl(HttpResponse response) {
143+
return Optional.empty();
144+
}
139145
}
140146

141147
public static class TestRecordSorter implements SourceRecordSorter {

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/KvHttpResponseParserConfigTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.HashMap;
3434
import java.util.List;
3535
import java.util.Map;
36+
import java.util.Optional;
3637

3738
import static java.util.Collections.emptyMap;
3839
import static org.assertj.core.api.Assertions.assertThat;
@@ -65,6 +66,11 @@ public static class TestResponseParser implements KvRecordHttpResponseParser {
6566
public List<KvRecord> parse(HttpResponse response) {
6667
return null;
6768
}
69+
70+
@Override
71+
public Optional<String> getNextPageUrl(HttpResponse response) {
72+
return Optional.empty();
73+
}
6874
}
6975

7076
public static class TestRecordMapper implements KvSourceRecordMapper {

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/response/PolicyHttpResponseParserConfigTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.HashMap;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Optional;
3334

3435
import static java.util.Collections.emptyMap;
3536
import static org.assertj.core.api.Assertions.assertThat;
@@ -62,6 +63,11 @@ public static class TestResponseParser implements HttpResponseParser {
6263
public List<SourceRecord> parse(HttpResponse response) {
6364
return null;
6465
}
66+
67+
@Override
68+
public Optional<String> getNextPageUrl(HttpResponse response) {
69+
return Optional.empty();
70+
}
6571
}
6672

6773
public static class TestPolicy implements HttpResponsePolicy {

0 commit comments

Comments
 (0)