Skip to content

fix(core): properly propagate stream cancellation on disconnect (#1349)#1354

Open
Sigmabrogz wants to merge 4 commits intovercel:mainfrom
Sigmabrogz:fix-1349-stream-cancel
Open

fix(core): properly propagate stream cancellation on disconnect (#1349)#1354
Sigmabrogz wants to merge 4 commits intovercel:mainfrom
Sigmabrogz:fix-1349-stream-cancel

Conversation

@Sigmabrogz
Copy link

Fixes #1349.

This PR addresses the issue where run.getReadable() and run.readable do not correctly propagate cancellation when the client disconnects, leading to leaked stream listeners.

Changes:

  • Added the cancel(reason) method on WorkflowServerReadableStream so it delegates cancellation to its inner reader (this.#reader.cancel(reason)).
  • Modified flushablePipe to propagate cancellation to the source stream via reader.cancel(err) in its catch and finally paths, instead of simply releasing locks.
  • flushablePipe also actively waits for writer.closed rejection during reading to notice abrupt connection closes faster.

This ensures the source correctly receives a cancellation signal rather than dangling.

Fixes vercel#1349.

This commit addresses the issue where `run.getReadable()`/`run.readable` do not
properly propagate cancellation on disconnect. It implements the `cancel(reason)`
method on `WorkflowServerReadableStream` to ensure it delegates cancellation
to its inner reader.

Additionally, this ensures that `flushablePipe` properly propagates the cancellation
to the source stream instead of only releasing locks, thereby cleaning up listeners.

Signed-off-by: Sigmabrogz <bnb1000bnb@gmail.com>
@Sigmabrogz Sigmabrogz requested a review from a team as a code owner March 12, 2026 19:07
@changeset-bot
Copy link

changeset-bot bot commented Mar 12, 2026

🦋 Changeset detected

Latest commit: 3b0cf54

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 16 packages
Name Type
@workflow/core Patch
@workflow/builders Patch
@workflow/cli Patch
@workflow/next Patch
@workflow/nitro Patch
@workflow/vitest Patch
@workflow/web-shared Patch
workflow Patch
@workflow/world-testing Patch
@workflow/astro Patch
@workflow/nest Patch
@workflow/rollup Patch
@workflow/sveltekit Patch
@workflow/vite Patch
@workflow/nuxt Patch
@workflow/ai Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@vercel
Copy link
Contributor

vercel bot commented Mar 12, 2026

@Sigmabrogz is attempting to deploy a commit to the Vercel Labs Team on Vercel.

A member of the Team first needs to authorize it.

@Sigmabrogz
Copy link
Author

I've verified the PR contents, though it looks like it just needs Vercel deployment authorization (which requires a team member) and a changeset file. Should I add the changeset on my branch or let the maintainers handle it during merge?

Copy link
Member

@VaguelySerious VaguelySerious left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM generally, two things we'd like to see before we can merge this:

  • Could you add a test to packages/core/src/serialization.test.ts packages/core/src/flushable-stream.test.ts or packages/core/src/serialization.test.ts packages/core/src/flushable-stream.test.ts
  • Could you add a changset with a patch to core? Run pnpm changeset to do this. The message you add will show up in the package changelog later

Also, we don't run vercel e2e tests on community PRs for security reasons, but I will ensure tests pass on our end before this PR gets merged

// on `state.reject(err)` for error handling.

// Attempt to cancel the upstream reader so the source knows it should stop generating data.
reader.cancel(err).catch(() => {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should all of these .catch(() => {}); statements instead log a warning? Presumably closing should work in most cases, and we'd like to know if it's not possible to close cleanly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log a warning for any cancel failures

@Sigmabrogz
Copy link
Author

Hi team, the PR is ready for review. The Vercel deployment just needs authorization from a team member. Thanks!

Signed-off-by: Sigmabrogz <sigmabrogz@gmail.com>
@Sigmabrogz
Copy link
Author

Hi @VaguelySerious, thank you for the review! I've added a unit test to packages/core/src/flushable-stream.test.ts that specifically asserts the abort signal propagates back up flushablePipe when the reader disconnects early. I've also run pnpm changeset to add the patch note for @workflow/core!

Ready for another look when you have a moment.

// Simulate a stream error / drop on the readable side (which aborts the pipe)
const error = new Error('Client disconnected');
readable.cancel(error);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test calls readable.cancel(error) on a ReadableStream that is locked by flushablePipe, causing the cancellation to throw rather than test the intended disconnect behavior. Additionally, pipePromise is not awaited or caught, risking unhandled promise rejections.

Fix on Vercel

@VaguelySerious
Copy link
Member

Re-running e2e tests in #1407

Copy link
Member

@VaguelySerious VaguelySerious left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the added unit tests are failing, see inline suggestions for a fix

Comment on lines +113 to +139
const chunks: string[] = [];
let sinkAborted = false;

// Create a sink that aborts (representing a dropped connection)
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
abort(reason) {
sinkAborted = true;
},
});

const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();

// Start piping in background
const pipePromise = flushablePipe(readable, mockSink, state);

pollWritableLock(writable, state);

const userWriter = writable.getWriter();
await userWriter.write('valid chunk');

// Simulate a stream error / drop on the readable side (which aborts the pipe)
const error = new Error('Client disconnected');
readable.cancel(error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const chunks: string[] = [];
let sinkAborted = false;
// Create a sink that aborts (representing a dropped connection)
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
abort(reason) {
sinkAborted = true;
},
});
const { readable, writable } = new TransformStream<string, string>();
const state = createFlushableState();
// Start piping in background
const pipePromise = flushablePipe(readable, mockSink, state);
pollWritableLock(writable, state);
const userWriter = writable.getWriter();
await userWriter.write('valid chunk');
// Simulate a stream error / drop on the readable side (which aborts the pipe)
const error = new Error('Client disconnected');
readable.cancel(error);
const chunks: string[] = [];
let sinkAborted = false;
// Create a sink that tracks writes and aborts (representing the response stream)
const mockSink = new WritableStream<string>({
write(chunk) {
chunks.push(chunk);
},
});
// Use a custom ReadableStream with a controller so we can error it
// externally. This simulates the source stream breaking (e.g., a client
// disconnect that causes the readable side of the pipe to error).
// Note: We cannot call readable.cancel() on a locked ReadableStream
// (flushablePipe locks it via getReader()), so we use controller.error()
// which propagates through the internal reader.
let sourceController!: ReadableStreamDefaultController<string>;
const source = new ReadableStream<string>({
start(controller) {
sourceController = controller;
},
});
const state = createFlushableState();
// Start piping in background
const pipePromise = flushablePipe(source, mockSink, state).catch(() => {
// Errors handled via state.reject
});
// Enqueue a valid chunk through the source
sourceController.enqueue('valid chunk');
// Allow the pipe to process the chunk
await new Promise((r) => setTimeout(r, 50));
// Simulate a stream error / client disconnect on the source side.
// controller.error() propagates to the internal reader held by flushablePipe,
// causing reader.read() to reject, which triggers the catch block.
sourceController.error(new Error('Client disconnected'));

Comment on lines +141 to +142
// Write should fail because the underlying pipe broke
await expect(userWriter.write('another')).rejects.toThrow();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Write should fail because the underlying pipe broke
await expect(userWriter.write('another')).rejects.toThrow();
// Wait for the pipe to process the error
await pipePromise;
// State promise should reject with the disconnection error
await expect(state.promise).rejects.toThrow('Client disconnected');

Comment on lines +143 to +148

// State promise should reject with the cancellation error
await expect(state.promise).rejects.toThrow('Client disconnected');

// Ensure the sink received the abort signal
expect(sinkAborted).toBe(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// State promise should reject with the cancellation error
await expect(state.promise).rejects.toThrow('Client disconnected');
// Ensure the sink received the abort signal
expect(sinkAborted).toBe(true);
// The first chunk should have been written before the error
expect(chunks).toContain('valid chunk');
// Ensure the stream ended
expect(state.streamEnded).toBe(true);

// on `state.reject(err)` for error handling.

// Attempt to cancel the upstream reader so the source knows it should stop generating data.
reader.cancel(err).catch(() => {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's log a warning for any cancel failures

@Sigmabrogz
Copy link
Author

I've pushed updates that address the test logic (the unit test correctly uses controller.error() rather than trying to cancel a locked readable, resolving the hanging promises), and I've updated the empty .catch(() => {}) statements to log warnings when a reader fails to cancel, per the feedback. Thanks for reviewing! Let me know if everything else looks good.

refactor(core): log warnings on reader cancel failures instead of swallowing

Signed-off-by: Sigmabrogz <sigmabrogz@users.noreply.github.com>
Made-with: Cursor
@Sigmabrogz Sigmabrogz force-pushed the fix-1349-stream-cancel branch from c2ce779 to 3b0cf54 Compare March 17, 2026 12:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

run.getReadable()/run.readable do not propagate cancel on disconnect, leaking stream listeners

2 participants