Skip to content

Commit

Permalink
[improve][txn] Support ack message list for transaction (#15729)
Browse files Browse the repository at this point in the history
* [improve][txn] Support ack message list for transaction
### Motivation
Now, there is
```java
doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
                                                           Map<String, Long> properties,
                                                           TransactionImpl txn)
```
But not interface
```java
acknowledgeAsync(List<MessageId> messageIdList, Transaction txn)
```
  • Loading branch information
liangyepianzhou authored Jul 5, 2022
1 parent aabd5d0 commit 926834e
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.util.Timeout;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -106,7 +107,9 @@
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessagesImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
Expand Down Expand Up @@ -1020,6 +1023,147 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
transaction.commit().get();
}

@Test(timeOut = 30000)
public void testTransactionAckMessageList() throws Exception {
String topic = "persistent://" + NAMESPACE1 +"/test";
String subName = "testSub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(5, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 5; i++) {
producer.newMessage().send();
}
//verify using aborted transaction to ack message list
List<MessageId> messages = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
messages.add(message.getMessageId());
}
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();

consumer.acknowledgeAsync(messages, transaction);
transaction.abort().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
assertTrue(messages.contains(message.getMessageId()));
}

//verify using committed transaction to ack message list
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.commit().get();

consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertFalse(messages.contains(message.getMessageId()));
message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
consumer.close();
}


@Test(timeOut = 30000)
public void testTransactionAckMessages() throws Exception {
String topic = "persistent://" + NAMESPACE1 +"/testTransactionAckMessages";
String subName = "testSub";
admin.topics().createPartitionedTopic(topic, 2);

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(5, TimeUnit.SECONDS)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();

for (int i = 0; i < 4; i++) {
producer.newMessage().send();
}
Method method = ConsumerBase.class.getDeclaredMethod("getNewMessagesImpl");
method.setAccessible(true);

Field field = MessagesImpl.class.getDeclaredField("messageList");
field.setAccessible(true);

MessagesImpl<byte[]> messages = (MessagesImpl<byte[]>) method.invoke(consumer);

List<Message<byte[]>> messageList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive();
messageList.add(message);
}
field.set(messages, messageList);
Transaction transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();

consumer.acknowledgeAsync(messages, transaction);
transaction.abort().get();
consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
List<MessageId> messageIds = new ArrayList<>();
for (Message message : messageList) {
messageIds.add(message.getMessageId());
}
for (int i = 0; i < 4; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertTrue(messageIds.contains(message.getMessageId()));
}

//verify using committed transaction to ack message list
transaction = pulsarClient
.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
consumer.acknowledgeAsync(messages, transaction);
transaction.commit().get();

consumer.close();
consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);
consumer.close();
}

@Test
public void testGetConnectExceptionForAckMsgWhenCnxIsNull() throws Exception {
String topic = NAMESPACE1 + "/testGetConnectExceptionForAckMsgWhenCnxIsNull";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,50 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);


/**
* Asynchronously acknowledge the consumption of {@link Messages}, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messages
* The {@link Messages} to be acknowledged
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result
* */
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn);


/**
* Asynchronously acknowledge the consumption of a list of message.
* @param messageIdList
* @return
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);


/**
* Acknowledge the consumption of a list of message, it will store in pending ack.
* After the transaction commit, the message will actually ack.
* After the transaction abort, the message will be redelivered.
* @param messageIdList A list of message Id.
* @param txn {@link Transaction} The transaction to ack messages.
* @throws PulsarClientException.AlreadyClosedException
* if the consumer was already closed
* @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
* if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
* different from the transaction in pending ack.
* @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
* broker don't support transaction
* @return {@link CompletableFuture} the future of the ack result */
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn);

/**
* Asynchronously reconsumeLater the consumption of a single message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {

@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
return acknowledgeAsync(messages, null);
}

@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (Message<?> message: messages) {
try {
Expand All @@ -494,14 +499,23 @@ public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
}
messageIds.add(message.getMessageId());
}
return acknowledgeAsync(messageIds);
if (txn != null) {
return acknowledgeAsync(messageIds, txn);
} else {
return acknowledgeAsync(messageIds);
}
}

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
}

@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn);
}

@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
return reconsumeLaterAsync(message, null, delayTime, unit);
Expand Down Expand Up @@ -611,12 +625,21 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
Map<String, Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null) {
if (txn != null && this instanceof ConsumerImpl) {

// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
if (ackType == AckType.Cumulative) {
txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
}

ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
// register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, null);
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
}
return ackFuture;
}
Expand Down Expand Up @@ -787,12 +810,25 @@ protected void onAcknowledge(MessageId messageId, Throwable exception) {
}
}

protected void onAcknowledge(List<MessageId> messageIds, Throwable exception) {
if (interceptors != null) {
messageIds.forEach(messageId -> interceptors.onAcknowledge(this, messageId, exception));
}
}

protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
if (interceptors != null) {
interceptors.onAcknowledgeCumulative(this, messageId, exception);
}
}

protected void onAcknowledgeCumulative(List<MessageId> messageIds, Throwable exception) {
if (interceptors != null) {
messageIds.forEach(messageId -> interceptors.onAcknowledgeCumulative(this, messageId, exception));
}
}


protected void onNegativeAcksSend(Set<MessageId> messageIds) {
if (interceptors != null) {
interceptors.onNegativeAcksSend(this, messageIds);
Expand Down
Loading

0 comments on commit 926834e

Please # to comment.