Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jan 28, 2024
1 parent e38c9b8 commit 3eb5393
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public synchronized void startProducer() {
replicatorId, waitTimeMs / 1000.0);
}
// BackOff before retrying
scheduledCheckTopicActiveAndRetryStartProducer(waitTimeMs);
scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
return;
}
State state = STATE_UPDATER.get(this);
Expand All @@ -159,7 +159,7 @@ public synchronized void startProducer() {
log.warn("[{}] Failed to create remote producer ({}), retrying in {} s",
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
scheduledCheckTopicActiveAndRetryStartProducer(waitTimeMs);
scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
} else {
log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId,
STATE_UPDATER.get(this), ex);
Expand All @@ -169,7 +169,7 @@ public synchronized void startProducer() {

}

protected void scheduledCheckTopicActiveAndRetryStartProducer(final long waitTimeMs) {
protected void scheduleCheckTopicActiveAndStartProducer(final long waitTimeMs) {
brokerService.executor().schedule(() -> {
if (state == State.Closed) {
return;
Expand Down Expand Up @@ -218,12 +218,12 @@ protected CompletableFuture<Boolean> isLocalTopicActive() {

protected synchronized CompletableFuture<Void> closeAsync(boolean onlyCloseProducer) {
if (producer == null) {
updateStatusAfterCloseProducer(onlyCloseProducer);
updateStatus(onlyCloseProducer);
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> future = producer.closeAsync();
future.thenRun(() -> {
updateStatusAfterCloseProducer(onlyCloseProducer);
updateStatus(onlyCloseProducer);
this.producer = null;
// deactivate further read
disableReplicatorRead();
Expand All @@ -241,7 +241,7 @@ protected synchronized CompletableFuture<Void> closeAsync(boolean onlyCloseProdu
return future;
}

protected void updateStatusAfterCloseProducer(boolean onlyCloseProducer) {
protected void updateStatus(boolean onlyCloseProducer) {
if (onlyCloseProducer) {
// Only close producer.
STATE_UPDATER.set(this, State.Stopped);
Expand Down

0 comments on commit 3eb5393

Please # to comment.