Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 31 additions & 5 deletions lib/async/bus/protocol/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,20 @@ def invoke(name, arguments, options, &block)
def accept(object, arguments, options, block_given)
if block_given
result = object.public_send(*arguments, **options) do |*yield_arguments|
self.write(Yield.new(@id, yield_arguments))
begin
self.write(Yield.new(@id, yield_arguments))
rescue IOError, EOFError, RuntimeError => error
# Connection closed or transaction closed - can't send yield
# Break out of the iteration since we can't communicate
break
end

response = self.read
begin
response = self.read
rescue ClosedQueueError, IOError, EOFError
# Connection closed - can't receive response
break
end

case response
when Next
Expand All @@ -137,13 +148,28 @@ def accept(object, arguments, options, block_given)
result = object.public_send(*arguments, **options)
end

self.write(Return.new(@id, result))
begin
self.write(Return.new(@id, result))
rescue IOError, EOFError, RuntimeError => error
# Connection closed or transaction closed - can't send return
# This is expected when connection terminates mid-transaction
end
rescue UncaughtThrowError => error
# UncaughtThrowError has both tag and value attributes
# Store both in the Throw message: result is tag, we'll add value handling
self.write(Throw.new(@id, [error.tag, error.value]))
begin
self.write(Throw.new(@id, [error.tag, error.value]))
rescue IOError, EOFError, RuntimeError
# Connection closed or transaction closed - can't send throw
# This is expected when connection terminates mid-transaction
end
rescue => error
self.write(Error.new(@id, error))
begin
self.write(Error.new(@id, error))
rescue IOError, EOFError, RuntimeError
# Connection closed or transaction closed - can't send error
# This is expected when connection terminates mid-transaction
end
end
end
end
Expand Down
234 changes: 234 additions & 0 deletions test/async/bus/protocol/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,5 +237,239 @@ def service.throw_method
end
end
end

with "connection termination during transaction" do
it "demonstrates unhandled RuntimeError when server crashes while client waits for response" do
# This test demonstrates that when a server connection is closed while
# a transaction is in-flight, the server task tries to write Return/Error
# but the transaction is already closed, causing an unhandled RuntimeError.
#
# Expected behavior: When the remote end closes the connection mid-transaction,
# the local end should fail with an exception, but that exception should be
# handled gracefully (not left as an unhandled exception in an async task).
server_connection = nil
server_started = Thread::Queue.new
close_connection = Thread::Queue.new

start_server do |connection|
server_connection = connection
service = Object.new
def service.slow_method
# Signal that server has started processing
@server_started.push(:started)
# Wait for signal to close connection
@close_connection.pop
:result
end

# Store queue references in service
service.instance_variable_set(:@server_started, server_started)
service.instance_variable_set(:@close_connection, close_connection)

connection.bind(:service, service)
end

client.connect do |connection|
# Start the invocation
invoke_task = Async do
connection[:service].slow_method
end

# Wait for server to start processing
server_started.pop

# Abruptly close the server connection
server_connection.close

# Signal server to continue (it will try to write Return/Error)
close_connection.push(:close)

# Wait for processing to complete
begin
invoke_task.wait
rescue
# Expected to fail
end

# Currently, this causes unhandled RuntimeError: "Transaction is closed!"
# in the server task when it tries to write Return/Error after connection closes.
# The test passes but demonstrates the issue: exceptions should be caught and
# handled gracefully, not left as unhandled exceptions in async tasks.
end
end

it "demonstrates unhandled RuntimeError when client crashes while server executes method" do
# This test demonstrates that when a client connection is closed while
# a server transaction is executing, the server task tries to write Return/Error
# but the transaction is already closed, causing an unhandled RuntimeError.
#
# Expected behavior: When the remote end closes the connection mid-transaction,
# the local end should fail with an exception, but that exception should be
# handled gracefully (not left as an unhandled exception in an async task).
client_connection = nil
server_started = Thread::Queue.new
close_connection = Thread::Queue.new

start_server do |connection|
service = Object.new
def service.slow_method
# Signal that server has started processing
@server_started.push(:started)
# Wait for signal to close connection
@close_connection.pop
:result
end

# Store queue references in service
service.instance_variable_set(:@server_started, server_started)
service.instance_variable_set(:@close_connection, close_connection)

connection.bind(:service, service)
end

client.connect do |connection|
client_connection = connection

# Start the invocation
invoke_task = Async do
connection[:service].slow_method
end

# Wait for server to start processing
server_started.pop

# Abruptly close the client connection
client_connection.close

# Signal server to continue (it will try to write Return/Error)
close_connection.push(:close)

# Wait for processing to complete
begin
invoke_task.wait
rescue
# Expected to fail
end

# Currently, this causes unhandled RuntimeError: "Transaction is closed!"
# in the server task when it tries to write Return/Error after connection closes.
# The test passes but demonstrates the issue: exceptions should be caught and
# handled gracefully, not left as unhandled exceptions in async tasks.
end
end

it "demonstrates unhandled RuntimeError when server crashes during yield operation" do
# This test demonstrates that when a server connection is closed during
# a yield operation, the server task tries to write Yield but the transaction
# is already closed, causing an unhandled RuntimeError.
#
# Expected behavior: When the remote end closes the connection mid-transaction,
# the local end should fail with an exception, but that exception should be
# handled gracefully (not left as an unhandled exception in an async task).
server_connection = nil
first_yield_received = Thread::Queue.new

start_server do |connection|
server_connection = connection
service = Object.new
def service.yielding_method
yield 1
# Wait for connection to be closed
@first_yield_received.pop
# This yield will fail because connection closes
yield 2
:done
end

# Store queue reference in service
service.instance_variable_set(:@first_yield_received, first_yield_received)

connection.bind(:service, service)
end

client.connect do |connection|
# Start the invocation with yield
invoke_task = Async do
results = []
connection[:service].yielding_method do |value|
results << value

# Close server connection after first yield
if results.size == 1
server_connection.close
first_yield_received.push(:received)
end

:ack
end
end

# Wait for processing to complete
begin
invoke_task.wait
rescue
# Expected to fail
end

# Currently, this causes unhandled RuntimeError: "Transaction is closed!"
# in the server task when it tries to write Yield after connection closes.
# The test passes but demonstrates the issue: exceptions should be caught and
# handled gracefully, not left as unhandled exceptions in async tasks.
end
end

it "handles client crash during server yield operation" do
client_connection = nil
first_yield_received = Thread::Queue.new

start_server do |connection|
service = Object.new
def service.yielding_method
yield 1
# Wait for connection to be closed
@first_yield_received.pop
# This yield will fail because client closes
yield 2
:done
end

# Store queue reference in service
service.instance_variable_set(:@first_yield_received, first_yield_received)

connection.bind(:service, service)
end

client.connect do |connection|
client_connection = connection

# Start the invocation with yield
invoke_task = Async do
results = []
connection[:service].yielding_method do |value|
results << value

# Close client connection after first yield
if results.size == 1
client_connection.close
first_yield_received.push(:received)
end

:ack
end
end

# Wait for processing to complete
begin
invoke_task.wait
rescue
# Expected to fail
end

# When the remote end closes the connection mid-transaction, the local end
# should fail with an exception, but that exception should be handled gracefully
# (not left as an unhandled exception in an async task).
end
end
end
end

Loading