Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ suites:
trajectory: "config/bern/trajectory_f1.json"
- name: F2_contingent_path
trajectory: "config/bern/trajectory_f2.json"
astm_f3623_conformance:
scenarios:
- name: openutm_sim_air_traffic_data
- name: sdsp_track
- name: sdsp_heartbeat
extra:
scenarios:
- name: F3_non_conforming_path
Expand All @@ -84,9 +89,6 @@ suites:
- name: opensky_live_data
- name: add_flight_declaration
- name: geo_fence_upload
- name: openutm_sim_air_traffic_data
# - name: sdsp_track
# - name: sdsp_heartbeat

# Reporting configuration
reporting:
Expand Down
10 changes: 10 additions & 0 deletions scenarios/bayesian_sim_air_traffic_data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: bayesian_sim_air_traffic_data
description: Bayesian simulation air traffic data test
steps:
- id: generate_bayesian_simulation_air_traffic_data
step: Generate Bayesian Simulation Air Traffic Data
- step: Fetch Session IDs
- step: Submit Simulated Air Traffic
arguments:
observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }}
session_ids: ${{ steps.Fetch Session IDs.result }}
10 changes: 10 additions & 0 deletions scenarios/f3623_T01_bluesky_w_data_latency.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: bluesky_sim_air_traffic_data_latency_issues
description: F3623 Blue Sky dataset with latency issues test
steps:
- id: generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues
step: Generate BlueSky Simulation Air Traffic Data with latency issues
- step: Fetch Session IDs
- step: Submit Simulated Air Traffic
arguments:
observations: ${{ steps.generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues.result }}
session_ids: ${{ steps.Fetch Session IDs.result }}
10 changes: 10 additions & 0 deletions scenarios/f3623_T02_bayesian_traffic_w_random_refresh.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
name: bayesian_sim_air_traffic_data
description: Bayesian simulation air traffic data test with varying refresh rates
steps:
- id: generate_bayesian_simulation_air_traffic_data
step: Generate Bayesian Simulation Air Traffic Data
- step: Fetch Session IDs
- step: Submit Simulated Air Traffic at varying refresh rates
arguments:
observations: ${{ steps.generate_bayesian_simulation_air_traffic_data.result }}
session_ids: ${{ steps.Fetch Session IDs.result }}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import random
import uuid
from uuid import UUID

Expand Down Expand Up @@ -100,3 +101,29 @@ async def generate_simulated_air_traffic_data(
except Exception as exc: # noqa: BLE001
logger.error(f"Failed to generate telemetry states from {config_path}: {exc}")
raise

@scenario_step("Generate Simulated Air Traffic Data with Latency")
async def generate_simulated_air_traffic_data_with_latency(
self,
config_path: str | None = None,
duration: int | None = None,
) -> list[list[FlightObservationSchema]]:
"""This method, simulates a adding latency to the flight observations list"""
flight_observations = self.generate_simulated_air_traffic_data(config_path=config_path, duration=duration)
LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues
TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds

modified_flight_observations = []
for track_observations in flight_observations:
modified_track_observations = []
for obs in track_observations:
if random.random() < LATENCY_PROBABILITY:
# Simulate latency by removing some observations
if random.random() < 0.5: # 50% chance to remove observation
continue
# Simulate timestamp shift
shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS)
obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds
modified_track_observations.append(obs)
modified_flight_observations.append(modified_track_observations)
return modified_flight_observations
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,35 @@ def _convert_track_to_observations(
logger.info(f"Converted track to observations. Generated {len(observations)} observations.")

return observations

@scenario_step("Generate Bayesian Simulation Air Traffic Data with latency issues")
async def generate_bayesian_sim_air_traffic_data_with_sensor_latency_issues(
self,
config_path: str | None = None,
duration: int | None = None,
) -> list[list[FlightObservationSchema]]:
"""
This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset.
Latency is simulated by randomly removing some observations and randomly shifting the timestamps of some observations
to be earlier or later than the actual timestamp, mimicking real-world sensor latency issues.
"""
flight_observations = self.generate_bayesian_sim_air_traffic_data(config_path=config_path, duration=duration)

LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues
TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds

modified_flight_observations = []
for track_observations in flight_observations:
modified_track_observations = []
for obs in track_observations:
if random.random() < LATENCY_PROBABILITY:
# Simulate latency by removing some observations
if random.random() < 0.5: # 50% chance to remove observation
continue
# Simulate timestamp shift
shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS)
obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds
modified_track_observations.append(obs)
modified_flight_observations.append(modified_track_observations)

