diff --git a/config/default.yaml b/config/default.yaml index 0aa85f6..7040a95 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -75,6 +75,11 @@ suites: trajectory: "config/bern/trajectory_f1.json" - name: F2_contingent_path trajectory: "config/bern/trajectory_f2.json" + astm_f3623_conformance: + scenarios: + - name: openutm_sim_air_traffic_data + - name: sdsp_track + - name: sdsp_heartbeat extra: scenarios: - name: F3_non_conforming_path @@ -84,9 +89,6 @@ suites: - name: opensky_live_data - name: add_flight_declaration - name: geo_fence_upload - - name: openutm_sim_air_traffic_data -# - name: sdsp_track -# - name: sdsp_heartbeat # Reporting configuration reporting: diff --git a/scenarios/bayesian_sim_air_traffic_data.yaml b/scenarios/bayesian_sim_air_traffic_data.yaml new file mode 100644 index 0000000..2afdfe2 --- /dev/null +++ b/scenarios/bayesian_sim_air_traffic_data.yaml @@ -0,0 +1,10 @@ +name: bayesian_sim_air_traffic_data +description: Bayesian simulation air traffic data test +steps: +- id: generate_bayesian_simulation_air_traffic_data + step: Generate Bayesian Simulation Air Traffic Data +- step: Fetch Session IDs +- step: Submit Simulated Air Traffic + arguments: + observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} + session_ids: ${{ steps.Fetch Session IDs.result }} diff --git a/scenarios/f3623_T01_bluesky_w_data_latency.yaml b/scenarios/f3623_T01_bluesky_w_data_latency.yaml new file mode 100644 index 0000000..c01e373 --- /dev/null +++ b/scenarios/f3623_T01_bluesky_w_data_latency.yaml @@ -0,0 +1,10 @@ +name: bluesky_sim_air_traffic_data_latency_issues +description: F3623 Blue Sky dataset with latency issues test +steps: +- id: generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues + step: Generate BlueSky Simulation Air Traffic Data with latency issues +- step: Fetch Session IDs +- step: Submit Simulated Air Traffic + arguments: + observations: ${{ steps.generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues.result }} + session_ids: ${{ steps.Fetch Session IDs.result }} diff --git a/scenarios/f3623_T02_bayesian_traffic_w_random_refresh.yaml b/scenarios/f3623_T02_bayesian_traffic_w_random_refresh.yaml new file mode 100644 index 0000000..4ad14d3 --- /dev/null +++ b/scenarios/f3623_T02_bayesian_traffic_w_random_refresh.yaml @@ -0,0 +1,10 @@ +name: bayesian_sim_air_traffic_data +description: Bayesian simulation air traffic data test with varying refresh rates +steps: +- id: generate_bayesian_simulation_air_traffic_data + step: Generate Bayesian Simulation Air Traffic Data +- step: Fetch Session IDs +- step: Submit Simulated Air Traffic at varying refresh rates + arguments: + observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }} + session_ids: ${{ steps.Fetch Session IDs.result }} diff --git a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py index 29a356a..6f8ccea 100644 --- a/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/air_traffic_client.py @@ -1,4 +1,5 @@ import json +import random import uuid from uuid import UUID @@ -100,3 +101,29 @@ async def generate_simulated_air_traffic_data( except Exception as exc: # noqa: BLE001 logger.error(f"Failed to generate telemetry states from {config_path}: {exc}") raise + + @scenario_step("Generate Simulated Air Traffic Data with Latency") + async def generate_simulated_air_traffic_data_with_latency( + self, + config_path: str | None = None, + duration: int | None = None, + ) -> list[list[FlightObservationSchema]]: + """This method, simulates a adding latency to the flight observations list""" + flight_observations = self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration) + LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues + TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds + + modified_flight_observations = [] + for track_observations in flight_observations: + modified_track_observations = [] + for obs in track_observations: + if random.random() < LATENCY_PROBABILITY: + # Simulate latency by removing some observations + if random.random() < 0.5: # 50% chance to remove observation + continue + # Simulate timestamp shift + shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS) + obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds + modified_track_observations.append(obs) + modified_flight_observations.append(modified_track_observations) + return modified_flight_observations diff --git a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py index 43ddfcc..73f76b6 100644 --- a/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py +++ b/src/openutm_verification/core/clients/air_traffic/bayesian_air_traffic_client.py @@ -175,3 +175,35 @@ def _convert_track_to_observations( logger.info(f"Converted track to observations. Generated {len(observations)} observations.") return observations + + @scenario_step("Generate Bayesian Simulation Air Traffic Data with latency issues") + async def generate_bayesian_sim_air_traffic_data_with_sensor_latency_issues( + self, + config_path: str | None = None, + duration: int | None = None, + ) -> list[list[FlightObservationSchema]]: + """ + This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset. + Latency is simulated by randomly removing some observations and randomly shifting the timestamps of some observations + to be earlier or later than the actual timestamp, mimicking real-world sensor latency issues. + """ + flight_observations = self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration) + + LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues + TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds + + modified_flight_observations = [] + for track_observations in flight_observations: + modified_track_observations = [] + for obs in track_observations: + if random.random() < LATENCY_PROBABILITY: + # Simulate latency by removing some observations + if random.random() < 0.5: # 50% chance to remove observation + continue + # Simulate timestamp shift + shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS) + obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds + modified_track_observations.append(obs) + modified_flight_observations.append(modified_track_observations) + + return modified_flight_observations diff --git a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py index 4e2c0dd..c98007a 100644 --- a/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py +++ b/src/openutm_verification/core/clients/air_traffic/blue_sky_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import random import tempfile import uuid from collections.abc import Iterable @@ -133,6 +134,34 @@ async def generate_bluesky_sim_air_traffic_data( # Convert dict -> list[list[FlightObservationSchema]] with stable ordering return [results_by_acid[acid] for acid in sorted(results_by_acid.keys())] + @scenario_step("Generate BlueSky Simulation Air Traffic Data with latency issues") + async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues( + self, + config_path: str | None = None, + duration: int | None = None, + ) -> list[list[FlightObservationSchema]]: + """This method generates""" + flight_observations = self.generate_bluesky_sim_air_traffic_data(config_path=config_path, duration=duration) + + # This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset + LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues + TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds + + modified_flight_observations = [] + for track_observations in flight_observations: + modified_track_observations = [] + for obs in track_observations: + if random.random() < LATENCY_PROBABILITY: + # Simulate latency by removing some observations + if random.random() < 0.5: # 50% chance to remove observation + continue + # Simulate timestamp shift + shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS) + obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds + modified_track_observations.append(obs) + modified_flight_observations.append(modified_track_observations) + return modified_flight_observations + def _tolist(x: Iterable[float] | object) -> list[float]: """Convert numpy arrays / array-likes to a Python list.""" diff --git a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py index 6d97899..8f781d7 100644 --- a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py +++ b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py @@ -1,5 +1,6 @@ import asyncio import json +import random import time import uuid from contextlib import asynccontextmanager @@ -728,6 +729,7 @@ async def submit_simulated_air_traffic( current_simulation_time = current_simulation_time.shift(seconds=1) duration_seconds = (arrow.now() - start_time).total_seconds() + return { "success": submission_errors == 0, "aircraft_count": number_of_aircraft, @@ -737,6 +739,142 @@ async def submit_simulated_air_traffic( "simulation_duration_seconds": (simulation_end - simulation_start).total_seconds(), } + @scenario_step("Submit Simulated Air Traffic at varying refresh rates") + async def submit_simulated_air_traffic_at_random_refresh_rates( + self, + observations: list[list[FlightObservationSchema]], + session_ids: list[uuid.UUID] | None = None, + single_or_multiple_sensors: str = "single", + ) -> StepResult: + """Submit simulated air traffic observations with corrupted timestamps to mimic a malfunctioning source. + + Iterates through observations and randomly applies timestamp anomalies: + - Stale timestamps (repeating a previous timestamp instead of advancing) + - Large backward jumps (timestamp shifted into the past) + - Large forward jumps (timestamp shifted into the future) + + Args: + observations: List of observation lists, one per aircraft. + session_ids: Optional list of session UUIDs. + single_or_multiple_sensors: Whether to use single or multiple sensor IDs. + + Returns: + StepResult with submission statistics. + """ + session_ids = session_ids or [uuid.uuid4()] + if not observations: + logger.warning("No air traffic observations to submit.") + return StepResult( + name="Submit Simulated Air Traffic at varying refresh rates", + status=Status.FAIL, + duration=0, + error_message="No air traffic observations provided", + ) + + number_of_aircraft = len(observations) + logger.debug(f"Submitting simulated air traffic (off-nominal timestamps) for {number_of_aircraft} aircraft") + + session_id = str(session_ids[0]) + + # Determine simulation time range + start_times = [] + end_times = [] + for aircraft_obs in observations: + if not aircraft_obs: + continue + start_times.append(arrow.get(aircraft_obs[0].timestamp)) + end_times.append(arrow.get(aircraft_obs[-1].timestamp)) + + if not start_times: + logger.warning("No valid start/end times found in observations.") + return StepResult( + name="Submit Simulated Air Traffic at varying refresh rates", + status=Status.FAIL, + duration=0, + error_message="No valid start/end times found in observations", + ) + + simulation_start = min(start_times) + simulation_end = max(end_times) + + now = arrow.now() + start_time = now + observations_submitted = 0 + submission_errors = 0 + + # Build a flat list of observations with corrupted timestamps per aircraft + corrupted_observations: list[list[FlightObservationSchema]] = [] + for aircraft_obs in observations: + corrupted_aircraft_obs: list[FlightObservationSchema] = [] + last_used_timestamp: int | None = None + for obs in aircraft_obs: + original_timestamp = obs.timestamp + anomaly_roll = random.random() + + if anomaly_roll < 0.3 and last_used_timestamp is not None: + # 30% chance: stale timestamp — repeat the previous timestamp + new_timestamp = last_used_timestamp + logger.debug(f"[off-nominal] Stale timestamp for {obs.icao_address}: kept {new_timestamp} instead of {original_timestamp}") + elif anomaly_roll < 0.5: + # 20% chance: backward jump — shift timestamp 10-60s into the past + offset = random.randint(10, 60) + new_timestamp = original_timestamp - offset + logger.debug(f"[off-nominal] Backward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (−{offset}s)") + elif anomaly_roll < 0.65: + # 15% chance: forward jump — shift timestamp 10-60s into the future + offset = random.randint(10, 60) + new_timestamp = original_timestamp + offset + logger.debug(f"[off-nominal] Forward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (+{offset}s)") + else: + # 35% chance: keep the original timestamp (normal) + new_timestamp = original_timestamp + + corrupted_obs = obs.model_copy(update={"timestamp": new_timestamp}) + corrupted_aircraft_obs.append(corrupted_obs) + last_used_timestamp = new_timestamp + + corrupted_observations.append(corrupted_aircraft_obs) + + # Play back observations in real-time using the original simulation timeline + current_simulation_time = simulation_start + while current_simulation_time < simulation_end: + target_real_time = start_time + (current_simulation_time - simulation_start) + while arrow.now() < target_real_time: + await asyncio.sleep(0.1) + + for aircraft_obs in corrupted_observations: + if not aircraft_obs: + continue + # Find observation closest to current simulation time using original positions + closest_obs = min( + aircraft_obs, + key=lambda obs: abs(arrow.get(obs.timestamp) - current_simulation_time), + ) + endpoint = f"/flight_stream/set_air_traffic/{session_id}" + payload = {"observations": [closest_obs.model_dump(mode="json")]} + ScenarioContext.add_air_traffic_data([closest_obs]) + try: + response = await self.post(endpoint, json=payload) + logger.debug(f"Air traffic submission response: {response.text}") + logger.info( + f"Off-nominal observation submitted for {closest_obs.icao_address} " + f"at sim time {current_simulation_time} with timestamp {closest_obs.timestamp}" + ) + observations_submitted += 1 + except Exception as e: + logger.error(f"Failed to submit off-nominal observation: {e}") + submission_errors += 1 + + current_simulation_time = current_simulation_time.shift(seconds=1) + + duration_seconds = (arrow.now() - start_time).total_seconds() + return StepResult( + name="Submit Simulated Air Traffic at varying refresh rates", + status=Status.PASS if submission_errors == 0 else Status.FAIL, + duration=round(duration_seconds, 2), + error_message=None if submission_errors == 0 else f"{submission_errors} submission errors occurred", + ) + @scenario_step("Submit Air Traffic") async def submit_air_traffic(self, observations: list[FlightObservationSchema], session_id: uuid.UUID = uuid.uuid4()) -> dict[str, Any]: """Submit air traffic observations to the Flight Blender API.