diff --git a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java index c503dc7e9..3466a4bcc 100644 --- a/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java +++ b/elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java @@ -45,6 +45,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * table output elastic5plugin @@ -172,24 +173,17 @@ public void setBulkFlushMaxActions(int bulkFlushMaxActions) { @Override public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo) { - ElasticsearchTableInfo elasticsearchTableInfo = (ElasticsearchTableInfo) targetTableInfo; - esTableInfo = elasticsearchTableInfo; - clusterName = elasticsearchTableInfo.getClusterName(); - String address = elasticsearchTableInfo.getAddress(); - String[] addr = StringUtils.split(address, ","); - esAddressList = Arrays.asList(addr); - index = elasticsearchTableInfo.getIndex(); - type = elasticsearchTableInfo.getEsType(); - String id = elasticsearchTableInfo.getId(); - String[] idField = StringUtils.split(id, ","); - idIndexList = new ArrayList<>(); - - for(int i = 0; i < idField.length; ++i) { - idIndexList.add(Integer.valueOf(idField[i])); + esTableInfo = (ElasticsearchTableInfo) targetTableInfo; + clusterName = esTableInfo.getClusterName(); + index = esTableInfo.getIndex(); + type = esTableInfo.getEsType(); + columnTypes = esTableInfo.getFieldTypes(); + esAddressList = Arrays.asList(esTableInfo.getAddress().split(",")); + String id = esTableInfo.getId(); + + if (!org.apache.commons.lang.StringUtils.isEmpty(id)) { + idIndexList = Arrays.stream(org.apache.commons.lang.StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList()); } - - columnTypes = elasticsearchTableInfo.getFieldTypes(); - return this; } }