diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 4749ea2e2d383..afda5ba0e7449 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -84,6 +84,7 @@ public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig, .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) + .setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.elasticsearch.client.RestClient.FailureListener() { public void onFailure(Node node) { diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 7b7041967026e..bb92047f17a31 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -112,6 +112,7 @@ public OpenSearchHighLevelRestClient(ElasticSearchConfig elasticSearchConfig, .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs()) .setConnectTimeout(config.getConnectTimeoutInMs()) .setSocketTimeout(config.getSocketTimeoutInMs())) + .setCompressionEnabled(config.isCompressionEnabled()) .setHttpClientConfigCallback(this.configCallback) .setFailureListener(new org.opensearch.client.RestClient.FailureListener() { @Override diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 6d9928c042697..5e8347b708df4 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -20,6 +20,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -27,8 +28,10 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import co.elastic.clients.transport.rest_client.RestClientTransport; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Map; import java.util.Optional; import java.util.UUID; @@ -41,7 +44,8 @@ import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient; import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer; import org.awaitility.Awaitility; -import org.mockito.Mockito; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestHighLevelClient; import org.testcontainers.containers.Network; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.AfterClass; @@ -109,11 +113,41 @@ public void fail() { public void testClientInstance() throws Exception { try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() .setElasticSearchUrl("http://" + container.getHttpHostAddress()) + .setCompressionEnabled(true) .setIndexName(INDEX))) { if (elasticImageName.equals(OPENSEARCH) || elasticImageName.equals(ELASTICSEARCH_7)) { assertTrue(client.getRestClient() instanceof OpenSearchHighLevelRestClient); + OpenSearchHighLevelRestClient osRestHighLevelClient = (OpenSearchHighLevelRestClient) client.getRestClient(); + RestHighLevelClient restHighLevelClient = osRestHighLevelClient.getClient(); + assertNotNull(restHighLevelClient); + + Field field = RestHighLevelClient.class.getDeclaredField("client"); + field.setAccessible(true); + RestClient restClient = (RestClient) field.get(restHighLevelClient); + assertNotNull(restClient); + + Field compressionEnabledFiled = RestClient.class.getDeclaredField("compressionEnabled"); + compressionEnabledFiled.setAccessible(true); + boolean compressionEnabled = (boolean) compressionEnabledFiled.get(restClient); + assertTrue(compressionEnabled); } else { assertTrue(client.getRestClient() instanceof ElasticSearchJavaRestClient); + ElasticSearchJavaRestClient javaRestClient = (ElasticSearchJavaRestClient) client.getRestClient(); + + Field field = ElasticSearchJavaRestClient.class.getDeclaredField("transport"); + field.setAccessible(true); + RestClientTransport transport = (RestClientTransport) field.get(javaRestClient); + assertNotNull(transport); + + Field restClientFiled = RestClientTransport.class.getDeclaredField("restClient"); + restClientFiled.setAccessible(true); + org.elasticsearch.client.RestClient restClient = (org.elasticsearch.client.RestClient) restClientFiled.get(transport); + assertNotNull(restClient); + + Field compressionEnabledFiled = org.elasticsearch.client.RestClient.class.getDeclaredField("compressionEnabled"); + compressionEnabledFiled.setAccessible(true); + boolean compressionEnabled = (boolean) compressionEnabledFiled.get(restClient); + assertTrue(compressionEnabled); } } } @@ -121,7 +155,7 @@ public void testClientInstance() throws Exception { @Test public void testIndexName() throws Exception { String index = "myindex-" + UUID.randomUUID(); - Record record = Mockito.mock(Record.class); + Record record = mock(Record.class); String topicName = "topic-" + UUID.randomUUID(); when(record.getTopicName()).thenReturn(Optional.of(topicName)); try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()