diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 9341db4..adc7dcb 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -241,13 +241,10 @@ def create_pipeline_run_response( pipeline_name = component_spec.name response.pipeline_name = pipeline_name if include_execution_stats: - execution_status_stats = self._calculate_execution_status_stats( - session=session, root_execution_id=pipeline_run.root_execution_id + response.execution_status_stats = self._get_execution_status_stats( + session=session, + root_execution_id=pipeline_run.root_execution_id, ) - response.execution_status_stats = { - status.value: count - for status, count in execution_status_stats.items() - } return response return ListPipelineJobsResponse( @@ -258,6 +255,16 @@ def create_pipeline_run_response( next_page_token=next_page_token, ) + def _get_execution_status_stats( + self, + session: orm.Session, + root_execution_id: bts.IdType, + ) -> dict[str, int]: + stats = self._calculate_execution_status_stats( + session=session, root_execution_id=root_execution_id + ) + return {status.value: count for status, count in stats.items()} + def _calculate_execution_status_stats( self, session: orm.Session, root_execution_id: bts.IdType ) -> dict[bts.ContainerExecutionStatus, int]: diff --git a/tests/test_api_server_sql.py b/tests/test_api_server_sql.py index c0f26a7..8120575 100644 --- a/tests/test_api_server_sql.py +++ b/tests/test_api_server_sql.py @@ -1,5 +1,50 @@ +from sqlalchemy import orm + from cloud_pipelines_backend import backend_types_sql as bts -from cloud_pipelines_backend.api_server_sql import ExecutionStatusSummary +from cloud_pipelines_backend import database_ops +from cloud_pipelines_backend.api_server_sql import ( + ExecutionStatusSummary, + PipelineRunsApiService_Sql, +) + + +def _initialize_db_and_get_session_factory(): + db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") + return lambda: orm.Session(bind=db_engine) + + +def _create_execution_node(session, task_spec=None, status=None, parent=None): + """Helper to create an ExecutionNode with optional status and parent.""" + node = bts.ExecutionNode(task_spec=task_spec or {}) + if parent is not None: + node.parent_execution = parent + if status is not None: + node.container_execution_status = status + session.add(node) + session.flush() + return node + + +def _link_ancestor(session, execution_node, ancestor_node): + """Create an ExecutionToAncestorExecutionLink.""" + link = bts.ExecutionToAncestorExecutionLink( + ancestor_execution=ancestor_node, + execution=execution_node, + ) + session.add(link) + session.flush() + + +def _create_pipeline_run(session, root_execution, created_by=None, annotations=None): + """Helper to create a PipelineRun linked to a root execution node.""" + run = bts.PipelineRun(root_execution=root_execution) + if created_by: + run.created_by = created_by + if annotations: + run.annotations = annotations + session.add(run) + session.flush() + return run class TestExecutionStatusSummary: @@ -55,3 +100,59 @@ def test_accumulate_all_statuses(self): assert summary.total_executions == expected_total assert summary.ended_executions == expected_ended assert summary.has_ended == (expected_ended == expected_total) + + +class TestPipelineRunServiceList: + def test_list_empty(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + result = service.list(session=session) + assert result.pipeline_runs == [] + assert result.next_page_token is None + + def test_list_returns_pipeline_runs(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + root = _create_execution_node(session) + root_id = root.id + _create_pipeline_run(session, root, created_by="user1") + session.commit() + + with session_factory() as session: + result = service.list(session=session) + assert len(result.pipeline_runs) == 1 + assert result.pipeline_runs[0].root_execution_id == root_id + assert result.pipeline_runs[0].created_by == "user1" + assert result.pipeline_runs[0].execution_status_stats is None + + def test_list_with_execution_stats(self): + session_factory = _initialize_db_and_get_session_factory() + service = PipelineRunsApiService_Sql() + with session_factory() as session: + root = _create_execution_node(session) + root_id = root.id + child1 = _create_execution_node( + session, + parent=root, + status=bts.ContainerExecutionStatus.SUCCEEDED, + ) + child2 = _create_execution_node( + session, + parent=root, + status=bts.ContainerExecutionStatus.RUNNING, + ) + _link_ancestor(session, child1, root) + _link_ancestor(session, child2, root) + _create_pipeline_run(session, root) + session.commit() + + with session_factory() as session: + result = service.list(session=session, include_execution_stats=True) + assert len(result.pipeline_runs) == 1 + assert result.pipeline_runs[0].root_execution_id == root_id + stats = result.pipeline_runs[0].execution_status_stats + assert stats is not None + assert stats["SUCCEEDED"] == 1 + assert stats["RUNNING"] == 1