Skip to content

Commit

Permalink
[fix][io] CompressionEnabled didn't work on elasticsearch sink (apach…
Browse files Browse the repository at this point in the history
…e#22565)

(cherry picked from commit a3cd1f8)
(cherry picked from commit e3bd58c)
  • Loading branch information
shibd authored and nikhil-ctds committed May 13, 2024
1 parent 0df1b33 commit e9a0c2a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public ElasticSearchJavaRestClient(ElasticSearchConfig elasticSearchConfig,
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
.setConnectTimeout(config.getConnectTimeoutInMs())
.setSocketTimeout(config.getSocketTimeoutInMs()))
.setCompressionEnabled(config.isCompressionEnabled())
.setHttpClientConfigCallback(this.configCallback)
.setFailureListener(new org.elasticsearch.client.RestClient.FailureListener() {
public void onFailure(Node node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public OpenSearchHighLevelRestClient(ElasticSearchConfig elasticSearchConfig,
.setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
.setConnectTimeout(config.getConnectTimeoutInMs())
.setSocketTimeout(config.getSocketTimeoutInMs()))
.setCompressionEnabled(config.isCompressionEnabled())
.setHttpClientConfigCallback(this.configCallback)
.setFailureListener(new org.opensearch.client.RestClient.FailureListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand All @@ -43,6 +46,8 @@
import org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;
import org.testcontainers.containers.Network;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -112,19 +117,49 @@ public void testClientInstance() throws Exception {
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setCompressionEnabled(true)
.setIndexName(INDEX), metrics)) {
if (elasticImageName.equals(OPENSEARCH) || elasticImageName.equals(ELASTICSEARCH_7)) {
assertTrue(client.getRestClient() instanceof OpenSearchHighLevelRestClient);
OpenSearchHighLevelRestClient osRestHighLevelClient = (OpenSearchHighLevelRestClient) client.getRestClient();
RestHighLevelClient restHighLevelClient = osRestHighLevelClient.getClient();
assertNotNull(restHighLevelClient);

Field field = RestHighLevelClient.class.getDeclaredField("client");
field.setAccessible(true);
RestClient restClient = (RestClient) field.get(restHighLevelClient);
assertNotNull(restClient);

Field compressionEnabledFiled = RestClient.class.getDeclaredField("compressionEnabled");
compressionEnabledFiled.setAccessible(true);
boolean compressionEnabled = (boolean) compressionEnabledFiled.get(restClient);
assertTrue(compressionEnabled);
} else {
assertTrue(client.getRestClient() instanceof ElasticSearchJavaRestClient);
ElasticSearchJavaRestClient javaRestClient = (ElasticSearchJavaRestClient) client.getRestClient();

Field field = ElasticSearchJavaRestClient.class.getDeclaredField("transport");
field.setAccessible(true);
RestClientTransport transport = (RestClientTransport) field.get(javaRestClient);
assertNotNull(transport);

Field restClientFiled = RestClientTransport.class.getDeclaredField("restClient");
restClientFiled.setAccessible(true);
org.elasticsearch.client.RestClient restClient = (org.elasticsearch.client.RestClient) restClientFiled.get(transport);
assertNotNull(restClient);

Field compressionEnabledFiled = org.elasticsearch.client.RestClient.class.getDeclaredField("compressionEnabled");
compressionEnabledFiled.setAccessible(true);
boolean compressionEnabled = (boolean) compressionEnabledFiled.get(restClient);
assertTrue(compressionEnabled);
}
}
}

@Test
public void testIndexName() throws Exception {
String index = "myindex-" + UUID.randomUUID();
Record<GenericObject> record = Mockito.mock(Record.class);
Record<GenericObject> record = mock(Record.class);
String topicName = "topic-" + UUID.randomUUID();
SinkContext mockContext = Mockito.mock(SinkContext.class);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(mockContext);
Expand Down

0 comments on commit e9a0c2a

Please # to comment.