Skip to content

Commit 6c2616b

Browse files
fix: raise TimeoutError on ZMQ retry exhaustion instead of returning None (#393)
- recv_json_with_retry() now raises TimeoutError after 5 failed attempts instead of returning None - send_json_with_retry() now raises TimeoutError after 5 failed attempts instead of silently returning None - read() catches TimeoutError explicitly and returns (default_return_val, False) - write() catches TimeoutError explicitly and logs error without crashing - Patched time.sleep in retry tests to avoid ~2.5s real sleeping per test - Fixed ruff formatting in test files Closes #393
1 parent b6cfcaa commit 6c2616b

4 files changed

Lines changed: 194 additions & 17 deletions

File tree

concore_base.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ def send_json_with_retry(self, message):
5959
except zmq.Again:
6060
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
6161
time.sleep(0.5)
62-
logger.error("Failed to send after retries.")
63-
return
62+
raise TimeoutError(f"ZMQ send failed after 5 retries on {self.address}")
6463

6564
def recv_json_with_retry(self):
6665
"""Receive JSON message with retries if timeout occurs."""
@@ -70,8 +69,7 @@ def recv_json_with_retry(self):
7069
except zmq.Again:
7170
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
7271
time.sleep(0.5)
73-
logger.error("Failed to receive after retries.")
74-
return None
72+
raise TimeoutError(f"ZMQ recv failed after 5 retries on {self.address}")
7573

7674

7775
def init_zmq_port(mod, port_name, port_type, address, socket_type_str):
@@ -282,6 +280,10 @@ def read(mod, port_identifier, name, initstr_val):
282280
return message[1:], True
283281
last_read_status = "SUCCESS"
284282
return message, True
283+
except TimeoutError as e:
284+
logger.error(f"ZMQ recv timeout on port {port_identifier} (name: {name}): {e}. Returning default.")
285+
last_read_status = "TIMEOUT"
286+
return default_return_val, False
285287
except zmq.error.ZMQError as e:
286288
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
287289
last_read_status = "TIMEOUT"
@@ -384,6 +386,8 @@ def write(mod, port_identifier, name, val, delta=0):
384386
# Mutation breaks cross-language determinism (see issue #385).
385387
else:
386388
zmq_p.send_json_with_retry(zmq_val)
389+
except TimeoutError as e:
390+
logger.error(f"ZMQ send timeout on port {port_identifier} (name: {name}): {e}")
387391
except zmq.error.ZMQError as e:
388392
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
389393
except Exception as e:

tests/test_concore.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
import os
33
import numpy as np
4+
from unittest.mock import patch
45

56

67
class TestSafeLiteralEval:
@@ -450,3 +451,97 @@ def test_write_timestamp_matches_cpp_semantics(self, temp_dir):
450451
"After 3 writes with delta=1 simtime must remain 0 "
451452
"(matching C++/MATLAB/Verilog); got %s" % concore.simtime
452453
)
454+
455+
456+
# ===================================================================
457+
# ZMQ Retry Exhaustion Tests (Issue #393)
458+
# ===================================================================
459+
460+
461+
class TestZMQRetryExhaustion:
462+
"""Tests for issue #393 — TimeoutError on retry exhaustion."""
463+
464+
@pytest.fixture(autouse=True)
465+
def reset_zmq_ports(self):
466+
import concore
467+
468+
original_ports = concore.zmq_ports.copy()
469+
yield
470+
concore.zmq_ports.clear()
471+
concore.zmq_ports.update(original_ports)
472+
473+
@pytest.fixture(autouse=True)
474+
def reset_simtime(self):
475+
import concore
476+
477+
old_simtime = concore.simtime
478+
yield
479+
concore.simtime = old_simtime
480+
481+
@patch("concore_base.time.sleep")
482+
def test_recv_json_with_retry_raises_timeout_error(self, mock_sleep):
483+
"""recv_json_with_retry must raise TimeoutError after 5 failed attempts."""
484+
from concore import ZeroMQPort
485+
from unittest.mock import MagicMock, patch
486+
import zmq
487+
488+
with patch.object(ZeroMQPort, "__init__", lambda self, *a, **kw: None):
489+
port = ZeroMQPort.__new__(ZeroMQPort)
490+
port.socket = MagicMock()
491+
port.socket.recv_json.side_effect = zmq.Again()
492+
port.address = "tcp://test:5555"
493+
494+
with pytest.raises(TimeoutError, match="ZMQ recv failed after 5 retries"):
495+
port.recv_json_with_retry()
496+
497+
assert port.socket.recv_json.call_count == 5
498+
499+
@patch("concore_base.time.sleep")
500+
def test_send_json_with_retry_raises_timeout_error(self, mock_sleep):
501+
"""send_json_with_retry must raise TimeoutError after 5 failed attempts."""
502+
from concore import ZeroMQPort
503+
from unittest.mock import MagicMock, patch
504+
import zmq
505+
506+
with patch.object(ZeroMQPort, "__init__", lambda self, *a, **kw: None):
507+
port = ZeroMQPort.__new__(ZeroMQPort)
508+
port.socket = MagicMock()
509+
port.socket.send_json.side_effect = zmq.Again()
510+
port.address = "tcp://test:5555"
511+
512+
with pytest.raises(TimeoutError, match="ZMQ send failed after 5 retries"):
513+
port.send_json_with_retry({"test": "data"})
514+
515+
assert port.socket.send_json.call_count == 5
516+
517+
def test_read_returns_default_on_zmq_timeout(self):
518+
"""read() must return default_return_val when recv exhausts retries, not None."""
519+
import concore
520+
521+
class MockZMQPort:
522+
def recv_json_with_retry(self):
523+
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")
524+
525+
concore.zmq_ports["test_timeout_port"] = MockZMQPort()
526+
concore.simtime = 0
527+
528+
result, ok = concore.read("test_timeout_port", "test_name", "[1.0, 2.0]")
529+
530+
assert result == [1.0, 2.0], (
531+
"read() must return default_return_val on TimeoutError, got %s" % result
532+
)
533+
assert ok is False
534+
535+
def test_write_does_not_crash_on_zmq_send_timeout(self):
536+
"""write() must handle TimeoutError from send gracefully."""
537+
import concore
538+
539+
class MockZMQPort:
540+
def send_json_with_retry(self, message):
541+
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")
542+
543+
concore.zmq_ports["test_timeout_port"] = MockZMQPort()
544+
concore.simtime = 0
545+
546+
# Should not raise — just log the error
547+
concore.write("test_timeout_port", "test_name", [1.0, 2.0])

