Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Map;
import java.util.Set;

import org.apache.camel.ContextEvents;
import org.apache.camel.api.management.ManagedCamelContext;
import org.apache.camel.api.management.mbean.ManagedCamelContextMBean;
import org.apache.camel.clock.Clock;
import org.apache.camel.spi.ReloadStrategy;
import org.apache.camel.spi.ResourceReloadStrategy;
import org.apache.camel.spi.annotations.DevConsole;
Expand Down Expand Up @@ -54,6 +56,10 @@ protected String doCallText(Map<String, Object> options) {
if (getCamelContext().getDescription() != null) {
sb.append(String.format("%n %s", getCamelContext().getDescription()));
}
Clock startClock = getCamelContext().getClock().get(ContextEvents.START);
if (startClock != null) {
sb.append(String.format("%n Started: %s", startClock.asDate()));
}
sb.append("\n");

ManagedCamelContext mcc = getCamelContext().getCamelContextExtension().getContextPlugin(ManagedCamelContext.class);
Expand Down Expand Up @@ -135,7 +141,12 @@ protected JsonObject doCallJson(Map<String, Object> options) {
root.put("version", getCamelContext().getVersion());
root.put("state", getCamelContext().getStatus().name());
root.put("phase", getCamelContext().getCamelContextExtension().getStatusPhase());
root.put("uptime", getCamelContext().getUptime().toMillis());
long uptimeMillis = getCamelContext().getUptime().toMillis();
Clock startClock = getCamelContext().getClock().get(ContextEvents.START);
if (startClock != null) {
root.put("startTimestamp", startClock.getCreated());
}
root.put("uptime", uptimeMillis);
root.put("devMode", getCamelContext().hasService(ResourceReloadStrategy.class) != null);

ManagedCamelContext mcc = getCamelContext().getCamelContextExtension().getContextPlugin(ManagedCamelContext.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.camel.spi.ResourceLoader;
import org.apache.camel.spi.ResourceReloadStrategy;
import org.apache.camel.spi.RoutesLoader;
import org.apache.camel.spi.RuntimeEndpointRegistry;
import org.apache.camel.spi.ShutdownPrepared;
import org.apache.camel.support.LoadOnDemandReloadStrategy;
import org.apache.camel.support.MessageHelper;
Expand Down Expand Up @@ -918,6 +919,10 @@ private void doActionResetStatsTask() throws Exception {
if (mcc != null) {
mcc.getManagedCamelContext().reset(true);
}
RuntimeEndpointRegistry reg = camelContext.getRuntimeEndpointRegistry();
if (reg != null) {
reg.reset();
}
}

private void doActionDebugTask(JsonObject root) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $ camel get endpoint --verbose
To sort endpoints by body size (largest first):

```sh
$ camel get endpoint --sort -size
$ camel get endpoint --sort=-size
```

### How it works
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,16 @@ class ActionsPopup {
private static final int ACTION_CLASSPATH = 8;
private static final int ACTION_MCP_INFO = 9;
private static final int ACTION_MCP_LOG = 10;
private static final int ACTION_STOP_ALL = 11;
private static final int ACTION_RESET_STATS = 11;
private static final int ACTION_STOP_ALL = 12;

private final Supplier<Set<String>> runningNames;
private final Supplier<List<IntegrationInfo>> integrations;
private final Runnable screenshotAction;
private final Runnable toggleKeystrokes;
private final Supplier<Boolean> keystrokesEnabled;
private final Runnable toggleTapeRecording;
private Runnable resetStatsAction;
private final Supplier<Boolean> tapeRecordingActive;
private MonitorContext ctx;
private boolean mcpEnabled;
Expand Down Expand Up @@ -133,6 +135,10 @@ void setContext(MonitorContext ctx) {
this.ctx = ctx;
}

void setResetStatsAction(Runnable resetStatsAction) {
this.resetStatsAction = resetStatsAction;
}

void setMcpEnabled(
boolean enabled, int port, Supplier<String> connectedClient, Supplier<List<TuiMcpServer.LogEntry>> activityLog) {
this.mcpEnabled = enabled;
Expand All @@ -143,7 +149,7 @@ void setMcpEnabled(
}

private int actionCount() {
return mcpEnabled ? 12 : 10;
return mcpEnabled ? 13 : 11;
}

boolean isVisible() {
Expand Down Expand Up @@ -195,6 +201,7 @@ List<String> getActionLabels() {
labels.add("Tape Recording Guide");
labels.add("Run Doctor");
labels.add("Show Classpath");
labels.add("Reset Stats");
if (mcpEnabled) {
labels.add("MCP Info");
labels.add("MCP Log");
Expand Down Expand Up @@ -348,6 +355,11 @@ boolean handleKeyEvent(KeyEvent ke) {
} else if (action == ACTION_MCP_LOG) {
showActionsMenu = false;
openMcpLog();
} else if (action == ACTION_RESET_STATS) {
showActionsMenu = false;
if (resetStatsAction != null) {
resetStatsAction.run();
}
} else if (action == ACTION_STOP_ALL) {
showActionsMenu = false;
stopAllPopup.open();
Expand Down Expand Up @@ -485,6 +497,7 @@ private void renderActionsMenu(Frame frame, Rect area) {
items.add(ListItem.from(" 📄 Tape Recording Guide"));
items.add(ListItem.from(" 🩺 Run Doctor"));
items.add(ListItem.from(" 📦 Show Classpath"));
items.add(ListItem.from(" 🔄 Reset Stats"));
if (mcpEnabled) {
items.add(ListItem.from(" 🤖 MCP Info"));
items.add(ListItem.from(" 📋 MCP Log"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,17 @@ public class CamelMonitor extends CamelCommand {
private final Map<String, LinkedList<long[]>> endpointRemoteStubSamples = new ConcurrentHashMap<>();
private final Map<String, Long> previousEndpointRemoteStubTime = new ConcurrentHashMap<>();

// Endpoint payload size (mean body size) history per PID — for sparkline
private final Map<String, LinkedList<Long>> endpointInSizeHistory = new ConcurrentHashMap<>();
private final Map<String, LinkedList<Long>> endpointOutSizeHistory = new ConcurrentHashMap<>();
private final Map<String, Long> previousEndpointSizeTime = new ConcurrentHashMap<>();

// Per-endpoint in/out rate history — keyed by pid + "|" + uri
private final Map<String, LinkedList<Long>> perEndpointInHistory = new ConcurrentHashMap<>();
private final Map<String, LinkedList<Long>> perEndpointOutHistory = new ConcurrentHashMap<>();
private final Map<String, LinkedList<long[]>> perEndpointSamples = new ConcurrentHashMap<>();
private final Map<String, Long> previousPerEndpointTime = new ConcurrentHashMap<>();

// Circuit breaker throughput history per PID/cbId (success + fail, one point per second)
private final Map<String, LinkedList<Long>> cbSuccessHistory = new ConcurrentHashMap<>();
private final Map<String, LinkedList<Long>> cbFailHistory = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -285,14 +296,17 @@ public Integer doCall() throws Exception {
// Create shared context and tab instances
ctx = new MonitorContext(data, infraData);
actionsPopup.setContext(ctx);
actionsPopup.setResetStatsAction(this::resetStats);
logTab = new LogTab(ctx);
routesTab = new RoutesTab(ctx);
consumersTab = new ConsumersTab(ctx);
endpointsTab = new EndpointsTab(
ctx,
endpointInHistory, endpointOutHistory,
endpointRemoteInHistory, endpointRemoteOutHistory,
endpointRemoteStubInHistory, endpointRemoteStubOutHistory);
endpointRemoteStubInHistory, endpointRemoteStubOutHistory,
endpointInSizeHistory, endpointOutSizeHistory,
perEndpointInHistory, perEndpointOutHistory);
httpTab = new HttpTab(ctx);
healthTab = new HealthTab(ctx);
historyTab = new HistoryTab(ctx, traces, traceFilePositions);
Expand Down Expand Up @@ -1610,6 +1624,44 @@ private void stopSelectedProcess(boolean forceKill) {
}
}

private void resetStats() {
IntegrationInfo info = ctx.findSelectedIntegration();
if (info == null) {
return;
}
String pid = info.pid;
JsonObject root = new JsonObject();
root.put("action", "reset-stats");
Path actionFile = ctx.getActionFile(pid);
PathUtils.writeTextSafely(root.toJson(), actionFile);
// Clear local sparkline history — overview
throughputHistory.remove(pid);
failedHistory.remove(pid);
throughputSamples.remove(pid);
previousExchangesTime.remove(pid);
// Clear local sparkline history — endpoints
endpointInHistory.remove(pid);
endpointOutHistory.remove(pid);
endpointSamples.remove(pid);
previousEndpointTime.remove(pid);
endpointRemoteInHistory.remove(pid);
endpointRemoteOutHistory.remove(pid);
endpointRemoteSamples.remove(pid);
previousEndpointRemoteTime.remove(pid);
endpointRemoteStubInHistory.remove(pid);
endpointRemoteStubOutHistory.remove(pid);
endpointRemoteStubSamples.remove(pid);
previousEndpointRemoteStubTime.remove(pid);
endpointInSizeHistory.remove(pid);
endpointOutSizeHistory.remove(pid);
previousEndpointSizeTime.remove(pid);
String perEpPrefix = pid + "|";
perEndpointInHistory.keySet().removeIf(k -> k.startsWith(perEpPrefix));
perEndpointOutHistory.keySet().removeIf(k -> k.startsWith(perEpPrefix));
perEndpointSamples.keySet().removeIf(k -> k.startsWith(perEpPrefix));
previousPerEndpointTime.keySet().removeIf(k -> k.startsWith(perEpPrefix));
}

private void sendRouteCommand(String pid, String routeId, String command) {
JsonObject root = new JsonObject();
root.put("action", "route");
Expand Down Expand Up @@ -1840,14 +1892,23 @@ private void refreshDataSync() {
endpointRemoteStubInHistory.remove(entry.getKey());
endpointRemoteStubOutHistory.remove(entry.getKey());
endpointRemoteStubSamples.remove(entry.getKey());

endpointInSizeHistory.remove(entry.getKey());
endpointOutSizeHistory.remove(entry.getKey());
previousEndpointSizeTime.remove(entry.getKey());
previousEndpointRemoteStubTime.remove(entry.getKey());
cpuLoadAvg.remove(entry.getKey());
prevCpuSample.remove(entry.getKey());
String vanishPid = entry.getKey() + "/";
cbSuccessHistory.keySet().removeIf(k -> k.startsWith(vanishPid));
cbFailHistory.keySet().removeIf(k -> k.startsWith(vanishPid));
cbThroughputSamples.keySet().removeIf(k -> k.startsWith(vanishPid));
previousCbTime.keySet().removeIf(k -> k.startsWith(vanishPid));
String vanishCbPrefix = entry.getKey() + "/";
cbSuccessHistory.keySet().removeIf(k -> k.startsWith(vanishCbPrefix));
cbFailHistory.keySet().removeIf(k -> k.startsWith(vanishCbPrefix));
cbThroughputSamples.keySet().removeIf(k -> k.startsWith(vanishCbPrefix));
previousCbTime.keySet().removeIf(k -> k.startsWith(vanishCbPrefix));
String vanishEpPrefix = entry.getKey() + "|";
perEndpointInHistory.keySet().removeIf(k -> k.startsWith(vanishEpPrefix));
perEndpointOutHistory.keySet().removeIf(k -> k.startsWith(vanishEpPrefix));
perEndpointSamples.keySet().removeIf(k -> k.startsWith(vanishEpPrefix));
previousPerEndpointTime.keySet().removeIf(k -> k.startsWith(vanishEpPrefix));
} else if (!livePids.contains(entry.getKey())) {
IntegrationInfo ghost = entry.getValue().info;
ghost.vanishing = true;
Expand Down Expand Up @@ -2097,6 +2158,49 @@ private void updateEndpointHistory(IntegrationInfo info) {
recordEndpointSample(pid, now, inRemoteStub, outRemoteStub,
endpointRemoteStubSamples, previousEndpointRemoteStubTime,
endpointRemoteStubInHistory, endpointRemoteStubOutHistory);

// Record payload size snapshots (mean body size per direction)
long inMeanSize = info.endpoints.stream()
.filter(ep -> "in".equals(ep.direction) && ep.meanBodySize >= 0)
.mapToLong(ep -> ep.meanBodySize).max().orElse(0);
long outMeanSize = info.endpoints.stream()
.filter(ep -> "out".equals(ep.direction) && ep.meanBodySize >= 0)
.mapToLong(ep -> ep.meanBodySize).max().orElse(0);
Long lastSizeTime = previousEndpointSizeTime.get(pid);
if (lastSizeTime == null || now - lastSizeTime >= 1000) {
previousEndpointSizeTime.put(pid, now);
LinkedList<Long> inSizeHist = endpointInSizeHistory.computeIfAbsent(pid, k -> new LinkedList<>());
inSizeHist.add(inMeanSize);
while (inSizeHist.size() > MAX_ENDPOINT_CHART_POINTS) {
inSizeHist.remove(0);
}
LinkedList<Long> outSizeHist = endpointOutSizeHistory.computeIfAbsent(pid, k -> new LinkedList<>());
outSizeHist.add(outMeanSize);
while (outSizeHist.size() > MAX_ENDPOINT_CHART_POINTS) {
outSizeHist.remove(0);
}
}

// Per-endpoint rate history (keyed by pid|uri)
Map<String, long[]> perUri = new LinkedHashMap<>();
for (EndpointInfo ep : info.endpoints) {
if (ep.uri == null) {
continue;
}
long[] inOut = perUri.computeIfAbsent(ep.uri, k -> new long[2]);
if ("in".equals(ep.direction)) {
inOut[0] += ep.hits;
} else if ("out".equals(ep.direction)) {
inOut[1] += ep.hits;
}
}
for (Map.Entry<String, long[]> entry : perUri.entrySet()) {
String key = pid + "|" + entry.getKey();
long[] inOut = entry.getValue();
recordEndpointSample(key, now, inOut[0], inOut[1],
perEndpointSamples, previousPerEndpointTime,
perEndpointInHistory, perEndpointOutHistory);
}
}

private void recordEndpointSample(
Expand Down Expand Up @@ -2844,6 +2948,12 @@ private IntegrationInfo parseIntegration(ProcessHandle ph, JsonObject root) {
ep.hits = TuiHelper.objToLong(ej.get("hits"));
ep.stub = Boolean.TRUE.equals(ej.get("stub"));
ep.remote = !Boolean.FALSE.equals(ej.get("remote"));
ep.minBodySize = TuiHelper.objToLong(ej.get("minBodySize"));
ep.maxBodySize = TuiHelper.objToLong(ej.get("maxBodySize"));
ep.meanBodySize = TuiHelper.objToLong(ej.get("meanBodySize"));
ep.minHeadersSize = TuiHelper.objToLong(ej.get("minHeadersSize"));
ep.maxHeadersSize = TuiHelper.objToLong(ej.get("maxHeadersSize"));
ep.meanHeadersSize = TuiHelper.objToLong(ej.get("meanHeadersSize"));
// Extract component from URI (e.g., "timer://tick" -> "timer")
if (ep.uri != null) {
int idx = ep.uri.indexOf(':');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

class ConsumersTab implements MonitorTab {

private static final String[] SORT_COLUMNS = { "id", "status", "type", "inflight", "total", "uri" };
private static final String[] SORT_COLUMNS = { "id", "status", "type", "inflight", "polls", "uri" };

private final MonitorContext ctx;
private final TableState tableState = new TableState();
Expand Down Expand Up @@ -131,7 +131,7 @@ public void render(Frame frame, Rect area) {
Cell.from(Span.styled(sortLabel("STATUS", "status"), sortStyle("status"))),
Cell.from(Span.styled(sortLabel("TYPE", "type"), sortStyle("type"))),
rightCell(sortLabel("INFLIGHT", "inflight"), 8, sortStyle("inflight")),
rightCell(sortLabel("TOTAL", "total"), 8, sortStyle("total")),
rightCell(sortLabel("POLLS", "polls"), 8, sortStyle("polls")),
rightCell("PERIOD", 10, Style.EMPTY.bold()),
Cell.from(Span.styled("SINCE-LAST", Style.EMPTY.bold())),
Cell.from(Span.styled(sortLabel("URI", "uri"), sortStyle("uri")))))
Expand Down Expand Up @@ -179,7 +179,7 @@ private int sortConsumer(ConsumerInfo a, ConsumerInfo b) {
yield ta.compareToIgnoreCase(tb);
}
case "inflight" -> Integer.compare(b.inflight, a.inflight);
case "total" -> {
case "polls" -> {
long la = a.totalCounter != null ? a.totalCounter : 0;
long lb = b.totalCounter != null ? b.totalCounter : 0;
yield Long.compare(lb, la);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ class EndpointInfo {
long hits;
boolean stub;
boolean remote;
long minBodySize = -1;
long maxBodySize = -1;
long meanBodySize = -1;
long minHeadersSize = -1;
long maxHeadersSize = -1;
long meanHeadersSize = -1;
}
Loading