Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions scenarios/bulk_add_flight_declarations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: bulk_add_flight_declarations
description: Runs the bulk add flight declaration scenario.
steps:
- step: Setup Two Flight Declarations
- id: teardown_first_declaration
step: Teardown Flight Declaration
arguments:
flight_declaration_id: ${{ steps.Setup Two Flight Declarations.result.declarations[0].id }}
- id: teardown_second_declaration
step: Teardown Flight Declaration
arguments:
flight_declaration_id: ${{ steps.Setup Two Flight Declarations.result.declarations[1].id }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: bulk_add_flight_declarations_via_operational_intents
description: Runs the bulk add flight declaration scenario by adding operational intents.
steps:
- step: Setup Two Operational Intents
- id: teardown_first_declaration
step: Teardown Flight Declaration
arguments:
flight_declaration_id: ${{ steps.Setup Two Operational Intents.result.declarations[0].id }}
- id: teardown_second_declaration
step: Teardown Flight Declaration
arguments:
flight_declaration_id: ${{ steps.Setup Two Operational Intents.result.declarations[1].id }}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def _request(
self,
method: str,
endpoint: str,
json: dict | None = None,
json: dict | list | None = None,
silent_status: list[int] | None = None,
) -> httpx.Response:
url = f"{self.base_url}{endpoint}"
Expand All @@ -48,7 +48,7 @@ async def _request(
async def get(self, endpoint: str, silent_status: list[int] | None = None) -> httpx.Response:
return await self._request("GET", endpoint, silent_status=silent_status)

async def post(self, endpoint: str, json: dict, silent_status: list[int] | None = None) -> httpx.Response:
async def post(self, endpoint: str, json: list | dict, silent_status: list[int] | None = None) -> httpx.Response:
return await self._request("POST", endpoint, json=json, silent_status=silent_status)

async def put(self, endpoint: str, json: dict, silent_status: list[int] | None = None) -> httpx.Response:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
StepResult,
)
from openutm_verification.models import (
BulkFlightDeclarationCreationResult,
FlightBlenderError,
FlightDeclarationCreationResult,
HeartbeatMessage,
OperationState,
SDSPHeartbeatMessage,
Expand Down Expand Up @@ -103,6 +105,7 @@ def __init__(
self.latest_geo_fence_id: str | None = None
# Context: store the most recently created flight declaration id for teardown/steps
self.latest_flight_declaration_id: str | None = None
self.all_flight_declaration_ids: list[str] = [] # When bulk uploading, store all created declaration ids for cleanup
# Context: store the generated telemetry states for the current scenario
self.telemetry_states: list[RIDAircraftState] | None = None

Expand Down Expand Up @@ -264,6 +267,51 @@ async def upload_flight_declaration(self, declaration: str | BaseModel) -> dict[

return response_json

@scenario_step("Bulk Upload Flight Declarations")
async def upload_multiple_flight_declarations(self, declarations: list[BaseModel]) -> dict[str, Any]:
"""
Upload multiple flight declarations to Flight Blender.
Serializes and sends the provided declarations in a single bulk POST to the
"/flight_declaration_ops/set_flight_declarations_bulk" endpoint. If the response
contains per-declaration IDs, they are appended to self.all_flight_declaration_ids.
Args:
declarations (list[BaseModel]): List of Pydantic BaseModel instances representing
flight declarations to upload.
Returns:
list[dict[str, Any]]: Parsed JSON response from the service — typically a list
of per-declaration result objects (each containing fields such as "id",
"is_approved", and "state").
Raises:
FlightBlenderError: If any declaration in the response is not approved
(an entry has "is_approved" == False). The error message includes the
operation state name derived from the "state" value.
Exception: Propagates exceptions raised by the underlying HTTP client or JSON parsing.
Side effects:
- Logs debug/info/error messages about upload progress and approval status.
- Extends self.all_flight_declaration_ids with returned declaration IDs when present.
"""

endpoint = "/flight_declaration_ops/set_flight_declarations_bulk"

all_declarations = []
for declaration in declarations:
all_declarations.append(declaration.model_dump(mode="json"))
logger.debug(f"Uploading multiple flight declarations: {len(all_declarations)} declarations")
logger.info(f"Uploading multiple flight declarations to {endpoint}")
response = await self.post(endpoint, json=all_declarations)
logger.info(f"Bulk Flight declaration upload response: {response.status_code}")

response_json = response.json()
if response_json["submitted"] != len(all_declarations):
logger.error(f"Submitted count {response_json['submitted']} does not match expected count {len(all_declarations)}")
raise FlightBlenderError(f"Submitted count {response_json['submitted']} does not match expected count {len(all_declarations)}")
try:
all_flight_declaration_ids = [declaration_response.get("id") for declaration_response in response_json["results"]]
self.all_flight_declaration_ids.extend(all_flight_declaration_ids)
except AttributeError:
logger.warning("Failed to extract flight declaration IDs from response")
return response_json

@scenario_step("Upload Flight Declaration Via Operational Intent")
async def upload_flight_declaration_via_operational_intent(self, declaration: str | BaseModel) -> dict[str, Any]:
"""Upload a flight declaration to the Flight Blender API.
Expand Down Expand Up @@ -317,6 +365,30 @@ async def upload_flight_declaration_via_operational_intent(self, declaration: st

return response_json

@scenario_step("Upload two Flight Declarations Via Operational Intent")
async def upload_multiple_flight_declarations_via_operational_intents(self, declarations: list[BaseModel]) -> dict[str, Any]:
endpoint = "/flight_declaration_ops/set_operational_intents_bulk"

all_declarations = []
for declaration in declarations:
all_declarations.append(declaration.model_dump(mode="json"))
logger.debug(f"Uploading multiple flight operational intents: {len(all_declarations)} operational intents")
logger.info(f"Uploading multiple flight declarations to {endpoint}")
response = await self.post(endpoint, json=all_declarations)
logger.info(f"Bulk Flight declaration upload response: {response.status_code}")

response_json = response.json()
logger.info(f"Bulk upload response: {json.dumps(response_json, indent=2)}")
if response_json["submitted"] != len(all_declarations):
logger.error(f"Submitted count {response_json['submitted']} does not match expected count {len(all_declarations)}")
raise FlightBlenderError(f"Submitted count {response_json['submitted']} does not match expected count {len(all_declarations)}")
try:
all_flight_declaration_ids = [declaration_response.get("id") for declaration_response in response_json["results"]]
self.all_flight_declaration_ids.extend(all_flight_declaration_ids)
except AttributeError:
logger.warning("Failed to extract flight declaration IDs from response")
return response_json

@scenario_step("Wait for User Input")
async def wait_for_user_input(self, prompt: str = "Press Enter to continue...") -> str:
"""Wait for user input to proceed.
Expand Down Expand Up @@ -356,6 +428,36 @@ async def update_operation_state(self, state: OperationState, duration: str | in
await asyncio.sleep(duration_seconds)
return response.json()

@scenario_step("Update Operation State of declaration")
async def update_operation_state_of_declaration(
self, state: OperationState, declaration_id: str, duration: str | int | float = 0
) -> dict[str, Any]:
"""Update the state of a flight operation.

Posts the new state and optionally waits for the specified duration.

Args:
state: The new OperationState to set.
duration: Optional duration to sleep after update (default 0). Can be a number (seconds) or a string (e.g., "5s", "1m").

Returns:
The JSON response from the API.

Raises:
FlightBlenderError: If the update request fails.
"""
duration_seconds = parse_duration(duration)
endpoint = f"/flight_declaration_ops/flight_declaration_state/{declaration_id}"
logger.debug(f"Updating operation {declaration_id} to state {state.name}")
payload = {"state": state.value, "submitted_by": "hh@auth.com"}

response = await self.put(endpoint, json=payload)
logger.info(f"Operation state updated for {declaration_id} to {state.name}")
if duration_seconds > 0:
logger.debug(f"Sleeping for {duration_seconds} seconds after state update")
await asyncio.sleep(duration_seconds)
return response.json()

def _load_telemetry_file(self, filename: str) -> list[RIDAircraftState]:
"""Load telemetry states from a JSON file.

Expand Down Expand Up @@ -1074,9 +1176,9 @@ async def close_heartbeat_websocket_connection(self, ws_connection: ClientConnec
await ws_connection.close()

@scenario_step("Teardown Flight Declaration")
async def teardown_flight_declaration(self):
async def teardown_flight_declaration(self, flight_declaration_id: str | None = None) -> dict[str, Any]:
logger.info("Tearing down flight declaration...")
await self.delete_flight_declaration()
await self.delete_flight_declaration(flight_declaration_id=flight_declaration_id)

@scenario_step("Setup Flight Declaration via Operational Intent")
async def setup_flight_declaration_via_operational_intent(
Expand Down Expand Up @@ -1184,6 +1286,134 @@ async def setup_flight_declaration(
"end_datetime": flight_declaration.end_datetime,
}

@scenario_step("Setup Two Flight Declarations")
async def setup_two_flight_declarations(
self,
flight_declaration_path: str | None = None,
trajectory_path: str | None = None,
) -> dict[str, Any]:
"""Generates data and uploads flight declaration.

Returns:
Dictionary with flight declaration info including 'id'.
"""

from openutm_verification.scenarios.common import (
generate_flight_declaration,
)

# Use instance attributes if arguments are not provided
flight_declaration_path = flight_declaration_path or self.flight_declaration_path
trajectory_path = trajectory_path or self.trajectory_path

if not flight_declaration_path:
raise ValueError("flight_declaration_path not provided and not found in config")

if not trajectory_path:
raise ValueError("trajectory_path not provided and not found in config")

# Synchronize start times
now = arrow.now()
start_time = now.shift(seconds=2)
end_time = now.shift(minutes=60)

second_start_time = now.shift(seconds=20)
second_end_time = now.shift(minutes=10)

first_flight_declaration = generate_flight_declaration(flight_declaration_path)
second_flight_declaration = generate_flight_declaration(flight_declaration_path)
# Update flight declaration times to match synchronized start time
first_flight_declaration.start_datetime = start_time.isoformat()
first_flight_declaration.end_datetime = end_time.isoformat()
second_flight_declaration.start_datetime = second_start_time.isoformat()
second_flight_declaration.end_datetime = second_end_time.isoformat()
upload_step_result = await self.upload_multiple_flight_declarations(declarations=[first_flight_declaration, second_flight_declaration])
if upload_step_result.status == Status.FAIL:
raise FlightBlenderError("Failed to upload one or more flight declarations during setup_two_flight_declarations")

all_declaration_details = BulkFlightDeclarationCreationResult(
declarations=[
FlightDeclarationCreationResult(
id=self.all_flight_declaration_ids[0],
start_datetime=first_flight_declaration.start_datetime,
end_datetime=first_flight_declaration.end_datetime,
),
FlightDeclarationCreationResult(
id=self.all_flight_declaration_ids[1],
start_datetime=second_flight_declaration.start_datetime,
end_datetime=second_flight_declaration.end_datetime,
),
]
)
# Return flight declaration info for use in subsequent steps

return asdict(all_declaration_details)

@scenario_step("Setup Two Operational Intents")
async def setup_two_flight_declarations_via_operational_intents(
self,
flight_declaration_via_operational_intent_path: str | None = None,
trajectory_path: str | None = None,
) -> dict[str, Any]:
"""Generates data and uploads flight declaration via Operational Intent.

Returns:
Dictionary with flight declaration info including 'id'.
"""
from openutm_verification.scenarios.common import (
generate_flight_declaration_via_operational_intent,
)

# Use instance attributes if arguments are not provided
flight_declaration_via_operational_intent_path = (
flight_declaration_via_operational_intent_path or self.flight_declaration_via_operational_intent
)
trajectory_path = trajectory_path or self.trajectory_path

if not flight_declaration_via_operational_intent_path:
raise ValueError("flight_declaration_via_operational_intent_path not provided and not found in config")

if not trajectory_path:
raise ValueError("trajectory_path not provided and not found in config")

# Synchronize start times
now = arrow.now()
start_time = now.shift(seconds=2)
end_time = now.shift(minutes=60)

second_start_time = now.shift(seconds=20)
second_end_time = now.shift(minutes=10)

first_flight_declaration = generate_flight_declaration_via_operational_intent(flight_declaration_via_operational_intent_path)
second_flight_declaration = generate_flight_declaration_via_operational_intent(flight_declaration_via_operational_intent_path)
# Update flight declaration times to match synchronized start time
first_flight_declaration.start_datetime = start_time.isoformat()
first_flight_declaration.end_datetime = end_time.isoformat()
second_flight_declaration.start_datetime = second_start_time.isoformat()
second_flight_declaration.end_datetime = second_end_time.isoformat()

upload_step_result = await self.upload_multiple_flight_declarations_via_operational_intents(
declarations=[first_flight_declaration, second_flight_declaration]
)
if upload_step_result.status == Status.FAIL:
raise FlightBlenderError("Failed to upload one or more flight declarations during setup_two_flight_declarations")

all_declaration_details = BulkFlightDeclarationCreationResult(
declarations=[
FlightDeclarationCreationResult(
id=self.all_flight_declaration_ids[0],
start_datetime=first_flight_declaration.start_datetime,
end_datetime=first_flight_declaration.end_datetime,
),
FlightDeclarationCreationResult(
id=self.all_flight_declaration_ids[1],
start_datetime=second_flight_declaration.start_datetime,
end_datetime=second_flight_declaration.end_datetime,
),
]
)
return asdict(all_declaration_details)

@asynccontextmanager
async def create_flight_declaration(self):
"""Context manager to setup and teardown a flight operation based on scenario config."""
Expand Down
12 changes: 12 additions & 0 deletions src/openutm_verification/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,15 @@ class HeartbeatMessage:
class SDSPHeartbeatMessage:
message: HeartbeatMessage
timestamp: str


@dataclass
class FlightDeclarationCreationResult:
id: str
start_datetime: str
end_datetime: str


@dataclass
class BulkFlightDeclarationCreationResult:
declarations: list[FlightDeclarationCreationResult]
24 changes: 23 additions & 1 deletion src/openutm_verification/server/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,29 @@ def _resolve_ref(self, ref: str, loop_context: Dict[str, Any] | None = None) ->
for part in remaining_parts:
if not part:
continue
if isinstance(current_value, dict):

# Handle array indexing, e.g. "result[0]" or "declarations[1]"
index_match = re.match(r"^(\w+)\[(\d+)\]$", part)
if index_match:
attr_name = index_match.group(1)
index = int(index_match.group(2))
# First resolve the attribute
if isinstance(current_value, dict):
current_value = current_value.get(attr_name)
elif hasattr(current_value, attr_name):
current_value = getattr(current_value, attr_name)
else:
raise ValueError(
f"Could not resolve '{attr_name}' in '{ref}'."
f"Available keys: {list(current_value.keys()) if isinstance(current_value, dict) else dir(current_value)}"
)
# Then index into it
if not isinstance(current_value, (list, tuple)):
raise ValueError(f"Cannot index into '{attr_name}' in '{ref}': expected a list, got {type(current_value).__name__}")
if index >= len(current_value):
raise ValueError(f"Index {index} out of range for '{attr_name}' in '{ref}' (length {len(current_value)})")
current_value = current_value[index]
elif isinstance(current_value, dict):
current_value = current_value.get(part)
elif hasattr(current_value, part):
current_value = getattr(current_value, part)
Expand Down