Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/clgraph/orchestrators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@
- Airflow (2.x and 3.x)
- Dagster (1.x)
- Prefect (2.x and 3.x)
- Kestra (YAML-based declarative workflows)

Example:
from clgraph import Pipeline
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator
from clgraph.orchestrators import (
AirflowOrchestrator,
DagsterOrchestrator,
PrefectOrchestrator,
KestraOrchestrator,
)

pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")

Expand All @@ -26,16 +32,22 @@
# Generate Prefect flow
prefect = PrefectOrchestrator(pipeline)
flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline")

# Generate Kestra flow YAML
kestra = KestraOrchestrator(pipeline)
yaml_content = kestra.to_flow(flow_id="my_pipeline", namespace="clgraph")
"""

from .airflow import AirflowOrchestrator
from .base import BaseOrchestrator
from .dagster import DagsterOrchestrator
from .kestra import KestraOrchestrator
from .prefect import PrefectOrchestrator

__all__ = [
"BaseOrchestrator",
"AirflowOrchestrator",
"DagsterOrchestrator",
"KestraOrchestrator",
"PrefectOrchestrator",
]
226 changes: 226 additions & 0 deletions src/clgraph/orchestrators/kestra.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
"""
Kestra orchestrator integration for clgraph.

Converts clgraph pipelines to Kestra YAML flows.
Kestra is a declarative orchestration platform using YAML-based workflow definitions.
"""

from typing import TYPE_CHECKING, Any, Dict, Optional

import yaml

from .base import BaseOrchestrator

if TYPE_CHECKING:
pass


class KestraOrchestrator(BaseOrchestrator):
"""
Converts clgraph pipelines to Kestra YAML flows.

Kestra uses YAML-based workflow definitions. Tasks are executed sequentially
in the order they're defined. This orchestrator generates tasks in topological
order to ensure dependencies are respected.

Example:
from clgraph.orchestrators import KestraOrchestrator

orchestrator = KestraOrchestrator(pipeline)
yaml_content = orchestrator.to_flow(
flow_id="my_pipeline",
namespace="clgraph.production",
)

# Save to file
with open("flows/my_pipeline.yml", "w") as f:
f.write(yaml_content)
"""

def to_flow(
self,
flow_id: str,
namespace: str,
description: Optional[str] = None,
connection_config: Optional[Dict[str, str]] = None,
retry_attempts: int = 3,
retry_delay: str = "PT1M",
labels: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Generate Kestra flow YAML from pipeline.

Args:
flow_id: Unique identifier for the flow
namespace: Kestra namespace (e.g., "clgraph.production")
description: Optional flow description (auto-generated if not provided)
connection_config: Database connection config:
{
"url": "jdbc:clickhouse://host:port/db",
"username": "default",
"password": ""
}
retry_attempts: Number of retry attempts (default: 3)
retry_delay: ISO 8601 duration between retries (default: PT1M)
labels: Optional key-value labels for flow
**kwargs: Additional flow-level configuration

Returns:
YAML string representing Kestra flow

Examples:
# Basic usage
yaml_content = orchestrator.to_flow(
flow_id="my_pipeline",
namespace="clgraph.production"
)

# With custom connection and schedule
yaml_content = orchestrator.to_flow(
flow_id="daily_analytics",
namespace="clgraph.analytics",
connection_config={
"url": "jdbc:clickhouse://localhost:8123/default",
"username": "default",
"password": "",
},
labels={"env": "production", "team": "analytics"},
)

Note:
- Use to_flow_with_triggers() to add scheduling
- Tasks use io.kestra.plugin.jdbc.clickhouse.Query type
- Tasks are ordered topologically to respect dependencies
"""
table_graph = self.table_graph

# Auto-generate description
if description is None:
query_count = len(table_graph.queries)
table_count = len(table_graph.tables)
description = (
f"Pipeline with {query_count} queries operating on "
f"{table_count} tables. Generated by clgraph."
)

# Default connection config
if connection_config is None:
connection_config = {
"url": "jdbc:clickhouse://clickhouse:8123/default",
"username": "default",
"password": "",
}

# Build tasks list in topological order
# Kestra executes tasks sequentially in the order they're defined,
# so topological ordering ensures dependencies are respected
tasks = []

for query_id in table_graph.topological_sort():
query = table_graph.queries[query_id]
task_id = self._sanitize_name(query_id)

task: Dict[str, Any] = {
"id": task_id,
"type": "io.kestra.plugin.jdbc.clickhouse.Query",
"url": connection_config["url"],
"username": connection_config["username"],
"password": connection_config["password"],
"sql": query.sql,
"retry": {
"type": "constant",
"maxAttempt": retry_attempts,
"interval": retry_delay,
},
}

tasks.append(task)