tests/test_concoredocker.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,3 +247,61 @@ def recv_json_with_retry(self):
247247

248248
assert result == original
249249
assert ok is True
250+
251+
252+
# ===================================================================
253+
# ZMQ Retry Exhaustion Tests (Issue #393)
254+
# ===================================================================
255+
256+
257+
class TestZMQRetryExhaustion:
258+
"""Tests for issue #393 — TimeoutError on retry exhaustion via concoredocker."""
259+
260+
@pytest.fixture(autouse=True)
261+
def reset_zmq_ports(self):
262+
import concoredocker
263+
264+
original_ports = concoredocker.zmq_ports.copy()
265+
yield
266+
concoredocker.zmq_ports.clear()
267+
concoredocker.zmq_ports.update(original_ports)
268+
269+
@pytest.fixture(autouse=True)
270+
def reset_simtime(self):
271+
import concoredocker
272+
273+
old_simtime = concoredocker.simtime
274+
yield
275+
concoredocker.simtime = old_simtime
276+
277+
def test_read_returns_default_on_zmq_timeout(self):
278+
"""read() must return default_return_val when recv exhausts retries, not None."""
279+
import concoredocker
280+
281+
class MockZMQPort:
282+
def recv_json_with_retry(self):
283+
raise TimeoutError("ZMQ recv failed after 5 retries on tcp://test:5555")
284+
285+
concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
286+
concoredocker.simtime = 0
287+
288+
result, ok = concoredocker.read("test_timeout_port", "test_name", "[1.0, 2.0]")
289+
290+
assert result == [1.0, 2.0], (
291+
"read() must return default_return_val on TimeoutError, got %s" % result
292+
)
293+
assert ok is False
294+
295+
def test_write_does_not_crash_on_zmq_send_timeout(self):
296+
"""write() must handle TimeoutError from send gracefully."""
297+
import concoredocker
298+
299+
class MockZMQPort:
300+
def send_json_with_retry(self, message):
301+
raise TimeoutError("ZMQ send failed after 5 retries on tcp://test:5555")
302+
303+
concoredocker.zmq_ports["test_timeout_port"] = MockZMQPort()
304+
concoredocker.simtime = 0
305+
306+
# Should not raise — just log the error
307+
concoredocker.write("test_timeout_port", "test_name", [1.0, 2.0])

