-
Notifications
You must be signed in to change notification settings - Fork 10
WiP Fixing bug where roster is called multiple times for single dag #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
90b9985
3800449
2ae8ac1
f0fcefa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 0.1.0+2026.01.26T17.51.31.713Z.b83cda8e.berickson.20260126.blacklist.by.domain | ||
| 0.1.0+2026.02.02T16.19.01.249Z.2ae8ac1a.berickson.20260120.comm.protocol.targets |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 0.1.0+2026.01.26T17.51.31.713Z.b83cda8e.berickson.20260126.blacklist.by.domain | ||
| 0.1.0+2026.02.02T16.19.01.249Z.2ae8ac1a.berickson.20260120.comm.protocol.targets |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -521,11 +521,11 @@ async def _handle_dependent_dags(query): | |
| return query | ||
|
|
||
|
|
||
| async def _prepare_dag_as_generator(client_query, query, target, request): | ||
| async def _prepare_dag_as_generators(client_query, query, targets, request): | ||
| ''' | ||
| Prepares the query for execution, sets up client parameters and runtime. | ||
| ''' | ||
| target_exports = [target] | ||
| target_exports = list(targets) | ||
|
|
||
| # Prepare the DAG execution function | ||
| query_func = learning_observer.communication_protocol.integration.prepare_dag_execution(query, target_exports) | ||
|
|
@@ -535,12 +535,41 @@ async def _prepare_dag_as_generator(client_query, query, target, request): | |
| runtime = learning_observer.runtime.Runtime(request) | ||
| client_parameters['runtime'] = runtime | ||
|
|
||
| # Execute the query and return the first value from the generator | ||
| # Execute the query and return generators keyed by export targets. | ||
| generator_dictionary = await query_func(**client_parameters) | ||
| return next(iter(generator_dictionary.values())) | ||
| target_nodes_to_targets = {} | ||
| exports = query.get('exports', {}) | ||
| execution_nodes = query.get('execution_dag', {}) | ||
| for target in target_exports: | ||
| if target in exports: | ||
| node = exports[target].get('returns') | ||
| if node not in execution_nodes: | ||
| node = f'__missing_export__:{target}' | ||
| else: | ||
|
Comment on lines
+544
to
+548
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When an export exists but its Useful? React with 👍 / 👎. |
||
| node = f'__missing_export__:{target}' | ||
| target_nodes_to_targets.setdefault(node, []).append(target) | ||
|
|
||
| generators_by_id = {} | ||
| for node, node_targets in target_nodes_to_targets.items(): | ||
| generator = generator_dictionary.get(node) | ||
| if generator is None: | ||
| debug_log(f'Missing generator for DAG node {node}') | ||
| continue | ||
| generator_id = id(generator) | ||
| if generator_id not in generators_by_id: | ||
| generators_by_id[generator_id] = { | ||
| 'generator': generator, | ||
| 'targets': [] | ||
| } | ||
| generators_by_id[generator_id]['targets'].extend(node_targets) | ||
| return [ | ||
| (entry['targets'], entry['generator']) | ||
| for entry in generators_by_id.values() | ||
| ] | ||
|
|
||
|
|
||
|
|
||
| async def _create_dag_generator(client_query, target, request): | ||
| async def _create_dag_generators(client_query, targets, request): | ||
| dag = client_query['execution_dag'] | ||
| if type(dag) not in DAG_DISPATCH: | ||
| debug_log(await dag_unsupported_type(type(dag))) | ||
|
|
@@ -552,7 +581,7 @@ async def _create_dag_generator(client_query, target, request): | |
| debug_log('The submitted query failed.') | ||
| return | ||
| query = await _handle_dependent_dags(query) | ||
| return await _prepare_dag_as_generator(client_query, query, target, request) | ||
| return await _prepare_dag_as_generators(client_query, query, targets, request) | ||
|
|
||
|
|
||
| def _scope_segment_for_provenance_key(key): | ||
|
|
@@ -847,7 +876,7 @@ async def _send_pending_updates_to_client(): | |
| # TODO this ought to be pulled from somewhere | ||
| await asyncio.sleep(1) | ||
|
|
||
| async def _execute_dag(dag_query, target, params): | ||
| async def _execute_dag(dag_query, targets, params): | ||
| '''This method creates the DAG generator and drives it. | ||
| Once finished, we wait until rescheduling it. If the parameters | ||
| change, we exit before creating and driving the generator. | ||
|
|
@@ -857,8 +886,16 @@ async def _execute_dag(dag_query, target, params): | |
| return | ||
|
|
||
| # Create DAG generator and drive | ||
| generator = await _create_dag_generator(dag_query, target, request) | ||
| await _drive_generator(generator, dag_query['kwargs'], target=target) | ||
| generators = await _create_dag_generators(dag_query, targets, request) | ||
| if generators is None: | ||
| return | ||
| drive_tasks = [] | ||
| for target_group, generator in generators: | ||
| drive_tasks.append(asyncio.create_task( | ||
| _drive_generator(generator, dag_query['kwargs'], targets=target_group) | ||
| )) | ||
| if drive_tasks: | ||
| await asyncio.gather(*drive_tasks) | ||
|
|
||
| # Handle rescheduling the execution of the DAG for fresh data | ||
| # TODO add some way to specify specific endpoint delays | ||
|
|
@@ -867,26 +904,30 @@ async def _execute_dag(dag_query, target, params): | |
| # if dag_delay is negative, we skip repeated execution | ||
| return | ||
| await asyncio.sleep(dag_delay) | ||
| await _execute_dag(dag_query, target, params) | ||
| await _execute_dag(dag_query, targets, params) | ||
|
|
||
| async def _drive_generator(generator, dag_kwargs, target=None): | ||
| async def _drive_generator(generator, dag_kwargs, targets=None): | ||
| '''For each item in the generator, this method creates | ||
| an update to send to the client. | ||
| ''' | ||
| target_exports = targets or [None] | ||
| async for item in generator: | ||
| scope = _find_student_or_resource(item) | ||
| update_path = ".".join(scope) | ||
| if 'option_hash' in dag_kwargs and target is not None: | ||
| item[f'option_hash_{target}'] = dag_kwargs['option_hash'] | ||
| # TODO this ought to be flag - we might want to see the provenance in some settings | ||
| item_without_provenance = learning_observer.communication_protocol.executor.strip_provenance(item) | ||
| update_payload = {'op': 'update', 'path': update_path, 'value': item_without_provenance} | ||
| _log_protocol_event( | ||
| 'update_enqueued', | ||
| payload=update_payload, | ||
| target_export=target | ||
| ) | ||
| await _queue_update(update_payload) | ||
| for target in target_exports: | ||
| item_payload = item | ||
| if 'option_hash' in dag_kwargs and target is not None and isinstance(item, dict): | ||
| item_payload = dict(item) | ||
| item_payload[f'option_hash_{target}'] = dag_kwargs['option_hash'] | ||
| # TODO this ought to be flag - we might want to see the provenance in some settings | ||
| item_without_provenance = learning_observer.communication_protocol.executor.strip_provenance(item_payload) | ||
| update_payload = {'op': 'update', 'path': update_path, 'value': item_without_provenance} | ||
| _log_protocol_event( | ||
| 'update_enqueued', | ||
| payload=update_payload, | ||
| target_export=target | ||
| ) | ||
| await _queue_update(update_payload) | ||
|
|
||
| send_batches_task = asyncio.create_task(_send_pending_updates_to_client()) | ||
| background_tasks.add(send_batches_task) | ||
|
|
@@ -932,15 +973,11 @@ async def _drive_generator(generator, dag_kwargs, target=None): | |
|
|
||
| if client_query != previous_client_query: | ||
| previous_client_query = copy.deepcopy(client_query) | ||
| # HACK even though we can specify multiple targets for a | ||
| # single DAG, this creates a new DAG for each. This eventually | ||
| # allows us to specify different parameters (such as the | ||
| # reschedule timeout). | ||
| for k, v in client_query.items(): | ||
| for target in v.get('target_exports', []): | ||
| execute_dag_task = asyncio.create_task(_execute_dag(v, target, client_query)) | ||
| background_tasks.add(execute_dag_task) | ||
| execute_dag_task.add_done_callback(background_tasks.discard) | ||
| targets = v.get('target_exports', []) | ||
| execute_dag_task = asyncio.create_task(_execute_dag(v, targets, client_query)) | ||
| background_tasks.add(execute_dag_task) | ||
| execute_dag_task.add_done_callback(background_tasks.discard) | ||
|
|
||
| # Various ways we might encounter an exception | ||
| except asyncio.CancelledError: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When an export exists but its
returnsnode is missing from the execution DAG,execute_dagemits an error generator keyed by the missing node name (not by__missing_export__). This new mapping rewrites that case to__missing_export__, sogenerator_dictionary.get(node)returnsNoneand the error update is silently dropped. This means a misconfigured export (e.g., a typo inreturns) no longer surfaces an error to the dashboard client. Consider keeping the original node name here or aligning the executor’s error key to the same placeholder so the error generator is still driven.Useful? React with 👍 / 👎.