Skip to content

Commit

Permalink
[improve][broker] Do not retain the data in the system topic (#22022)
Browse files Browse the repository at this point in the history
### Motivation

For some use case, the users need to store all the messages even though these message are acked by all subscription.
So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`).  But the data in the system topic does not need for infinite retention. 

### Modifications

For system topics, do not retain messages that have already been acknowledged.
  • Loading branch information
liangyepianzhou authored and Technoboy- committed Feb 27, 2024
1 parent 2a6b644 commit 4e97827
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1839,10 +1839,17 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
}

if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
if (SystemTopicNames.isSystemTopic(topicName)) {
if (log.isDebugEnabled()) {
log.debug("{} Disable data retention policy for system topic.", topicName);
}
retentionPolicies = new RetentionPolicies(0, 0);
} else {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,21 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -212,6 +217,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc
);
}

@Test
public void testRetentionPolicesForSystemTopic() throws Exception {
String namespace = "my-tenant/my-ns";
String topicPrefix = "persistent://" + namespace + "/";
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1));
// Check event topics and transaction internal topics.
for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
}
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.PENDING_ACK_STORE_SUFFIX);

// Check common topics.
checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime());
// Specify retention policies for system topic.
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
pulsar.getConfiguration().setSystemTopicEnabled(true);
admin.topics().createNonPartitionedTopic(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
admin.topicPolicies().setRetention(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
Awaitility.await().untilAsserted(() -> {
checkTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
new RetentionPolicies(10, 10));
});
}

private void checkSystemTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));

}

private void checkCommonTopicRetentionPolicy(String topicName) throws Exception {
checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
}

private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception {
ManagedLedgerConfig config = pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topicName)).get();
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
}

private void testCompactionCursorRetention(String topic) throws Exception {
Set<String> keys = Sets.newHashSet("a", "b", "c");
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");
Expand Down

0 comments on commit 4e97827

Please # to comment.