# Build flow structure
flow: Dict[str, Any] = {
"id": flow_id,
"namespace": namespace,
"description": description,
"labels": labels or {"generator": "clgraph"},
"tasks": tasks,
}

# Add any additional kwargs
flow.update(kwargs)

return yaml.dump(flow, default_flow_style=False, sort_keys=False)

def to_flow_with_triggers(
self,
flow_id: str,
namespace: str,
cron: Optional[str] = None,
**kwargs,
) -> str:
"""
Generate Kestra flow YAML with schedule triggers.

Args:
flow_id: Unique identifier for the flow
namespace: Kestra namespace
cron: Cron expression for scheduling (e.g., "0 0 * * *" for daily)
**kwargs: Additional arguments passed to to_flow()

Returns:
YAML string with triggers configured

Examples:
# Daily at midnight
yaml_content = orchestrator.to_flow_with_triggers(
flow_id="daily_pipeline",
namespace="clgraph.production",
cron="0 0 * * *"
)

# Every hour
yaml_content = orchestrator.to_flow_with_triggers(
flow_id="hourly_pipeline",
namespace="clgraph.production",
cron="0 * * * *"
)
"""
flow_yaml = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs)
flow_dict = yaml.safe_load(flow_yaml)

if cron:
flow_dict["triggers"] = [
{
"id": "schedule",
"type": "io.kestra.plugin.core.trigger.Schedule",
"cron": cron,
}
]

return yaml.dump(flow_dict, default_flow_style=False, sort_keys=False)

def to_flow_dict(
self,
flow_id: str,
namespace: str,
**kwargs,
) -> Dict[str, Any]:
"""
Generate Kestra flow as a Python dictionary.

Useful when you need to manipulate the flow structure programmatically
before serializing to YAML.

Args:
flow_id: Unique identifier for the flow
namespace: Kestra namespace
**kwargs: Additional arguments passed to to_flow()

Returns:
Dictionary representing Kestra flow structure
"""
yaml_content = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs)
return yaml.safe_load(yaml_content)


__all__ = ["KestraOrchestrator"]
94 changes: 94 additions & 0 deletions src/clgraph/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,100 @@ def to_prefect_deployment(
**kwargs,
)

# ========================================================================
# Orchestrator Methods - Kestra
# ========================================================================

def to_kestra_flow(
self,
flow_id: str,
namespace: str,
description: Optional[str] = None,
connection_config: Optional[Dict[str, str]] = None,
cron: Optional[str] = None,
retry_attempts: int = 3,
labels: Optional[Dict[str, str]] = None,
**kwargs,
) -> str:
"""
Generate Kestra flow YAML from this pipeline.

Kestra is a declarative orchestration platform using YAML-based
workflow definitions. This method generates a complete flow file.

Args:
flow_id: Unique identifier for the flow
namespace: Kestra namespace (e.g., "clgraph.production")
description: Optional flow description (auto-generated if not provided)
connection_config: Database connection config dict:
{
"url": "jdbc:clickhouse://host:port/db",
"username": "default",
"password": "",
}
cron: Optional cron expression for scheduling (e.g., "0 0 * * *")
retry_attempts: Number of retry attempts (default: 3)
labels: Optional key-value labels for the flow
**kwargs: Additional flow configuration

Returns:
YAML string representing Kestra flow

Examples:
# Basic usage
yaml_content = pipeline.to_kestra_flow(
flow_id="enterprise_pipeline",
namespace="clgraph.production"
)

# Save to file
with open("flows/enterprise_pipeline.yml", "w") as f:
f.write(yaml_content)

# With schedule and custom connection
yaml_content = pipeline.to_kestra_flow(
flow_id="daily_analytics",
namespace="clgraph.analytics",
cron="0 0 * * *", # Daily at midnight
connection_config={
"url": "jdbc:clickhouse://localhost:8123/default",
"username": "default",
"password": "",
},
labels={"env": "production", "team": "analytics"},
)

Note:
- Kestra uses io.kestra.plugin.jdbc.clickhouse.Query task type
- Dependencies are managed via dependsOn field
- Install Kestra ClickHouse plugin for database connectivity
"""
from .orchestrators import KestraOrchestrator

orchestrator = KestraOrchestrator(self)

if cron:
return orchestrator.to_flow_with_triggers(
flow_id=flow_id,
namespace=namespace,
description=description,
connection_config=connection_config,
cron=cron,
retry_attempts=retry_attempts,
labels=labels,
**kwargs,
)
else:
return orchestrator.to_flow(
flow_id=flow_id,
namespace=namespace,
description=description,
connection_config=connection_config,
retry_attempts=retry_attempts,
labels=labels,
**kwargs,
)

# ========================================================================
# Validation Methods
# ========================================================================
Expand Down
Loading
Loading