diff --git a/lib/io/stream/readable.rb b/lib/io/stream/readable.rb index e627ee1..99b9d17 100644 --- a/lib/io/stream/readable.rb +++ b/lib/io/stream/readable.rb @@ -97,12 +97,19 @@ def read(size = nil, buffer = nil) end if size - until @finished or @read_buffer.bytesize >= size - # Compute the amount of data we need to read from the underlying stream: - read_size = size - @read_buffer.bytesize - - # Don't read less than @minimum_read_size to avoid lots of small reads: - fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) + if @finished or @read_buffer.bytesize >= size + # We have enough data in the read buffer, but we should still flush pending writes: + self.flush + else + while true + # Compute the amount of data we need to read from the underlying stream: + read_size = size - @read_buffer.bytesize + + # Don't read less than @minimum_read_size to avoid lots of small reads: + fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) + + break if @finished or @read_buffer.bytesize >= size + end end else until @finished diff --git a/releases.md b/releases.md index 240fab8..06f3530 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - Remove old OpenSSL method shims. + - Ensure `IO::Stream::Readable#read` calls `#flush` even if buffered data is sufficient to satisfy the read, to maintain consistent behavior. ## v0.11.0 diff --git a/test/io/stream/buffered.rb b/test/io/stream/buffered.rb index d8a9cef..3a447c5 100644 --- a/test/io/stream/buffered.rb +++ b/test/io/stream/buffered.rb @@ -1011,6 +1011,8 @@ def after(error = nil) end describe "Socket.pair" do + include Sus::Fixtures::Async::ReactorContext + let(:sockets) {Socket.pair(:UNIX, :STREAM)} let(:client) {IO::Stream::Buffered.wrap(sockets[0])} let(:server) {IO::Stream::Buffered.wrap(sockets[1])} @@ -1022,6 +1024,29 @@ def after(error = nil) it_behaves_like AUnidirectionalStream it_behaves_like ABidirectionalStream + + it "flushes pending writes even when read buffer already has data" do + # When read-ahead pulls more data than requested, a subsequent read can + # complete from the buffer without calling fill_read_buffer — skipping + # flush. This test verifies that pending writes are still flushed. + task_a = reactor.async do + client.write("A1") + data = client.read_exactly(2) + client.write("A2") + client.read_exactly(2) + end + + task_b = reactor.async do + server.write("B1") + server.read_exactly(2) + server.write("B2") + server.read_exactly(2) + end + + # Without the fix, this deadlocks because A2 is never flushed. + task_a.wait + task_b.wait + end end describe TCPSocket do