tests/test_read_status.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# Helpers
1414
# ---------------------------------------------------------------------------
1515

16+
1617
class DummyZMQPort:
1718
"""Minimal stand-in for ZeroMQPort used in ZMQ read tests."""
1819

@@ -33,22 +34,24 @@ def recv_json_with_retry(self):
3334
# File-based read tests
3435
# ---------------------------------------------------------------------------
3536

37+
3638
class TestReadFileSuccess:
3739
"""read() on a valid file returns (data, True) with SUCCESS status."""
3840

3941
@pytest.fixture(autouse=True)
4042
def setup(self, temp_dir, monkeypatch):
4143
import concore
44+
4245
self.concore = concore
43-
monkeypatch.setattr(concore, 'delay', 0)
46+
monkeypatch.setattr(concore, "delay", 0)
4447

4548
# Create ./in1/ym with valid data: [simtime, value]
4649
in_dir = os.path.join(temp_dir, "in1")
4750
os.makedirs(in_dir, exist_ok=True)
4851
with open(os.path.join(in_dir, "ym"), "w") as f:
4952
f.write("[10, 3.14]")
5053

51-
monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in"))
54+
monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in"))
5255

5356
def test_returns_data_and_true(self):
5457
data, ok = self.concore.read(1, "ym", "[0, 0.0]")
@@ -66,10 +69,11 @@ class TestReadFileMissing:
6669
@pytest.fixture(autouse=True)
6770
def setup(self, temp_dir, monkeypatch):
6871
import concore
72+
6973
self.concore = concore
70-
monkeypatch.setattr(concore, 'delay', 0)
74+
monkeypatch.setattr(concore, "delay", 0)
7175
# Point to a directory that does NOT have the file
72-
monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in"))
76+
monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in"))
7377

7478
def test_returns_default_and_false(self):
7579
data, ok = self.concore.read(1, "nonexistent", "[0, 0.0]")
@@ -86,15 +90,16 @@ class TestReadFileParseError:
8690
@pytest.fixture(autouse=True)
8791
def setup(self, temp_dir, monkeypatch):
8892
import concore
93+
8994
self.concore = concore
90-
monkeypatch.setattr(concore, 'delay', 0)
95+
monkeypatch.setattr(concore, "delay", 0)
9196

9297
in_dir = os.path.join(temp_dir, "in1")
9398
os.makedirs(in_dir, exist_ok=True)
9499
with open(os.path.join(in_dir, "ym"), "w") as f:
95100
f.write("NOT_VALID_PYTHON{{{")
96101

97-
monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in"))
102+
monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in"))
98103

99104
def test_returns_default_and_false(self):
100105
data, ok = self.concore.read(1, "ym", "[0, 0.0]")
@@ -111,16 +116,17 @@ class TestReadFileRetriesExceeded:
111116
@pytest.fixture(autouse=True)
112117
def setup(self, temp_dir, monkeypatch):
113118
import concore
119+
114120
self.concore = concore
115-
monkeypatch.setattr(concore, 'delay', 0)
121+
monkeypatch.setattr(concore, "delay", 0)
116122

