diff --git a/lib/async/bus/protocol/transaction.rb b/lib/async/bus/protocol/transaction.rb index cfed125..4d241ff 100644 --- a/lib/async/bus/protocol/transaction.rb +++ b/lib/async/bus/protocol/transaction.rb @@ -120,9 +120,20 @@ def invoke(name, arguments, options, &block) def accept(object, arguments, options, block_given) if block_given result = object.public_send(*arguments, **options) do |*yield_arguments| - self.write(Yield.new(@id, yield_arguments)) + begin + self.write(Yield.new(@id, yield_arguments)) + rescue IOError, EOFError, RuntimeError => error + # Connection closed or transaction closed - can't send yield + # Break out of the iteration since we can't communicate + break + end - response = self.read + begin + response = self.read + rescue ClosedQueueError, IOError, EOFError + # Connection closed - can't receive response + break + end case response when Next @@ -137,13 +148,28 @@ def accept(object, arguments, options, block_given) result = object.public_send(*arguments, **options) end - self.write(Return.new(@id, result)) + begin + self.write(Return.new(@id, result)) + rescue IOError, EOFError, RuntimeError => error + # Connection closed or transaction closed - can't send return + # This is expected when connection terminates mid-transaction + end rescue UncaughtThrowError => error # UncaughtThrowError has both tag and value attributes # Store both in the Throw message: result is tag, we'll add value handling - self.write(Throw.new(@id, [error.tag, error.value])) + begin + self.write(Throw.new(@id, [error.tag, error.value])) + rescue IOError, EOFError, RuntimeError + # Connection closed or transaction closed - can't send throw + # This is expected when connection terminates mid-transaction + end rescue => error - self.write(Error.new(@id, error)) + begin + self.write(Error.new(@id, error)) + rescue IOError, EOFError, RuntimeError + # Connection closed or transaction closed - can't send error + # This is expected when connection terminates mid-transaction + end end end end diff --git a/test/async/bus/protocol/transaction.rb b/test/async/bus/protocol/transaction.rb index 295e8de..27b6f11 100644 --- a/test/async/bus/protocol/transaction.rb +++ b/test/async/bus/protocol/transaction.rb @@ -237,5 +237,239 @@ def service.throw_method end end end + + with "connection termination during transaction" do + it "demonstrates unhandled RuntimeError when server crashes while client waits for response" do + # This test demonstrates that when a server connection is closed while + # a transaction is in-flight, the server task tries to write Return/Error + # but the transaction is already closed, causing an unhandled RuntimeError. + # + # Expected behavior: When the remote end closes the connection mid-transaction, + # the local end should fail with an exception, but that exception should be + # handled gracefully (not left as an unhandled exception in an async task). + server_connection = nil + server_started = Thread::Queue.new + close_connection = Thread::Queue.new + + start_server do |connection| + server_connection = connection + service = Object.new + def service.slow_method + # Signal that server has started processing + @server_started.push(:started) + # Wait for signal to close connection + @close_connection.pop + :result + end + + # Store queue references in service + service.instance_variable_set(:@server_started, server_started) + service.instance_variable_set(:@close_connection, close_connection) + + connection.bind(:service, service) + end + + client.connect do |connection| + # Start the invocation + invoke_task = Async do + connection[:service].slow_method + end + + # Wait for server to start processing + server_started.pop + + # Abruptly close the server connection + server_connection.close + + # Signal server to continue (it will try to write Return/Error) + close_connection.push(:close) + + # Wait for processing to complete + begin + invoke_task.wait + rescue + # Expected to fail + end + + # Currently, this causes unhandled RuntimeError: "Transaction is closed!" + # in the server task when it tries to write Return/Error after connection closes. + # The test passes but demonstrates the issue: exceptions should be caught and + # handled gracefully, not left as unhandled exceptions in async tasks. + end + end + + it "demonstrates unhandled RuntimeError when client crashes while server executes method" do + # This test demonstrates that when a client connection is closed while + # a server transaction is executing, the server task tries to write Return/Error + # but the transaction is already closed, causing an unhandled RuntimeError. + # + # Expected behavior: When the remote end closes the connection mid-transaction, + # the local end should fail with an exception, but that exception should be + # handled gracefully (not left as an unhandled exception in an async task). + client_connection = nil + server_started = Thread::Queue.new + close_connection = Thread::Queue.new + + start_server do |connection| + service = Object.new + def service.slow_method + # Signal that server has started processing + @server_started.push(:started) + # Wait for signal to close connection + @close_connection.pop + :result + end + + # Store queue references in service + service.instance_variable_set(:@server_started, server_started) + service.instance_variable_set(:@close_connection, close_connection) + + connection.bind(:service, service) + end + + client.connect do |connection| + client_connection = connection + + # Start the invocation + invoke_task = Async do + connection[:service].slow_method + end + + # Wait for server to start processing + server_started.pop + + # Abruptly close the client connection + client_connection.close + + # Signal server to continue (it will try to write Return/Error) + close_connection.push(:close) + + # Wait for processing to complete + begin + invoke_task.wait + rescue + # Expected to fail + end + + # Currently, this causes unhandled RuntimeError: "Transaction is closed!" + # in the server task when it tries to write Return/Error after connection closes. + # The test passes but demonstrates the issue: exceptions should be caught and + # handled gracefully, not left as unhandled exceptions in async tasks. + end + end + + it "demonstrates unhandled RuntimeError when server crashes during yield operation" do + # This test demonstrates that when a server connection is closed during + # a yield operation, the server task tries to write Yield but the transaction + # is already closed, causing an unhandled RuntimeError. + # + # Expected behavior: When the remote end closes the connection mid-transaction, + # the local end should fail with an exception, but that exception should be + # handled gracefully (not left as an unhandled exception in an async task). + server_connection = nil + first_yield_received = Thread::Queue.new + + start_server do |connection| + server_connection = connection + service = Object.new + def service.yielding_method + yield 1 + # Wait for connection to be closed + @first_yield_received.pop + # This yield will fail because connection closes + yield 2 + :done + end + + # Store queue reference in service + service.instance_variable_set(:@first_yield_received, first_yield_received) + + connection.bind(:service, service) + end + + client.connect do |connection| + # Start the invocation with yield + invoke_task = Async do + results = [] + connection[:service].yielding_method do |value| + results << value + + # Close server connection after first yield + if results.size == 1 + server_connection.close + first_yield_received.push(:received) + end + + :ack + end + end + + # Wait for processing to complete + begin + invoke_task.wait + rescue + # Expected to fail + end + + # Currently, this causes unhandled RuntimeError: "Transaction is closed!" + # in the server task when it tries to write Yield after connection closes. + # The test passes but demonstrates the issue: exceptions should be caught and + # handled gracefully, not left as unhandled exceptions in async tasks. + end + end + + it "handles client crash during server yield operation" do + client_connection = nil + first_yield_received = Thread::Queue.new + + start_server do |connection| + service = Object.new + def service.yielding_method + yield 1 + # Wait for connection to be closed + @first_yield_received.pop + # This yield will fail because client closes + yield 2 + :done + end + + # Store queue reference in service + service.instance_variable_set(:@first_yield_received, first_yield_received) + + connection.bind(:service, service) + end + + client.connect do |connection| + client_connection = connection + + # Start the invocation with yield + invoke_task = Async do + results = [] + connection[:service].yielding_method do |value| + results << value + + # Close client connection after first yield + if results.size == 1 + client_connection.close + first_yield_received.push(:received) + end + + :ack + end + end + + # Wait for processing to complete + begin + invoke_task.wait + rescue + # Expected to fail + end + + # When the remote end closes the connection mid-transaction, the local end + # should fail with an exception, but that exception should be handled gracefully + # (not left as an unhandled exception in an async task). + end + end + end end