Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/telnet-dev-logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add a `@trigger.dev/core/v3/telnetLogServer` module: the shared `TelnetLogServer` (localhost-only, backpressure-safe), `formatLogLine`, and `stripAnsi` helpers, plus an optional static `Logger.onLog` / `SimpleStructuredLogger.onLog` sink used to fan structured logs out to a local dev-only telnet/TCP stream.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres?schema=publi
# See: https://www.prisma.io/docs/reference/api-reference/prisma-schema-reference#fields:~:text=the%20shadow%20database.-,directUrl,-No
DIRECT_URL=${DATABASE_URL}
REMIX_APP_PORT=3030
# Dev-only: stream the webapp's logs over a local telnet/TCP socket (nc localhost 6767). Uncomment to enable.
# WEBAPP_TELNET_LOGS_PORT=6767
APP_ENV=development
APP_ORIGIN=http://localhost:3030
ELECTRIC_ORIGIN=http://localhost:3060
Expand Down
6 changes: 6 additions & 0 deletions .server-changes/supervisor-telnet-logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: supervisor
type: feature
---

Add an opt-in, dev-only telnet log stream: set `SUPERVISOR_TELNET_LOGS_PORT` (e.g. 6769) to tail this process's logs as plain text over a local TCP socket (`nc localhost 6769`). Bound to localhost; off unless the port is set.
6 changes: 6 additions & 0 deletions .server-changes/webapp-telnet-logs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add an opt-in, dev-only telnet log stream: set `WEBAPP_TELNET_LOGS_PORT` (e.g. 6767) to tail this process's logs as plain text over a local TCP socket (`nc localhost 6767`). Bound to localhost; off unless the port is set.
5 changes: 4 additions & 1 deletion apps/supervisor/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:3030/otel

# Optional settings
DEBUG=1
TRIGGER_DEQUEUE_INTERVAL_MS=1000
TRIGGER_DEQUEUE_INTERVAL_MS=1000

# Dev-only: stream this process's logs over a local telnet/TCP socket (nc localhost 6769). Uncomment to enable.
# SUPERVISOR_TELNET_LOGS_PORT=6769
3 changes: 3 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ export const Env = z
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),

// Opt-in, dev-only: stream this process's logs over a local telnet/TCP socket on this port.
SUPERVISOR_TELNET_LOGS_PORT: z.coerce.number().optional(),

// Required settings
TRIGGER_API_URL: z.string().url(),
TRIGGER_WORKER_TOKEN: z.string(), // accepts file:// path to read from a file
Expand Down
10 changes: 10 additions & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { SupervisorSession } from "@trigger.dev/core/v3/workers";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { formatLogLine, startTelnetLogServer } from "@trigger.dev/core/v3/telnetLogServer";
import { env } from "./env.js";
import { WorkloadServer } from "./workloadServer/index.js";
import type { WorkloadManagerOptions, WorkloadManager } from "./workloadManager/types.js";
Expand Down Expand Up @@ -749,5 +750,14 @@ class ManagedSupervisor {
}
}

// Opt-in, dev-only: mirror this process's structured logs to a local telnet/TCP stream.
if (env.SUPERVISOR_TELNET_LOGS_PORT && env.SUPERVISOR_TELNET_LOGS_PORT > 0) {
const telnetLogServer = startTelnetLogServer({
port: env.SUPERVISOR_TELNET_LOGS_PORT,
name: "supervisor",
});
SimpleStructuredLogger.onLog = (log) => telnetLogServer.broadcast(formatLogLine(log));
}

const worker = new ManagedSupervisor();
worker.start();
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ const EnvironmentSchema = z
.optional(),
ADMIN_EMAILS: z.string().refine(isValidRegex, "ADMIN_EMAILS must be a valid regex.").optional(),
REMIX_APP_PORT: z.string().optional(),
// Opt-in, dev-only: stream this process's logs over a local telnet/TCP socket on this port.
// Read directly from process.env in server.ts (before this schema loads); declared here for discoverability.
WEBAPP_TELNET_LOGS_PORT: z.coerce.number().optional(),
LOGIN_ORIGIN: z.string().default("http://localhost:3030"),
LOGIN_RATE_LIMITS_ENABLED: BoolEnv.default(true),
APP_ORIGIN: z.string().default("http://localhost:3030"),
Expand Down
17 changes: 17 additions & 0 deletions apps/webapp/app/services/logger.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { LogLevel } from "@trigger.dev/core/logger";
import { Logger } from "@trigger.dev/core/logger";
import { patchConsoleToTelnet, startTelnetLogServer } from "@trigger.dev/core/v3/telnetLogServer";
import { sensitiveDataReplacer } from "./sensitiveDataReplacer";
import { AsyncLocalStorage } from "async_hooks";
import { getHttpContext } from "./httpAsyncStorage.server";
Expand Down Expand Up @@ -79,3 +80,19 @@ export const socketLogger = new Logger(
return fields ? { ...fields } : {};
}
);

