diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java index 51c9025ed0541..c3714c06b968f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultBacklogDebugger.java @@ -16,6 +16,8 @@ */ package org.apache.camel.impl.debugger; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; @@ -75,7 +77,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back private final ConcurrentMap suspendedBreakpointMessages = new ConcurrentHashMap<>(); private final AtomicReference suspend = new AtomicReference<>(); - private volatile String singleStepExchangeId; + private final Deque singleStepExchangeId = new ArrayDeque<>(); private boolean suspendMode; private String initialBreakpoints; @@ -232,7 +234,11 @@ public boolean isSuspendMode() { @Override public boolean isSingleStepMode() { - return singleStepExchangeId != null; + return !singleStepExchangeId.isEmpty(); + } + + private boolean isSingleStepMode(String exchangeId) { + return singleStepExchangeId.contains(exchangeId); } @Override @@ -350,7 +356,7 @@ public void removeBreakpoint(String nodeId) { @Override public void removeAllBreakpoints() { // stop single stepping - singleStepExchangeId = null; + singleStepExchangeId.clear(); for (String nodeId : getSuspendedBreakpointNodeIds()) { removeBreakpoint(nodeId); @@ -371,9 +377,9 @@ public void resumeBreakpoint(String nodeId) { public void resumeBreakpoint(String nodeId, boolean stepMode) { logger.log("Resume breakpoint " + nodeId); - if (!stepMode && singleStepExchangeId != null) { - debugger.stopSingleStepExchange(singleStepExchangeId); - singleStepExchangeId = null; + if (!stepMode && !singleStepExchangeId.isEmpty()) { + singleStepExchangeId.forEach(debugger::stopSingleStepExchange); + singleStepExchangeId.clear(); } // remember to remove the dumped message as its no longer in need @@ -565,7 +571,7 @@ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) { public void resumeAll() { logger.log("Resume all"); // stop single stepping - singleStepExchangeId = null; + singleStepExchangeId.clear(); for (String node : getSuspendedBreakpointNodeIds()) { // remember to remove the dumped message as its no longer in need @@ -594,8 +600,11 @@ public void stepBreakpoint() { String nodeId = msg.getToNode(); NodeBreakpoint breakpoint = breakpoints.get(nodeId); if (breakpoint != null) { - singleStepExchangeId = msg.getExchangeId(); - if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) { + String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null; + if (tid == null || !tid.equals(msg.getExchangeId())) { + singleStepExchangeId.push(msg.getExchangeId()); + } + if (debugger.startSingleStepExchange(msg.getExchangeId(), new StepBreakpoint())) { // now resume resumeBreakpoint(nodeId, true); } @@ -616,8 +625,11 @@ public void stepBreakpoint(String nodeId) { BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId); NodeBreakpoint breakpoint = breakpoints.get(nodeId); if (msg != null && breakpoint != null) { - singleStepExchangeId = msg.getExchangeId(); - if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) { + String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null; + if (tid == null || !tid.equals(msg.getExchangeId())) { + singleStepExchangeId.push(msg.getExchangeId()); + } + if (debugger.startSingleStepExchange(msg.getExchangeId(), new StepBreakpoint())) { // now resume resumeBreakpoint(nodeId, true); } @@ -1000,13 +1012,13 @@ public void onEvent(Exchange exchange, ExchangeEvent event, NamedNode definition NamedRoute route = getOriginalRoute(exchange); String completedId = event.getExchange().getExchangeId(); try { - if (isSingleStepIncludeStartEnd() && singleStepExchangeId != null - && singleStepExchangeId.equals(completedId)) { + String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null; + if (isSingleStepIncludeStartEnd() && completedId.equals(tid)) { doCompleted(exchange, definition, route, cause); } } finally { + singleStepExchangeId.remove(completedId); logger.log("ExchangeId: " + completedId + " is completed, so exiting single step mode."); - singleStepExchangeId = null; } } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java index b001120c5ac68..4fcdbefc793d4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/DefaultDebugger.java @@ -52,7 +52,7 @@ public class DefaultDebugger extends ServiceSupport implements Debugger, CamelCo private final EventNotifier debugEventNotifier = new DebugEventNotifier(); private final List breakpoints = new CopyOnWriteArrayList<>(); - private final int maxConcurrentSingleSteps = 1; + private final int maxConcurrentSingleSteps = 100; private final Map singleSteps = new HashMap<>(maxConcurrentSingleSteps); private CamelContext camelContext; @@ -217,7 +217,7 @@ public void stopSingleStepExchange(String exchangeId) { @Override public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode definition) { // is the exchange in single step mode? - Breakpoint singleStep = singleSteps.get(exchange.getExchangeId()); + Breakpoint singleStep = getSingleStepBreakpoint(exchange); if (singleStep != null) { onBeforeProcess(exchange, processor, definition, singleStep); return true; @@ -241,7 +241,7 @@ public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode d @Override public boolean afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) { // is the exchange in single step mode? - Breakpoint singleStep = singleSteps.get(exchange.getExchangeId()); + Breakpoint singleStep = getSingleStepBreakpoint(exchange); if (singleStep != null) { onAfterProcess(exchange, processor, definition, timeTaken, singleStep); return true; @@ -265,7 +265,7 @@ public boolean afterProcess(Exchange exchange, Processor processor, NamedNode de @Override public boolean onEvent(Exchange exchange, ExchangeEvent event) { // is the exchange in single step mode? - Breakpoint singleStep = singleSteps.get(exchange.getExchangeId()); + Breakpoint singleStep = getSingleStepBreakpoint(exchange); if (singleStep != null) { onEvent(exchange, event, singleStep); return true; @@ -317,6 +317,18 @@ protected void onEvent(Exchange exchange, ExchangeEvent event, Breakpoint breakp } } + private Breakpoint getSingleStepBreakpoint(Exchange exchange) { + Breakpoint answer = singleSteps.get(exchange.getExchangeId()); + if (answer == null) { + // we may step into an EIP such as split so check via correlation id (parent exchange) + String id = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class); + if (id != null) { + answer = singleSteps.get(id); + } + } + return answer; + } + private boolean matchConditions( Exchange exchange, Processor processor, NamedNode definition, BreakpointConditions breakpoint, boolean before) { for (Condition condition : breakpoint.getConditions()) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index f9514de06a130..3fd2143fdd864 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -1278,6 +1278,10 @@ protected void removeAggregationStrategyFromExchange(Exchange exchange) { } // remove the strategy using this processor as the key map.remove(this); + // and remove map if its empty + if (map.isEmpty()) { + exchange.removeProperty(ExchangePropertyKey.AGGREGATION_STRATEGY); + } } /**