Skip to content

Comments

Air Traffic Client Unification phase 1#90

Open
atti92 wants to merge 9 commits intomainfrom
air-traffic-unification-p1
Open

Air Traffic Client Unification phase 1#90
atti92 wants to merge 9 commits intomainfrom
air-traffic-unification-p1

Conversation

@atti92
Copy link
Contributor

@atti92 atti92 commented Jan 31, 2026

No description provided.

Base automatically changed from fix-pyproject to main January 31, 2026 12:51
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a unified interface for air traffic streaming operations, implementing "Air Traffic Client Unification phase 1". The changes replace multiple provider-specific scenario steps with a single "Stream Air Traffic" step that supports multiple data sources (GeoJSON, BlueSky, Bayesian, OpenSky) and delivery targets (Flight Blender, AMQP, none).

Changes:

  • Introduces a provider/streamer architecture with protocol definitions and factory functions
  • Implements four air traffic providers (GeoJSON, BlueSky, Bayesian, OpenSky) that wrap existing clients
  • Implements three streamers (NullStreamer, FlightBlenderStreamer, AMQPStreamer) for data delivery
  • Adds a unified AirTrafficStepClient with a single stream_air_traffic step
  • Includes comprehensive unit tests for factories and data models
  • Provides an example YAML scenario demonstrating the new unified interface

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 19 comments.

Show a summary per file
File Description
src/openutm_verification/core/providers/protocol.py Defines the AirTrafficProvider protocol for data sources
src/openutm_verification/core/providers/geojson_provider.py GeoJSON provider wrapping AirTrafficClient
src/openutm_verification/core/providers/bluesky_provider.py BlueSky simulator provider wrapping BlueSkyClient
src/openutm_verification/core/providers/bayesian_provider.py Bayesian track generation provider wrapping BayesianTrafficClient
src/openutm_verification/core/providers/opensky_provider.py OpenSky Network live data provider wrapping OpenSkyClient
src/openutm_verification/core/providers/factory.py Factory function for creating providers by type
src/openutm_verification/core/providers/__init__.py Module exports for providers
src/openutm_verification/core/streamers/protocol.py Defines AirTrafficStreamer protocol and StreamResult dataclass
src/openutm_verification/core/streamers/null_streamer.py Null streamer for data collection without delivery
src/openutm_verification/core/streamers/flight_blender_streamer.py Flight Blender API streamer wrapping FlightBlenderClient
src/openutm_verification/core/streamers/amqp_streamer.py AMQP streamer (placeholder implementation)
src/openutm_verification/core/streamers/factory.py Factory function for creating streamers by type
src/openutm_verification/core/streamers/__init__.py Module exports for streamers
src/openutm_verification/core/steps/air_traffic_step.py Unified AirTrafficStepClient with stream_air_traffic method
src/openutm_verification/core/steps/__init__.py Module exports for steps
src/openutm_verification/core/execution/dependencies.py Registers AirTrafficStepClient as a dependency
tests/test_stream_air_traffic.py Unit tests for factories, protocols, and step registration
scenarios/stream_air_traffic_example.yaml Example scenario demonstrating the unified interface

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@atti92 atti92 self-assigned this Jan 31, 2026
@atti92 atti92 marked this pull request as ready for review February 11, 2026 20:44
@atti92 atti92 requested a review from Copilot February 13, 2026 19:56
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 20 out of 21 changed files in this pull request and generated 11 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +166 to +173
logger.error(f"Flight Blender streaming failed: {e}")
return self._make_result(
success=False,
provider_name=provider.name,
duration_seconds=duration_seconds,
errors=[str(e)],
observations=observations,
)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching Exception here converts submission failures into a StreamResult and prevents exceptions from propagating to the @scenario_step wrapper. That can cause the scenario step to be reported as PASS even when Flight Blender submission fails. Prefer letting exceptions bubble up (or re-raising FlightBlenderError), and only returning success=False if the step client converts that into a FAIL StepResult/exception.

Suggested change
logger.error(f"Flight Blender streaming failed: {e}")
return self._make_result(
success=False,
provider_name=provider.name,
duration_seconds=duration_seconds,
errors=[str(e)],
observations=observations,
)
# Let exceptions propagate so that the scenario_step wrapper can
# correctly mark this step as failed, rather than converting all
# failures into a StreamResult.
logger.exception(f"Flight Blender streaming failed: {e}")
raise

Copilot uses AI. Check for mistakes.
total_observations=total_observations,
total_batches=total_batches,
errors=errors or [],
observations=observations or [],
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamResult.observations is optional (... | None) so downstream steps can distinguish 'not captured' vs 'captured but empty'. _make_result currently forces observations to [] even when the caller doesn't intend to include observations, which changes that meaning (and diverges from the dataclass default of None). Consider preserving None unless observations are explicitly provided.

Suggested change
observations=observations or [],
observations=observations if observations is not None else None,

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +99
return await client.generate_simulated_air_traffic_data(
config_path=self._config_path,
duration=effective_duration,
)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AirTrafficClient.generate_simulated_air_traffic_data is decorated with @scenario_step, so calling it returns a StepResult, not a raw list[list[FlightObservationSchema]]. Returning that value here will break downstream consumers (e.g., streamers summing len(batch)), and will also wrap nested step results unexpectedly. Consider calling the undecorated implementation (e.g., via .__wrapped__) or refactoring the client to expose a non-step async method for providers to use.