// Opt-in, dev-only: mirror this process's stdout to a local telnet/TCP stream.
// We patch console (rather than the static Logger.onLog sink) so the stream also captures logs
// from separate/bundled copies of the Logger — e.g. the enterprise SSO plugin, which bundles its
// own @trigger.dev/core and logs via its own console.log, invisible to the webapp's onLog hook.
const telnetLogsPort = process.env.WEBAPP_TELNET_LOGS_PORT
? Number(process.env.WEBAPP_TELNET_LOGS_PORT)
: undefined;
if (telnetLogsPort && Number.isFinite(telnetLogsPort) && telnetLogsPort > 0) {
const telnetGlobal = globalThis as typeof globalThis & { __webappTelnetLogs?: boolean };
if (!telnetGlobal.__webappTelnetLogs) {
telnetGlobal.__webappTelnetLogs = true;
const telnetLogServer = startTelnetLogServer({ port: telnetLogsPort, name: "webapp" });
patchConsoleToTelnet(telnetLogServer, { pretty: true });
}
}
15 changes: 15 additions & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"./v3/utils/omit": "./src/v3/utils/omit.ts",
"./v3/utils/retries": "./src/v3/utils/retries.ts",
"./v3/utils/structuredLogger": "./src/v3/utils/structuredLogger.ts",
"./v3/telnetLogServer": "./src/v3/telnetLogServer.ts",
"./v3/test": "./src/v3/test/index.ts",
"./v3/zodfetch": "./src/v3/zodfetch.ts",
"./v3/zodMessageHandler": "./src/v3/zodMessageHandler.ts",
Expand Down Expand Up @@ -124,6 +125,9 @@
"v3/utils/structuredLogger": [
"dist/commonjs/v3/utils/structuredLogger.d.ts"
],
"v3/telnetLogServer": [
"dist/commonjs/v3/telnetLogServer.d.ts"
],
"v3/zodfetch": [
"dist/commonjs/v3/zodfetch.d.ts"
],
Expand Down Expand Up @@ -490,6 +494,17 @@
"default": "./dist/commonjs/v3/utils/structuredLogger.js"
}
},
"./v3/telnetLogServer": {
"import": {
"@triggerdotdev/source": "./src/v3/telnetLogServer.ts",
"types": "./dist/esm/v3/telnetLogServer.d.ts",
"default": "./dist/esm/v3/telnetLogServer.js"
},
"require": {
"types": "./dist/commonjs/v3/telnetLogServer.d.ts",
"default": "./dist/commonjs/v3/telnetLogServer.js"
}
},
"./v3/test": {
"import": {
"@triggerdotdev/source": "./src/v3/test/index.ts",
Expand Down
12 changes: 12 additions & 0 deletions packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export class Logger {
// Add a static "onError" method that will be called when an error is logged
static onError: (message: string, ...args: Array<Record<string, unknown> | undefined>) => void;

// Optional static sink called with the fully-structured log for every emitted line.
// Used (e.g.) to fan logs out to a dev-only telnet stream. Must not re-enter the Logger.
static onLog?: (structuredLog: Record<string, unknown>) => void;

constructor(
name: string,
level: LogLevel = "info",
Expand Down Expand Up @@ -134,6 +138,14 @@ export class Logger {
structuredLog.skipForwarding = true;
}

if (Logger.onLog) {
try {
Logger.onLog(structuredLog);
} catch {
// A sink must never break logging — and must never re-enter the Logger.
}
}

loggerFunction(JSON.stringify(structuredLog, this.#jsonReplacer));
}
}
Expand Down
162 changes: 162 additions & 0 deletions packages/core/src/v3/telnetLogServer.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import net from "node:net";
import { afterEach, describe, expect, test } from "vitest";
import { formatConsoleLine, formatLogLine, stripAnsi, TelnetLogServer } from "./telnetLogServer.js";

const servers: TelnetLogServer[] = [];

afterEach(() => {
while (servers.length) {
servers.pop()?.close();
}
});

/** Grab a port the OS just told us is free, then hand it to a TelnetLogServer. */
async function startServerOnFreePort(
name = "test"
): Promise<{ server: TelnetLogServer; port: number }> {
const probe = net.createServer();
const port = await listening(probe, 0);
await new Promise<void>((resolve) => probe.close(() => resolve()));

// A banner lets clients deterministically wait until the server has registered them
// (it's written in the connection handler), removing the connect/register race.
const server = new TelnetLogServer({ port, name, banner: "ready" });
servers.push(server);
server.start();
await delay(30);
return { server, port };
}

/** Connects and resolves only once the first bytes (the banner) arrive — server-side socket is registered by then. */
function connectAndCollect(port: number): Promise<{ socket: net.Socket; lines: () => string }> {
return new Promise((resolve, reject) => {
let buffer = "";
const socket = net.connect(port, "127.0.0.1");
socket.setEncoding("utf8");
socket.on("data", (chunk) => {
const first = buffer === "";
buffer += chunk;
if (first) {
resolve({ socket, lines: () => buffer });
}
});
socket.on("error", reject);
});
}

function listening(server: net.Server, port: number, host = "127.0.0.1"): Promise<number> {
return new Promise((resolve) => {
server.listen(port, host, () => {
const address = server.address();
resolve(typeof address === "object" && address ? address.port : port);
});
});
}

describe("stripAnsi", () => {
test("removes color escape codes", () => {
const colored = "2026-06-11 ERROR boom";
expect(stripAnsi(colored)).toBe("2026-06-11 ERROR boom");
});
});

describe("formatLogLine", () => {
test("formats a core Logger-shaped object (level/name)", () => {
const line = formatLogLine({
timestamp: new Date("2026-06-11T12:00:00.000Z"),
name: "webapp",
level: "info",
message: "queue drained",
count: 3,
});
expect(line).toBe("2026-06-11T12:00:00.000Z INFO [webapp] queue drained {count=3}");
});

test("formats a SimpleStructuredLogger-shaped object ($level/$name)", () => {
const line = formatLogLine({
timestamp: new Date("2026-06-11T12:00:00.000Z"),
$name: "supervisor",
$level: "warn",
message: "retrying",
attempt: 2,
});
expect(line).toBe("2026-06-11T12:00:00.000Z WARN [supervisor] retrying {attempt=2}");
});

test("omits empty name and extras", () => {
const line = formatLogLine({
timestamp: "2026-06-11T12:00:00.000Z",
level: "log",
message: "hello",
});
expect(line).toBe("2026-06-11T12:00:00.000Z LOG hello");
});
});

describe("formatConsoleLine", () => {
test("pretty-formats a JSON structured-log line", () => {
const raw = JSON.stringify({
timestamp: "2026-06-11T12:00:00.000Z",
name: "sso-plugin",
level: "info",
message: "sso.webhook.connection.activated: connection marked active",
connId: "conn_123",
});
expect(formatConsoleLine(raw)).toBe(
"2026-06-11T12:00:00.000Z INFO [sso-plugin] sso.webhook.connection.activated: connection marked active {connId=conn_123}"
);
});

test("passes non-JSON console output through unchanged", () => {
expect(formatConsoleLine("GET /healthcheck 200 1.2 ms")).toBe("GET /healthcheck 200 1.2 ms");
});

test("passes JSON that isn't a structured log through unchanged", () => {
const raw = JSON.stringify({ foo: "bar" });
expect(formatConsoleLine(raw)).toBe(raw);
});
});

describe("TelnetLogServer", () => {
test("broadcasts a line to a connected client", async () => {
const { server, port } = await startServerOnFreePort();
const client = await connectAndCollect(port);
server.broadcast("first line");
await delay(50);
expect(client.lines()).toContain("first line\r\n");
client.socket.destroy();
});

test("close() ends connected sockets", async () => {
const { server, port } = await startServerOnFreePort();
const client = await connectAndCollect(port);
server.broadcast("alive");
await delay(30);
expect(client.lines()).toContain("alive\r\n");

let closed = false;
client.socket.on("close", () => {
closed = true;
});
server.close();
await delay(30);
expect(closed).toBe(true);
});

test("EADDRINUSE does not throw", async () => {
const blocker = net.createServer();
const port = await listening(blocker, 0);

const server = new TelnetLogServer({ port, name: "test" });
servers.push(server);
// Must not throw even though the port is taken.
expect(() => server.start()).not.toThrow();
await delay(30);

blocker.close();
});
});

function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Loading
Loading