return modified_flight_observations
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import random
import tempfile
import uuid
from collections.abc import Iterable
Expand Down Expand Up @@ -133,6 +134,34 @@ async def generate_bluesky_sim_air_traffic_data(
# Convert dict -> list[list[FlightObservationSchema]] with stable ordering
return [results_by_acid[acid] for acid in sorted(results_by_acid.keys())]

@scenario_step("Generate BlueSky Simulation Air Traffic Data with latency issues")
async def generate_bluesky_sim_air_traffic_data_with_sensor_latency_issues(
self,
config_path: str | None = None,
duration: int | None = None,
) -> list[list[FlightObservationSchema]]:
"""This method generates"""
flight_observations = self.generate_bluesky_sim_air_traffic_data(config_path=config_path, duration=duration)

# This method modifies the retrieved simulation data by changing the timestamp and adding latency to the observed dataset
LATENCY_PROBABILITY = 0.1 # 10% chance to have latency issues
TIMESTAMP_SHIFT_RANGE_SECONDS = (-1, 2.5) # Shift timestamps by -5 to +5 seconds

modified_flight_observations = []
for track_observations in flight_observations:
modified_track_observations = []
for obs in track_observations:
if random.random() < LATENCY_PROBABILITY:
# Simulate latency by removing some observations
if random.random() < 0.5: # 50% chance to remove observation
continue
# Simulate timestamp shift
shift_seconds = random.uniform(*TIMESTAMP_SHIFT_RANGE_SECONDS)
obs.timestamp += int(shift_seconds * 1000) # Convert seconds to milliseconds
modified_track_observations.append(obs)
modified_flight_observations.append(modified_track_observations)
return modified_flight_observations


def _tolist(x: Iterable[float] | object) -> list[float]:
"""Convert numpy arrays / array-likes to a Python list."""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import random
import time
import uuid
from contextlib import asynccontextmanager
Expand Down Expand Up @@ -728,6 +729,7 @@ async def submit_simulated_air_traffic(
current_simulation_time = current_simulation_time.shift(seconds=1)

duration_seconds = (arrow.now() - start_time).total_seconds()

return {
"success": submission_errors == 0,
"aircraft_count": number_of_aircraft,
Expand All @@ -737,6 +739,142 @@ async def submit_simulated_air_traffic(
"simulation_duration_seconds": (simulation_end - simulation_start).total_seconds(),
}

@scenario_step("Submit Simulated Air Traffic at varying refresh rates")
async def submit_simulated_air_traffic_at_random_refresh_rates(
self,
observations: list[list[FlightObservationSchema]],
session_ids: list[uuid.UUID] | None = None,
single_or_multiple_sensors: str = "single",
) -> StepResult:
"""Submit simulated air traffic observations with corrupted timestamps to mimic a malfunctioning source.

Iterates through observations and randomly applies timestamp anomalies:
- Stale timestamps (repeating a previous timestamp instead of advancing)
- Large backward jumps (timestamp shifted into the past)
- Large forward jumps (timestamp shifted into the future)

Args:
observations: List of observation lists, one per aircraft.
session_ids: Optional list of session UUIDs.
single_or_multiple_sensors: Whether to use single or multiple sensor IDs.

Returns:
StepResult with submission statistics.
"""
session_ids = session_ids or [uuid.uuid4()]
if not observations:
logger.warning("No air traffic observations to submit.")
return StepResult(
name="Submit Simulated Air Traffic at varying refresh rates",
status=Status.FAIL,
duration=0,
error_message="No air traffic observations provided",
)

number_of_aircraft = len(observations)
logger.debug(f"Submitting simulated air traffic (off-nominal timestamps) for {number_of_aircraft} aircraft")

session_id = str(session_ids[0])

# Determine simulation time range
start_times = []
end_times = []
for aircraft_obs in observations:
if not aircraft_obs:
continue
start_times.append(arrow.get(aircraft_obs[0].timestamp))
end_times.append(arrow.get(aircraft_obs[-1].timestamp))

if not start_times:
logger.warning("No valid start/end times found in observations.")
return StepResult(
name="Submit Simulated Air Traffic at varying refresh rates",
status=Status.FAIL,
duration=0,
error_message="No valid start/end times found in observations",
)

simulation_start = min(start_times)
simulation_end = max(end_times)

now = arrow.now()
start_time = now
observations_submitted = 0
submission_errors = 0

# Build a flat list of observations with corrupted timestamps per aircraft
corrupted_observations: list[list[FlightObservationSchema]] = []
for aircraft_obs in observations:
corrupted_aircraft_obs: list[FlightObservationSchema] = []
last_used_timestamp: int | None = None
for obs in aircraft_obs:
original_timestamp = obs.timestamp
anomaly_roll = random.random()

if anomaly_roll < 0.3 and last_used_timestamp is not None:
# 30% chance: stale timestamp — repeat the previous timestamp
new_timestamp = last_used_timestamp
logger.debug(f"[off-nominal] Stale timestamp for {obs.icao_address}: kept {new_timestamp} instead of {original_timestamp}")
elif anomaly_roll < 0.5:
# 20% chance: backward jump — shift timestamp 10-60s into the past
offset = random.randint(10, 60)
new_timestamp = original_timestamp - offset
logger.debug(f"[off-nominal] Backward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (−{offset}s)")
elif anomaly_roll < 0.65:
# 15% chance: forward jump — shift timestamp 10-60s into the future
offset = random.randint(10, 60)
new_timestamp = original_timestamp + offset
logger.debug(f"[off-nominal] Forward jump for {obs.icao_address}: {original_timestamp} -> {new_timestamp} (+{offset}s)")
else:
# 35% chance: keep the original timestamp (normal)
new_timestamp = original_timestamp

corrupted_obs = obs.model_copy(update={"timestamp": new_timestamp})
corrupted_aircraft_obs.append(corrupted_obs)
last_used_timestamp = new_timestamp

corrupted_observations.append(corrupted_aircraft_obs)

# Play back observations in real-time using the original simulation timeline
current_simulation_time = simulation_start
while current_simulation_time < simulation_end:
target_real_time = start_time + (current_simulation_time - simulation_start)
while arrow.now() < target_real_time:
await asyncio.sleep(0.1)

for aircraft_obs in corrupted_observations:
if not aircraft_obs:
continue
# Find observation closest to current simulation time using original positions
closest_obs = min(
aircraft_obs,
key=lambda obs: abs(arrow.get(obs.timestamp) - current_simulation_time),
)
endpoint = f"/flight_stream/set_air_traffic/{session_id}"
payload = {"observations": [closest_obs.model_dump(mode="json")]}
ScenarioContext.add_air_traffic_data([closest_obs])
try:
response = await self.post(endpoint, json=payload)
logger.debug(f"Air traffic submission response: {response.text}")
logger.info(
f"Off-nominal observation submitted for {closest_obs.icao_address} "
f"at sim time {current_simulation_time} with timestamp {closest_obs.timestamp}"
)
observations_submitted += 1
except Exception as e:
logger.error(f"Failed to submit off-nominal observation: {e}")
submission_errors += 1

current_simulation_time = current_simulation_time.shift(seconds=1)

duration_seconds = (arrow.now() - start_time).total_seconds()
return StepResult(
name="Submit Simulated Air Traffic at varying refresh rates",
status=Status.PASS if submission_errors == 0 else Status.FAIL,
duration=round(duration_seconds, 2),
error_message=None if submission_errors == 0 else f"{submission_errors} submission errors occurred",
)

@scenario_step("Submit Air Traffic")
async def submit_air_traffic(self, observations: list[FlightObservationSchema], session_id: uuid.UUID = uuid.uuid4()) -> dict[str, Any]:
"""Submit air traffic observations to the Flight Blender API.
Expand Down