From 26a807169cbfe5de44a6dd0ab04ed6aad96e1af5 Mon Sep 17 00:00:00 2001 From: atomchchen Date: Fri, 26 Jul 2024 01:11:54 +0800 Subject: [PATCH 1/3] TransactionCoordinatorClient support retry --- .../TransactionCoordinatorClientTest.java | 26 ++++++++++ .../impl/TransactionMetaStoreHandler.java | 28 +++++++++-- .../TransactionCoordinatorClientImpl.java | 50 +++++++++---------- 3 files changed, 75 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java index c442c3a901464..8c4913ca2e291 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -24,14 +24,18 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.TransactionMetadataStoreService; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; +import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient.State; import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -107,4 +111,26 @@ public void testTransactionCoordinatorExceptionUnwrap() { instanceof TransactionCoordinatorClientException.InvalidTxnStatusException); } } + + @Test + public void testClientStartWithRetry() throws Exception{ + + String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl(); + String invalidBrokerServiceUrl = "localhost:0"; + String brokerServiceUrl = validBrokerServiceUrl + "," + invalidBrokerServiceUrl; + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); + + @Cleanup + TransactionCoordinatorClient transactionCoordinatorClient = new TransactionCoordinatorClientImpl(pulsarClient); + + try { + transactionCoordinatorClient.start(); + }catch (TransactionCoordinatorClientException e) { + Assert.fail("Shouldn't have exception at here", e); + } + + Assert.assertEquals(transactionCoordinatorClient.getState(), State.READY); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 2a43ca20beb38..e73783c50f140 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -88,6 +89,10 @@ public RequestTime(long creationTime, long requestId) { private Timeout requestTimeout; private final CompletableFuture connectFuture; + private final long lookupDeadline; + private final List previousExceptions = new CopyOnWriteArrayList<>(); + + public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientImpl pulsarClient, String topic, CompletableFuture connectFuture) { @@ -109,6 +114,7 @@ public TransactionMetaStoreHandler(long transactionCoordinatorId, PulsarClientIm this.connectFuture = connectFuture; this.internalPinnedExecutor = pulsarClient.getInternalExecutorService(); this.timer = pulsarClient.timer(); + this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs(); } public void start() { @@ -117,10 +123,24 @@ public void start() { @Override public void connectionFailed(PulsarClientException exception) { - LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", - transactionCoordinatorId, exception); - if (!this.connectFuture.isDone()) { - this.connectFuture.completeExceptionally(exception); + boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); + boolean timeout = System.currentTimeMillis() > lookupDeadline; + if (nonRetriableError || timeout) { + exception.setPreviousExceptions(previousExceptions); + if (connectFuture.completeExceptionally(exception)) { + if (nonRetriableError) { + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", + transactionCoordinatorId, exception); + } else { + LOG.error( + "Transaction meta handler with transaction coordinator id {} connection failed after " + + "timeout", + transactionCoordinatorId, exception); + } + setState(State.Failed); + } + } else { + previousExceptions.add(exception); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 499627f9c73f2..9b74903d18e69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -79,34 +79,34 @@ public void start() throws TransactionCoordinatorClientException { @Override public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { - return pulsarClient.getLookup() - .getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, true) - .thenCompose(partitionMeta -> { - List> connectFutureList = new ArrayList<>(); - if (LOG.isDebugEnabled()) { - LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions); - } - if (partitionMeta.partitions > 0) { - handlers = new TransactionMetaStoreHandler[partitionMeta.partitions]; - for (int i = 0; i < partitionMeta.partitions; i++) { - CompletableFuture connectFuture = new CompletableFuture<>(); - connectFutureList.add(connectFuture); - TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler( - i, pulsarClient, getTCAssignTopicName(i), connectFuture); - handlers[i] = handler; - handlerMap.put(i, handler); - handler.start(); + return pulsarClient.getPartitionedTopicMetadata( + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true) + .thenCompose(partitionMeta -> { + List> connectFutureList = new ArrayList<>(); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions); + } + if (partitionMeta.partitions > 0) { + handlers = new TransactionMetaStoreHandler[partitionMeta.partitions]; + for (int i = 0; i < partitionMeta.partitions; i++) { + CompletableFuture connectFuture = new CompletableFuture<>(); + connectFutureList.add(connectFuture); + TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler( + i, pulsarClient, getTCAssignTopicName(i), connectFuture); + handlers[i] = handler; + handlerMap.put(i, handler); + handler.start(); + } + } else { + return FutureUtil.failedFuture(new TransactionCoordinatorClientException( + "The broker doesn't enable the transaction coordinator, " + + "or the transaction coordinator has not initialized")); } - } else { - return FutureUtil.failedFuture(new TransactionCoordinatorClientException( - "The broker doesn't enable the transaction coordinator, " - + "or the transaction coordinator has not initialized")); - } - STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY); + STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY); - return FutureUtil.waitForAll(connectFutureList); - }); + return FutureUtil.waitForAll(connectFutureList); + }); } else { return FutureUtil.failedFuture( new CoordinatorClientStateException("Can not start while current state is " + state)); From c4b6c1b6e153132d4ff0be81e76e22d80574e886 Mon Sep 17 00:00:00 2001 From: atomchchen Date: Fri, 26 Jul 2024 01:19:38 +0800 Subject: [PATCH 2/3] reformat code --- .../TransactionCoordinatorClientImpl.java | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 9b74903d18e69..45a3ad4f978b1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -81,32 +81,32 @@ public CompletableFuture startAsync() { if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) { return pulsarClient.getPartitionedTopicMetadata( SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), true) - .thenCompose(partitionMeta -> { - List> connectFutureList = new ArrayList<>(); - if (LOG.isDebugEnabled()) { - LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions); - } - if (partitionMeta.partitions > 0) { - handlers = new TransactionMetaStoreHandler[partitionMeta.partitions]; - for (int i = 0; i < partitionMeta.partitions; i++) { - CompletableFuture connectFuture = new CompletableFuture<>(); - connectFutureList.add(connectFuture); - TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler( - i, pulsarClient, getTCAssignTopicName(i), connectFuture); - handlers[i] = handler; - handlerMap.put(i, handler); - handler.start(); - } - } else { - return FutureUtil.failedFuture(new TransactionCoordinatorClientException( - "The broker doesn't enable the transaction coordinator, " - + "or the transaction coordinator has not initialized")); + .thenCompose(partitionMeta -> { + List> connectFutureList = new ArrayList<>(); + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction meta store assign partition is {}.", partitionMeta.partitions); + } + if (partitionMeta.partitions > 0) { + handlers = new TransactionMetaStoreHandler[partitionMeta.partitions]; + for (int i = 0; i < partitionMeta.partitions; i++) { + CompletableFuture connectFuture = new CompletableFuture<>(); + connectFutureList.add(connectFuture); + TransactionMetaStoreHandler handler = new TransactionMetaStoreHandler( + i, pulsarClient, getTCAssignTopicName(i), connectFuture); + handlers[i] = handler; + handlerMap.put(i, handler); + handler.start(); } + } else { + return FutureUtil.failedFuture(new TransactionCoordinatorClientException( + "The broker doesn't enable the transaction coordinator, " + + "or the transaction coordinator has not initialized")); + } - STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY); + STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, State.READY); - return FutureUtil.waitForAll(connectFutureList); - }); + return FutureUtil.waitForAll(connectFutureList); + }); } else { return FutureUtil.failedFuture( new CoordinatorClientStateException("Can not start while current state is " + state)); From c6b3c4740f64e009b2f90a7e0d5e246375c625a3 Mon Sep 17 00:00:00 2001 From: atomchchen Date: Sat, 27 Jul 2024 00:34:56 +0800 Subject: [PATCH 3/3] code format --- .../coordinator/TransactionCoordinatorClientTest.java | 2 -- .../pulsar/client/impl/TransactionMetaStoreHandler.java | 6 ++---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java index 8c4913ca2e291..36bc0e522c210 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java @@ -114,14 +114,12 @@ public void testTransactionCoordinatorExceptionUnwrap() { @Test public void testClientStartWithRetry() throws Exception{ - String validBrokerServiceUrl = pulsarServices[0].getBrokerServiceUrl(); String invalidBrokerServiceUrl = "localhost:0"; String brokerServiceUrl = validBrokerServiceUrl + "," + invalidBrokerServiceUrl; @Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(brokerServiceUrl).build(); - @Cleanup TransactionCoordinatorClient transactionCoordinatorClient = new TransactionCoordinatorClientImpl(pulsarClient); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index e73783c50f140..e45d53971159e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -132,10 +132,8 @@ public void connectionFailed(PulsarClientException exception) { LOG.error("Transaction meta handler with transaction coordinator id {} connection failed.", transactionCoordinatorId, exception); } else { - LOG.error( - "Transaction meta handler with transaction coordinator id {} connection failed after " - + "timeout", - transactionCoordinatorId, exception); + LOG.error("Transaction meta handler with transaction coordinator id {} connection failed after " + + "timeout", transactionCoordinatorId, exception); } setState(State.Failed); }