diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java index 1141af88e72b0..623d8e7505e0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.time.Duration; import lombok.Cleanup; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.service.Topic; @@ -71,6 +72,46 @@ public Object[][] produceConf() { }; } + /** + * Param1: Producer enableBatch or not + * Param2: Send in async way or not + */ + @DataProvider(name = "brokenPipeline") + public Object[][] brokenPipeline() { + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(dataProvider = "brokenPipeline") + public void testProducerCloseCallback2(boolean brokenPipeline) throws Exception { + initClient(); + @Cleanup + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer() + .topic("testProducerClose") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(false) + .create(); + final TypedMessageBuilder messageBuilder = producer.newMessage(); + final TypedMessageBuilder value = messageBuilder.value("test-msg".getBytes(StandardCharsets.UTF_8)); + producer.getClientCnx().channel().config().setAutoRead(false); + final CompletableFuture completableFuture = value.sendAsync(); + producer.closeAsync(); + Thread.sleep(3000); + if (brokenPipeline) { + //producer.getClientCnx().channel().config().setAutoRead(true); + producer.getClientCnx().channel().close(); + } else { + producer.getClientCnx().channel().config().setAutoRead(true); + } + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + System.out.println(1); + Assert.assertTrue(completableFuture.isDone()); + }); + } + @Test(timeOut = 10_000) public void testProducerCloseCallback() throws Exception { initClient(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerReconnectionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerReconnectionTest.java new file mode 100644 index 0000000000000..e05e666d62efd --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerReconnectionTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerReconnectionTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testConcurrencyReconnectAndClose() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + PulsarClientImpl client = (PulsarClientImpl) pulsarClient; + + // Create producer which will run with special steps. + ProducerBuilderImpl producerBuilder = (ProducerBuilderImpl) client.newProducer() + .blockIfQueueFull(false).maxPendingMessages(1).producerName("p1") + .enableBatching(true).topic(topicName); + CompletableFuture> producerFuture = new CompletableFuture<>(); + AtomicBoolean reconnectionStartTrigger = new AtomicBoolean(); + CountDownLatch reconnectingSignal = new CountDownLatch(1); + CountDownLatch closedSignal = new CountDownLatch(1); + ProducerImpl producer = new ProducerImpl<>(client, topicName, producerBuilder.getConf(), producerFuture, + -1, Schema.BYTES, null, Optional.empty()) { + @Override + ConnectionHandler initConnectionHandler() { + ConnectionHandler connectionHandler = super.initConnectionHandler(); + ConnectionHandler spyConnectionHandler = spy(connectionHandler); + doAnswer(invocation -> { + boolean result = (boolean) invocation.callRealMethod(); + if (reconnectionStartTrigger.get()) { + log.info("[testConcurrencyReconnectAndClose] verified state for reconnection"); + reconnectingSignal.countDown(); + closedSignal.await(); + log.info("[testConcurrencyReconnectAndClose] reconnected"); + } + return result; + }).when(spyConnectionHandler).isValidStateForReconnection(); + return spyConnectionHandler; + } + }; + log.info("[testConcurrencyReconnectAndClose] producer created"); + producerFuture.get(5, TimeUnit.SECONDS); + + // Reconnect. + log.info("[testConcurrencyReconnectAndClose] trigger a reconnection"); + ServerCnx serverCnx = (ServerCnx) pulsar.getBrokerService().getTopic(topicName, false).join() + .get().getProducers().values().iterator().next().getCnx(); + reconnectionStartTrigger.set(true); + serverCnx.ctx().close(); + producer.sendAsync("1".getBytes(StandardCharsets.UTF_8)); + Awaitility.await().untilAsserted(() -> { + assertNotEquals(producer.getPendingQueueSize(), 0); + }); + + // Close producer when reconnecting. + reconnectingSignal.await(); + log.info("[testConcurrencyReconnectAndClose] producer close"); + producer.closeAsync(); + Awaitility.await().untilAsserted(() -> { + HandlerState.State state1 = producer.getState(); + assertTrue(state1 == HandlerState.State.Closed || state1 == HandlerState.State.Closing); + }); + // give another thread time to call "signalToChangeStateToConnecting.await()". + closedSignal.countDown(); + + // Wait for reconnection. + Thread.sleep(3000); + + HandlerState.State state2 = producer.getState(); + log.info("producer state: {}", state2); + assertTrue(state2 == HandlerState.State.Closed || state2 == HandlerState.State.Closing); + assertEquals(producer.getPendingQueueSize(), 0); + + // Verify: ref is expected. + producer.close(); + admin.topics().delete(topicName); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 2b7fb90b14a47..8ec8a47dfbabd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import com.google.common.annotations.VisibleForTesting; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -168,13 +169,12 @@ public void connectionClosed(ClientCnx cnx) { duringConnect.set(false); state.client.getCnxPool().releaseConnection(cnx); if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) { - if (!isValidStateForReconnection()) { + if (!state.changeToConnecting()) { log.info("[{}] [{}] Ignoring reconnection request (state: {})", state.topic, state.getHandlerName(), state.getState()); return; } long delayMs = backoff.next(); - state.setState(State.Connecting); log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", state.topic, state.getHandlerName(), cnx.channel(), delayMs / 1000.0); @@ -208,7 +208,8 @@ protected long switchClientCnx(ClientCnx clientCnx) { return EPOCH_UPDATER.incrementAndGet(this); } - private boolean isValidStateForReconnection() { + @VisibleForTesting + public boolean isValidStateForReconnection() { State state = this.state.getState(); switch (state) { case Uninitialized: diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 92f8332dfb2c1..50209c009c75d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -268,15 +268,19 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties())); } - this.connectionHandler = new ConnectionHandler(this, + this.connectionHandler = initConnectionHandler(); + setChunkMaxMessageSize(); + grabCnx(); + } + + ConnectionHandler initConnectionHandler() { + return new ConnectionHandler(this, new BackoffBuilder() .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) .setMandatoryStop(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS) .create(), - this); - setChunkMaxMessageSize(); - grabCnx(); + this); } private void setChunkMaxMessageSize() { @@ -1097,7 +1101,7 @@ public CompletableFuture handleOnce() { @Override - public CompletableFuture closeAsync() { + public synchronized CompletableFuture closeAsync() { final State currentState = getAndUpdateState(state -> { if (state == State.Closed) { return state; @@ -1124,11 +1128,11 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> { cnx.removeProducer(producerId); - closeAndClearPendingMessages(); if (exception == null || !cnx.ctx().channel().isActive()) { // Either we've received the success response for the close producer command from the broker, or the // connection did break in the meantime. In any case, the producer is gone. log.info("[{}] [{}] Closed Producer", topic, producerName); + closeAndClearPendingMessages(); closeFuture.complete(null); } else { closeFuture.completeExceptionally(exception); @@ -1714,6 +1718,12 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // Because the state could have been updated while retrieving the connection, we set it back to connecting, // as long as the change from current state to connecting is a valid state change. if (!changeToConnecting()) { + if (getState() == State.Closing || getState() == State.Closed) { + // Producer was closed while reconnecting, close the connection to make sure the broker + // drops the producer on its side + failPendingMessages(cnx, + new PulsarClientException.ProducerFencedException("producer has been closed")); + } return CompletableFuture.completedFuture(null); } // We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating @@ -1774,6 +1784,8 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side cnx.removeProducer(producerId); + failPendingMessages(cnx, + new PulsarClientException.ProducerFencedException("producer has been closed")); cnx.channel().close(); future.complete(null); return; @@ -1942,7 +1954,7 @@ private void closeProducerTasks() { private void resendMessages(ClientCnx cnx, long expectedEpoch) { cnx.ctx().channel().eventLoop().execute(() -> { - synchronized (this) { + synchronized (ProducerImpl.this) { if (getState() == State.Closing || getState() == State.Closed) { // Producer was closed while reconnecting, close the connection to make sure the broker // drops the producer on its side @@ -2098,7 +2110,7 @@ public void run(Timeout timeout) throws Exception { * This fails and clears the pending messages with the given exception. This method should be called from within the * ProducerImpl object mutex. */ - private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { + private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { if (cnx == null) { final AtomicInteger releaseCount = new AtomicInteger(); final boolean batchMessagingEnabled = isBatchMessagingEnabled(); @@ -2250,7 +2262,7 @@ private void batchMessageAndSend(boolean shouldScheduleNextBatchFlush) { } } - protected void processOpSendMsg(OpSendMsg op) { + protected synchronized void processOpSendMsg(OpSendMsg op) { if (op == null) { return; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index 4ead2839c51b9..7c6bcf2cf06be 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -25,14 +25,8 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; -import io.netty.util.HashedWheelTimer; import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -71,34 +65,4 @@ public void testPopulateMessageSchema() { assertTrue(producer.populateMessageSchema(msg, null)); verify(msg).setSchemaState(MessageImpl.SchemaState.Ready); } - - @Test - public void testClearPendingMessageWhenCloseAsync() { - PulsarClientImpl client = mock(PulsarClientImpl.class); - Mockito.doReturn(1L).when(client).newProducerId(); - ClientConfigurationData clientConf = new ClientConfigurationData(); - clientConf.setStatsIntervalSeconds(-1); - Mockito.doReturn(clientConf).when(client).getConfiguration(); - ConnectionPool connectionPool = mock(ConnectionPool.class); - Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon(); - Mockito.doReturn(connectionPool).when(client).getCnxPool(); - HashedWheelTimer timer = mock(HashedWheelTimer.class); - Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any()); - Mockito.doReturn(timer).when(client).timer(); - ProducerConfigurationData producerConf = new ProducerConfigurationData(); - producerConf.setSendTimeoutMs(-1); - ProducerImpl producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty())); - - // make sure throw exception when send request to broker - ClientCnx clientCnx = mock(ClientCnx.class); - CompletableFuture tCompletableFuture = new CompletableFuture<>(); - tCompletableFuture.completeExceptionally(new PulsarClientException("error")); - when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture); - Mockito.doReturn(clientCnx).when(producer).cnx(); - - // run closeAsync and verify - CompletableFuture voidCompletableFuture = producer.closeAsync(); - verify(producer).closeAndClearPendingMessages(); - } - }