Skip to content

[hotfix] Supports specifying routing fields #121

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 1 commit into
base: v3.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public Optional<String> getPathPrefix() {
return config.getOptional(ElasticsearchConnectorOptions.CONNECTION_PATH_PREFIX);
}

public String getPartitionRoutingFields() {
return config.get(ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ public class ElasticsearchConnectorOptions {
"The format must produce a valid JSON document. "
+ "Please refer to the documentation on formats for more details.");

public static final ConfigOption<String> PARTITION_ROUTING_FIELDS =
ConfigOptions.key("sink.partition-routing.fields")
.stringType()
.noDefaultValue()
.withDescription("Route field names list, multiple separated by commas.");

// --------------------------------------------------------------------------------------------
// Enums
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.StringUtils;

import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.Period;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

/** An extractor for a Elasticsearch key from a {@link RowData}. */
Expand Down Expand Up @@ -109,6 +113,39 @@ public static Function<RowData, String> createKeyExtractor(
.orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
}

public static Function<RowData, String> createColumnExtractor(
TableSchema schema, String keyDelimiter, String columns) {
List<String> cols = null;
if (StringUtils.isNullOrWhitespaceOnly(columns)) {
cols = new ArrayList<>(0);
} else {
cols = Arrays.asList(columns.split(","));
}
return createColumnExtractor(schema, keyDelimiter, cols);
}

public static Function<RowData, String> createColumnExtractor(
TableSchema schema, String keyDelimiter, List<String> columns) {
Map<String, ColumnWithIndex> namesToColumns = new HashMap<>();
List<TableColumn> tableColumns = schema.getTableColumns();
for (int i = 0; i < schema.getFieldCount(); i++) {
TableColumn column = tableColumns.get(i);
namesToColumns.put(column.getName(), new ColumnWithIndex(column, i));
}

FieldFormatter[] fieldFormatters = columns == null || columns.isEmpty() ? new FieldFormatter[0] :
columns.stream()
.map(namesToColumns::get)
.map(
column ->
toFormatter(
column.index, column.getType()))
.toArray(FieldFormatter[]::new);

Function<RowData, String> extractor = new KeyExtractor(fieldFormatters, keyDelimiter);
return Optional.of(extractor).orElseGet(() -> (Function<RowData, String> & Serializable) (row) -> null);
}

private static FieldFormatter toFormatter(int index, LogicalType type) {
switch (type.getTypeRoot()) {
case DATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ interface RequestFactory extends Serializable {
* Creates an update request to be added to a {@link RequestIndexer}. Note: the type field has
* been deprecated since Elasticsearch 7.x and it would not take any effort.
*/
UpdateRequest createUpdateRequest(
String index, String docType, String key, XContentType contentType, byte[] document);
UpdateRequest createUpdateRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document);

/**
* Creates an index request to be added to a {@link RequestIndexer}. Note: the type field has
* been deprecated since Elasticsearch 7.x and it would not take any effort.
*/
IndexRequest createIndexRequest(
String index, String docType, String key, XContentType contentType, byte[] document);
IndexRequest createIndexRequest(String index, String docType, String key, String routing, XContentType contentType, byte[] document);

/**
* Creates a delete request to be added to a {@link RequestIndexer}. Note: the type field has
* been deprecated since Elasticsearch 7.x and it would not take any effort.
*/
DeleteRequest createDeleteRequest(String index, String docType, String key);
DeleteRequest createDeleteRequest(String index, String docType, String key, String routing);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,23 @@ class RowElasticsearchSinkFunction implements ElasticsearchSinkFunction<RowData>
private final XContentType contentType;
private final RequestFactory requestFactory;
private final Function<RowData, String> createKey;
private final Function<RowData, String> routingKey;

public RowElasticsearchSinkFunction(
IndexGenerator indexGenerator,
@Nullable String docType, // this is deprecated in es 7+
SerializationSchema<RowData> serializationSchema,
XContentType contentType,
RequestFactory requestFactory,
Function<RowData, String> createKey) {
Function<RowData, String> createKey,
Function<RowData, String> routingKey) {
this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
this.docType = docType;
this.serializationSchema = Preconditions.checkNotNull(serializationSchema);
this.contentType = Preconditions.checkNotNull(contentType);
this.requestFactory = Preconditions.checkNotNull(requestFactory);
this.createKey = Preconditions.checkNotNull(createKey);
this.routingKey = routingKey;
}

@Override
Expand Down Expand Up @@ -96,20 +99,20 @@ private void processUpsert(RowData row, RequestIndexer indexer) {
if (key != null) {
final UpdateRequest updateRequest =
requestFactory.createUpdateRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
requestFactory.createIndexRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexGenerator.generate(row), docType, key, routingKey.apply(row), contentType, document);
indexer.add(indexRequest);
}
}

private void processDelete(RowData row, RequestIndexer indexer) {
final String key = createKey.apply(row);
final DeleteRequest deleteRequest =
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key, routingKey.apply(row));
indexer.add(deleteRequest);
}

Expand All @@ -127,7 +130,8 @@ public boolean equals(Object o) {
&& Objects.equals(serializationSchema, that.serializationSchema)
&& contentType == that.contentType
&& Objects.equals(requestFactory, that.requestFactory)
&& Objects.equals(createKey, that.createKey);
&& Objects.equals(createKey, that.createKey)
&& Objects.equals(routingKey, that.routingKey);
}

@Override
Expand All @@ -138,6 +142,7 @@ public int hashCode() {
serializationSchema,
contentType,
requestFactory,
createKey);
createKey,
routingKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -129,4 +131,82 @@ public void testAllTypesKey() {
.isEqualTo(
"1_2_3_4_true_1.0_2.0_ABCD_2012-12-12T12:12:12_2013-01-13T13:13:13_14:14:14_2015-05-15");
}

@Test
public void testStringColumnsExtractor() {
TableSchema schema =
TableSchema.builder()
.field("a", DataTypes.BIGINT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();

Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", "a,b");

String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
assertThat(key).isEqualTo("12_ABCD");
}

@Test
public void testListColumnsExtractor() {
TableSchema schema =
TableSchema.builder()
.field("a", DataTypes.BIGINT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();

Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_",
Arrays.asList("a", "b"));

String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
assertThat(key).isEqualTo("12_ABCD");
}

@Test
public void testEmptyColumnsExtractor() {
TableSchema schema =
TableSchema.builder()
.field("a", DataTypes.BIGINT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();

String columns = null;
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns);

String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
assertThat(key).isEqualTo("");
}

@Test
public void testBlankColumnsExtractor() {
TableSchema schema =
TableSchema.builder()
.field("a", DataTypes.BIGINT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();

Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", "");

String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
assertThat(key).isEqualTo("");
}

@Test
public void testNullColumnsExtractor() {
TableSchema schema =
TableSchema.builder()
.field("a", DataTypes.BIGINT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();

List<String> columns = null;
Function<RowData, String> keyExtractor = KeyExtractor.createColumnExtractor(schema, "_", columns);

String key = keyExtractor.apply(GenericRowData.of(12L, StringData.fromString("ABCD")));
assertThat(key).isEqualTo("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields()));

final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
Expand Down Expand Up @@ -295,26 +296,39 @@ public UpdateRequest createUpdateRequest(
String index,
String docType,
String key,
String routing,
XContentType contentType,
byte[] document) {
return new UpdateRequest(index, docType, key)
.doc(document, contentType)
.upsert(document, contentType);
UpdateRequest req = new UpdateRequest(index, docType, key)
.doc(document, contentType);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req.upsert(document, contentType);
}

@Override
public IndexRequest createIndexRequest(
String index,
String docType,
String key,
String routing,
XContentType contentType,
byte[] document) {
return new IndexRequest(index, docType, key).source(document, contentType);
IndexRequest req = new IndexRequest(index, docType, key);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req.source(document, contentType);
}

@Override
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
return new DeleteRequest(index, docType, key);
public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) {
DeleteRequest req = new DeleteRequest(index, docType, key);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()),
KeyExtractor.createColumnExtractor(schema, config.getKeyDelimiter(), config.getPartitionRoutingFields()));

final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
Expand Down Expand Up @@ -290,26 +291,39 @@ public UpdateRequest createUpdateRequest(
String index,
String docType,
String key,
String routing,
XContentType contentType,
byte[] document) {
return new UpdateRequest(index, key)
.doc(document, contentType)
.upsert(document, contentType);
UpdateRequest req = new UpdateRequest(index, key)
.doc(document, contentType);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req.upsert(document, contentType);
}

@Override
public IndexRequest createIndexRequest(
String index,
String docType,
String key,
String routing,
XContentType contentType,
byte[] document) {
return new IndexRequest(index).id(key).source(document, contentType);
IndexRequest req = new IndexRequest(index).id(key);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req.source(document, contentType);
}

@Override
public DeleteRequest createDeleteRequest(String index, String docType, String key) {
return new DeleteRequest(index, key);
public DeleteRequest createDeleteRequest(String index, String docType, String key, String routing) {
DeleteRequest req = new DeleteRequest(index, key);
if (!StringUtils.isNullOrWhitespaceOnly(routing)) {
req = req.routing(routing);
}
return req;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.HOSTS_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.INDEX_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.KEY_DELIMITER_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PARTITION_ROUTING_FIELDS;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.PASSWORD_OPTION;
import static org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions.USERNAME_OPTION;
import static org.apache.flink.table.connector.source.lookup.LookupOptions.CACHE_TYPE;
Expand Down Expand Up @@ -104,7 +105,8 @@ public class Elasticsearch7DynamicTableFactory
PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
PARTIAL_CACHE_MAX_ROWS,
PARTIAL_CACHE_CACHE_MISSING_KEY,
MAX_RETRIES)
MAX_RETRIES,
PARTITION_ROUTING_FIELDS)
.collect(Collectors.toSet());

@Override
Expand Down