Skip to content

Commit

Permalink
improve: return topic.getLastPosition() when call `getMaxReadPositi…
Browse files Browse the repository at this point in the history
…on` to TransactionBufferDisable
  • Loading branch information
liangyepianzhou committed Dec 12, 2023
1 parent 501feb5 commit f229294
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
if (ledger instanceof ShadowManagedLedgerImpl) {
Expand Down Expand Up @@ -417,7 +417,7 @@ public CompletableFuture<Void> initialize() {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
this.transactionBuffer = new TransactionBufferDisable();
this.transactionBuffer = new TransactionBufferDisable(this);
}
shadowSourceTopic = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,12 @@ public TransactionBufferReader newReader(long sequenceId) throws

final ConcurrentMap<TxnID, TxnBuffer> buffers;
final Map<Long, Set<TxnID>> txnIndex;
private final Topic topic;

public InMemTransactionBuffer(Topic topic) {
this.buffers = new ConcurrentHashMap<>();
this.txnIndex = new HashMap<>();
this.topic = topic;
}

@Override
Expand Down Expand Up @@ -372,7 +375,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {

@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
return (PositionImpl) topic.getLastPosition();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand All @@ -40,6 +41,11 @@
@Slf4j
public class TransactionBufferDisable implements TransactionBuffer {

private final Topic topic;
public TransactionBufferDisable(Topic topic) {
this.topic = topic;
}

@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -91,7 +97,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {

@Override
public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
return (PositionImpl) topic.getLastPosition();
}

@Override
Expand Down

0 comments on commit f229294

Please # to comment.