Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,22 @@ class ArtifactNodeIdResponse:
id: bts.IdType


@dataclasses.dataclass(kw_only=True)
class ExecutionStatusSummary:
total_executions: int = 0
ended_executions: int = 0
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions


@dataclasses.dataclass
class GetGraphExecutionStateResponse:
child_execution_status_stats: dict[bts.IdType, dict[str, int]]
Expand Down
57 changes: 57 additions & 0 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend.api_server_sql import ExecutionStatusSummary


class TestExecutionStatusSummary:
def test_initial_state(self):
summary = ExecutionStatusSummary()
assert summary.total_executions == 0
assert summary.ended_executions == 0
assert summary.has_ended is False

def test_accumulate_all_ended_statuses(self):
"""Add each ended status with 2^i count for robust uniqueness."""
summary = ExecutionStatusSummary()
ended_statuses = sorted(bts.CONTAINER_STATUSES_ENDED, key=lambda s: s.value)
expected_total = 0
expected_ended = 0
for i, status in enumerate(ended_statuses):
count = 2**i
summary.count_execution_status(status=status, count=count)
expected_total += count
expected_ended += count
assert summary.total_executions == expected_total
assert summary.ended_executions == expected_ended
assert summary.has_ended is True

def test_accumulate_all_in_progress_statuses(self):
"""Add each in-progress status with 2^i count for robust uniqueness."""
summary = ExecutionStatusSummary()
in_progress_statuses = sorted(
set(bts.ContainerExecutionStatus) - bts.CONTAINER_STATUSES_ENDED,
key=lambda s: s.value,
)
expected_total = 0
for i, status in enumerate(in_progress_statuses):
count = 2**i
summary.count_execution_status(status=status, count=count)
expected_total += count
assert summary.total_executions == expected_total
assert summary.ended_executions == 0
assert summary.has_ended is False

def test_accumulate_all_statuses(self):
"""Add every status with 2^i count. Summary math must be exact."""
summary = ExecutionStatusSummary()
all_statuses = sorted(bts.ContainerExecutionStatus, key=lambda s: s.value)
expected_total = 0
expected_ended = 0
for i, status in enumerate(all_statuses):
count = 2**i
expected_total += count
if status in bts.CONTAINER_STATUSES_ENDED:
expected_ended += count
summary.count_execution_status(status=status, count=count)
assert summary.total_executions == expected_total
assert summary.ended_executions == expected_ended
assert summary.has_ended == (expected_ended == expected_total)