Skip to content

Commit 782ff71

Browse files
grzegorz8davidradlGrzegorz Kołakowski
authored andcommitted
HTTP-63 Add caching (getindata#109)
Co-authored-by: David Radley <david_radley@uk.ibm.com> Co-authored-by: Grzegorz Kołakowski <grzegorz.kolakowski@getindata.com>
1 parent e77ff67 commit 782ff71

11 files changed

+359
-106
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## [Unreleased]
44

5+
### Added
6+
7+
- Added support for caching of lookup joins.
8+
59
### Fixed
610

711
- Fixed issue in the logging code of the `JavaNetHttpPollingClient` which prevents showing the status code and response body when the log level is configured at DEBUG (or lower) level.

README.md

+43-19
Large diffs are not rendered by default.

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ under the License.
6868

6969
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
7070
section, omitting the patch part (so for 1.15.0 use 1.15). -->
71+
7172
<flink.version>1.16.3</flink.version>
7273

7374
<target.java.version>11</target.java.version>
@@ -296,7 +297,6 @@ under the License.
296297
<artifactId>maven-surefire-plugin</artifactId>
297298
<version>3.0.0-M5</version>
298299
<configuration>
299-
300300
</configuration>
301301
</plugin>
302302

src/main/java/com/getindata/connectors/http/internal/table/lookup/AsyncHttpTableLookupFunction.java

+10-14
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

33
import java.util.Collection;
4-
import java.util.Collections;
5-
import java.util.Optional;
64
import java.util.concurrent.CompletableFuture;
75
import java.util.concurrent.ExecutorService;
86
import java.util.concurrent.Executors;
97

108
import lombok.RequiredArgsConstructor;
119
import lombok.extern.slf4j.Slf4j;
1210
import org.apache.flink.table.data.RowData;
13-
import org.apache.flink.table.functions.AsyncTableFunction;
11+
import org.apache.flink.table.functions.AsyncLookupFunction;
1412
import org.apache.flink.table.functions.FunctionContext;
1513
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
1614

@@ -19,7 +17,7 @@
1917

2018
@Slf4j
2119
@RequiredArgsConstructor
22-
public class AsyncHttpTableLookupFunction extends AsyncTableFunction<RowData> {
20+
public class AsyncHttpTableLookupFunction extends AsyncLookupFunction {
2321

2422
private static final String PULLING_THREAD_POOL_SIZE = "8";
2523

@@ -73,29 +71,27 @@ public void open(FunctionContext context) throws Exception {
7371
);
7472
}
7573

76-
public void eval(CompletableFuture<Collection<RowData>> resultFuture, Object... keys) {
77-
78-
CompletableFuture<Optional<RowData>> future = new CompletableFuture<>();
79-
future.completeAsync(() -> decorate.lookupByKeys(keys), pullingThreadPool);
74+
@Override
75+
public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
76+
CompletableFuture<Collection<RowData>> future = new CompletableFuture<>();
77+
future.completeAsync(() -> decorate.lookup(keyRow), pullingThreadPool);
8078

8179
// We don't want to use ForkJoinPool at all. We are using a different thread pool
8280
// for publishing here intentionally to avoid thread starvation.
81+
CompletableFuture<Collection<RowData>> resultFuture = new CompletableFuture<>();
8382
future.whenCompleteAsync(
84-
(optionalResult, throwable) -> {
83+
(result, throwable) -> {
8584
if (throwable != null) {
8685
log.error("Exception while processing Http Async request", throwable);
8786
resultFuture.completeExceptionally(
8887
new RuntimeException("Exception while processing Http Async request",
8988
throwable));
9089
} else {
91-
if (optionalResult.isPresent()) {
92-
resultFuture.complete(Collections.singleton(optionalResult.get()));
93-
} else {
94-
resultFuture.complete(Collections.emptyList());
95-
}
90+
resultFuture.complete(result);
9691
}
9792
},
9893
publishingThreadPool);
94+
return resultFuture;
9995
}
10096

10197
public LookupRow getLookupRow() {

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java

+44-17
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.ArrayList;
44
import java.util.List;
5+
import javax.annotation.Nullable;
56

67
import lombok.extern.slf4j.Slf4j;
78
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -10,15 +11,19 @@
1011
import org.apache.flink.table.api.DataTypes.Field;
1112
import org.apache.flink.table.connector.Projection;
1213
import org.apache.flink.table.connector.format.DecodingFormat;
13-
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
1414
import org.apache.flink.table.connector.source.DynamicTableSource;
1515
import org.apache.flink.table.connector.source.LookupTableSource;
16-
import org.apache.flink.table.connector.source.TableFunctionProvider;
1716
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
1817
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
18+
import org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider ;
19+
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
20+
import org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
21+
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
22+
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
1923
import org.apache.flink.table.data.RowData;
2024
import org.apache.flink.table.factories.DynamicTableFactory;
2125
import org.apache.flink.table.factories.FactoryUtil;
26+
import org.apache.flink.table.functions.AsyncLookupFunction;
2227
import org.apache.flink.table.types.DataType;
2328
import org.apache.flink.table.types.logical.LogicalType;
2429
import org.apache.flink.table.types.logical.RowType;
@@ -46,17 +51,20 @@ public class HttpLookupTableSource
4651
private final DynamicTableFactory.Context dynamicTableFactoryContext;
4752

4853
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
54+
@Nullable
55+
private final LookupCache cache;
4956

5057
public HttpLookupTableSource(
5158
DataType physicalRowDataType,
5259
HttpLookupConfig lookupConfig,
5360
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
54-
DynamicTableFactory.Context dynamicTablecontext) {
55-
61+
DynamicTableFactory.Context dynamicTablecontext,
62+
@Nullable LookupCache cache) {
5663
this.physicalRowDataType = physicalRowDataType;
5764
this.lookupConfig = lookupConfig;
5865
this.decodingFormat = decodingFormat;
5966
this.dynamicTableFactoryContext = dynamicTablecontext;
67+
this.cache = cache;
6068
}
6169

6270
@Override
@@ -66,6 +74,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)
6674

6775
@Override
6876
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
77+
log.debug("getLookupRuntimeProvider Entry");
6978

7079
LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());
7180

@@ -94,21 +103,38 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
94103
PollingClientFactory<RowData> pollingClientFactory =
95104
createPollingClientFactory(lookupQueryCreator, lookupConfig);
96105

97-
HttpTableLookupFunction dataLookupFunction =
98-
new HttpTableLookupFunction(
99-
pollingClientFactory,
100-
responseSchemaDecoder,
101-
lookupRow,
102-
lookupConfig
103-
);
106+
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
107+
}
108+
109+
protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
110+
DeserializationSchema<RowData> responseSchemaDecoder,
111+
PollingClientFactory<RowData> pollingClientFactory) {
104112

113+
HttpTableLookupFunction dataLookupFunction =
114+
new HttpTableLookupFunction(
115+
pollingClientFactory,
116+
responseSchemaDecoder,
117+
lookupRow,
118+
lookupConfig
119+
);
105120
if (lookupConfig.isUseAsync()) {
106-
log.info("Using Async version of HttpLookupTable.");
107-
return AsyncTableFunctionProvider.of(
108-
new AsyncHttpTableLookupFunction(dataLookupFunction));
121+
AsyncLookupFunction asyncLookupFunction =
122+
new AsyncHttpTableLookupFunction(dataLookupFunction);
123+
if (cache != null) {
124+
log.info("Using async version of HttpLookupTable with cache.");
125+
return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
126+
} else {
127+
log.info("Using async version of HttpLookupTable without cache.");
128+
return AsyncLookupFunctionProvider.of(asyncLookupFunction);
129+
}
109130
} else {
110-
log.info("Using blocking version of HttpLookupTable.");
111-
return TableFunctionProvider.of(dataLookupFunction);
131+
if (cache != null) {
132+
log.info("Using blocking version of HttpLookupTable with cache.");
133+
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
134+
} else {
135+
log.info("Using blocking version of HttpLookupTable without cache.");
136+
return LookupFunctionProvider.of(dataLookupFunction);
137+
}
112138
}
113139
}
114140

