Skip to content

WiP Fixing bug where roster is called multiple times for single dag#256

Merged
bradley-erickson merged 4 commits intomasterfrom
berickson/20260120-comm-protocol-targets
Feb 2, 2026
Merged

WiP Fixing bug where roster is called multiple times for single dag#256
bradley-erickson merged 4 commits intomasterfrom
berickson/20260120-comm-protocol-targets

Conversation

@bradley-erickson
Copy link
Collaborator

Currently we built this so we call each target individually, but we ought to call them together as one when we can.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2d8375b7c0

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 34 to 38
async def _pump(self):
try:
async for item in self._source:
async with self._condition:
self._buffer.append(item)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Add backpressure or eviction for shared async buffer

Because _pump() runs in a background task and appends every yielded item to _buffer without any eviction, the shared iterable will eagerly drain the source and retain all items in memory. For long‑running or high‑volume DAG generators (e.g., streaming dashboard updates), this means memory grows without bound and the producer keeps running even if consumers are slow or cancel, which can lead to OOM or unnecessary work. Consider coupling consumption to demand (backpressure) or trimming the buffer once all consumers have advanced past an item.

Useful? React with 👍 / 👎.

@bradley-erickson
Copy link
Collaborator Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0b3edc5e01

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +545 to +547
node = exports[target].get('returns')
if node not in execution_nodes:
node = f'__missing_export__:{target}'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve missing-return error mapping

When an export exists but its returns node is missing from the execution DAG, execute_dag emits an error generator keyed by the missing node name (not by __missing_export__). This new mapping rewrites that case to __missing_export__, so generator_dictionary.get(node) returns None and the error update is silently dropped. This means a misconfigured export (e.g., a typo in returns) no longer surfaces an error to the dashboard client. Consider keeping the original node name here or aligning the executor’s error key to the same placeholder so the error generator is still driven.

Useful? React with 👍 / 👎.

@bradley-erickson bradley-erickson force-pushed the berickson/20260120-comm-protocol-targets branch from 0b3edc5 to 3800449 Compare January 29, 2026 18:19
@bradley-erickson
Copy link
Collaborator Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3800449848

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 999 to 1005
node_result = await visit(e)
if isinstance(node_result, collections.abc.AsyncIterable):
cached = async_generator_cache.get(id(node_result))
if cached is None:
cached = _clean_json_via_generator(node_result)
async_generator_cache[id(node_result)] = cached
out[e] = cached

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return independent generators per target

When multiple targets resolve to the same async iterable, this cache returns the same async generator instance for every target. Async generators are single-consumer; if a caller iterates each target independently (e.g., prepare_dag_execution(..., targets=[...]) in communication_protocol/integration.py can return multiple targets), the first consumer will drain it and the others will see an already‑exhausted generator or raise RuntimeError about concurrent iteration. Before this change, each target got its own _clean_json_via_generator, allowing independent consumption over the shared _SharedAsyncIterable source. Consider caching only the underlying shared iterable and still creating a new _clean_json_via_generator per target.

Useful? React with 👍 / 👎.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the shared async iterable take care of this?

@bradley-erickson
Copy link
Collaborator Author

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2ae8ac1aa2

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +544 to +548
if target in exports:
node = exports[target].get('returns')
if node not in execution_nodes:
node = f'__missing_export__:{target}'
else:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use missing return node key to keep error generator

When an export exists but its returns node is missing, execute_dag stores the error generator under the original target_node name (see execute_dag around lines 904-909 in communication_protocol/executor.py). Here you rewrite those cases to __missing_export__:{target} (lines 544-548), so generator_dictionary.get(node) won’t find the error generator and the dashboard silently drops the failure. This means clients won’t see execution errors for exports whose returns node is missing; it only happens for that specific invalid DAG shape but makes debugging much harder.

Useful? React with 👍 / 👎.

@bradley-erickson bradley-erickson merged commit b1e0dd6 into master Feb 2, 2026
1 of 2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant