Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,12 @@ def read(port_identifier, name, initstr_val):
zmq_p = zmq_ports[port_identifier]
try:
message = zmq_p.recv_json_with_retry()
# Strip simtime prefix if present (mirroring file-based read behavior)
if isinstance(message, list) and len(message) > 0:
first_element = message[0]
if isinstance(first_element, (int, float)):
simtime = max(simtime, first_element)
return message[1:]
return message
except zmq.error.ZMQError as e:
logging.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
Expand Down Expand Up @@ -365,7 +371,7 @@ def read(port_identifier, name, initstr_val):
def write(port_identifier, name, val, delta=0):
"""
Write data either to ZMQ port or file.
`val` must be list (with simtime prefix) or string.
`val` is the data payload (list or string); write() prepends [simtime + delta] internally.
"""
global simtime

Expand All @@ -375,7 +381,13 @@ def write(port_identifier, name, val, delta=0):
try:
# Keep ZMQ payloads JSON-serializable by normalizing numpy types.
zmq_val = convert_numpy_to_python(val)
zmq_p.send_json_with_retry(zmq_val)
if isinstance(zmq_val, list):
# Prepend simtime to match file-based write behavior
payload = [simtime + delta] + zmq_val
zmq_p.send_json_with_retry(payload)
simtime += delta
else:
zmq_p.send_json_with_retry(zmq_val)
except zmq.error.ZMQError as e:
logging.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
Expand Down
38 changes: 35 additions & 3 deletions tests/test_concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,43 @@ def send_json_with_retry(self, message):
dummy = DummyPort()
concore.zmq_ports["test_zmq"] = dummy

# Reset simtime for predictable test behavior
concore.simtime = 0

payload = [np.int64(7), np.float64(3.5), {"x": np.float32(1.25)}]
concore.write("test_zmq", "data", payload)

assert dummy.sent is not None
assert dummy.sent == [7, 3.5, {"x": 1.25}]
assert not isinstance(dummy.sent[0], np.generic)
# ZMQ write now prepends simtime (0 in this case) to match file-based write behavior
assert dummy.sent == [0, 7, 3.5, {"x": 1.25}]
# Data values (after simtime) should be converted from numpy types
assert not isinstance(dummy.sent[1], np.generic)
assert not isinstance(dummy.sent[2]["x"], np.generic)
assert not isinstance(dummy.sent[2], np.generic)
assert not isinstance(dummy.sent[3]["x"], np.generic)

def test_zmq_write_read_roundtrip(self):
"""Test that ZMQ write+read returns original data without simtime prefix."""
import concore

class DummyZMQPort:
def __init__(self):
self.buffer = None

def send_json_with_retry(self, message):
self.buffer = message

def recv_json_with_retry(self):
return self.buffer

dummy = DummyZMQPort()
concore.zmq_ports["roundtrip_test"] = dummy

# Reset simtime for predictable test behavior
concore.simtime = 0

original_data = [1.5, 2.5, 3.5]
concore.write("roundtrip_test", "data", original_data)

# Read should return original data (simtime stripped)
result = concore.read("roundtrip_test", "data", "[]")
assert result == original_data