Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,19 @@ def create(
session.refresh(pipeline_run)
return PipelineRunResponse.from_db(pipeline_run)

def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse:
def get(
self,
session: orm.Session,
id: bts.IdType,
include_execution_stats: bool = False,
) -> PipelineRunResponse:
pipeline_run = session.get(bts.PipelineRun, id)
if not pipeline_run:
raise ItemNotFoundError(f"Pipeline run {id} not found.")
return PipelineRunResponse.from_db(pipeline_run)
response = PipelineRunResponse.from_db(pipeline_run)
if include_execution_stats:
response = self._populate_execution_stats(session=session, response=response)
return response

def terminate(
self,
Expand Down Expand Up @@ -258,12 +266,7 @@ def create_pipeline_run_response(
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
response = self._populate_execution_stats(session=session, response=response)
return response

return ListPipelineJobsResponse(
Expand All @@ -274,6 +277,19 @@ def create_pipeline_run_response(
next_page_token=next_page_token,
)

def _populate_execution_stats(
self,
session: orm.Session,
response: PipelineRunResponse,
) -> PipelineRunResponse:
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=response.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

def _get_execution_stats_and_summary(
self,
session: orm.Session,
Expand Down
67 changes: 67 additions & 0 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
Expand All @@ -6,6 +7,7 @@
ExecutionStatusSummary,
PipelineRunsApiService_Sql,
)
from cloud_pipelines_backend.errors import ItemNotFoundError


def _initialize_db_and_get_session_factory():
Expand Down Expand Up @@ -162,3 +164,68 @@ def test_list_with_execution_stats(self):
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False


class TestPipelineRunServiceGet:
def test_get_not_found(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
with pytest.raises(ItemNotFoundError):
service.get(session=session, id="nonexistent-id")

def test_get_returns_pipeline_run(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
run = _create_pipeline_run(session, root, created_by="user1")
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(session=session, id=run_id)
assert result.id == run_id
assert result.root_execution_id == root_id
assert result.created_by == "user1"
assert result.execution_status_stats is None
assert result.execution_summary is None

def test_get_with_execution_stats(self):
session_factory = _initialize_db_and_get_session_factory()
service = PipelineRunsApiService_Sql()
with session_factory() as session:
root = _create_execution_node(session)
root_id = root.id
child1 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.SUCCEEDED,
)
child2 = _create_execution_node(
session,
parent=root,
status=bts.ContainerExecutionStatus.RUNNING,
)
_link_ancestor(session, child1, root)
_link_ancestor(session, child2, root)
run = _create_pipeline_run(session, root)
run_id = run.id
session.commit()

with session_factory() as session:
result = service.get(
session=session, id=run_id, include_execution_stats=True
)
assert result.id == run_id
assert result.root_execution_id == root_id
stats = result.execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
summary = result.execution_summary
assert summary is not None
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False