From 5014f69adbb4bf26f10b773b5f2dfe3707355e02 Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 17 Feb 2026 07:49:46 -0800 Subject: [PATCH] feat: API pipeline run list has execution summary --- cloud_pipelines_backend/api_server_sql.py | 48 +++++++++++++---------- tests/test_api_server_sql.py | 10 ++++- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index adc7dcb..3ca1b0f 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -32,6 +32,22 @@ def _get_current_time() -> datetime.datetime: from .errors import ItemNotFoundError +@dataclasses.dataclass(kw_only=True) +class ExecutionStatusSummary: + total_executions: int = 0 + ended_executions: int = 0 + has_ended: bool = False + + def count_execution_status( + self, *, status: bts.ContainerExecutionStatus, count: int + ) -> None: + self.total_executions += count + if status in bts.CONTAINER_STATUSES_ENDED: + self.ended_executions += count + + self.has_ended = self.ended_executions == self.total_executions + + # ==== PipelineJobService @dataclasses.dataclass(kw_only=True) class PipelineRunResponse: @@ -43,6 +59,7 @@ class PipelineRunResponse: created_at: datetime.datetime | None = None pipeline_name: str | None = None execution_status_stats: dict[str, int] | None = None + execution_summary: ExecutionStatusSummary | None = None @classmethod def from_db(cls, pipeline_run: bts.PipelineRun) -> "PipelineRunResponse": @@ -241,10 +258,12 @@ def create_pipeline_run_response( pipeline_name = component_spec.name response.pipeline_name = pipeline_name if include_execution_stats: - response.execution_status_stats = self._get_execution_status_stats( + stats, summary = self._get_execution_stats_and_summary( session=session, root_execution_id=pipeline_run.root_execution_id, ) + response.execution_status_stats = stats + response.execution_summary = summary return response return ListPipelineJobsResponse( @@ -255,15 +274,20 @@ def create_pipeline_run_response( next_page_token=next_page_token, ) - def _get_execution_status_stats( + def _get_execution_stats_and_summary( self, session: orm.Session, root_execution_id: bts.IdType, - ) -> dict[str, int]: + ) -> tuple[dict[str, int], ExecutionStatusSummary]: stats = self._calculate_execution_status_stats( session=session, root_execution_id=root_execution_id ) - return {status.value: count for status, count in stats.items()} + summary = ExecutionStatusSummary() + status_stats: dict[str, int] = {} + for status, count in stats.items(): + summary.count_execution_status(status=status, count=count) + status_stats[status.value] = count + return status_stats, summary def _calculate_execution_status_stats( self, session: orm.Session, root_execution_id: bts.IdType @@ -482,22 +506,6 @@ class ArtifactNodeIdResponse: id: bts.IdType -@dataclasses.dataclass(kw_only=True) -class ExecutionStatusSummary: - total_executions: int = 0 - ended_executions: int = 0 - has_ended: bool = False - - def count_execution_status( - self, *, status: bts.ContainerExecutionStatus, count: int - ) -> None: - self.total_executions += count - if status in bts.CONTAINER_STATUSES_ENDED: - self.ended_executions += count - - self.has_ended = self.ended_executions == self.total_executions - - @dataclasses.dataclass class GetGraphExecutionStateResponse: child_execution_status_stats: dict[bts.IdType, dict[str, int]] diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index 8120575..d7c092f 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -151,8 +151,14 @@ def test_list_with_execution_stats(self): with session_factory() as session: result = service.list(session=session, include_execution_stats=True) assert len(result.pipeline_runs) == 1 - assert result.pipeline_runs[0].root_execution_id == root_id - stats = result.pipeline_runs[0].execution_status_stats + run = result.pipeline_runs[0] + assert run.root_execution_id == root_id + stats = run.execution_status_stats assert stats is not None assert stats["SUCCEEDED"] == 1 assert stats["RUNNING"] == 1 + summary = run.execution_summary + assert summary is not None + assert summary.total_executions == 2 + assert summary.ended_executions == 1 + assert summary.has_ended is False