From c5db5830482c96ddcb4a5375cd91d16d272ef414 Mon Sep 17 00:00:00 2001 From: Onizuka Date: Thu, 27 Jan 2022 03:54:00 +0800 Subject: [PATCH] Implement support for reindex API. Original Pull Request #2070 Closes #1529 --- .../DefaultReactiveElasticsearchClient.java | 16 + .../reactive/ReactiveElasticsearchClient.java | 71 +++ .../client/reactive/RequestCreator.java | 11 + .../client/util/RequestConverters.java | 11 +- .../core/DocumentOperations.java | 25 ++ .../core/ElasticsearchRestTemplate.java | 23 + .../core/ReactiveDocumentOperations.java | 25 ++ .../core/ReactiveElasticsearchTemplate.java | 25 ++ .../elasticsearch/core/RequestFactory.java | 135 +++++- .../elasticsearch/core/ResponseConverter.java | 50 +++ .../core/reindex/ReindexRequest.java | 386 +++++++++++++++++ .../core/reindex/ReindexResponse.java | 403 ++++++++++++++++++ .../elasticsearch/core/reindex/Remote.java | 142 ++++++ .../core/reindex/package-info.java | 3 + .../core/ElasticsearchTemplateTests.java | 38 ++ ...ElasticsearchTemplateIntegrationTests.java | 40 ++ .../core/RequestFactoryTests.java | 95 ++++- 17 files changed, 1479 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 001e44b1c..cd5d2beb3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -83,11 +83,13 @@ import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.indices.*; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -143,6 +145,7 @@ * @author Brian Clozel * @author Farid Faoudi * @author George Popides + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -509,6 +512,19 @@ public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { .next(); } + @Override + public Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers) + .next(); + } + + @Override + public Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) + .next() + .map(TaskSubmissionResponse::getTask); + } + @Override public Mono execute(ReactiveElasticsearchClientCallback callback) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 030243851..b5edb1904 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; @@ -76,6 +77,7 @@ * @author Henrique Amaral * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -713,6 +715,75 @@ default Mono bulk(BulkRequest bulkRequest) { */ Mono bulk(HttpHeaders headers, BulkRequest bulkRequest); + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the response + * @since 4.4 + */ + default Mono reindex(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return reindex(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + * @since 4.4 + */ + default Mono reindex(ReindexRequest reindexRequest){ + return reindex(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + * @since 4.4 + */ + Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest); + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the task id + * @since 4.4 + */ + default Mono submitReindex(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return submitReindex(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task id + * @since 4.4 + */ + default Mono submitReindex(ReindexRequest reindexRequest){ + return submitReindex(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task id + * @since 4.4 + */ + Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest); /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 34fb05016..0d1b5ecec 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -49,6 +49,7 @@ import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; @@ -289,4 +290,14 @@ default Function getIndex() { default Function clusterHealth() { return RequestConverters::clusterHealth; } + + /** + * @since 4.4 + */ + default Function reindex() { return RequestConverters::reindex; } + + /** + * @since 4.4 + */ + default Function submitReindex() { return RequestConverters::submitReindex; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 6d5ad016d..82e9df984 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) { return request; } - public static Request reindex(ReindexRequest reindexRequest) throws IOException { + public static Request reindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, true); } - static Request submitReindex(ReindexRequest reindexRequest) throws IOException { + public static Request submitReindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, false); } @@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool .withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) .withRequestsPerSecond(reindexRequest.getRequestsPerSecond()); + if(reindexRequest.getDestination().isRequireAlias()){ + params.putParam("require_alias", Boolean.TRUE.toString()); + } if (reindexRequest.getScrollTime() != null) { params.putParam("scroll", reindexRequest.getScrollTime()); } + params.putParam("slices", Integer.toString(reindexRequest.getSlices())); + if(reindexRequest.getMaxDocs() > -1){ + params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs())); + } request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index d795462c1..fb66a4b99 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,6 +18,8 @@ import java.util.Collection; import java.util.List; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -34,6 +36,7 @@ * * @author Peter-Josef Meisch * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface DocumentOperations { @@ -322,4 +325,26 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * @since 4.2 */ ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param reindexRequest reindex request parameters + * @return the reindex response + * @since 4.4 + */ + ReindexResponse reindex(ReindexRequest reindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param reindexRequest reindex request parameters + * @return the task id + * @since 4.4 + */ + String submitReindex(ReindexRequest reindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 84277ce34..b355e25bd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -61,7 +61,9 @@ import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -106,6 +108,7 @@ * @author Gyula Attila Csorogi * @author Massimiliano Poggi * @author Farid Faoudi + * @author Sijia Liu * @since 4.4 */ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @@ -277,6 +280,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) return ResponseConverter.byQueryResponseOf(bulkByScrollResponse); } + @Override + public ReindexResponse reindex(ReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final BulkByScrollResponse bulkByScrollResponse = execute( + client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); + return ResponseConverter.reindexResponseOf(bulkByScrollResponse); + } + + @Override + public String submitReindex(ReindexRequest postReindexRequest) { + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return execute( + client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); + } + public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 995a82897..59a2e5675 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,6 +40,7 @@ * @author Aleksei Arsenev * @author Roman Puchkovskiy * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -302,4 +305,26 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @since 4.2 */ Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param reindexRequest reindex request parameters + * @return a {@link Mono} emitting the reindex response + * @since 4.4 + */ + Mono reindex(ReindexRequest reindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param reindexRequest reindex request parameters + * @return a {@link Mono} emitting the {@literal task} id. + * @since 4.4 + */ + Mono submitReindex(ReindexRequest reindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 387f6d3fa..d3bf9ad60 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -70,6 +70,8 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -105,6 +107,7 @@ * @author Russell Parry * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware { @@ -609,6 +612,28 @@ public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordin }); } + @Override + public Mono reindex(ReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf); + }); + } + + @Override + public Mono submitReindex(ReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.submitReindex(reindexRequest))); + }); + } + @Override public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 0251bfeab..bf67ace9c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -15,19 +15,15 @@ */ package org.springframework.data.elasticsearch.core; +import static org.elasticsearch.core.TimeValue.*; import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.elasticsearch.index.reindex.RemoteInfo.*; +import static org.elasticsearch.script.Script.*; import static org.springframework.util.CollectionUtils.*; +import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.elasticsearch.action.DocWriteRequest; @@ -58,6 +54,7 @@ import org.elasticsearch.client.indices.IndexTemplatesExistRequest; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoDistance; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.core.TimeValue; @@ -66,6 +63,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -73,6 +71,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.QueryRescoreMode; import org.elasticsearch.search.rescore.QueryRescorerBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; @@ -81,6 +80,8 @@ import org.elasticsearch.search.sort.SortMode; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; @@ -91,7 +92,12 @@ import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source; +import org.springframework.data.elasticsearch.core.reindex.Remote; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -387,6 +393,117 @@ public DeleteIndexTemplateRequest deleteIndexTemplateRequest(DeleteTemplateReque return new DeleteIndexTemplateRequest(deleteTemplateRequest.getTemplateName()); } + /** + * @since 4.4 + */ + public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest){ + final org.elasticsearch.index.reindex.ReindexRequest request = new org.elasticsearch.index.reindex.ReindexRequest(); + if(reindexRequest.getConflicts() != null){ + request.setConflicts(reindexRequest.getConflicts().name().toLowerCase(Locale.ROOT)); + } + if(reindexRequest.getMaxDocs() != null){ + request.setMaxDocs(reindexRequest.getMaxDocs()); + } + // region source build + final Source source = reindexRequest.getSource(); + request.setSourceIndices(source.getIndexes().getIndexNames()); + // source query will build from RemoteInfo if remote exist + if(source.getQuery() != null && source.getRemote() == null){ + request.setSourceQuery(getQuery(source.getQuery())); + } + if(source.getSize() != null){ + request.setSourceBatchSize(source.getSize()); + } + + if(source.getRemote() != null){ + Remote remote = source.getRemote(); + QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : getQuery(source.getQuery()); + BytesReference query; + try { + XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); + query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (IOException e) { + throw new IllegalArgumentException("an IOException occurs while building the source query content",e); + } + request.setRemoteInfo(new RemoteInfo( + remote.getScheme(), + remote.getHost(), + remote.getPort(), + remote.getPathPrefix(), + query, + remote.getUsername(), + remote.getPassword(), + Collections.emptyMap(), + remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT : timeValueSeconds(remote.getSocketTimeout().getSeconds()), + remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT : timeValueSeconds(remote.getConnectTimeout().getSeconds()) + )); + } + + final Slice slice = source.getSlice(); + if(slice != null){ + request.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); + } + final SourceFilter sourceFilter = source.getSourceFilter(); + if(sourceFilter != null){ + request.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); + } + // endregion + + // region dest build + final Dest dest = reindexRequest.getDest(); + request.setDestIndex(dest.getIndex().getIndexName()) + .setDestRouting(dest.getRouting()) + .setDestPipeline(dest.getPipeline()); + + final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); + if(versionType != null){ + request.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); + } + final IndexQuery.OpType opType = dest.getOpType(); + if(opType != null){ + request.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); + } + // endregion + + // region script build + final ReindexRequest.Script script = reindexRequest.getScript(); + if(script != null){ + request.setScript(new Script(DEFAULT_SCRIPT_TYPE, + script.getLang(), + script.getSource(), + Collections.emptyMap() + )); + } + // endregion + + // region query parameters build + final Duration timeout = reindexRequest.getTimeout(); + if(timeout != null){ + request.setTimeout(timeValueSeconds(timeout.getSeconds())); + } + if(reindexRequest.getRefresh() != null){ + request.setRefresh(reindexRequest.getRefresh()); + } + if(reindexRequest.getRequireAlias() != null){ + request.setRequireAlias(reindexRequest.getRequireAlias()); + } + if(reindexRequest.getRequestsPerSecond() != null){ + request.setRequestsPerSecond(reindexRequest.getRequestsPerSecond()); + } + final Duration scroll = reindexRequest.getScroll(); + if(scroll != null){ + request.setScroll(timeValueSeconds(scroll.getSeconds())); + } + if(reindexRequest.getWaitForActiveShards() != null){ + request.setWaitForActiveShards(ActiveShardCount.parseString(reindexRequest.getWaitForActiveShards())); + } + if(reindexRequest.getSlices() != null){ + request.setSlices(reindexRequest.getSlices()); + } + // endregion + return request; + } + // endregion // region delete diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index 8f9d7928e..8efe62224 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -45,6 +45,7 @@ import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.TemplateData; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -54,6 +55,7 @@ * * @author George Popides * @author Peter-Josef Meisch + * @author Sijia Liu * @since 4.2 */ public class ResponseConverter { @@ -384,4 +386,52 @@ public static ByQueryResponse.SearchFailure byQueryResponseSearchFailureOf( } // endregion + + // region postReindexResponse + + /** + * @since 4.4 + */ + public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + final List failures = bulkByScrollResponse.getBulkFailures() // + .stream() // + .map(ResponseConverter::reindexResponseFailureOf) // + .collect(Collectors.toList()); // + + return ReindexResponse.builder() // + .withTook(bulkByScrollResponse.getTook().getMillis()) // + .withTimedOut(bulkByScrollResponse.isTimedOut()) // + .withTotal(bulkByScrollResponse.getTotal()) // + .withUpdated(bulkByScrollResponse.getUpdated()) // + .withDeleted(bulkByScrollResponse.getDeleted()) // + .withBatches(bulkByScrollResponse.getBatches()) // + .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) // + .withNoops(bulkByScrollResponse.getNoops()) // + .withBulkRetries(bulkByScrollResponse.getBulkRetries()) // + .withSearchRetries(bulkByScrollResponse.getSearchRetries()) // + .withThrottledMillis(bulkByScrollResponse.getStatus().getThrottled().getMillis()) // + .withRequestsPerSecond(bulkByScrollResponse.getStatus().getRequestsPerSecond()) // + .withThrottledUntilMillis(bulkByScrollResponse.getStatus().getThrottledUntil().getMillis()) // + .withFailures(failures) // + .build(); // + + } + + /** + * @since 4.4 + */ + public static ReindexResponse.Failure reindexResponseFailureOf(BulkItemResponse.Failure failure) { + return ReindexResponse.Failure.builder() // + .withIndex(failure.getIndex()) // + .withType(failure.getType()) // + .withId(failure.getId()) // + .withStatus(failure.getStatus().getStatus()) // + .withAborted(failure.isAborted()) // + .withCause(failure.getCause()) // + .withSeqNo(failure.getSeqNo()) // + .withTerm(failure.getTerm()) // + .build(); // + } + + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java new file mode 100644 index 000000000..a9e95de15 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java @@ -0,0 +1,386 @@ +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; + +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.Query; +import org.springframework.data.elasticsearch.core.query.SourceFilter; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; + +/** + * Request to reindex some documents from one index to another. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @author Sijia Liu + * @since 4.4 + */ +public class ReindexRequest { + + // Request body + private final Source source; + private final Dest dest; + @Nullable private final Integer maxDocs; + @Nullable private final Conflicts conflicts; + @Nullable private final Script script; + + // Query parameters + @Nullable private final Duration timeout; + @Nullable private final Boolean requireAlias; + @Nullable private final Boolean refresh; + @Nullable private final String waitForActiveShards; + @Nullable private final Integer requestsPerSecond; + @Nullable private final Duration scroll; + @Nullable private final Integer slices; + + private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + + Assert.notNull(source, "source must not be null"); + Assert.notNull(dest, "dest must not be null"); + + this.source = source; + this.dest = dest; + this.maxDocs = maxDocs; + this.conflicts = conflicts; + this.script = script; + this.timeout = timeout; + this.requireAlias = requireAlias; + this.refresh = refresh; + this.waitForActiveShards = waitForActiveShards; + this.requestsPerSecond = requestsPerSecond; + this.scroll = scroll; + this.slices = slices; + } + + @Nullable + public Integer getMaxDocs() { + return maxDocs; + } + + public Source getSource() { + return source; + } + + public Dest getDest() { + return dest; + } + + @Nullable + public Script getScript() { + return script; + } + + @Nullable + public Conflicts getConflicts() { + return conflicts; + } + + @Nullable + public Boolean getRequireAlias() { + return requireAlias; + } + + @Nullable + public Duration getTimeout() { + return timeout; + } + + @Nullable + public Boolean getRefresh() { + return refresh; + } + + @Nullable + public String getWaitForActiveShards() { + return waitForActiveShards; + } + + @Nullable + public Integer getRequestsPerSecond() { + return requestsPerSecond; + } + + @Nullable + public Duration getScroll() { + return scroll; + } + + @Nullable + public Integer getSlices() { + return slices; + } + + public static ReindexRequestBuilder builder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + return new ReindexRequestBuilder(sourceIndex, destIndex); + } + + public enum Conflicts { + PROCEED, ABORT + } + + public static class Source { + private final IndexCoordinates indexes; + @Nullable private Query query; + @Nullable private Remote remote; + @Nullable private Slice slice; + @Nullable private Integer size; + @Nullable private SourceFilter sourceFilter; + + private Source(IndexCoordinates indexes){ + Assert.notNull(indexes, "indexes must not be null"); + + this.indexes = indexes; + } + + public IndexCoordinates getIndexes() { + return indexes; + } + + @Nullable + public Remote getRemote() { + return remote; + } + + @Nullable + public Query getQuery() { + return query; + } + + @Nullable + public Integer getSize() { + return size; + } + + @Nullable + public Slice getSlice() { + return slice; + } + + @Nullable + public SourceFilter getSourceFilter() { + return sourceFilter; + } + } + + public static class Slice { + private final int id; + private final int max; + + private Slice(int id, int max) { + this.id = id; + this.max = max; + } + + public int getId() { + return id; + } + + public int getMax() { + return max; + } + } + + public static class Dest { + + private final IndexCoordinates index; + @Nullable private String pipeline; + @Nullable private String routing; + @Nullable private Document.VersionType versionType; + @Nullable private IndexQuery.OpType opType; + + private Dest(IndexCoordinates index) { + Assert.notNull(index, "dest index must not be null"); + + this.index = index; + } + + public IndexCoordinates getIndex() { + return index; + } + + @Nullable + public Document.VersionType getVersionType() { + return versionType; + } + + @Nullable + public IndexQuery.OpType getOpType() { + return opType; + } + + @Nullable + public String getPipeline() { + return pipeline; + } + + @Nullable + public String getRouting() { + return routing; + } + } + + public static class Script { + private final String source; + @Nullable private final String lang; + + private Script(String source, @Nullable String lang) { + Assert.notNull(source, "source must not be null"); + + this.source = source; + this.lang = lang; + } + + public String getSource() { + return source; + } + + @Nullable + public String getLang() { + return lang; + } + } + + public static final class ReindexRequestBuilder { + + private final Source source; + private final Dest dest; + @Nullable private Integer maxDocs; + @Nullable private Conflicts conflicts; + @Nullable private Script script; + @Nullable private Duration timeout; + @Nullable private Boolean requireAlias; + @Nullable private Boolean refresh; + @Nullable private String waitForActiveShards; + @Nullable private Integer requestsPerSecond; + @Nullable private Duration scroll; + @Nullable private Integer slices; + + public ReindexRequestBuilder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + + Assert.notNull(sourceIndex, "sourceIndex must not be null"); + Assert.notNull(destIndex, "destIndex must not be null"); + + this.source = new Source(sourceIndex); + this.dest = new Dest(destIndex); + } + + // region setter + + public ReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { + this.maxDocs = maxDocs; + return this; + } + + public ReindexRequestBuilder withConflicts(Conflicts conflicts) { + this.conflicts = conflicts; + return this; + } + + public ReindexRequestBuilder withSourceQuery(Query query) { + this.source.query = query; + return this; + } + + public ReindexRequestBuilder withSourceSlice(int id, int max){ + this.source.slice = new Slice(id, max); + return this; + } + + public ReindexRequestBuilder withSourceRemote(Remote remote) { + this.source.remote = remote; + return this; + } + + public ReindexRequestBuilder withSourceSize(int size) { + this.source.size = size; + return this; + } + + public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ + this.source.sourceFilter = sourceFilter; + return this; + } + + public ReindexRequestBuilder withDestPipeline(String pipelineName){ + this.dest.pipeline = pipelineName; + return this; + } + + public ReindexRequestBuilder withDestRouting(String routing){ + this.dest.routing = routing; + return this; + } + + public ReindexRequestBuilder withDestVersionType(Document.VersionType versionType) { + this.dest.versionType = versionType; + return this; + } + + public ReindexRequestBuilder withDestOpType(IndexQuery.OpType opType) { + this.dest.opType = opType; + return this; + } + + public ReindexRequestBuilder withScript(String source, @Nullable String lang) { + this.script = new Script(source, lang); + return this; + } + + public ReindexRequestBuilder withTimeout(Duration timeout){ + this.timeout = timeout; + return this; + } + + public ReindexRequestBuilder withRequireAlias(boolean requireAlias){ + this.requireAlias = requireAlias; + return this; + } + + public ReindexRequestBuilder withRefresh(boolean refresh){ + this.refresh = refresh; + return this; + } + + public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ + this.waitForActiveShards = waitForActiveShards; + return this; + } + + public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public ReindexRequestBuilder withScroll(Duration scroll){ + this.scroll = scroll; + return this; + } + + public ReindexRequestBuilder withSlices(int slices){ + this.slices = slices; + return this; + } + // endregion + + public ReindexRequest build() { + return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java new file mode 100644 index 000000000..05be38c65 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java @@ -0,0 +1,403 @@ +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; + +import org.springframework.lang.Nullable; + +import java.util.Collections; +import java.util.List; + +/** + * Response of reindex request. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) + * + * @author Sijia Liu + * @since 4.4 + */ +public class ReindexResponse { + + private final long took; + private final boolean timedOut; + private final long total; + private final long updated; + private final long deleted; + private final int batches; + private final long versionConflicts; + private final long noops; + private final long bulkRetries; + private final long searchRetries; + private final long throttledMillis; + private final double requestsPerSecond; + private final long throttledUntilMillis; + private final List failures; + + private ReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + long versionConflicts, long noops, long bulkRetries, long searchRetries, + long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { + this.took = took; + this.timedOut = timedOut; + this.total = total; + this.updated = updated; + this.deleted = deleted; + this.batches = batches; + this.versionConflicts = versionConflicts; + this.noops = noops; + this.bulkRetries = bulkRetries; + this.searchRetries = searchRetries; + this.throttledMillis = throttledMillis; + this.requestsPerSecond = requestsPerSecond; + this.throttledUntilMillis = throttledUntilMillis; + this.failures = failures; + } + + /** + * The number of milliseconds from start to end of the whole operation. + */ + public long getTook() { + return took; + } + + /** + * Did any of the sub-requests that were part of this request timeout? + */ + public boolean isTimedOut() { + return timedOut; + } + + /** + * The number of documents that were successfully processed. + */ + public long getTotal() { + return total; + } + + /** + * The number of documents that were successfully updated. + */ + public long getUpdated() { + return updated; + } + + /** + * The number of documents that were successfully deleted. + */ + public long getDeleted() { + return deleted; + } + + /** + * The number of scroll responses pulled back by the update by query. + */ + public int getBatches() { + return batches; + } + + /** + * The number of version conflicts that the update by query hit. + */ + public long getVersionConflicts() { + return versionConflicts; + } + + /** + * The number of documents that were ignored because the script used for the update by query returned a noop value for + * ctx.op. + */ + public long getNoops() { + return noops; + } + + /** + * The number of times that the request had retry bulk actions. + */ + public long getBulkRetries() { + return bulkRetries; + } + + /** + * The number of times that the request had retry search actions. + */ + public long getSearchRetries() { + return searchRetries; + } + + /** + * Number of milliseconds the request slept to conform to requests_per_second. + */ + public long getThrottledMillis() { + return throttledMillis; + } + + /** + * The number of requests per second effectively executed during the reindex. + */ + public double getRequestsPerSecond() { + return requestsPerSecond; + } + + /** + * This field should always be equal to zero in a _reindex response. + * It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) + * a throttled request will be executed again in order to conform to requests_per_second. + */ + public long getThrottledUntilMillis() { + return throttledUntilMillis; + } + + /** + * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true + * (the default). + */ + public List getFailures() { + return failures; + } + + /** + * Create a new {@link ReindexResponseBuilder} to build {@link ReindexResponse} + * + * @return a new {@link ReindexResponseBuilder} to build {@link ReindexResponse} + */ + public static ReindexResponseBuilder builder() { + return new ReindexResponseBuilder(); + } + + public static class Failure { + + @Nullable private final String index; + @Nullable private final String type; + @Nullable private final String id; + @Nullable private final Exception cause; + @Nullable private final Integer status; + @Nullable private final Long seqNo; + @Nullable private final Long term; + @Nullable private final Boolean aborted; + + private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, + @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { + this.index = index; + this.type = type; + this.id = id; + this.cause = cause; + this.status = status; + this.seqNo = seqNo; + this.term = term; + this.aborted = aborted; + } + + @Nullable + public String getIndex() { + return index; + } + + @Nullable + public String getType() { + return type; + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public Exception getCause() { + return cause; + } + + @Nullable + public Integer getStatus() { + return status; + } + + @Nullable + public Long getSeqNo() { + return seqNo; + } + + @Nullable + public Long getTerm() { + return term; + } + + @Nullable + public Boolean getAborted() { + return aborted; + } + + /** + * Create a new {@link Failure.FailureBuilder} to build {@link Failure} + * + * @return a new {@link Failure.FailureBuilder} to build {@link Failure} + */ + public static Failure.FailureBuilder builder() { + return new Failure.FailureBuilder(); + } + + /** + * Builder for {@link Failure} + */ + public static final class FailureBuilder { + @Nullable private String index; + @Nullable private String type; + @Nullable private String id; + @Nullable private Exception cause; + @Nullable private Integer status; + @Nullable private Long seqNo; + @Nullable private Long term; + @Nullable private Boolean aborted; + + private FailureBuilder() {} + + public Failure.FailureBuilder withIndex(String index) { + this.index = index; + return this; + } + + public Failure.FailureBuilder withType(String type) { + this.type = type; + return this; + } + + public Failure.FailureBuilder withId(String id) { + this.id = id; + return this; + } + + public Failure.FailureBuilder withCause(Exception cause) { + this.cause = cause; + return this; + } + + public Failure.FailureBuilder withStatus(Integer status) { + this.status = status; + return this; + } + + public Failure.FailureBuilder withSeqNo(Long seqNo) { + this.seqNo = seqNo; + return this; + } + + public Failure.FailureBuilder withTerm(Long term) { + this.term = term; + return this; + } + + public Failure.FailureBuilder withAborted(Boolean aborted) { + this.aborted = aborted; + return this; + } + + public Failure build() { + return new Failure(index, type, id, cause, status, seqNo, term, aborted); + } + } + } + + public static final class ReindexResponseBuilder { + private long took; + private boolean timedOut; + private long total; + private long updated; + private long deleted; + private int batches; + private long versionConflicts; + private long noops; + private long bulkRetries; + private long searchRetries; + private long throttledMillis; + private double requestsPerSecond; + private long throttledUntilMillis; + private List failures = Collections.emptyList(); + + private ReindexResponseBuilder() {} + + public ReindexResponseBuilder withTook(long took) { + this.took = took; + return this; + } + + public ReindexResponseBuilder withTimedOut(boolean timedOut) { + this.timedOut = timedOut; + return this; + } + + public ReindexResponseBuilder withTotal(long total) { + this.total = total; + return this; + } + + public ReindexResponseBuilder withUpdated(long updated) { + this.updated = updated; + return this; + } + + public ReindexResponseBuilder withDeleted(long deleted) { + this.deleted = deleted; + return this; + } + + public ReindexResponseBuilder withBatches(int batches) { + this.batches = batches; + return this; + } + + public ReindexResponseBuilder withVersionConflicts(long versionConflicts) { + this.versionConflicts = versionConflicts; + return this; + } + + public ReindexResponseBuilder withNoops(long noops) { + this.noops = noops; + return this; + } + + public ReindexResponseBuilder withBulkRetries(long bulkRetries) { + this.bulkRetries = bulkRetries; + return this; + } + + public ReindexResponseBuilder withSearchRetries(long searchRetries) { + this.searchRetries = searchRetries; + return this; + } + + public ReindexResponseBuilder withThrottledMillis(long throttledMillis){ + this.throttledMillis = throttledMillis; + return this; + } + + public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ + this.throttledUntilMillis = throttledUntilMillis; + return this; + } + + public ReindexResponseBuilder withFailures(List failures) { + this.failures = failures; + return this; + } + + public ReindexResponse build() { + return new ReindexResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttledMillis, requestsPerSecond, throttledUntilMillis, failures); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java new file mode 100644 index 000000000..1b11037c2 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java @@ -0,0 +1,142 @@ +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; + +/** + * Remote info + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source) + * + * @author Sijia Liu + * @since 4.4 + */ +public class Remote { + private final String scheme; + private final String host; + private final int port; + + @Nullable private final String pathPrefix; + @Nullable private final String username; + @Nullable private final String password; + @Nullable private final Duration socketTimeout; + @Nullable private final Duration connectTimeout; + + private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { + + Assert.notNull(scheme, "scheme must not be null"); + Assert.notNull(host, "host must not be null"); + + this.scheme = scheme; + this.host = host; + this.port = port; + this.pathPrefix = pathPrefix; + this.username = username; + this.password = password; + this.socketTimeout = socketTimeout; + this.connectTimeout = connectTimeout; + } + + public String getHost() { + return host; + } + + @Nullable + public String getUsername() { + return username; + } + + @Nullable + public String getPassword() { + return password; + } + + @Nullable + public Duration getSocketTimeout() { + return socketTimeout; + } + + @Nullable + public Duration getConnectTimeout() { + return connectTimeout; + } + + public String getScheme() { + return scheme; + } + + public int getPort() { + return port; + } + + @Nullable + public String getPathPrefix() { + return pathPrefix; + } + + public static RemoteBuilder builder(String scheme, String host, int port){ + return new RemoteBuilder(scheme, host, port); + } + + public static class RemoteBuilder{ + private final String scheme; + private final String host; + private final int port; + @Nullable private String pathPrefix; + @Nullable private String username; + @Nullable private String password; + @Nullable private Duration socketTimeout; + @Nullable private Duration connectTimeout; + + public RemoteBuilder(String scheme, String host, int port) { + this.scheme = scheme; + this.host = host; + this.port = port; + } + + public RemoteBuilder withPathPrefix(String pathPrefix){ + this.pathPrefix = pathPrefix; + return this; + } + + public RemoteBuilder withUsername(String username){ + this.username = username; + return this; + } + + public RemoteBuilder withPassword(String password){ + this.password = password; + return this; + } + + public RemoteBuilder withSocketTimeout(Duration socketTimeout){ + this.socketTimeout = socketTimeout; + return this; + } + + public RemoteBuilder withConnectTimeout(Duration connectTimeout){ + this.connectTimeout = connectTimeout; + return this; + } + + public Remote build(){ + return new Remote(scheme, host, port , pathPrefix, username, password, socketTimeout, connectTimeout); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java new file mode 100644 index 000000000..2dfc0a174 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java @@ -0,0 +1,3 @@ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.data.elasticsearch.core.reindex; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 981f9e7d8..e56a15b9b 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -83,6 +83,11 @@ import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.Setting; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; +import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; +import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; +import org.springframework.data.elasticsearch.core.query.ScriptField; import org.springframework.data.elasticsearch.core.document.Explanation; import org.springframework.data.elasticsearch.core.geo.GeoPoint; import org.springframework.data.elasticsearch.core.index.AliasAction; @@ -123,6 +128,7 @@ * @author Subhobrata Dey * @author Farid Faoudi * @author Peer Mueller + * @author Sijia Liu */ @SpringIntegrationTest public abstract class ElasticsearchTemplateTests { @@ -3643,6 +3649,38 @@ void shouldNotErrorOnSortWithUnmappedFieldAndUnmappedTypeSettings() { operations.search(query, SampleEntity.class); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + String documentId = nextIdAsString(); + SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("abc").build(); + operations.save(sampleEntity); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + + final ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .withRefresh(true).build(); + final ReindexResponse reindex = operations.reindex(reindexRequest); + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + assertThat(reindex.getTotal()).isEqualTo(1); + assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + final ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); + String task = operations.submitReindex(reindexRequest); + // Maybe there should be a task api to detect whether the task exists + assertThat(task).isNotBlank(); + } + // region entities @Document(indexName = "#{@indexNameProvider.indexName()}") @Setting(shards = 1, replicas = 0, refreshInterval = "-1") diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 7d449b585..121bc9cde 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -19,7 +19,9 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; +import static org.springframework.data.elasticsearch.utils.IdGenerator.*; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -96,6 +98,7 @@ * @author Russell Parry * @author Roman Puchkovskiy * @author George Popides + * @author Sijia Liu */ @SuppressWarnings("SpringJavaAutowiredMembersInspection") @SpringIntegrationTest @@ -1188,6 +1191,43 @@ void shouldReturnMonoOfReactiveSearchHits() { .verifyComplete(); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + SampleEntity sampleEntity = randomEntity("abc"); + operations.save(sampleEntity).block(); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + final ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .withRefresh(true) + .build(); + operations.reindex(reindexRequest) + .as(StepVerifier::create) + .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) + .verifyComplete(); + operations.count(operations.matchAllQuery(), SampleEntity.class, IndexCoordinates.of(destIndexName)) + .as(StepVerifier::create) + .expectNext(1L) + .verifyComplete(); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + final ReindexRequest reindexRequest = ReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .build(); + operations.submitReindex(reindexRequest) + .as(StepVerifier::create) + .consumeNextWith(task -> assertThat(task).isNotBlank()) + .verifyComplete(); + } // endregion // region Helper functions diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index f513abc97..e327d1c35 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -48,6 +48,7 @@ import org.springframework.data.annotation.Id; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.geo.GeoPoint; @@ -55,18 +56,12 @@ import org.springframework.data.elasticsearch.core.index.AliasActionParameters; import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.Remote; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.query.Criteria; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.RescorerQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; -import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.lang.Nullable; /** @@ -74,6 +69,7 @@ * @author Roman Puchkovskiy * @author Peer Mueller * @author vdisk + * @author Sijia Liu */ @SuppressWarnings("ConstantConditions") @ExtendWith(MockitoExtension.class) @@ -562,6 +558,87 @@ void shouldSetStoredFieldsOnSearchRequest() { .isEqualTo(Arrays.asList("last-name", "current-location")); } + @Test // #1529 + void shouldCreateReindexRequest() throws IOException, JSONException { + final String expected = "{\n" + + " \"source\":{\n" + + " \"remote\":{\n" + + " \"username\":\"admin\",\n" + + " \"password\":\"password\",\n" + + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + + " \"socket_timeout\":\"30s\",\n" + + " \"connect_timeout\":\"30s\"\n" + + " },\n" + + " \"index\":[\"source_1\",\"source_2\"],\n" + + " \"size\":5,\n" + + " \"query\":{\"match_all\":{}},\n" + + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + + " \"slice\":{\"id\":1,\"max\":20}\n" + + " },\n" + + " \"dest\":{\n" + + " \"index\":\"destination\",\n" + + " \"routing\":\"routing\",\n" + + " \"op_type\":\"create\",\n" + + " \"pipeline\":\"pipeline\",\n" + + " \"version_type\":\"external\"\n" + + " },\n" + + " \"max_docs\":10,\n" + + " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + + " \"conflicts\":\"proceed\"\n" + + "}"; + + Remote remote = Remote.builder("http", "localhost",9200) + .withPathPrefix("elasticsearch") + .withUsername("admin") + .withPassword("password") + .withConnectTimeout(Duration.ofSeconds(30)) + .withSocketTimeout(Duration.ofSeconds(30)).build(); + + ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"), + IndexCoordinates.of("destination")) + .withConflicts(ReindexRequest.Conflicts.PROCEED) + .withMaxDocs(10) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) + .withSourceSize(5) + .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()) + .withSourceRemote(remote) + .withSourceSlice(1,20) + .withDestOpType(IndexQuery.OpType.CREATE) + .withDestVersionType(Document.VersionType.EXTERNAL) + .withDestPipeline("pipeline") + .withDestRouting("routing") + .withScript("Math.max(1,2)", "java") + .build(); + + final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); + + assertEquals(expected, json, false); + } + + @Test + void shouldAllowSourceQueryForReindexWithoutRemote() throws IOException, JSONException { + final String expected = "{\n" + + " \"source\":{\n" + + " \"index\":[\"source\"],\n" + + " \"query\":{\"match_all\":{}}\n" + + " },\n" + + " \"dest\":{\n" + + " \"index\":\"destination\",\n" + + " \"op_type\":\"index\",\n" + + " \"version_type\":\"internal\"\n" + + " }\n" + + "}"; + + ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source"), + IndexCoordinates.of("destination")) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) + .build(); + + final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); + + assertEquals(expected, json, false); + } + // region entities static class Person { @Nullable