@@ -118,7 +144,8 @@ public DynamicTableSource copy() {
118144
physicalRowDataType,
119145
lookupConfig,
120146
decodingFormat,
121-
dynamicTableFactoryContext
147+
dynamicTableFactoryContext,
148+
cache
122149
);
123150
}
124151

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java

+32-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Set;
66
import java.util.function.Predicate;
77
import java.util.stream.Collectors;
8+
import javax.annotation.Nullable;
89

910
import org.apache.flink.api.common.serialization.DeserializationSchema;
1011
import org.apache.flink.configuration.ConfigOption;
@@ -15,6 +16,9 @@
1516
import org.apache.flink.table.catalog.ResolvedSchema;
1617
import org.apache.flink.table.connector.format.DecodingFormat;
1718
import org.apache.flink.table.connector.source.DynamicTableSource;
19+
import org.apache.flink.table.connector.source.lookup.LookupOptions;
20+
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
21+
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
1822
import org.apache.flink.table.data.RowData;
1923
import org.apache.flink.table.factories.DeserializationFormatFactory;
2024
import org.apache.flink.table.factories.DynamicTableSourceFactory;
@@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
4852
FactoryUtil.TableFactoryHelper helper =
4953
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);
5054

51-
ReadableConfig readableConfig = helper.getOptions();
55+
ReadableConfig readable = helper.getOptions();
5256
helper.validateExcept(
5357
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
5458
"table.",
@@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
6266
FactoryUtil.FORMAT
6367
);
6468

