Skip to content

Commit

Permalink
camel-jbang - Debug to work with split eip (#16643)
Browse files Browse the repository at this point in the history
* CAMEL-21223: Debugger should support stepping inside Split EIP

* camel-core: multicast/split EIP should remove internal exchange property for aggregation strategy if its no longer in use

* CAMEL-21223: Debugger should support stepping inside Split EIP
  • Loading branch information
davsclaus authored Dec 23, 2024
1 parent e4ee152 commit 31a2ea9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.camel.impl.debugger;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -75,7 +77,7 @@ public final class DefaultBacklogDebugger extends ServiceSupport implements Back
private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<>();

private final AtomicReference<CountDownLatch> suspend = new AtomicReference<>();
private volatile String singleStepExchangeId;
private final Deque<String> singleStepExchangeId = new ArrayDeque<>();

private boolean suspendMode;
private String initialBreakpoints;
Expand Down Expand Up @@ -232,7 +234,11 @@ public boolean isSuspendMode() {

@Override
public boolean isSingleStepMode() {
return singleStepExchangeId != null;
return !singleStepExchangeId.isEmpty();
}

private boolean isSingleStepMode(String exchangeId) {
return singleStepExchangeId.contains(exchangeId);
}

@Override
Expand Down Expand Up @@ -350,7 +356,7 @@ public void removeBreakpoint(String nodeId) {
@Override
public void removeAllBreakpoints() {
// stop single stepping
singleStepExchangeId = null;
singleStepExchangeId.clear();

for (String nodeId : getSuspendedBreakpointNodeIds()) {
removeBreakpoint(nodeId);
Expand All @@ -371,9 +377,9 @@ public void resumeBreakpoint(String nodeId) {
public void resumeBreakpoint(String nodeId, boolean stepMode) {
logger.log("Resume breakpoint " + nodeId);

if (!stepMode && singleStepExchangeId != null) {
debugger.stopSingleStepExchange(singleStepExchangeId);
singleStepExchangeId = null;
if (!stepMode && !singleStepExchangeId.isEmpty()) {
singleStepExchangeId.forEach(debugger::stopSingleStepExchange);
singleStepExchangeId.clear();
}

// remember to remove the dumped message as its no longer in need
Expand Down Expand Up @@ -565,7 +571,7 @@ public void removeMessageHeaderOnBreakpoint(String nodeId, String headerName) {
public void resumeAll() {
logger.log("Resume all");
// stop single stepping
singleStepExchangeId = null;
singleStepExchangeId.clear();

for (String node : getSuspendedBreakpointNodeIds()) {
// remember to remove the dumped message as its no longer in need
Expand Down Expand Up @@ -594,8 +600,11 @@ public void stepBreakpoint() {
String nodeId = msg.getToNode();
NodeBreakpoint breakpoint = breakpoints.get(nodeId);
if (breakpoint != null) {
singleStepExchangeId = msg.getExchangeId();
if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) {
String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null;
if (tid == null || !tid.equals(msg.getExchangeId())) {
singleStepExchangeId.push(msg.getExchangeId());
}
if (debugger.startSingleStepExchange(msg.getExchangeId(), new StepBreakpoint())) {
// now resume
resumeBreakpoint(nodeId, true);
}
Expand All @@ -616,8 +625,11 @@ public void stepBreakpoint(String nodeId) {
BacklogTracerEventMessage msg = suspendedBreakpointMessages.get(nodeId);
NodeBreakpoint breakpoint = breakpoints.get(nodeId);
if (msg != null && breakpoint != null) {
singleStepExchangeId = msg.getExchangeId();
if (debugger.startSingleStepExchange(singleStepExchangeId, new StepBreakpoint())) {
String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null;
if (tid == null || !tid.equals(msg.getExchangeId())) {
singleStepExchangeId.push(msg.getExchangeId());
}
if (debugger.startSingleStepExchange(msg.getExchangeId(), new StepBreakpoint())) {
// now resume
resumeBreakpoint(nodeId, true);
}
Expand Down Expand Up @@ -1000,13 +1012,13 @@ public void onEvent(Exchange exchange, ExchangeEvent event, NamedNode definition
NamedRoute route = getOriginalRoute(exchange);
String completedId = event.getExchange().getExchangeId();
try {
if (isSingleStepIncludeStartEnd() && singleStepExchangeId != null
&& singleStepExchangeId.equals(completedId)) {
String tid = !singleStepExchangeId.isEmpty() ? singleStepExchangeId.peek() : null;
if (isSingleStepIncludeStartEnd() && completedId.equals(tid)) {
doCompleted(exchange, definition, route, cause);
}
} finally {
singleStepExchangeId.remove(completedId);
logger.log("ExchangeId: " + completedId + " is completed, so exiting single step mode.");
singleStepExchangeId = null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class DefaultDebugger extends ServiceSupport implements Debugger, CamelCo

private final EventNotifier debugEventNotifier = new DebugEventNotifier();
private final List<BreakpointConditions> breakpoints = new CopyOnWriteArrayList<>();
private final int maxConcurrentSingleSteps = 1;
private final int maxConcurrentSingleSteps = 100;
private final Map<String, Breakpoint> singleSteps = new HashMap<>(maxConcurrentSingleSteps);
private CamelContext camelContext;

Expand Down Expand Up @@ -217,7 +217,7 @@ public void stopSingleStepExchange(String exchangeId) {
@Override
public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode definition) {
// is the exchange in single step mode?
Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onBeforeProcess(exchange, processor, definition, singleStep);
return true;
Expand All @@ -241,7 +241,7 @@ public boolean beforeProcess(Exchange exchange, Processor processor, NamedNode d
@Override
public boolean afterProcess(Exchange exchange, Processor processor, NamedNode definition, long timeTaken) {
// is the exchange in single step mode?
Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onAfterProcess(exchange, processor, definition, timeTaken, singleStep);
return true;
Expand All @@ -265,7 +265,7 @@ public boolean afterProcess(Exchange exchange, Processor processor, NamedNode de
@Override
public boolean onEvent(Exchange exchange, ExchangeEvent event) {
// is the exchange in single step mode?
Breakpoint singleStep = singleSteps.get(exchange.getExchangeId());
Breakpoint singleStep = getSingleStepBreakpoint(exchange);
if (singleStep != null) {
onEvent(exchange, event, singleStep);
return true;
Expand Down Expand Up @@ -317,6 +317,18 @@ protected void onEvent(Exchange exchange, ExchangeEvent event, Breakpoint breakp
}
}

private Breakpoint getSingleStepBreakpoint(Exchange exchange) {
Breakpoint answer = singleSteps.get(exchange.getExchangeId());
if (answer == null) {
// we may step into an EIP such as split so check via correlation id (parent exchange)
String id = exchange.getProperty(ExchangePropertyKey.CORRELATION_ID, String.class);
if (id != null) {
answer = singleSteps.get(id);
}
}
return answer;
}

private boolean matchConditions(
Exchange exchange, Processor processor, NamedNode definition, BreakpointConditions breakpoint, boolean before) {
for (Condition condition : breakpoint.getConditions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1278,6 +1278,10 @@ protected void removeAggregationStrategyFromExchange(Exchange exchange) {
}
// remove the strategy using this processor as the key
map.remove(this);
// and remove map if its empty
if (map.isEmpty()) {
exchange.removeProperty(ExchangePropertyKey.AGGREGATION_STRATEGY);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private void doRead(Console c, AtomicBoolean quit) {
logUpdated.set(true);
}
}
sendDebugCommand(spawnPid, "step", line, null);
sendDebugCommand(spawnPid, "step", null);
}
// user have pressed ENTER so continue
waitForUser.set(false);
Expand Down Expand Up @@ -309,7 +309,7 @@ protected int doWatch() {
return 0;
}

private void sendDebugCommand(long pid, String command, String argument, String breakpoint) {
private void sendDebugCommand(long pid, String command, String breakpoint) {
// ensure output file is deleted before executing action
File outputFile = getOutputFile(Long.toString(pid));
FileUtil.deleteFile(outputFile);
Expand All @@ -319,14 +319,6 @@ private void sendDebugCommand(long pid, String command, String argument, String
if (command != null) {
root.put("command", command);
}
if (argument != null && !argument.isBlank()) {
if ("i".equals(argument)) {
argument = "into";
} else if ("o".equals(argument)) {
argument = "over";
}
root.put("argument", argument);
}
if (breakpoint != null) {
root.put("breakpoint", breakpoint);
}
Expand Down Expand Up @@ -479,7 +471,7 @@ private void printDebugStatus(long pid, StringWriter buffer) {
}
}

String msg = " Breakpoint suspended. Press ENTER to continue (i = step into (default), o = step over).";
String msg = " Breakpoint suspended. Press ENTER to continue.";
if (loggingColor) {
AnsiConsole.out().println(Ansi.ansi().a(Ansi.Attribute.INTENSITY_BOLD).a(msg).reset());
} else {
Expand Down

0 comments on commit 31a2ea9

Please # to comment.