fix(core): properly propagate stream cancellation on disconnect (#1349)#1354
fix(core): properly propagate stream cancellation on disconnect (#1349)#1354Sigmabrogz wants to merge 4 commits intovercel:mainfrom
Conversation
Fixes vercel#1349. This commit addresses the issue where `run.getReadable()`/`run.readable` do not properly propagate cancellation on disconnect. It implements the `cancel(reason)` method on `WorkflowServerReadableStream` to ensure it delegates cancellation to its inner reader. Additionally, this ensures that `flushablePipe` properly propagates the cancellation to the source stream instead of only releasing locks, thereby cleaning up listeners. Signed-off-by: Sigmabrogz <bnb1000bnb@gmail.com>
🦋 Changeset detectedLatest commit: 3b0cf54 The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
|
@Sigmabrogz is attempting to deploy a commit to the Vercel Labs Team on Vercel. A member of the Team first needs to authorize it. |
|
I've verified the PR contents, though it looks like it just needs Vercel deployment authorization (which requires a team member) and a changeset file. Should I add the changeset on my branch or let the maintainers handle it during merge? |
VaguelySerious
left a comment
There was a problem hiding this comment.
LGTM generally, two things we'd like to see before we can merge this:
- Could you add a test to
packages/core/src/serialization.test.ts packages/core/src/flushable-stream.test.tsorpackages/core/src/serialization.test.ts packages/core/src/flushable-stream.test.ts - Could you add a changset with a patch to
core? Runpnpm changesetto do this. The message you add will show up in the package changelog later
Also, we don't run vercel e2e tests on community PRs for security reasons, but I will ensure tests pass on our end before this PR gets merged
| // on `state.reject(err)` for error handling. | ||
|
|
||
| // Attempt to cancel the upstream reader so the source knows it should stop generating data. | ||
| reader.cancel(err).catch(() => {}); |
There was a problem hiding this comment.
Should all of these .catch(() => {}); statements instead log a warning? Presumably closing should work in most cases, and we'd like to know if it's not possible to close cleanly
There was a problem hiding this comment.
Let's log a warning for any cancel failures
|
Hi team, the PR is ready for review. The Vercel deployment just needs authorization from a team member. Thanks! |
Signed-off-by: Sigmabrogz <sigmabrogz@gmail.com>
|
Hi @VaguelySerious, thank you for the review! I've added a unit test to Ready for another look when you have a moment. |
|
Re-running e2e tests in #1407 |
VaguelySerious
left a comment
There was a problem hiding this comment.
the added unit tests are failing, see inline suggestions for a fix
| const chunks: string[] = []; | ||
| let sinkAborted = false; | ||
|
|
||
| // Create a sink that aborts (representing a dropped connection) | ||
| const mockSink = new WritableStream<string>({ | ||
| write(chunk) { | ||
| chunks.push(chunk); | ||
| }, | ||
| abort(reason) { | ||
| sinkAborted = true; | ||
| }, | ||
| }); | ||
|
|
||
| const { readable, writable } = new TransformStream<string, string>(); | ||
| const state = createFlushableState(); | ||
|
|
||
| // Start piping in background | ||
| const pipePromise = flushablePipe(readable, mockSink, state); | ||
|
|
||
| pollWritableLock(writable, state); | ||
|
|
||
| const userWriter = writable.getWriter(); | ||
| await userWriter.write('valid chunk'); | ||
|
|
||
| // Simulate a stream error / drop on the readable side (which aborts the pipe) | ||
| const error = new Error('Client disconnected'); | ||
| readable.cancel(error); |
There was a problem hiding this comment.
| const chunks: string[] = []; | |
| let sinkAborted = false; | |
| // Create a sink that aborts (representing a dropped connection) | |
| const mockSink = new WritableStream<string>({ | |
| write(chunk) { | |
| chunks.push(chunk); | |
| }, | |
| abort(reason) { | |
| sinkAborted = true; | |
| }, | |
| }); | |
| const { readable, writable } = new TransformStream<string, string>(); | |
| const state = createFlushableState(); | |
| // Start piping in background | |
| const pipePromise = flushablePipe(readable, mockSink, state); | |
| pollWritableLock(writable, state); | |
| const userWriter = writable.getWriter(); | |
| await userWriter.write('valid chunk'); | |
| // Simulate a stream error / drop on the readable side (which aborts the pipe) | |
| const error = new Error('Client disconnected'); | |
| readable.cancel(error); | |
| const chunks: string[] = []; | |
| let sinkAborted = false; | |
| // Create a sink that tracks writes and aborts (representing the response stream) | |
| const mockSink = new WritableStream<string>({ | |
| write(chunk) { | |
| chunks.push(chunk); | |
| }, | |
| }); | |
| // Use a custom ReadableStream with a controller so we can error it | |
| // externally. This simulates the source stream breaking (e.g., a client | |
| // disconnect that causes the readable side of the pipe to error). | |
| // Note: We cannot call readable.cancel() on a locked ReadableStream | |
| // (flushablePipe locks it via getReader()), so we use controller.error() | |
| // which propagates through the internal reader. | |
| let sourceController!: ReadableStreamDefaultController<string>; | |
| const source = new ReadableStream<string>({ | |
| start(controller) { | |
| sourceController = controller; | |
| }, | |
| }); | |
| const state = createFlushableState(); | |
| // Start piping in background | |
| const pipePromise = flushablePipe(source, mockSink, state).catch(() => { | |
| // Errors handled via state.reject | |
| }); | |
| // Enqueue a valid chunk through the source | |
| sourceController.enqueue('valid chunk'); | |
| // Allow the pipe to process the chunk | |
| await new Promise((r) => setTimeout(r, 50)); | |
| // Simulate a stream error / client disconnect on the source side. | |
| // controller.error() propagates to the internal reader held by flushablePipe, | |
| // causing reader.read() to reject, which triggers the catch block. | |
| sourceController.error(new Error('Client disconnected')); |
| // Write should fail because the underlying pipe broke | ||
| await expect(userWriter.write('another')).rejects.toThrow(); |
There was a problem hiding this comment.
| // Write should fail because the underlying pipe broke | |
| await expect(userWriter.write('another')).rejects.toThrow(); | |
| // Wait for the pipe to process the error | |
| await pipePromise; | |
| // State promise should reject with the disconnection error | |
| await expect(state.promise).rejects.toThrow('Client disconnected'); | |
|
|
||
| // State promise should reject with the cancellation error | ||
| await expect(state.promise).rejects.toThrow('Client disconnected'); | ||
|
|
||
| // Ensure the sink received the abort signal | ||
| expect(sinkAborted).toBe(true); |
There was a problem hiding this comment.
| // State promise should reject with the cancellation error | |
| await expect(state.promise).rejects.toThrow('Client disconnected'); | |
| // Ensure the sink received the abort signal | |
| expect(sinkAborted).toBe(true); | |
| // The first chunk should have been written before the error | |
| expect(chunks).toContain('valid chunk'); | |
| // Ensure the stream ended | |
| expect(state.streamEnded).toBe(true); |
| // on `state.reject(err)` for error handling. | ||
|
|
||
| // Attempt to cancel the upstream reader so the source knows it should stop generating data. | ||
| reader.cancel(err).catch(() => {}); |
There was a problem hiding this comment.
Let's log a warning for any cancel failures
|
I've pushed updates that address the test logic (the unit test correctly uses |
refactor(core): log warnings on reader cancel failures instead of swallowing Signed-off-by: Sigmabrogz <sigmabrogz@users.noreply.github.com> Made-with: Cursor
c2ce779 to
3b0cf54
Compare
Fixes #1349.
This PR addresses the issue where
run.getReadable()andrun.readabledo not correctly propagate cancellation when the client disconnects, leading to leaked stream listeners.Changes:
cancel(reason)method onWorkflowServerReadableStreamso it delegates cancellation to its inner reader (this.#reader.cancel(reason)).flushablePipeto propagate cancellation to the source stream viareader.cancel(err)in itscatchandfinallypaths, instead of simply releasing locks.flushablePipealso actively waits forwriter.closedrejection during reading to notice abrupt connection closes faster.This ensures the source correctly receives a cancellation signal rather than dangling.