Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[improve][broker] Fix ServiceUnitStateCompactionStrategy to cover fas… #46

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,19 @@ static void createNamespaceIfAbsent(PulsarResources resources, NamespaceName nam
namespaceResources.createPolicies(namespaceName, policies);
} else {
log.info("Namespace {} already exists.", namespaceName);
namespaceResources.setPolicies(namespaceName, policies -> {
policies.replication_clusters.add(cluster);
return policies;
});
var replicaClusterFound = false;
var policiesOptional = namespaceResources.getPolicies(namespaceName);
if (policiesOptional.isPresent() && policiesOptional.get().replication_clusters.contains(cluster)) {
replicaClusterFound = true;
}
if (!replicaClusterFound) {
namespaceResources.setPolicies(namespaceName, policies -> {
policies.replication_clusters.add(cluster);
return policies;
});
log.info("Updated namespace:{} policies. Added the replication cluster:{}",
namespaceName, cluster);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
NamespaceBundle bundle = getNamespaceBundle(serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
false,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
.thenApply(numUnloadedTopics -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ public boolean shouldKeepLeft(ServiceUnitStateData from, ServiceUnitStateData to
return false;
}

// Skip the compaction case where from = null and to.versionId > 1
if (from != null && from.versionId() + 1 != to.versionId()) {
return true;
if (from != null) {
if (from.versionId() == Long.MAX_VALUE && to.versionId() == Long.MIN_VALUE) { // overflow
} else if (from.versionId() >= to.versionId()) {
return true;
} else if (from.versionId() < to.versionId() - 1) { // Compacted
return false;
} // else from.versionId() == to.versionId() - 1 // continue to check further
}

if (to.force()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private <T> CompletableFuture<Long> runPhaseTwo(
});
})
.thenCompose(v -> {
log.info("Acking ledger id {}", phaseOneResult.firstId);
log.info("Acking ledger id {}", phaseOneResult.lastId);
return ((CompactionReaderImpl<T>) reader)
.acknowledgeCumulativeAsync(
phaseOneResult.lastId, Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws Except
restartBroker();
pulsar1 = pulsar;
setPrimaryLoadManager();
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));

var serviceUnitStateChannelPrimaryNew =
(ServiceUnitStateChannelImpl) FieldUtils.readDeclaredField(primaryLoadManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public void testVersionId(){
new ServiceUnitStateData(Owned, dst, src, 10),
new ServiceUnitStateData(Releasing, "broker2", dst, 5)));

assertFalse(strategy.shouldKeepLeft(
new ServiceUnitStateData(Owned, dst, src, 10),
new ServiceUnitStateData(Owned, "broker2", dst, 12)));

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState.isValidTransition;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.MSG_COMPRESSION_TYPE;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData.state;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -49,6 +52,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.lang.reflect.FieldUtils;
Expand All @@ -69,6 +73,7 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
Expand Down Expand Up @@ -628,6 +633,80 @@ public void testSlowTableviewAfterCompaction() throws Exception {

}

@Test
public void testSlowReceiveTableviewAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
String strategyClassName = "topicCompactionStrategyClassName";

pulsarClient.newConsumer(schema)
.topic(topic)
.subscriptionName("sub1")
.readCompacted(true)
.subscribe().close();

var tv = pulsar.getClient().newTableViewBuilder(schema)
.topic(topic)
.subscriptionName("slowTV")
.loadConf(Map.of(
strategyClassName,
ServiceUnitStateCompactionStrategy.class.getName()))
.create();

// Configure retention to ensue data is retained for reader
admin.namespaces().setRetention("my-property/use/my-ns",
new RetentionPolicies(-1, -1));

Producer<ServiceUnitStateData> producer = pulsarClient.newProducer(schema)
.topic(topic)
.compressionType(MSG_COMPRESSION_TYPE)
.enableBatching(true)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

StrategicTwoPhaseCompactor compactor
= new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);

var reader = ((CompletableFuture<ReaderImpl<ServiceUnitStateData>>) FieldUtils
.readDeclaredField(tv, "reader", true)).get();
var consumer = spy(reader.getConsumer());
FieldUtils.writeDeclaredField(reader, "consumer", consumer, true);
String bundle = "bundle1";
final AtomicInteger versionId = new AtomicInteger(0);
final AtomicInteger cnt = new AtomicInteger(1);
int msgAddCount = 1000; // has to be big enough to cover compacted cursor fast-forward.
doAnswer(invocationOnMock -> {
if (cnt.decrementAndGet() == 0) {
var msg = consumer.receiveAsync();
for (int i = 0; i < msgAddCount; i++) {
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
}
compactor.compact(topic, strategy).join();
return msg;
}
// Call the real method
reset(consumer);
return consumer.receiveAsync();
}).when(consumer).receiveAsync();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker", true,
versionId.incrementAndGet())).send();
producer.newMessage().key(bundle).value(
new ServiceUnitStateData(Owned, "broker" + versionId.incrementAndGet(), true,
versionId.get())).send();
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
() -> {
var val = tv.get(bundle);
assertNotNull(val);
assertEquals(val.dstBroker(), "broker" + versionId.get());
}
);

producer.close();
tv.close();
}

@Test
public void testBrokerRestartAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ private void handleMessage(Message<T> msg) {
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
if (!update) {
log.info("Skipped the message from topic {}. key={} value={} prev={}",
conf.getTopicName(),
key,
cur,
prev);
}
}

if (update) {
Expand Down