Flush pending writes before reading from buffer#12
Conversation
221a57b to
db27c24
Compare
Replace hand-rolled SocketIO wrapper and ZMTP.read_exact with IO::Stream::Buffered from the io-stream gem. This brings read-ahead buffering (fewer syscalls for frame parsing), automatic TCP_NODELAY, and read_exactly for exact-byte reads. Deleted: - TCP::SocketIO class - ZMTP.read_exact / Mechanism module - String#rsplit_host_port monkey-patch - TCP keepalive options (tcp_keepalive_*) TCP throughput improved 20-28% from read-ahead buffering. Uses minimum_write_size: 0 to work around socketry/io-stream#12.
db27c24 to
308b84c
Compare
When read-ahead pulls more data than requested, subsequent reads can complete from the buffer without calling fill_read_buffer. This skips the flush call inside fill_read_buffer, leaving pending writes unsent. In bidirectional protocols (e.g. ZMTP handshake), this causes deadlock: fiber A's write sits in the buffer while fiber B blocks waiting for it. Move the flush to the top of #read so it always runs, regardless of whether the read buffer already has enough data.
308b84c to
d17a0c4
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a deadlock in bidirectional/Async usage by ensuring buffered writes are flushed even when a read(size) can be satisfied entirely from the existing read-ahead buffer.
Changes:
- Update
IO::Stream::Readable#readto flush pending writes even when the read buffer already contains enough data to satisfy the request. - Add an Async regression test using
Socket.pairto cover the “read-ahead leaves extra bytes in buffer” scenario.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
lib/io/stream/readable.rb |
Flushes pending writes on buffer-hit paths in #read(size) to prevent write starvation/deadlocks. |
test/io/stream/buffered.rb |
Adds a regression test using Sus::Fixtures::Async::ReactorContext to reproduce/guard the deadlock scenario. |
|
|
||
| # Don't read less than @minimum_read_size to avoid lots of small reads: | ||
| fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) | ||
| if @finished or @read_buffer.bytesize >= size |
There was a problem hiding this comment.
read(size) now calls flush when @finished is true. That means a buffered read after EOF can raise (e.g. EPIPE/IOError from flushing pending writes) and prevent returning data that’s already in @read_buffer, which is a behavior change from before. If the intent is only to fix the “buffer satisfied size so fill_read_buffer is skipped” case, consider only flushing when @read_buffer.bytesize >= size (or otherwise ensure EOF-buffered reads can still succeed without being interrupted by write-side errors).
| if @finished or @read_buffer.bytesize >= size | |
| if @read_buffer.bytesize >= size |
| while true | ||
| # Compute the amount of data we need to read from the underlying stream: | ||
| read_size = size - @read_buffer.bytesize | ||
|
|
||
| # Don't read less than @minimum_read_size to avoid lots of small reads: | ||
| fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) | ||
|
|
||
| break if @finished or @read_buffer.bytesize >= size | ||
| end |
There was a problem hiding this comment.
This addresses the flush-skipped path for #read, but other APIs (e.g. read_partial, peek, gets/read_until) can also return purely from @read_buffer without calling fill_read_buffer (which is where flushing currently happens). That can leave the same “pending writes never flushed” hazard for those methods. Consider centralizing “flush before serving data from the read buffer” in a shared helper (or applying the same flush-on-buffer-hit behavior to the other read entry points) to keep semantics consistent.
| task_a.wait | ||
| task_b.wait |
There was a problem hiding this comment.
As written, a regression will manifest as a deadlock and this example will hang indefinitely (waiting on task_a/task_b) rather than failing fast. Please wrap the waits in a bounded timeout (using the Async/Sus reactor context utilities) so CI gets a deterministic failure instead of a stuck test run.
| task_a.wait | |
| task_b.wait | |
| reactor.with_timeout(1) do | |
| task_a.wait | |
| task_b.wait | |
| end |
| # flush. This test verifies that pending writes are still flushed. | ||
| task_a = reactor.async do | ||
| client.write("A1") | ||
| data = client.read_exactly(2) |
There was a problem hiding this comment.
data = client.read_exactly(2) assigns to data but the value is never used. Consider removing the assignment (or asserting on the returned value) to keep the test focused and avoid unused locals.
| data = client.read_exactly(2) | |
| client.read_exactly(2) |
|
I merged this, but I'm having 2nd thoughts about it. Implicit flushing adds overhead and it also messes with interleaved reads and writes across fibers. Also, as copilot pointed out, it would have to go in multiple methods. Honestly, it's probably better to terminate messages with flush explicitly. |
|
For example, it's entirely possible to have this: It seems like a better direction would be to remove the implicit flush to reduce contention. |
|
This was interesting to read. But I wanna apologize for wasting your time, probably. Me and Claude were a little too PR-happy. Turns out all we had to do was flush after queueing a message send, duh. No workaround needed. Huge perf wins. Sorry again. |
|
You don't need to apologise, I think what we should do is improve the documentation to cover the recommended patterns if not already. |
|
I wrote some documentation: https://socketry.github.io/io-stream/guides/high-performance-io/index |
|
Thanks a lot. Great writeup! In my omq gem I've actually implemented proper batch flushing in the meantime. The send pump waits for data (messages) in the queue, greedy drain (up to 64), flush, so latency is still good. 3-4x better throughput. It's nuts! And not janky! |
Problem
When read-ahead pulls more data than a single
read/read_exactlycall needs, the excess stays in@read_buffer. A subsequentread(size)finds@read_buffer.bytesize >= sizeand returns directly from the buffer without entering thefill_read_bufferloop — which meansflushis never called.Any data sitting in the write buffer is silently held back.
Deadlock scenario
In a bidirectional protocol (e.g. ZMTP handshake) where two fibers exchange data over a socket pair under Async:
read_exactly(2)→fill_read_buffer→ flushes "A1",sysreadyields (:wait_readable)read_exactly(2)→fill_read_buffer→ flushes "B1",sysreadreads "A1" → returnsread_exactly(2)→fill_read_buffer→ flushes "B2",sysreadyieldssysreadreturns "B1B2" (4 bytes — read-ahead pulled both)read_exactly(2)consumes "B1" (2 bytes), "B2" remains in bufferread_exactly(2)@read_buffer.bytesize >= size→ loop skipped →flushnever called → "A2" never sentReproduction
Fix
Call
flushat the top of#readbefore checking the buffer, so pending writes are always sent before any read — even when the read buffer already has enough data from a previous read-ahead.Test plan