Skip to content

Commit

Permalink
Implement support for reindex API [DATAES-955] spring-projects#1529
Browse files Browse the repository at this point in the history
  • Loading branch information
oni7uka committed Jan 21, 2022
1 parent dc6d7a0 commit 84eac20
Show file tree
Hide file tree
Showing 16 changed files with 1,373 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -143,6 +145,7 @@
* @author Brian Clozel
* @author Farid Faoudi
* @author George Popides
* @author Sijia Liu
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -509,6 +512,19 @@ public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
.next();
}

@Override
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers)
.next();
}

@Override
public Mono<String> submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers)
.next()
.map(TaskSubmissionResponse::getTask);
}

@Override
public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
Expand All @@ -76,6 +77,7 @@
* @author Henrique Amaral
* @author Thomas Geese
* @author Farid Faoudi
* @author Sijia Liu
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -713,6 +715,69 @@ default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
*/
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param consumer must not be {@literal null}
* @return the {@link Mono} emitting the response
*/
default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer){

ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest);
return reindex(reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the response
*/
default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest){
return reindex(HttpHeaders.EMPTY, reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the response
*/
Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest);

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param consumer must not be {@literal null}
* @return the {@link Mono} emitting the task
*/
default Mono<String> submitReindexTask(Consumer<ReindexRequest> consumer){

ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest);
return submitReindexTask(reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the task
*/
default Mono<String> submitReindexTask(ReindexRequest reindexRequest){
return submitReindexTask(HttpHeaders.EMPTY, reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the task
*/
Mono<String> submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest);
/**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
Expand Down Expand Up @@ -289,4 +290,8 @@ default Function<GetIndexRequest, Request> getIndex() {
default Function<ClusterHealthRequest, Request> clusterHealth() {
return RequestConverters::clusterHealth;
}

default Function<ReindexRequest, Request> reindex() { return RequestConverters::reindex; }

default Function<ReindexRequest, Request> submitReindex() { return RequestConverters::submitReindex; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) {
return request;
}

public static Request reindex(ReindexRequest reindexRequest) throws IOException {
public static Request reindex(ReindexRequest reindexRequest) {
return prepareReindexRequest(reindexRequest, true);
}

static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
public static Request submitReindex(ReindexRequest reindexRequest) {
return prepareReindexRequest(reindexRequest, false);
}

Expand All @@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
.withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond());

if(reindexRequest.getDestination().isRequireAlias()){
params.putParam("require_alias", Boolean.TRUE.toString());
}
if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
}
params.putParam("slices", Integer.toString(reindexRequest.getSlices()));
if(reindexRequest.getMaxDocs() > -1){
params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs()));
}
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.Collection;
import java.util.List;

import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand All @@ -34,6 +36,7 @@
*
* @author Peter-Josef Meisch
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.0
*/
public interface DocumentOperations {
Expand Down Expand Up @@ -322,4 +325,24 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
* @since 4.2
*/
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Copies documents from a source to a destination.
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
* For example, you cannot reindex a data stream into itself.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param postReindexRequest reindex request parameters
* @return the reindex response
*/
PostReindexResponse reindex(PostReindexRequest postReindexRequest);

/**
* Submits a reindex task.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param postReindexRequest reindex request parameters
* @return the task
*/
String submitReindexTask(PostReindexRequest postReindexRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.suggest.SuggestBuilder;
Expand All @@ -61,7 +62,9 @@
import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand Down Expand Up @@ -106,6 +109,7 @@
* @author Gyula Attila Csorogi
* @author Massimiliano Poggi
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.4
*/
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Expand Down Expand Up @@ -277,6 +281,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index)
return ResponseConverter.byQueryResponseOf(bulkByScrollResponse);
}

@Override
public PostReindexResponse reindex(PostReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
final BulkByScrollResponse bulkByScrollResponse = execute(
client -> client.reindex(reindexRequest, RequestOptions.DEFAULT));
return ResponseConverter.postReindexResponseOf(bulkByScrollResponse);
}

@Override
public String submitReindexTask(PostReindexRequest postReindexRequest) {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return execute(
client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
}

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -38,6 +40,7 @@
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.0
*/
public interface ReactiveDocumentOperations {
Expand Down Expand Up @@ -302,4 +305,24 @@ default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index)
* @since 4.2
*/
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Copies documents from a source to a destination.
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
* For example, you cannot reindex a data stream into itself.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param postReindexRequest reindex request parameters
* @return a {@link Mono} emitting the reindex response
*/
Mono<PostReindexResponse> reindex(PostReindexRequest postReindexRequest);

/**
* Submits a reindex task.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param postReindexRequest reindex request parameters
* @return a {@link Mono} emitting the {@literal task}.
*/
Mono<String> submitReindexTask(PostReindexRequest postReindexRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.reactivestreams.Publisher;
Expand All @@ -70,6 +71,8 @@
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
Expand Down Expand Up @@ -105,6 +108,7 @@
* @author Russell Parry
* @author Thomas Geese
* @author Farid Faoudi
* @author Sijia Liu
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
Expand Down Expand Up @@ -609,6 +613,28 @@ public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordin
});
}

@Override
public Mono<PostReindexResponse> reindex(PostReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

return Mono.defer(() -> {
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::postReindexResponseOf);
});
}

@Override
public Mono<String> submitReindexTask(PostReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

return Mono.defer(() -> {
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.submitReindexTask(reindexRequest)));
});
}

@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
Expand Down
Loading

0 comments on commit 84eac20

Please # to comment.