Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a unified interface for air traffic streaming operations, implementing "Air Traffic Client Unification phase 1". The changes replace multiple provider-specific scenario steps with a single "Stream Air Traffic" step that supports multiple data sources (GeoJSON, BlueSky, Bayesian, OpenSky) and delivery targets (Flight Blender, AMQP, none).
Changes:
- Introduces a provider/streamer architecture with protocol definitions and factory functions
- Implements four air traffic providers (GeoJSON, BlueSky, Bayesian, OpenSky) that wrap existing clients
- Implements three streamers (NullStreamer, FlightBlenderStreamer, AMQPStreamer) for data delivery
- Adds a unified
AirTrafficStepClientwith a singlestream_air_trafficstep - Includes comprehensive unit tests for factories and data models
- Provides an example YAML scenario demonstrating the new unified interface
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
src/openutm_verification/core/providers/protocol.py |
Defines the AirTrafficProvider protocol for data sources |
src/openutm_verification/core/providers/geojson_provider.py |
GeoJSON provider wrapping AirTrafficClient |
src/openutm_verification/core/providers/bluesky_provider.py |
BlueSky simulator provider wrapping BlueSkyClient |
src/openutm_verification/core/providers/bayesian_provider.py |
Bayesian track generation provider wrapping BayesianTrafficClient |
src/openutm_verification/core/providers/opensky_provider.py |
OpenSky Network live data provider wrapping OpenSkyClient |
src/openutm_verification/core/providers/factory.py |
Factory function for creating providers by type |
src/openutm_verification/core/providers/__init__.py |
Module exports for providers |
src/openutm_verification/core/streamers/protocol.py |
Defines AirTrafficStreamer protocol and StreamResult dataclass |
src/openutm_verification/core/streamers/null_streamer.py |
Null streamer for data collection without delivery |
src/openutm_verification/core/streamers/flight_blender_streamer.py |
Flight Blender API streamer wrapping FlightBlenderClient |
src/openutm_verification/core/streamers/amqp_streamer.py |
AMQP streamer (placeholder implementation) |
src/openutm_verification/core/streamers/factory.py |
Factory function for creating streamers by type |
src/openutm_verification/core/streamers/__init__.py |
Module exports for streamers |
src/openutm_verification/core/steps/air_traffic_step.py |
Unified AirTrafficStepClient with stream_air_traffic method |
src/openutm_verification/core/steps/__init__.py |
Module exports for steps |
src/openutm_verification/core/execution/dependencies.py |
Registers AirTrafficStepClient as a dependency |
tests/test_stream_air_traffic.py |
Unit tests for factories, protocols, and step registration |
scenarios/stream_air_traffic_example.yaml |
Example scenario demonstrating the unified interface |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/openutm_verification/core/streamers/flight_blender_streamer.py
Outdated
Show resolved
Hide resolved
src/openutm_verification/core/streamers/flight_blender_streamer.py
Outdated
Show resolved
Hide resolved
src/openutm_verification/core/streamers/flight_blender_streamer.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 21 changed files in this pull request and generated 11 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.error(f"Flight Blender streaming failed: {e}") | ||
| return self._make_result( | ||
| success=False, | ||
| provider_name=provider.name, | ||
| duration_seconds=duration_seconds, | ||
| errors=[str(e)], | ||
| observations=observations, | ||
| ) |
There was a problem hiding this comment.
Catching Exception here converts submission failures into a StreamResult and prevents exceptions from propagating to the @scenario_step wrapper. That can cause the scenario step to be reported as PASS even when Flight Blender submission fails. Prefer letting exceptions bubble up (or re-raising FlightBlenderError), and only returning success=False if the step client converts that into a FAIL StepResult/exception.
| logger.error(f"Flight Blender streaming failed: {e}") | |
| return self._make_result( | |
| success=False, | |
| provider_name=provider.name, | |
| duration_seconds=duration_seconds, | |
| errors=[str(e)], | |
| observations=observations, | |
| ) | |
| # Let exceptions propagate so that the scenario_step wrapper can | |
| # correctly mark this step as failed, rather than converting all | |
| # failures into a StreamResult. | |
| logger.exception(f"Flight Blender streaming failed: {e}") | |
| raise |
| total_observations=total_observations, | ||
| total_batches=total_batches, | ||
| errors=errors or [], | ||
| observations=observations or [], |
There was a problem hiding this comment.
StreamResult.observations is optional (... | None) so downstream steps can distinguish 'not captured' vs 'captured but empty'. _make_result currently forces observations to [] even when the caller doesn't intend to include observations, which changes that meaning (and diverges from the dataclass default of None). Consider preserving None unless observations are explicitly provided.
| observations=observations or [], | |
| observations=observations if observations is not None else None, |
| return await client.generate_simulated_air_traffic_data( | ||
| config_path=self._config_path, | ||
| duration=effective_duration, | ||
| ) |
There was a problem hiding this comment.
AirTrafficClient.generate_simulated_air_traffic_data is decorated with @scenario_step, so calling it returns a StepResult, not a raw list[list[FlightObservationSchema]]. Returning that value here will break downstream consumers (e.g., streamers summing len(batch)), and will also wrap nested step results unexpectedly. Consider calling the undecorated implementation (e.g., via .__wrapped__) or refactoring the client to expose a non-step async method for providers to use.
| return await client.generate_simulated_air_traffic_data( | |
| config_path=self._config_path, | |
| duration=effective_duration, | |
| ) | |
| # Call the undecorated implementation to return raw observations, | |
| # not a StepResult produced by the @scenario_step decorator. | |
| raw_observations = await client.generate_simulated_air_traffic_data.__wrapped__( # type: ignore[attr-defined] | |
| client, | |
| config_path=self._config_path, | |
| duration=effective_duration, | |
| ) | |
| return raw_observations |
| return self._make_result( | ||
| success=result.get("success", False), |
There was a problem hiding this comment.
FlightBlenderClient.submit_simulated_air_traffic is decorated with @scenario_step, so result here will be a StepResult, not a dict. Using result.get(...) will raise an AttributeError at runtime. Consider using the raw result (result.result) or bypassing the decorator (e.g., via .__wrapped__), or adding a non-step client method for streamer usage.
| return self._make_result( | |
| success=result.get("success", False), | |
| # `submit_simulated_air_traffic` may be decorated with @scenario_step, | |
| # in which case it returns a StepResult wrapper with the actual | |
| # result available as `.result`. Fallback to `result` itself if | |
| # no such attribute exists. | |
| raw_result = getattr(result, "result", result) | |
| return self._make_result( | |
| success=raw_result.get("success", False), |
| @scenario_step("Stream Air Traffic") | ||
| async def stream_air_traffic( | ||
| self, | ||
| provider: ProviderType, | ||
| duration: int, |
There was a problem hiding this comment.
Because this method is decorated with @scenario_step, callers receive a StepResult wrapper at runtime, not a raw StreamResult. The return type annotation currently advertises StreamResult, which is misleading for API consumers and static typing. Consider updating the annotation to a StepResult[StreamResult] (or returning an explicit StepResult).
| self._viewport = viewport or DEFAULT_SWITZERLAND_VIEWPORT | ||
| self._duration = duration or 30 | ||
|
|
There was a problem hiding this comment.
self._duration is set in __init__, but get_observations() always performs a single fetch_data() call and never uses either the instance duration or the duration override parameter. This makes the argument and docstring misleading. Either implement duration-based polling/aggregation, or remove/rename the parameter to reflect the actual behavior.
| self._config_path = config_path or "" | ||
| self._number_of_aircraft = number_of_aircraft or 2 | ||
| self._duration = duration or 30 | ||
| self._sensor_ids = sensor_ids or [] | ||
| self._session_ids = session_ids or [] |
There was a problem hiding this comment.
Like GeoJSONProvider, this defaults config_path to "" and stores it in _config_path, so omitting config_path will override any configured defaults and likely fail when the underlying BlueSky client loads the scenario. Consider validating config_path early with a helpful error, or sourcing a default path from application config when omitted.
| ) | ||
|
|
||
| async with BlueSkyClient(settings) as client: | ||
| return await client.generate_bluesky_sim_air_traffic_data( |
There was a problem hiding this comment.
BlueSkyClient.generate_bluesky_sim_air_traffic_data is a @scenario_step, so this call returns a StepResult wrapper rather than the observation list. That will break the provider/streamer interface (and likely causes type errors at runtime). Consider calling the undecorated method (via .__wrapped__) or adding a non-step helper on the client for providers to call.
| return await client.generate_bluesky_sim_air_traffic_data( | |
| generate_raw = client.generate_bluesky_sim_air_traffic_data.__wrapped__ | |
| return await generate_raw( | |
| client, |
| ) | ||
|
|
||
| async with BayesianTrafficClient(settings) as client: | ||
| result = await client.generate_bayesian_sim_air_traffic_data( |
There was a problem hiding this comment.
BayesianTrafficClient.generate_bayesian_sim_air_traffic_data is decorated with @scenario_step, so it returns a StepResult wrapper rather than the observation list. That wrapper is truthy, so result if result else [] will return the StepResult and break downstream processing. Consider bypassing the wrapper (e.g., .__wrapped__) or exposing a non-step client method for providers.
| result = await client.generate_bayesian_sim_air_traffic_data( | |
| # Bypass the @scenario_step wrapper to get the raw observation list | |
| generate_fn = getattr( | |
| BayesianTrafficClient.generate_bayesian_sim_air_traffic_data, | |
| "__wrapped__", | |
| BayesianTrafficClient.generate_bayesian_sim_air_traffic_data, | |
| ) | |
| result = await generate_fn( | |
| client, |
| return await streamer_instance.stream_from_provider( | ||
| provider=provider_instance, | ||
| duration_seconds=duration, | ||
| ) |
There was a problem hiding this comment.
The @scenario_step wrapper always marks the step as PASS unless an exception is raised or a StepResult with FAIL is returned. As written, this step returns StreamResult even when StreamResult.success is False, which will still produce a PASS step and can hide streaming failures. Consider raising an appropriate exception when streaming fails (e.g., FlightBlenderError/OpenSkyError where relevant) or returning an explicit StepResult with FAIL status based on the StreamResult.success flag.
| return await streamer_instance.stream_from_provider( | |
| provider=provider_instance, | |
| duration_seconds=duration, | |
| ) | |
| stream_result = await streamer_instance.stream_from_provider( | |
| provider=provider_instance, | |
| duration_seconds=duration, | |
| ) | |
| # If streaming failed, raise an exception so the scenario step is marked as failed. | |
| if not stream_result.success: | |
| raise RuntimeError(f"Air traffic streaming failed: {stream_result}") | |
| return stream_result |
No description provided.