Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ def _get_current_time() -> datetime.datetime:
from .errors import ItemNotFoundError


@dataclasses.dataclass(kw_only=True)
class ExecutionStatusSummary:
total_executions: int = 0
ended_executions: int = 0
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions


# ==== PipelineJobService
@dataclasses.dataclass(kw_only=True)
class PipelineRunResponse:
Expand All @@ -43,6 +59,7 @@ class PipelineRunResponse:
created_at: datetime.datetime | None = None
pipeline_name: str | None = None
execution_status_stats: dict[str, int] | None = None
execution_summary: ExecutionStatusSummary | None = None

@classmethod
def from_db(cls, pipeline_run: bts.PipelineRun) -> "PipelineRunResponse":
Expand Down Expand Up @@ -241,10 +258,12 @@ def create_pipeline_run_response(
pipeline_name = component_spec.name
response.pipeline_name = pipeline_name
if include_execution_stats:
response.execution_status_stats = self._get_execution_status_stats(
stats, summary = self._get_execution_stats_and_summary(
session=session,
root_execution_id=pipeline_run.root_execution_id,
)
response.execution_status_stats = stats
response.execution_summary = summary
return response

return ListPipelineJobsResponse(
Expand All @@ -255,15 +274,20 @@ def create_pipeline_run_response(
next_page_token=next_page_token,
)

def _get_execution_status_stats(
def _get_execution_stats_and_summary(
self,
session: orm.Session,
root_execution_id: bts.IdType,
) -> dict[str, int]:
) -> tuple[dict[str, int], ExecutionStatusSummary]:
stats = self._calculate_execution_status_stats(
session=session, root_execution_id=root_execution_id
)
return {status.value: count for status, count in stats.items()}
summary = ExecutionStatusSummary()
status_stats: dict[str, int] = {}
for status, count in stats.items():
summary.count_execution_status(status=status, count=count)
status_stats[status.value] = count
return status_stats, summary

def _calculate_execution_status_stats(
self, session: orm.Session, root_execution_id: bts.IdType
Expand Down Expand Up @@ -482,22 +506,6 @@ class ArtifactNodeIdResponse:
id: bts.IdType


@dataclasses.dataclass(kw_only=True)
class ExecutionStatusSummary:
total_executions: int = 0
ended_executions: int = 0
has_ended: bool = False

def count_execution_status(
self, *, status: bts.ContainerExecutionStatus, count: int
) -> None:
self.total_executions += count
if status in bts.CONTAINER_STATUSES_ENDED:
self.ended_executions += count

self.has_ended = self.ended_executions == self.total_executions


@dataclasses.dataclass
class GetGraphExecutionStateResponse:
child_execution_status_stats: dict[bts.IdType, dict[str, int]]
Expand Down
10 changes: 8 additions & 2 deletions tests/test_api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,14 @@ def test_list_with_execution_stats(self):
with session_factory() as session:
result = service.list(session=session, include_execution_stats=True)
assert len(result.pipeline_runs) == 1
assert result.pipeline_runs[0].root_execution_id == root_id
stats = result.pipeline_runs[0].execution_status_stats
run = result.pipeline_runs[0]
assert run.root_execution_id == root_id
stats = run.execution_status_stats
assert stats is not None
assert stats["SUCCEEDED"] == 1
assert stats["RUNNING"] == 1
summary = run.execution_summary
assert summary is not None
assert summary.total_executions == 2
assert summary.ended_executions == 1
assert summary.has_ended is False