117123
# Create an empty file
118124
in_dir = os.path.join(temp_dir, "in1")
119125
os.makedirs(in_dir, exist_ok=True)
120126
with open(os.path.join(in_dir, "ym"), "w") as f:
121127
pass # empty
122128

123-
monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in"))
129+
monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in"))
124130

125131
def test_returns_default_and_false(self):
126132
data, ok = self.concore.read(1, "ym", "[0, 0.0]")
@@ -135,12 +141,14 @@ def test_last_read_status_is_retries_exceeded(self):
135141
# ZMQ read tests
136142
# ---------------------------------------------------------------------------
137143

144+
138145
class TestReadZMQSuccess:
139146
"""Successful ZMQ read returns (data, True)."""
140147

141148
@pytest.fixture(autouse=True)
142149
def setup(self, monkeypatch):
143150
import concore
151+
144152
self.concore = concore
145153
self.original_ports = concore.zmq_ports.copy()
146154
yield
@@ -164,6 +172,7 @@ class TestReadZMQTimeout:
164172
@pytest.fixture(autouse=True)
165173
def setup(self, monkeypatch):
166174
import concore
175+
167176
self.concore = concore
168177
self.original_ports = concore.zmq_ports.copy()
169178
yield
@@ -185,6 +194,7 @@ class TestReadZMQError:
185194
@pytest.fixture(autouse=True)
186195
def setup(self, monkeypatch):
187196
import concore
197+
188198
self.concore = concore
189199
self.original_ports = concore.zmq_ports.copy()
190200
yield
@@ -193,6 +203,7 @@ def setup(self, monkeypatch):
193203

194204
def test_zmq_error_returns_default_and_false(self):
195205
import zmq
206+
196207
dummy = DummyZMQPort(raise_on_recv=zmq.error.ZMQError("test error"))
197208
self.concore.zmq_ports["test_port"] = dummy
198209

@@ -205,21 +216,23 @@ def test_zmq_error_returns_default_and_false(self):
205216
# Backward compatibility
206217
# ---------------------------------------------------------------------------
207218

219+
208220
class TestReadBackwardCompatibility:
209221
"""Legacy callers can use isinstance check on the result."""
210222

211223
@pytest.fixture(autouse=True)
212224
def setup(self, temp_dir, monkeypatch):
213225
import concore
226+
214227
self.concore = concore
215-
monkeypatch.setattr(concore, 'delay', 0)
228+
monkeypatch.setattr(concore, "delay", 0)
216229

217230
in_dir = os.path.join(temp_dir, "in1")
218231
os.makedirs(in_dir, exist_ok=True)
219232
with open(os.path.join(in_dir, "ym"), "w") as f:
220233
f.write("[10, 42.0]")
221234

222-
monkeypatch.setattr(concore, 'inpath', os.path.join(temp_dir, "in"))
235+
monkeypatch.setattr(concore, "inpath", os.path.join(temp_dir, "in"))
223236

224237
def test_legacy_unpack_pattern(self):
225238
"""The recommended migration pattern works correctly."""
@@ -245,17 +258,24 @@ def test_tuple_unpack(self):
245258
# last_read_status exposed on module
246259
# ---------------------------------------------------------------------------
247260

261+
248262
class TestLastReadStatusExposed:
249263
"""concore.last_read_status is publicly accessible."""
250264

251265
def test_attribute_exists(self):
252266
import concore
253-
assert hasattr(concore, 'last_read_status')
267+
268+
assert hasattr(concore, "last_read_status")
254269

255270
def test_initial_value_is_success(self):
256271
import concore
272+
257273
# Before any read, default is SUCCESS
258274
assert concore.last_read_status in (
259-
"SUCCESS", "FILE_NOT_FOUND", "TIMEOUT",
260-
"PARSE_ERROR", "EMPTY_DATA", "RETRIES_EXCEEDED",
275+
"SUCCESS",
276+
"FILE_NOT_FOUND",
277+
"TIMEOUT",
278+
"PARSE_ERROR",
279+
"EMPTY_DATA",
280+
"RETRIES_EXCEEDED",
261281
)

0 commit comments

Comments
 (0)