From b25a85806f84d41475e75c47ed2b58586496e69e Mon Sep 17 00:00:00 2001 From: Yue Chao Qin Date: Tue, 17 Feb 2026 08:29:59 -0800 Subject: [PATCH] feat: API pipeline run get has execution summary --- cloud_pipelines_backend/api_server_sql.py | 32 ++++++++--- tests/test_api_server_sql.py | 67 +++++++++++++++++++++++ 2 files changed, 91 insertions(+), 8 deletions(-) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 3ca1b0f..d5f4202 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -130,11 +130,19 @@ def create( session.refresh(pipeline_run) return PipelineRunResponse.from_db(pipeline_run) - def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse: + def get( + self, + session: orm.Session, + id: bts.IdType, + include_execution_stats: bool = False, + ) -> PipelineRunResponse: pipeline_run = session.get(bts.PipelineRun, id) if not pipeline_run: raise ItemNotFoundError(f"Pipeline run {id} not found.") - return PipelineRunResponse.from_db(pipeline_run) + response = PipelineRunResponse.from_db(pipeline_run) + if include_execution_stats: + response = self._populate_execution_stats(session=session, response=response) + return response def terminate( self, @@ -258,12 +266,7 @@ def create_pipeline_run_response( pipeline_name = component_spec.name response.pipeline_name = pipeline_name if include_execution_stats: - stats, summary = self._get_execution_stats_and_summary( - session=session, - root_execution_id=pipeline_run.root_execution_id, - ) - response.execution_status_stats = stats - response.execution_summary = summary + response = self._populate_execution_stats(session=session, response=response) return response return ListPipelineJobsResponse( @@ -274,6 +277,19 @@ def create_pipeline_run_response( next_page_token=next_page_token, ) + def _populate_execution_stats( + self, + session: orm.Session, + response: PipelineRunResponse, + ) -> PipelineRunResponse: + stats, summary = self._get_execution_stats_and_summary( + session=session, + root_execution_id=response.root_execution_id, + ) + response.execution_status_stats = stats + response.execution_summary = summary + return response + def _get_execution_stats_and_summary( self, session: orm.Session, diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index d7c092f..e8a5eb1 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -1,3 +1,4 @@ +import pytest from sqlalchemy import orm from cloud_pipelines_backend import backend_types_sql as bts @@ -6,6 +7,7 @@ ExecutionStatusSummary, PipelineRunsApiService_Sql, ) +from cloud_pipelines_backend.errors import ItemNotFoundError def _initialize_db_and_get_session_factory(): @@ -162,3 +164,68 @@ def test_list_with_execution_stats(self): assert summary.total_executions == 2 assert summary.ended_executions == 1 assert summary.has_ended is False + + +class TestPipelineRunServiceGet: + def test_get_not_found(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + with pytest.raises(ItemNotFoundError): + service.get(session=session, id="nonexistent-id") + + def test_get_returns_pipeline_run(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + root = _create_execution_node(session) + root_id = root.id + run = _create_pipeline_run(session, root, created_by="user1") + run_id = run.id + session.commit() + + with session_factory() as session: + result = service.get(session=session, id=run_id) + assert result.id == run_id + assert result.root_execution_id == root_id + assert result.created_by == "user1" + assert result.execution_status_stats is None + assert result.execution_summary is None + + def test_get_with_execution_stats(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + root = _create_execution_node(session) + root_id = root.id + child1 = _create_execution_node( + session, + parent=root, + status=bts.ContainerExecutionStatus.SUCCEEDED, + ) + child2 = _create_execution_node( + session, + parent=root, + status=bts.ContainerExecutionStatus.RUNNING, + ) + _link_ancestor(session, child1, root) + _link_ancestor(session, child2, root) + run = _create_pipeline_run(session, root) + run_id = run.id + session.commit() + + with session_factory() as session: + result = service.get( + session=session, id=run_id, include_execution_stats=True + ) + assert result.id == run_id + assert result.root_execution_id == root_id + stats = result.execution_status_stats + assert stats is not None + assert stats["SUCCEEDED"] == 1 + assert stats["RUNNING"] == 1 + summary = result.execution_summary + assert summary is not None + assert summary.total_executions == 2 + assert summary.ended_executions == 1 + assert summary.has_ended is False