Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,10 @@ def create_pipeline_run_response(
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
execution_status_stats = self._calculate_execution_status_stats(
session=session, root_execution_id=pipeline_run.root_execution_id
response.execution_status_stats = self._get_execution_status_stats(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = {
status.value: count
for status, count in execution_status_stats.items()
}
return response

return ListPipelineJobsResponse(
Expand All @@ -258,6 +255,16 @@ def create_pipeline_run_response(
next_page_token=next_page_token,
)

def _get_execution_status_stats(
self,
session: orm.Session,
root_execution_id: bts.IdType,
) -> dict[str, int]:
stats = self._calculate_execution_status_stats(
session=session, root_execution_id=root_execution_id
)
return {status.value: count for status, count in stats.items()}

def _calculate_execution_status_stats(
self, session: orm.Session, root_execution_id: bts.IdType
) -> dict[bts.ContainerExecutionStatus, int]:
Expand Down
103 changes: 102 additions & 1 deletion tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,50 @@
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend.api_server_sql import ExecutionStatusSummary
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.api_server_sql import (
ExecutionStatusSummary,
PipelineRunsApiService_Sql,
)


def _initialize_db_and_get_session_factory():
db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://")
return lambda: orm.Session(bind=db_engine)


def _create_execution_node(session, task_spec=None, status=None, parent=None):
"""Helper to create an ExecutionNode with optional status and parent."""
node = bts.ExecutionNode(task_spec=task_spec or {})
if parent is not None:
node.parent_execution = parent
if status is not None:
node.container_execution_status = status
session.add(node)
session.flush()
return node


def _link_ancestor(session, execution_node, ancestor_node):
"""Create an ExecutionToAncestorExecutionLink."""
link = bts.ExecutionToAncestorExecutionLink(
ancestor_execution=ancestor_node,
execution=execution_node,
)
session.add(link)
session.flush()


def _create_pipeline_run(session, root_execution, created_by=None, annotations=None):
"""Helper to create a PipelineRun linked to a root execution node."""
run = bts.PipelineRun(root_execution=root_execution)
if created_by:
run.created_by = created_by
if annotations:
run.annotations = annotations
session.add(run)
session.flush()
return run


class TestExecutionStatusSummary:
Expand Down Expand Up @@ -55,3 +100,59 @@ def test_accumulate_all_statuses(self):
assert summary.total_executions == expected_total
assert summary.ended_executions == expected_ended
assert summary.has_ended == (expected_ended == expected_total)


class TestPipelineRunServiceList:
def test_list_empty(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
result = service.list(session=session)
assert result.pipeline_runs == []
assert result.next_page_token is None

def test_list_returns_pipeline_runs(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
_create_pipeline_run(session, root, created_by="user1")
session.commit()

with session_factory() as session:
result = service.list(session=session)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
assert result.pipeline_runs[0].created_by == "user1"
assert result.pipeline_runs[0].execution_status_stats is None

def test_list_with_execution_stats(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
child1 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.SUCCEEDED,
)
child2 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.RUNNING,
)
_link_ancestor(session, child1, root)
_link_ancestor(session, child2, root)
_create_pipeline_run(session, root)
session.commit()

with session_factory() as session:
result = service.list(session=session, include_execution_stats=True)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
stats = result.pipeline_runs[0].execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1