Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add part of the index lifecycle management apis #3208

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.sksamuel.elastic4s

import com.sksamuel.elastic4s.api.{AggregationApi, AliasesApi, AnalyzeApi, AnalyzerApi, BulkApi, CatsApi, ClearRolesCacheApi, ClusterApi, CollapseApi, CountApi, CreateIndexApi, CreateRoleApi, CreateUserApi, DeleteApi, DeleteIndexApi, DeleteRoleApi, DeleteUserApi, ExistsApi, ExplainApi, ForceMergeApi, GetApi, HighlightApi, IndexAdminApi, IndexApi, IndexRecoveryApi, IndexTemplateApi, IngestApi, KnnApi, LocksApi, MappingApi, NodesApi, NormalizerApi, PipelineAggregationApi, PitApi, QueryApi, ReindexApi, ReloadSearchAnalyzersApi, RoleApi, ScoreApi, ScriptApi, ScrollApi, SearchApi, SearchTemplateApi, SettingsApi, SnapshotApi, SortApi, StoredScriptApi, SuggestionApi, SynonymsApi, TaskApi, TermVectorApi, TermsEnumApi, TokenFilterApi, TokenizerApi, TypesApi, UpdateApi, UserAdminApi, UserApi, ValidateApi}

import com.sksamuel.elastic4s.api.{AggregationApi, AliasesApi, AnalyzeApi, AnalyzerApi, BulkApi, CatsApi, ClearRolesCacheApi, ClusterApi, CollapseApi, CountApi, CreateIndexApi, CreateRoleApi, CreateUserApi, DeleteApi, DeleteIndexApi, DeleteRoleApi, DeleteUserApi, ExistsApi, ExplainApi, ForceMergeApi, GetApi, HighlightApi, IndexAdminApi, IndexApi, IndexLifecycleManagementApi, IndexRecoveryApi, IndexTemplateApi, IngestApi, KnnApi, LocksApi, MappingApi, NodesApi, NormalizerApi, PipelineAggregationApi, PitApi, QueryApi, ReindexApi, ReloadSearchAnalyzersApi, RoleApi, ScoreApi, ScriptApi, ScrollApi, SearchApi, SearchTemplateApi, SettingsApi, SnapshotApi, SortApi, StoredScriptApi, SuggestionApi, SynonymsApi, TaskApi, TermVectorApi, TermsEnumApi, TokenFilterApi, TokenizerApi, TypesApi, UpdateApi, UserAdminApi, UserApi, ValidateApi}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

Expand Down Expand Up @@ -30,6 +29,7 @@ trait ElasticApi
with HighlightApi
with IndexApi
with IndexAdminApi
with IndexLifecycleManagementApi
with AnalyzeApi
with IndexRecoveryApi
with IndexTemplateApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.sksamuel.elastic4s.handlers.explain.ExplainHandlers
import com.sksamuel.elastic4s.handlers.get.GetHandlers
import com.sksamuel.elastic4s.handlers.index.mapping.MappingHandlers
import com.sksamuel.elastic4s.handlers.index.{IndexAdminHandlers, IndexHandlers, IndexStatsHandlers, IndexTemplateHandlers, RolloverHandlers}
import com.sksamuel.elastic4s.handlers.indexlifecyclemanagement.IndexLifecycleManagementHandlers
import com.sksamuel.elastic4s.handlers.locks.LocksHandlers
import com.sksamuel.elastic4s.handlers.nodes.NodesHandlers
import com.sksamuel.elastic4s.handlers.pit.PitHandlers
Expand Down Expand Up @@ -47,6 +48,7 @@ with GetHandlers
with IndexHandlers
with IndexAdminHandlers
with IndexAliasHandlers
with IndexLifecycleManagementHandlers
with IndexStatsHandlers
with IndexTemplateHandlers
with IngestHandlers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.sksamuel.elastic4s.api

import com.sksamuel.elastic4s.requests.indexlifecyclemanagement.{StartIlmRequest, GetIlmStatusRequest, StopIlmRequest}

