diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index ff8e0655d03be..ea013d2da7dd7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -690,7 +690,7 @@ private void recoveredCursor(PositionImpl position, Map properties position = ledger.getLastPosition(); } log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); - this.cursorProperties = cursorProperties; + this.cursorProperties = cursorProperties == null ? Collections.emptyMap() : cursorProperties; messagesConsumedCounter = -getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition())); markDeletePosition = position; persistentMarkDeletePosition = position; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java index 6a00bfd199584..33076fd51a8e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.commons.collections4.MapUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage; @@ -85,6 +86,9 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d */ public CompletableFuture cleanResidualSnapshots(ManagedCursor cursor) { Map cursorProperties = cursor.getCursorProperties(); + if (MapUtils.isEmpty(cursorProperties)) { + return CompletableFuture.completedFuture(null); + } List> futures = new ArrayList<>(); FutureUtil.Sequencer sequencer = FutureUtil.Sequencer.create(); cursorProperties.forEach((k, v) -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 67a7de1f01339..d7a3e80f086d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -50,6 +50,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; @@ -137,9 +138,13 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat private synchronized long recoverBucketSnapshot() throws RuntimeException { ManagedCursor cursor = this.lastMutableBucket.getCursor(); + Map cursorProperties = cursor.getCursorProperties(); + if (MapUtils.isEmpty(cursorProperties)) { + return 0; + } FutureUtil.Sequencer sequencer = this.lastMutableBucket.getSequencer(); Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); - cursor.getCursorProperties().keySet().forEach(key -> { + cursorProperties.keySet().forEach(key -> { if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { String[] keys = key.split(DELIMITER); checkArgument(keys.length == 3); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java index 0a82b2b4c3cb0..54fec3934ddbc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java @@ -20,10 +20,13 @@ import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import com.google.common.collect.Multimap; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -32,6 +35,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.BrokerTestUtil; @@ -40,6 +44,7 @@ import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; @@ -47,6 +52,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -353,4 +359,121 @@ public void testDelete() throws Exception { } } } + + @DataProvider(name = "subscriptionTypes") + public Object[][] subscriptionTypes() { + return new Object[][]{ + {SubscriptionType.Shared}, + {SubscriptionType.Key_Shared}, + {SubscriptionType.Failover}, + {SubscriptionType.Exclusive}, + }; + } + + /** + * see: https://github.com/apache/pulsar/pull/21595. + */ + @Test(dataProvider = "subscriptionTypes") + public void testDeleteTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + final String subscriptionName = "s1"; + // create a topic. + admin.topics().createNonPartitionedTopic(topic); + // create a subscription without props. + admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest); + pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName) + .subscriptionType(subscriptionType).subscribe().close(); + ManagedCursorImpl cursor = findCursor(topic, subscriptionName); + assertNotNull(cursor); + assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty()); + // Test topic deletion is successful. + admin.topics().delete(topic); + } + + /** + * see: https://github.com/apache/pulsar/pull/21595. + */ + @Test(dataProvider = "subscriptionTypes") + public void testDeletePartitionedTopicIfCursorPropsEmpty(SubscriptionType subscriptionType) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + final String subscriptionName = "s1"; + // create a topic. + admin.topics().createPartitionedTopic(topic, 2); + // create a subscription without props. + admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest); + pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName) + .subscriptionType(subscriptionType).subscribe().close(); + ManagedCursorImpl cursor = findCursor(topic + "-partition-0", subscriptionName); + assertNotNull(cursor); + assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty()); + // Test topic deletion is successful. + admin.topics().deletePartitionedTopic(topic); + } + + /** + * see: https://github.com/apache/pulsar/pull/21595. + */ + @Test(dataProvider = "subscriptionTypes") + public void testDeleteTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + final String subscriptionName = "s1"; + // create a topic. + admin.topics().createNonPartitionedTopic(topic); + // create a subscription without props. + admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest); + pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName) + .subscriptionType(subscriptionType).subscribe().close(); + ManagedCursorImpl cursor = findCursor(topic, subscriptionName); + assertNotNull(cursor); + assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty()); + // Put a subscription prop. + Map properties = new HashMap<>(); + properties.put("ignore", "ignore"); + admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties); + assertTrue(cursor.getCursorProperties() != null && !cursor.getCursorProperties().isEmpty()); + // Test topic deletion is successful. + admin.topics().delete(topic); + } + + /** + * see: https://github.com/apache/pulsar/pull/21595. + */ + @Test(dataProvider = "subscriptionTypes") + public void testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType subscriptionType) throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_"); + final String subscriptionName = "s1"; + // create a topic. + admin.topics().createPartitionedTopic(topic, 2); + pulsarClient.newProducer().topic(topic).create().close(); + // create a subscription without props. + admin.topics().createSubscription(topic, subscriptionName, MessageId.earliest); + pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName) + .subscriptionType(subscriptionType).subscribe().close(); + + ManagedCursorImpl cursor = findCursor(topic + "-partition-0", subscriptionName); + assertNotNull(cursor); + assertTrue(cursor.getCursorProperties() == null || cursor.getCursorProperties().isEmpty()); + // Put a subscription prop. + Map properties = new HashMap<>(); + properties.put("ignore", "ignore"); + admin.topics().updateSubscriptionProperties(topic, subscriptionName, properties); + assertTrue(cursor.getCursorProperties() != null && !cursor.getCursorProperties().isEmpty()); + // Test topic deletion is successful. + admin.topics().deletePartitionedTopic(topic); + } + + + private ManagedCursorImpl findCursor(String topic, String subscriptionName) { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + Iterator cursorIterator = persistentTopic.getManagedLedger().getCursors().iterator(); + while (cursorIterator.hasNext()) { + ManagedCursor managedCursor = cursorIterator.next(); + if (managedCursor == null || !managedCursor.getName().equals(subscriptionName)) { + continue; + } + return (ManagedCursorImpl) managedCursor; + } + return null; + } }