Skip to content

Commit

Permalink
Implement support for reindex API.
Browse files Browse the repository at this point in the history
Original Pull Request #2070 
Closes #1529
  • Loading branch information
oni7uka authored Jan 26, 2022
1 parent cf3e46b commit c5db583
Show file tree
Hide file tree
Showing 17 changed files with 1,479 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -143,6 +145,7 @@
* @author Brian Clozel
* @author Farid Faoudi
* @author George Popides
* @author Sijia Liu
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -509,6 +512,19 @@ public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
.next();
}

@Override
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers)
.next();
}

@Override
public Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) {
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers)
.next()
.map(TaskSubmissionResponse::getTask);
}

@Override
public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.Aggregation;
Expand All @@ -76,6 +77,7 @@
* @author Henrique Amaral
* @author Thomas Geese
* @author Farid Faoudi
* @author Sijia Liu
* @since 3.2
* @see ClientConfiguration
* @see ReactiveRestClients
Expand Down Expand Up @@ -713,6 +715,75 @@ default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
*/
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param consumer must not be {@literal null}
* @return the {@link Mono} emitting the response
* @since 4.4
*/
default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer){

ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest);
return reindex(reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the response
* @since 4.4
*/
default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest){
return reindex(HttpHeaders.EMPTY, reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the response
* @since 4.4
*/
Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest);

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param consumer must not be {@literal null}
* @return the {@link Mono} emitting the task id
* @since 4.4
*/
default Mono<String> submitReindex(Consumer<ReindexRequest> consumer){

ReindexRequest reindexRequest = new ReindexRequest();
consumer.accept(reindexRequest);
return submitReindex(reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the task id
* @since 4.4
*/
default Mono<String> submitReindex(ReindexRequest reindexRequest){
return submitReindex(HttpHeaders.EMPTY, reindexRequest);
}

/**
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param reindexRequest must not be {@literal null}
* @return the {@link Mono} emitting the task id
* @since 4.4
*/
Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest);
/**
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
Expand Down Expand Up @@ -289,4 +290,14 @@ default Function<GetIndexRequest, Request> getIndex() {
default Function<ClusterHealthRequest, Request> clusterHealth() {
return RequestConverters::clusterHealth;
}

/**
* @since 4.4
*/
default Function<ReindexRequest, Request> reindex() { return RequestConverters::reindex; }

/**
* @since 4.4
*/
default Function<ReindexRequest, Request> submitReindex() { return RequestConverters::submitReindex; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) {
return request;
}

public static Request reindex(ReindexRequest reindexRequest) throws IOException {
public static Request reindex(ReindexRequest reindexRequest) {
return prepareReindexRequest(reindexRequest, true);
}

static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
public static Request submitReindex(ReindexRequest reindexRequest) {
return prepareReindexRequest(reindexRequest, false);
}

Expand All @@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
.withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond());

if(reindexRequest.getDestination().isRequireAlias()){
params.putParam("require_alias", Boolean.TRUE.toString());
}
if (reindexRequest.getScrollTime() != null) {
params.putParam("scroll", reindexRequest.getScrollTime());
}
params.putParam("slices", Integer.toString(reindexRequest.getSlices()));
if(reindexRequest.getMaxDocs() > -1){
params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs()));
}
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.Collection;
import java.util.List;

import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand All @@ -34,6 +36,7 @@
*
* @author Peter-Josef Meisch
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.0
*/
public interface DocumentOperations {
Expand Down Expand Up @@ -322,4 +325,26 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
* @since 4.2
*/
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Copies documents from a source to a destination.
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
* For example, you cannot reindex a data stream into itself.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param reindexRequest reindex request parameters
* @return the reindex response
* @since 4.4
*/
ReindexResponse reindex(ReindexRequest reindexRequest);

/**
* Submits a reindex task.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param reindexRequest reindex request parameters
* @return the task id
* @since 4.4
*/
String submitReindex(ReindexRequest reindexRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@
import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
Expand Down Expand Up @@ -106,6 +108,7 @@
* @author Gyula Attila Csorogi
* @author Massimiliano Poggi
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.4
*/
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
Expand Down Expand Up @@ -277,6 +280,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index)
return ResponseConverter.byQueryResponseOf(bulkByScrollResponse);
}

@Override
public ReindexResponse reindex(ReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
final BulkByScrollResponse bulkByScrollResponse = execute(
client -> client.reindex(reindexRequest, RequestOptions.DEFAULT));
return ResponseConverter.reindexResponseOf(bulkByScrollResponse);
}

@Override
public String submitReindex(ReindexRequest postReindexRequest) {
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return execute(
client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
}

public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
IndexCoordinates index) {
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -38,6 +40,7 @@
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @author Farid Faoudi
* @author Sijia Liu
* @since 4.0
*/
public interface ReactiveDocumentOperations {
Expand Down Expand Up @@ -302,4 +305,26 @@ default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index)
* @since 4.2
*/
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);

/**
* Copies documents from a source to a destination.
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
* For example, you cannot reindex a data stream into itself.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param reindexRequest reindex request parameters
* @return a {@link Mono} emitting the reindex response
* @since 4.4
*/
Mono<ReindexResponse> reindex(ReindexRequest reindexRequest);

/**
* Submits a reindex task.
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
*
* @param reindexRequest reindex request parameters
* @return a {@link Mono} emitting the {@literal task} id.
* @since 4.4
*/
Mono<String> submitReindex(ReindexRequest reindexRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
Expand Down Expand Up @@ -105,6 +107,7 @@
* @author Russell Parry
* @author Thomas Geese
* @author Farid Faoudi
* @author Sijia Liu
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
Expand Down Expand Up @@ -609,6 +612,28 @@ public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordin
});
}

@Override
public Mono<ReindexResponse> reindex(ReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

return Mono.defer(() -> {
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf);
});
}

@Override
public Mono<String> submitReindex(ReindexRequest postReindexRequest) {

Assert.notNull(postReindexRequest, "postReindexRequest must not be null");

return Mono.defer(() -> {
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
return Mono.from(execute(client -> client.submitReindex(reindexRequest)));
});
}

@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
return delete(query, entityType, getIndexCoordinatesFor(entityType));
Expand Down
Loading

0 comments on commit c5db583

Please # to comment.