diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index d7fad06..9341db4 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -475,6 +475,22 @@ class ArtifactNodeIdResponse: id: bts.IdType +@dataclasses.dataclass(kw_only=True) +class ExecutionStatusSummary: + total_executions: int = 0 + ended_executions: int = 0 + has_ended: bool = False + + def count_execution_status( + self, *, status: bts.ContainerExecutionStatus, count: int + ) -> None: + self.total_executions += count + if status in bts.CONTAINER_STATUSES_ENDED: + self.ended_executions += count + + self.has_ended = self.ended_executions == self.total_executions + + @dataclasses.dataclass class GetGraphExecutionStateResponse: child_execution_status_stats: dict[bts.IdType, dict[str, int]] diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py new file mode 100644 index 0000000..c0f26a7 --- /dev/null +++ b/tests/test_api_server_sql.py @@ -0,0 +1,57 @@ +from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend.api_server_sql import ExecutionStatusSummary + + +class TestExecutionStatusSummary: + def test_initial_state(self): + summary = ExecutionStatusSummary() + assert summary.total_executions == 0 + assert summary.ended_executions == 0 + assert summary.has_ended is False + + def test_accumulate_all_ended_statuses(self): + """Add each ended status with 2^i count for robust uniqueness.""" + summary = ExecutionStatusSummary() + ended_statuses = sorted(bts.CONTAINER_STATUSES_ENDED, key=lambda s: s.value) + expected_total = 0 + expected_ended = 0 + for i, status in enumerate(ended_statuses): + count = 2**i + summary.count_execution_status(status=status, count=count) + expected_total += count + expected_ended += count + assert summary.total_executions == expected_total + assert summary.ended_executions == expected_ended + assert summary.has_ended is True + + def test_accumulate_all_in_progress_statuses(self): + """Add each in-progress status with 2^i count for robust uniqueness.""" + summary = ExecutionStatusSummary() + in_progress_statuses = sorted( + set(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED, + key=lambda s: s.value, + ) + expected_total = 0 + for i, status in enumerate(in_progress_statuses): + count = 2**i + summary.count_execution_status(status=status, count=count) + expected_total += count + assert summary.total_executions == expected_total + assert summary.ended_executions == 0 + assert summary.has_ended is False + + def test_accumulate_all_statuses(self): + """Add every status with 2^i count. Summary math must be exact.""" + summary = ExecutionStatusSummary() + all_statuses = sorted(bts.ContainerExecutionStatus, key=lambda s: s.value) + expected_total = 0 + expected_ended = 0 + for i, status in enumerate(all_statuses): + count = 2**i + expected_total += count + if status in bts.CONTAINER_STATUSES_ENDED: + expected_ended += count + summary.count_execution_status(status=status, count=count) + assert summary.total_executions == expected_total + assert summary.ended_executions == expected_ended + assert summary.has_ended == (expected_ended == expected_total)