Suggested change
return await client.generate_simulated_air_traffic_data(
config_path=self._config_path,
duration=effective_duration,
)
# Call the undecorated implementation to return raw observations,
# not a StepResult produced by the @scenario_step decorator.
raw_observations = await client.generate_simulated_air_traffic_data.__wrapped__( # type: ignore[attr-defined]
client,
config_path=self._config_path,
duration=effective_duration,
)
return raw_observations

Copilot uses AI. Check for mistakes.
Comment on lines +156 to +157
return self._make_result(
success=result.get("success", False),
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlightBlenderClient.submit_simulated_air_traffic is decorated with @scenario_step, so result here will be a StepResult, not a dict. Using result.get(...) will raise an AttributeError at runtime. Consider using the raw result (result.result) or bypassing the decorator (e.g., via .__wrapped__), or adding a non-step client method for streamer usage.

Suggested change
return self._make_result(
success=result.get("success", False),
# `submit_simulated_air_traffic` may be decorated with @scenario_step,
# in which case it returns a StepResult wrapper with the actual
# result available as `.result`. Fallback to `result` itself if
# no such attribute exists.
raw_result = getattr(result, "result", result)
return self._make_result(
success=raw_result.get("success", False),

Copilot uses AI. Check for mistakes.
Comment on lines +26 to +30
@scenario_step("Stream Air Traffic")
async def stream_air_traffic(
self,
provider: ProviderType,
duration: int,
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this method is decorated with @scenario_step, callers receive a StepResult wrapper at runtime, not a raw StreamResult. The return type annotation currently advertises StreamResult, which is misleading for API consumers and static typing. Consider updating the annotation to a StepResult[StreamResult] (or returning an explicit StepResult).

Copilot uses AI. Check for mistakes.
Comment on lines +39 to +41
self._viewport = viewport or DEFAULT_SWITZERLAND_VIEWPORT
self._duration = duration or 30

Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._duration is set in __init__, but get_observations() always performs a single fetch_data() call and never uses either the instance duration or the duration override parameter. This makes the argument and docstring misleading. Either implement duration-based polling/aggregation, or remove/rename the parameter to reflect the actual behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +40 to +44
self._config_path = config_path or ""
self._number_of_aircraft = number_of_aircraft or 2
self._duration = duration or 30
self._sensor_ids = sensor_ids or []
self._session_ids = session_ids or []
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like GeoJSONProvider, this defaults config_path to "" and stores it in _config_path, so omitting config_path will override any configured defaults and likely fail when the underlying BlueSky client loads the scenario. Consider validating config_path early with a helpful error, or sourcing a default path from application config when omitted.

Copilot uses AI. Check for mistakes.
)

async with BlueSkyClient(settings) as client:
return await client.generate_bluesky_sim_air_traffic_data(
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlueSkyClient.generate_bluesky_sim_air_traffic_data is a @scenario_step, so this call returns a StepResult wrapper rather than the observation list. That will break the provider/streamer interface (and likely causes type errors at runtime). Consider calling the undecorated method (via .__wrapped__) or adding a non-step helper on the client for providers to call.

Suggested change
return await client.generate_bluesky_sim_air_traffic_data(
generate_raw = client.generate_bluesky_sim_air_traffic_data.__wrapped__
return await generate_raw(
client,

Copilot uses AI. Check for mistakes.
)

async with BayesianTrafficClient(settings) as client:
result = await client.generate_bayesian_sim_air_traffic_data(
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BayesianTrafficClient.generate_bayesian_sim_air_traffic_data is decorated with @scenario_step, so it returns a StepResult wrapper rather than the observation list. That wrapper is truthy, so result if result else [] will return the StepResult and break downstream processing. Consider bypassing the wrapper (e.g., .__wrapped__) or exposing a non-step client method for providers.

Suggested change
result = await client.generate_bayesian_sim_air_traffic_data(
# Bypass the @scenario_step wrapper to get the raw observation list
generate_fn = getattr(
BayesianTrafficClient.generate_bayesian_sim_air_traffic_data,
"__wrapped__",
BayesianTrafficClient.generate_bayesian_sim_air_traffic_data,
)
result = await generate_fn(
client,

Copilot uses AI. Check for mistakes.
Comment on lines +85 to +88
return await streamer_instance.stream_from_provider(
provider=provider_instance,
duration_seconds=duration,
)
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @scenario_step wrapper always marks the step as PASS unless an exception is raised or a StepResult with FAIL is returned. As written, this step returns StreamResult even when StreamResult.success is False, which will still produce a PASS step and can hide streaming failures. Consider raising an appropriate exception when streaming fails (e.g., FlightBlenderError/OpenSkyError where relevant) or returning an explicit StepResult with FAIL status based on the StreamResult.success flag.

Suggested change
return await streamer_instance.stream_from_provider(
provider=provider_instance,
duration_seconds=duration,
)
stream_result = await streamer_instance.stream_from_provider(
provider=provider_instance,
duration_seconds=duration,
)
# If streaming failed, raise an exception so the scenario step is marked as failed.
if not stream_result.success:
raise RuntimeError(f"Air traffic streaming failed: {stream_result}")
return stream_result

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant