diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java index cbf8a9f0def..8bb75826b97 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaBrokerEventListener.java @@ -127,6 +127,7 @@ public void onMessage(Message jmsMessage) { long nextWarn = start; try { while (!memoryUsage.waitForSpace(1000, 90)) { + replicaStatistics.setReplicaReplicationFlowControl(true); long now = System.currentTimeMillis(); if (now >= nextWarn) { logger.warn("High memory usage. Pausing replication (paused for: {}s)", (now - start) / 1000); @@ -137,6 +138,7 @@ public void onMessage(Message jmsMessage) { throw new RuntimeException(e); } } + replicaStatistics.setReplicaReplicationFlowControl(false); try { processMessageWithRetries(message, null, null); diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java index 1fb75915f48..565846099ad 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaSequencer.java @@ -479,6 +479,7 @@ boolean iterateSend() { long nextWarn = start; try { while (!memoryUsage.waitForSpace(1000, 95)) { + replicaStatistics.setSourceReplicationFlowControl(true); long now = System.currentTimeMillis(); if (now >= nextWarn) { logger.warn("High memory usage. Pausing replication (paused for: {}s)", (now - start) / 1000); @@ -489,6 +490,7 @@ boolean iterateSend() { throw new RuntimeException(e); } } + replicaStatistics.setSourceReplicationFlowControl(false); iterateSend0(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java index 4e1885d9674..8fc932c2151 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/ReplicaStatistics.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ReplicaStatistics { @@ -33,6 +34,9 @@ public class ReplicaStatistics { private AtomicLong replicationLag; private AtomicLong replicaLastProcessedTime; + private AtomicBoolean sourceReplicationFlowControl; + private AtomicBoolean replicaReplicationFlowControl; + public ReplicaStatistics() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { if (tpsCounter == null) { @@ -60,6 +64,9 @@ public void reset() { sourceLastProcessedTime = null; replicationLag = null; replicaLastProcessedTime = null; + + sourceReplicationFlowControl = null; + replicaReplicationFlowControl = null; } public void increaseTpsCounter(long size) { @@ -116,4 +123,26 @@ public void setReplicaLastProcessedTime(long replicaLastProcessedTime) { } this.replicaLastProcessedTime.set(replicaLastProcessedTime); } + + public AtomicBoolean getSourceReplicationFlowControl() { + return sourceReplicationFlowControl; + } + + public void setSourceReplicationFlowControl(boolean sourceReplicationFlowControl) { + if (this.sourceReplicationFlowControl == null) { + this.sourceReplicationFlowControl = new AtomicBoolean(); + } + this.sourceReplicationFlowControl.set(sourceReplicationFlowControl); + } + + public AtomicBoolean getReplicaReplicationFlowControl() { + return replicaReplicationFlowControl; + } + + public void setReplicaReplicationFlowControl(boolean replicaReplicationFlowControl) { + if (this.replicaReplicationFlowControl == null) { + this.replicaReplicationFlowControl = new AtomicBoolean(); + } + this.replicaReplicationFlowControl.set(replicaReplicationFlowControl); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java index 737706e2511..9f6d1ecdf8c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationView.java @@ -21,6 +21,7 @@ import org.apache.activemq.replica.ReplicaStatistics; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ReplicationView implements ReplicationViewMBean { @@ -69,4 +70,14 @@ public Long getReplicaWaitTime() { return Optional.ofNullable(replicaStatistics.getReplicaLastProcessedTime()).map(AtomicLong::get) .map(v -> System.currentTimeMillis() - v).orElse(null); } + + @Override + public Boolean getSourceReplicationFlowControl() { + return Optional.ofNullable(replicaStatistics.getSourceReplicationFlowControl()).map(AtomicBoolean::get).orElse(null); + } + + @Override + public Boolean getReplicaReplicationFlowControl() { + return Optional.ofNullable(replicaStatistics.getReplicaReplicationFlowControl()).map(AtomicBoolean::get).orElse(null); + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java index 7edd5f848d9..95d6b5141f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/replica/jmx/ReplicationViewMBean.java @@ -40,4 +40,10 @@ public interface ReplicationViewMBean { @MBeanInfo("Get wait time(if the broker's role is replica)") Long getReplicaWaitTime(); + + @MBeanInfo("Flow control is enabled for replication on the source side") + Boolean getSourceReplicationFlowControl(); + + @MBeanInfo("Flow control is enabled for replication on the replica side") + Boolean getReplicaReplicationFlowControl(); }