65-
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
69+
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);
6670

6771
ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();
6872

@@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
7377
physicalRowDataType,
7478
lookupConfig,
7579
decodingFormat,
76-
dynamicTableContext
80+
dynamicTableContext,
81+
getLookupCache(readable)
7782
);
7883
}
7984

@@ -89,7 +94,18 @@ public Set<ConfigOption<?>> requiredOptions() {
8994

9095
@Override
9196
public Set<ConfigOption<?>> optionalOptions() {
92-
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);
97+
98+
return Set.of(
99+
URL_ARGS,
100+
ASYNC_POLLING,
101+
LOOKUP_METHOD,
102+
REQUEST_CALLBACK_IDENTIFIER,
103+
LookupOptions.CACHE_TYPE,
104+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
105+
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
106+
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
107+
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
108+
LookupOptions.MAX_RETRIES);
93109
}
94110

95111
private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
@@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
115131
.build();
116132
}
117133

134+
@Nullable
135+
private LookupCache getLookupCache(ReadableConfig tableOptions) {
136+
LookupCache cache = null;
137+
// Do not support legacy cache options
138+
if (tableOptions
139+
.get(LookupOptions.CACHE_TYPE)
140+
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
141+
cache = DefaultLookupCache.fromConfig(tableOptions);
142+
}
143+
return cache;
144+
}
145+
118146
// TODO verify this since we are on 1.15 now.
119147
// Backport from Flink 1.15-Master
120148
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {

src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java

+11-20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.getindata.connectors.http.internal.table.lookup;
22

3+
import java.util.Collection;
4+
import java.util.Collections;
35
import java.util.Optional;
46
import java.util.concurrent.atomic.AtomicInteger;
57

@@ -8,17 +10,16 @@
810
import lombok.extern.slf4j.Slf4j;
911
import org.apache.flink.annotation.VisibleForTesting;
1012
import org.apache.flink.api.common.serialization.DeserializationSchema;
11-
import org.apache.flink.table.data.GenericRowData;
1213
import org.apache.flink.table.data.RowData;
1314
import org.apache.flink.table.functions.FunctionContext;
14-
import org.apache.flink.table.functions.TableFunction;
15+
import org.apache.flink.table.functions.LookupFunction;
1516

1617
import com.getindata.connectors.http.internal.PollingClient;
1718
import com.getindata.connectors.http.internal.PollingClientFactory;
1819
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;
1920

2021
@Slf4j
21-
public class HttpTableLookupFunction extends TableFunction<RowData> {
22+
public class HttpTableLookupFunction extends LookupFunction {
2223

2324
private final PollingClientFactory<RowData> pollingClientFactory;
2425

@@ -50,32 +51,22 @@ public HttpTableLookupFunction(
5051

5152
@Override
5253
public void open(FunctionContext context) throws Exception {
53-
super.open(context);
54-
5554
this.responseSchemaDecoder.open(
5655
SerializationSchemaUtils
5756
.createDeserializationInitContext(HttpTableLookupFunction.class));
5857

5958
this.localHttpCallCounter = new AtomicInteger(0);
6059
this.client = pollingClientFactory
61-
.createPollClient(options, responseSchemaDecoder);
62-
63-
context
64-
.getMetricGroup()
65-
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
66-
}
60+
.createPollClient(options, responseSchemaDecoder);
6761

68-
/**
69-
* This is a lookup method which is called by Flink framework in a runtime.
70-
*/
71-
public void eval(Object... keys) {
72-
lookupByKeys(keys)
73-
.ifPresent(this::collect);
62+
context.getMetricGroup()
63+
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
7464
}
7565

76-
public Optional<RowData> lookupByKeys(Object[] keys) {
77-
RowData rowData = GenericRowData.of(keys);
66+
@Override
67+
public Collection<RowData> lookup(RowData keyRow) {
7868
localHttpCallCounter.incrementAndGet();
79-
return client.pull(rowData);
69+
Optional<RowData> result = client.pull(keyRow);
70+
return result.map(Collections::singletonList).orElse(Collections.emptyList());
8071
}
8172
}

src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java

+1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public JavaNetHttpPollingClient(
6666
@Override
6767
public Optional<RowData> pull(RowData lookupRow) {
6868
try {
69+
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
6970
return queryAndProcess(lookupRow);
7071
} catch (Exception e) {
7172
log.error("Exception during HTTP request.", e);

0 commit comments

Comments
 (0)