WiP Fixing bug where roster is called multiple times for single dag#256
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2d8375b7c0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async def _pump(self): | ||
| try: | ||
| async for item in self._source: | ||
| async with self._condition: | ||
| self._buffer.append(item) |
There was a problem hiding this comment.
Add backpressure or eviction for shared async buffer
Because _pump() runs in a background task and appends every yielded item to _buffer without any eviction, the shared iterable will eagerly drain the source and retain all items in memory. For long‑running or high‑volume DAG generators (e.g., streaming dashboard updates), this means memory grows without bound and the producer keeps running even if consumers are slow or cancel, which can lead to OOM or unnecessary work. Consider coupling consumption to demand (backpressure) or trimming the buffer once all consumers have advanced past an item.
Useful? React with 👍 / 👎.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0b3edc5e01
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| node = exports[target].get('returns') | ||
| if node not in execution_nodes: | ||
| node = f'__missing_export__:{target}' |
There was a problem hiding this comment.
Preserve missing-return error mapping
When an export exists but its returns node is missing from the execution DAG, execute_dag emits an error generator keyed by the missing node name (not by __missing_export__). This new mapping rewrites that case to __missing_export__, so generator_dictionary.get(node) returns None and the error update is silently dropped. This means a misconfigured export (e.g., a typo in returns) no longer surfaces an error to the dashboard client. Consider keeping the original node name here or aligning the executor’s error key to the same placeholder so the error generator is still driven.
Useful? React with 👍 / 👎.
…once per run so other nodes can share thoes values
0b3edc5 to
3800449
Compare
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3800449848
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| node_result = await visit(e) | ||
| if isinstance(node_result, collections.abc.AsyncIterable): | ||
| cached = async_generator_cache.get(id(node_result)) | ||
| if cached is None: | ||
| cached = _clean_json_via_generator(node_result) | ||
| async_generator_cache[id(node_result)] = cached | ||
| out[e] = cached |
There was a problem hiding this comment.
Return independent generators per target
When multiple targets resolve to the same async iterable, this cache returns the same async generator instance for every target. Async generators are single-consumer; if a caller iterates each target independently (e.g., prepare_dag_execution(..., targets=[...]) in communication_protocol/integration.py can return multiple targets), the first consumer will drain it and the others will see an already‑exhausted generator or raise RuntimeError about concurrent iteration. Before this change, each target got its own _clean_json_via_generator, allowing independent consumption over the shared _SharedAsyncIterable source. Consider caching only the underlying shared iterable and still creating a new _clean_json_via_generator per target.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Doesn't the shared async iterable take care of this?
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 2ae8ac1aa2
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if target in exports: | ||
| node = exports[target].get('returns') | ||
| if node not in execution_nodes: | ||
| node = f'__missing_export__:{target}' | ||
| else: |
There was a problem hiding this comment.
Use missing return node key to keep error generator
When an export exists but its returns node is missing, execute_dag stores the error generator under the original target_node name (see execute_dag around lines 904-909 in communication_protocol/executor.py). Here you rewrite those cases to __missing_export__:{target} (lines 544-548), so generator_dictionary.get(node) won’t find the error generator and the dashboard silently drops the failure. This means clients won’t see execution errors for exports whose returns node is missing; it only happens for that specific invalid DAG shape but makes debugging much harder.
Useful? React with 👍 / 👎.
Currently we built this so we call each target individually, but we ought to call them together as one when we can.