Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[fix][broker] Create new ledger after the current ledger is closed #22034

Merged
merged 12 commits into from
Mar 22, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -2667,8 +2667,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
ledgers.headMap(slowestReaderLedgerId, false).values().iterator();
while (ledgerInfoIterator.hasNext()){
LedgerInfo ls = ledgerInfoIterator.next();
// currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
// Current ledger can not be deleted when it is currently being written to.
// However, When the manager ledger state is ClosedLedger, the current ledger was closed
// and there are no pending operations. So it can be deleted.
if (ls.getLedgerId() == currentLedger.getId() && currentState != State.ClosedLedger) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} skipped for deletion as it is currently being written to", name,
ls.getLedgerId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand All @@ -47,6 +48,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.CallbackMutex;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
Expand Down Expand Up @@ -444,10 +446,14 @@ public void badVersionErrorDuringTruncateLedger() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_trim",
new ManagedLedgerConfig()
.setMaxEntriesPerLedger(2));
Field trimmerMutexField = ManagedLedgerImpl.class.getDeclaredField("trimmerMutex");
trimmerMutexField.setAccessible(true);
CallbackMutex trimmerMutex = ((CallbackMutex) trimmerMutexField.get(ledger));
trimmerMutex.tryLock();
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());
ledger.addEntry("test".getBytes());

trimmerMutex.unlock();
metadataStore.failConditional(new MetadataStoreException.BadVersionException("err"), (op, path) ->
path.equals("/managed-ledgers/my_test_ledger_trim")
&& op == FaultInjectionMetadataStore.OperationType.PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4232,4 +4232,24 @@ public void testNoCleanupOffloadLedgerWhenMetadataExceptionHappens() throws Exce
verify(ledgerOffloader, times(0))
.deleteOffloaded(eq(ledgerInfo.getLedgerId()), any(), anyMap());
}

@Test
public void testDeleteCurrentLedgerWhenItIsClosed() throws Exception {
ManagedLedgerConfig config = spy(new ManagedLedgerConfig());
ManagedLedgerImpl ml = spy((ManagedLedgerImpl) factory.open("testDeleteCurrentLedgerWhenItIsClosed", config));
assertEquals(ml.ledgers.size(), 1);
ml.config.setMaximumRolloverTime(10, TimeUnit.MILLISECONDS);
Thread.sleep(10);
ml.addEntry(new byte[4]);
ml.internalTrimLedgers(false, Futures.NULL_PROMISE);
Awaitility.await().untilAsserted(() -> {
assertEquals(ml.state, ManagedLedgerImpl.State.ClosedLedger);
assertEquals(ml.ledgers.size(), 0);
});
ml.addEntry(new byte[4]);
Awaitility.await().untilAsserted(() -> {
assertEquals(ml.state, ManagedLedgerImpl.State.LedgerOpened);
assertEquals(ml.ledgers.size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
// non-durable mes should still
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
assertEquals(nonDurableSubscriptionBacklog, 0,
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");

MessageIdImpl msgId = null;
Expand All @@ -268,10 +268,7 @@
admin.topics().getInternalStats(topic1, false);

// check there is only one ledger left
assertEquals(internalStats.ledgers.size(), 1);

// check if its the expected ledger id given MAX_ENTRIES_PER_LEDGER
assertEquals(internalStats.ledgers.get(0).ledgerId, finalMsgId.getLedgerId());
assertEquals(internalStats.ledgers.size(), 0);
});

// check reader can still read with out error
Expand Down Expand Up @@ -322,7 +319,7 @@
// non-durable mes should still
assertEquals(stats.getSubscriptions().size(), 1);
long nonDurableSubscriptionBacklog = stats.getSubscriptions().values().iterator().next().getMsgBacklog();
assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,

Check failure on line 322 in pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java

View workflow job for this annotation

GitHub Actions / CI - Unit - Brokers - Broker Group 1

BacklogQuotaManagerTest.testTriggerBacklogQuotaSizeWithReader

non-durable subscription backlog is [0] expected [5] but found [0]
"non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]");
MessageIdImpl messageId = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,7 @@ public void testEncryptionRequired() throws Exception {
.value(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8))
.send();
txn.commit();
admin.namespaces().deleteNamespace(namespace, true);
}

@Test
Expand Down Expand Up @@ -1945,5 +1946,6 @@ public void testDelayedDeliveryExceedsMaxDelay() throws Exception {
assertEquals(ex.getMessage(), "Exceeds max allowed delivery delay of "
+ maxDeliveryDelayInMillis + " milliseconds");
}
admin.namespaces().deleteNamespace(namespace, true);
}
}
Loading