trait IndexLifecycleManagementApi {
def getIlmStatus: GetIlmStatusRequest = GetIlmStatusRequest()

def startIlm(): StartIlmRequest = StartIlmRequest()

def stopIlm(): StopIlmRequest = StopIlmRequest()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

import scala.concurrent.duration.Duration

import com.sksamuel.elastic4s.ext.OptionImplicits.RichOptionImplicits

case class GetIlmStatusRequest(masterTimeout: Option[String] = None, timeout: Option[String] = None) {
def masterTimeout(timeout: Duration): GetIlmStatusRequest = copy(masterTimeout = s"${timeout.toNanos}nanos".some)
def masterTimeout(timeout: String): GetIlmStatusRequest = copy(masterTimeout = timeout.some)

def timeout(timeout: Duration): GetIlmStatusRequest = copy(timeout = s"${timeout.toNanos}nanos".some)
def timeout(timeout: String): GetIlmStatusRequest = copy(timeout = timeout.some)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

import com.fasterxml.jackson.annotation.JsonProperty

case class GetIlmStatusResponse(@JsonProperty("operation_mode") operationMode: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

import scala.concurrent.duration.Duration

import com.sksamuel.elastic4s.ext.OptionImplicits.RichOptionImplicits

case class StartIlmRequest(masterTimeout: Option[String] = None, timeout: Option[String] = None) {
def masterTimeout(timeout: Duration): StartIlmRequest = copy(masterTimeout = s"${timeout.toNanos}nanos".some)
def masterTimeout(timeout: String): StartIlmRequest = copy(masterTimeout = timeout.some)

def timeout(timeout: Duration): StartIlmRequest = copy(timeout = s"${timeout.toNanos}nanos".some)
def timeout(timeout: String): StartIlmRequest = copy(timeout = timeout.some)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

case class StartIlmResponse(acknowledged: Boolean)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

import scala.concurrent.duration.Duration

import com.sksamuel.elastic4s.ext.OptionImplicits.RichOptionImplicits

case class StopIlmRequest(masterTimeout: Option[String] = None, timeout: Option[String] = None) {
def masterTimeout(timeout: Duration): StopIlmRequest = copy(masterTimeout = s"${timeout.toNanos}nanos".some)
def masterTimeout(timeout: String): StopIlmRequest = copy(masterTimeout = timeout.some)

def timeout(timeout: Duration): StopIlmRequest = copy(timeout = s"${timeout.toNanos}nanos".some)
def timeout(timeout: String): StopIlmRequest = copy(timeout = timeout.some)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

case class StopIlmResponse(acknowledged: Boolean)
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.sksamuel.elastic4s.handlers.indexlifecyclemanagement

import com.sksamuel.elastic4s.handlers.ElasticErrorParser
import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.requests.indexlifecyclemanagement._

trait IndexLifecycleManagementHandlers {
implicit object IndexLifecycleStatusHandler extends Handler[GetIlmStatusRequest, GetIlmStatusResponse] {
override def responseHandler: ResponseHandler[GetIlmStatusResponse] = new ResponseHandler[GetIlmStatusResponse] {
override def handle(response: HttpResponse): Either[ElasticError, GetIlmStatusResponse] = {
response.statusCode match {
case 200 | 201 => Right(ResponseHandler.fromResponse[GetIlmStatusResponse](response))
case 400 => Left(ElasticErrorParser.parse(response))
case _ => sys.error("Invalid response")
}
}
}

override def build(request: GetIlmStatusRequest): ElasticRequest = {
val endpoint = "/_ilm/status"

ElasticRequest("GET", endpoint)
}
}

implicit object IndexLifecycleStartHandler extends Handler[StartIlmRequest, StartIlmResponse] {
override def responseHandler: ResponseHandler[StartIlmResponse] = new ResponseHandler[StartIlmResponse] {
override def handle(response: HttpResponse): Either[ElasticError, StartIlmResponse] = {
response.statusCode match {
case 200 | 201 => Right(ResponseHandler.fromResponse[StartIlmResponse](response))
case 400 => Left(ElasticErrorParser.parse(response))
case _ => sys.error("Invalid response")
}
}
}

override def build(request: StartIlmRequest): ElasticRequest = {
val endpoint = "/_ilm/start"

val params = scala.collection.mutable.Map.empty[String, String]
request.masterTimeout.foreach(params.put("master_timeout", _))
request.timeout.foreach(params.put("timeout", _))

ElasticRequest("POST", endpoint, params.toMap)
}
}

implicit object IndexLifecycleStopHandler extends Handler[StopIlmRequest, StopIlmResponse] {
override def responseHandler: ResponseHandler[StopIlmResponse] = new ResponseHandler[StopIlmResponse] {
override def handle(response: HttpResponse): Either[ElasticError, StopIlmResponse] = {
response.statusCode match {
case 200 | 201 => Right(ResponseHandler.fromResponse[StopIlmResponse](response))
case 400 => Left(ElasticErrorParser.parse(response))
case _ => sys.error("Invalid response")
}
}
}

override def build(request: StopIlmRequest): ElasticRequest = {
val endpoint = "/_ilm/stop"

val params = scala.collection.mutable.Map.empty[String, String]
request.masterTimeout.foreach(params.put("master_timeout", _))
request.timeout.foreach(params.put("timeout", _))

ElasticRequest("POST", endpoint, params.toMap)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.sksamuel.elastic4s.requests.indexlifecyclemanagement

import scala.concurrent.duration.DurationInt

import com.sksamuel.elastic4s.ElasticDsl
import com.sksamuel.elastic4s.testkit.DockerTests
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class IlmTest extends AnyFlatSpec with Matchers with ElasticDsl with DockerTests with Eventually {
"ilm" should "return status" in {
client.execute {
startIlm()
}.await.result

eventually(timeout(5.seconds)) {
val resp = client.execute {
getIlmStatus
}.await.result
resp.operationMode shouldBe "RUNNING"
}
}

"ilm" should "stop" in {
val resp = client.execute {
stopIlm()
}.await.result
resp.acknowledged shouldBe true

eventually(timeout(5.seconds)) {
val resp = client.execute {
getIlmStatus
}.await.result
resp.operationMode shouldBe "STOPPED"
}
}

"ilm" should "start" in {
client.execute {
stopIlm()
}.await.result

eventually(timeout(5.seconds)) {
val resp = client.execute {
getIlmStatus
}.await.result
resp.operationMode shouldBe "STOPPED"
}

val resp = client.execute {
startIlm()
}.await.result
resp.acknowledged shouldBe true

eventually(timeout(5.seconds)) {
val resp = client.execute {
getIlmStatus
}.await.result
resp.operationMode shouldBe "RUNNING"
}
}
}