From f827ae2b048fed45a7c5226b18a2c5073745747f Mon Sep 17 00:00:00 2001 From: Ming-Jer Lee Date: Mon, 12 Jan 2026 18:03:52 -0800 Subject: [PATCH 1/2] feat: Add Kestra orchestrator support - Add KestraOrchestrator class in orchestrators/kestra.py - Add to_kestra_flow() method to Pipeline class - Generate YAML flows with automatic task dependencies - Support cron schedules, custom connection config, and labels - Add comprehensive test suite (24 tests) The Kestra orchestrator generates YAML workflow definitions compatible with Kestra's declarative orchestration platform. Co-Authored-By: Claude Opus 4.5 --- src/clgraph/orchestrators/__init__.py | 14 +- src/clgraph/orchestrators/kestra.py | 236 +++++++++++++ src/clgraph/pipeline.py | 94 +++++ tests/test_kestra_integration.py | 471 ++++++++++++++++++++++++++ 4 files changed, 814 insertions(+), 1 deletion(-) create mode 100644 src/clgraph/orchestrators/kestra.py create mode 100644 tests/test_kestra_integration.py diff --git a/src/clgraph/orchestrators/__init__.py b/src/clgraph/orchestrators/__init__.py index 32cb64c..b98bc7b 100644 --- a/src/clgraph/orchestrators/__init__.py +++ b/src/clgraph/orchestrators/__init__.py @@ -8,10 +8,16 @@ - Airflow (2.x and 3.x) - Dagster (1.x) - Prefect (2.x and 3.x) +- Kestra (YAML-based declarative workflows) Example: from clgraph import Pipeline - from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator + from clgraph.orchestrators import ( + AirflowOrchestrator, + DagsterOrchestrator, + PrefectOrchestrator, + KestraOrchestrator, + ) pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery") @@ -26,16 +32,22 @@ # Generate Prefect flow prefect = PrefectOrchestrator(pipeline) flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline") + + # Generate Kestra flow YAML + kestra = KestraOrchestrator(pipeline) + yaml_content = kestra.to_flow(flow_id="my_pipeline", namespace="clgraph") """ from .airflow import AirflowOrchestrator from .base import BaseOrchestrator from .dagster import DagsterOrchestrator +from .kestra import KestraOrchestrator from .prefect import PrefectOrchestrator __all__ = [ "BaseOrchestrator", "AirflowOrchestrator", "DagsterOrchestrator", + "KestraOrchestrator", "PrefectOrchestrator", ] diff --git a/src/clgraph/orchestrators/kestra.py b/src/clgraph/orchestrators/kestra.py new file mode 100644 index 0000000..11f5660 --- /dev/null +++ b/src/clgraph/orchestrators/kestra.py @@ -0,0 +1,236 @@ +""" +Kestra orchestrator integration for clgraph. + +Converts clgraph pipelines to Kestra YAML flows. +Kestra is a declarative orchestration platform using YAML-based workflow definitions. +""" + +from typing import TYPE_CHECKING, Any, Dict, Optional + +import yaml + +from .base import BaseOrchestrator + +if TYPE_CHECKING: + pass + + +class KestraOrchestrator(BaseOrchestrator): + """ + Converts clgraph pipelines to Kestra YAML flows. + + Kestra uses YAML-based workflow definitions with tasks that can have + dependencies via the `dependsOn` field. + + Example: + from clgraph.orchestrators import KestraOrchestrator + + orchestrator = KestraOrchestrator(pipeline) + yaml_content = orchestrator.to_flow( + flow_id="my_pipeline", + namespace="clgraph.production", + ) + + # Save to file + with open("flows/my_pipeline.yml", "w") as f: + f.write(yaml_content) + """ + + def to_flow( + self, + flow_id: str, + namespace: str, + description: Optional[str] = None, + connection_config: Optional[Dict[str, str]] = None, + retry_attempts: int = 3, + retry_delay: str = "PT1M", + labels: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML from pipeline. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace (e.g., "clgraph.production") + description: Optional flow description (auto-generated if not provided) + connection_config: Database connection config: + { + "url": "jdbc:clickhouse://host:port/db", + "username": "default", + "password": "" + } + retry_attempts: Number of retry attempts (default: 3) + retry_delay: ISO 8601 duration between retries (default: PT1M) + labels: Optional key-value labels for flow + **kwargs: Additional flow-level configuration + + Returns: + YAML string representing Kestra flow + + Examples: + # Basic usage + yaml_content = orchestrator.to_flow( + flow_id="my_pipeline", + namespace="clgraph.production" + ) + + # With custom connection and schedule + yaml_content = orchestrator.to_flow( + flow_id="daily_analytics", + namespace="clgraph.analytics", + connection_config={ + "url": "jdbc:clickhouse://localhost:8123/default", + "username": "default", + "password": "", + }, + labels={"env": "production", "team": "analytics"}, + ) + + Note: + - Use to_flow_with_triggers() to add scheduling + - Tasks use io.kestra.plugin.jdbc.clickhouse.Query type + - Dependencies are managed via dependsOn field + """ + table_graph = self.table_graph + + # Auto-generate description + if description is None: + query_count = len(table_graph.queries) + table_count = len(table_graph.tables) + description = ( + f"Pipeline with {query_count} queries operating on " + f"{table_count} tables. Generated by clgraph." + ) + + # Default connection config + if connection_config is None: + connection_config = { + "url": "jdbc:clickhouse://clickhouse:8123/default", + "username": "default", + "password": "", + } + + # Build tasks list + tasks = [] + task_id_mapping: Dict[str, str] = {} # query_id -> task_id + + for query_id in table_graph.topological_sort(): + query = table_graph.queries[query_id] + task_id = self._sanitize_name(query_id) + task_id_mapping[query_id] = task_id + + # Find dependencies + depends_on = [] + for source_table in query.source_tables: + if source_table in table_graph.tables: + table_node = table_graph.tables[source_table] + if table_node.created_by and table_node.created_by in task_id_mapping: + depends_on.append(task_id_mapping[table_node.created_by]) + + task: Dict[str, Any] = { + "id": task_id, + "type": "io.kestra.plugin.jdbc.clickhouse.Query", + "url": connection_config["url"], + "username": connection_config["username"], + "password": connection_config["password"], + "sql": query.sql, + "retry": { + "type": "constant", + "maxAttempt": retry_attempts, + "interval": retry_delay, + }, + } + + if depends_on: + task["dependsOn"] = depends_on + + tasks.append(task) + + # Build flow structure + flow: Dict[str, Any] = { + "id": flow_id, + "namespace": namespace, + "description": description, + "labels": labels or {"generator": "clgraph"}, + "tasks": tasks, + } + + # Add any additional kwargs + flow.update(kwargs) + + return yaml.dump(flow, default_flow_style=False, sort_keys=False) + + def to_flow_with_triggers( + self, + flow_id: str, + namespace: str, + cron: Optional[str] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML with schedule triggers. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace + cron: Cron expression for scheduling (e.g., "0 0 * * *" for daily) + **kwargs: Additional arguments passed to to_flow() + + Returns: + YAML string with triggers configured + + Examples: + # Daily at midnight + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="daily_pipeline", + namespace="clgraph.production", + cron="0 0 * * *" + ) + + # Every hour + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="hourly_pipeline", + namespace="clgraph.production", + cron="0 * * * *" + ) + """ + flow_yaml = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs) + flow_dict = yaml.safe_load(flow_yaml) + + if cron: + flow_dict["triggers"] = [ + { + "id": "schedule", + "type": "io.kestra.plugin.core.trigger.Schedule", + "cron": cron, + } + ] + + return yaml.dump(flow_dict, default_flow_style=False, sort_keys=False) + + def to_flow_dict( + self, + flow_id: str, + namespace: str, + **kwargs, + ) -> Dict[str, Any]: + """ + Generate Kestra flow as a Python dictionary. + + Useful when you need to manipulate the flow structure programmatically + before serializing to YAML. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace + **kwargs: Additional arguments passed to to_flow() + + Returns: + Dictionary representing Kestra flow structure + """ + yaml_content = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs) + return yaml.safe_load(yaml_content) + + +__all__ = ["KestraOrchestrator"] diff --git a/src/clgraph/pipeline.py b/src/clgraph/pipeline.py index 42bc559..edc6192 100644 --- a/src/clgraph/pipeline.py +++ b/src/clgraph/pipeline.py @@ -2501,6 +2501,100 @@ def to_prefect_deployment( **kwargs, ) + # ======================================================================== + # Orchestrator Methods - Kestra + # ======================================================================== + + def to_kestra_flow( + self, + flow_id: str, + namespace: str, + description: Optional[str] = None, + connection_config: Optional[Dict[str, str]] = None, + cron: Optional[str] = None, + retry_attempts: int = 3, + labels: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML from this pipeline. + + Kestra is a declarative orchestration platform using YAML-based + workflow definitions. This method generates a complete flow file. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace (e.g., "clgraph.production") + description: Optional flow description (auto-generated if not provided) + connection_config: Database connection config dict: + { + "url": "jdbc:clickhouse://host:port/db", + "username": "default", + "password": "", + } + cron: Optional cron expression for scheduling (e.g., "0 0 * * *") + retry_attempts: Number of retry attempts (default: 3) + labels: Optional key-value labels for the flow + **kwargs: Additional flow configuration + + Returns: + YAML string representing Kestra flow + + Examples: + # Basic usage + yaml_content = pipeline.to_kestra_flow( + flow_id="enterprise_pipeline", + namespace="clgraph.production" + ) + + # Save to file + with open("flows/enterprise_pipeline.yml", "w") as f: + f.write(yaml_content) + + # With schedule and custom connection + yaml_content = pipeline.to_kestra_flow( + flow_id="daily_analytics", + namespace="clgraph.analytics", + cron="0 0 * * *", # Daily at midnight + connection_config={ + "url": "jdbc:clickhouse://localhost:8123/default", + "username": "default", + "password": "", + }, + labels={"env": "production", "team": "analytics"}, + ) + + Note: + - Kestra uses io.kestra.plugin.jdbc.clickhouse.Query task type + - Dependencies are managed via dependsOn field + - Install Kestra ClickHouse plugin for database connectivity + """ + from .orchestrators import KestraOrchestrator + + orchestrator = KestraOrchestrator(self) + + if cron: + return orchestrator.to_flow_with_triggers( + flow_id=flow_id, + namespace=namespace, + description=description, + connection_config=connection_config, + cron=cron, + retry_attempts=retry_attempts, + labels=labels, + **kwargs, + ) + else: + return orchestrator.to_flow( + flow_id=flow_id, + namespace=namespace, + description=description, + connection_config=connection_config, + retry_attempts=retry_attempts, + labels=labels, + **kwargs, + ) + # ======================================================================== # Validation Methods # ======================================================================== diff --git a/tests/test_kestra_integration.py b/tests/test_kestra_integration.py new file mode 100644 index 0000000..8408818 --- /dev/null +++ b/tests/test_kestra_integration.py @@ -0,0 +1,471 @@ +""" +Tests for Kestra orchestrator integration. + +Tests the to_kestra_flow() method and KestraOrchestrator class. +""" + +import yaml + +from clgraph import Pipeline + + +def mock_executor(sql: str) -> None: + """Mock executor for testing.""" + pass + + +class TestToKestraFlowBasic: + """Basic tests for to_kestra_flow method.""" + + def test_basic_flow_generation(self): + """Test basic Kestra flow YAML generation.""" + pipeline = Pipeline( + [ + ("staging", "CREATE TABLE staging AS SELECT 1 as id"), + ("analytics", "CREATE TABLE analytics AS SELECT * FROM staging"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="test_flow", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + + assert flow["id"] == "test_flow" + assert flow["namespace"] == "clgraph.test" + assert len(flow["tasks"]) == 2 + + def test_flow_with_description(self): + """Test flow with custom description.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="desc_flow", + namespace="clgraph.test", + description="Custom description for testing", + ) + + flow = yaml.safe_load(yaml_content) + assert flow["description"] == "Custom description for testing" + + def test_flow_auto_description(self): + """Test auto-generated description.""" + pipeline = Pipeline( + [ + ("q1", "CREATE TABLE t1 AS SELECT 1"), + ("q2", "CREATE TABLE t2 AS SELECT * FROM t1"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="auto_desc", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert "2 queries" in flow["description"] + assert "clgraph" in flow["description"] + + def test_task_structure(self): + """Test task structure contains required fields.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="task_test", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["id"] == "query1" + assert task["type"] == "io.kestra.plugin.jdbc.clickhouse.Query" + assert "url" in task + assert "username" in task + assert "sql" in task + assert "retry" in task + + def test_task_sql_content(self): + """Test that SQL is correctly embedded in tasks.""" + sql = "CREATE TABLE test_table AS SELECT 42 as value" + pipeline = Pipeline([("query1", sql)]) + + yaml_content = pipeline.to_kestra_flow(flow_id="sql_test", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert flow["tasks"][0]["sql"] == sql + + +class TestToKestraFlowDependencies: + """Test Kestra flow dependency handling.""" + + def test_linear_dependencies(self): + """Test linear dependency chain.""" + pipeline = Pipeline( + [ + ("step1", "CREATE TABLE step1 AS SELECT 1"), + ("step2", "CREATE TABLE step2 AS SELECT * FROM step1"), + ("step3", "CREATE TABLE step3 AS SELECT * FROM step2"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="linear_deps", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + tasks_by_id = {t["id"]: t for t in flow["tasks"]} + + # step1 has no dependencies + assert "dependsOn" not in tasks_by_id["step1"] + # step2 depends on step1 + assert tasks_by_id["step2"]["dependsOn"] == ["step1"] + # step3 depends on step2 + assert tasks_by_id["step3"]["dependsOn"] == ["step2"] + + def test_diamond_dependencies(self): + """Test diamond pattern dependencies.""" + pipeline = Pipeline( + [ + ("source", "CREATE TABLE source AS SELECT 1 as id"), + ("left_branch", "CREATE TABLE left_branch AS SELECT * FROM source"), + ("right_branch", "CREATE TABLE right_branch AS SELECT * FROM source"), + ( + "final", + "CREATE TABLE final AS SELECT * FROM left_branch, right_branch", + ), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="diamond_deps", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + tasks_by_id = {t["id"]: t for t in flow["tasks"]} + + # source has no dependencies + assert "dependsOn" not in tasks_by_id["source"] + # left and right depend on source + assert tasks_by_id["left_branch"]["dependsOn"] == ["source"] + assert tasks_by_id["right_branch"]["dependsOn"] == ["source"] + # final depends on both left and right + final_deps = set(tasks_by_id["final"]["dependsOn"]) + assert final_deps == {"left_branch", "right_branch"} + + def test_no_dependencies_for_source_queries(self): + """Test that source queries have no dependsOn field.""" + pipeline = Pipeline( + [ + ("source1", "CREATE TABLE source1 AS SELECT 1"), + ("source2", "CREATE TABLE source2 AS SELECT 2"), + ( + "combined", + "CREATE TABLE combined AS SELECT * FROM source1, source2", + ), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="multi_source", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + tasks_by_id = {t["id"]: t for t in flow["tasks"]} + + assert "dependsOn" not in tasks_by_id["source1"] + assert "dependsOn" not in tasks_by_id["source2"] + + +class TestToKestraFlowScheduling: + """Test Kestra flow scheduling.""" + + def test_flow_with_cron(self): + """Test flow with cron schedule.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="scheduled_flow", namespace="clgraph.test", cron="0 0 * * *" + ) + + flow = yaml.safe_load(yaml_content) + + assert "triggers" in flow + assert len(flow["triggers"]) == 1 + trigger = flow["triggers"][0] + assert trigger["type"] == "io.kestra.plugin.core.trigger.Schedule" + assert trigger["cron"] == "0 0 * * *" + + def test_flow_without_schedule(self): + """Test flow without schedule has no triggers.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="unscheduled_flow", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert "triggers" not in flow + + def test_hourly_cron(self): + """Test hourly cron schedule.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="hourly_flow", namespace="clgraph.test", cron="0 * * * *" + ) + + flow = yaml.safe_load(yaml_content) + assert flow["triggers"][0]["cron"] == "0 * * * *" + + +class TestToKestraFlowConfiguration: + """Test Kestra flow configuration options.""" + + def test_custom_connection_config(self): + """Test custom connection configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="custom_conn", + namespace="clgraph.test", + connection_config={ + "url": "jdbc:clickhouse://custom-host:8123/mydb", + "username": "myuser", + "password": "mypassword", + }, + ) + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["url"] == "jdbc:clickhouse://custom-host:8123/mydb" + assert task["username"] == "myuser" + assert task["password"] == "mypassword" + + def test_default_connection_config(self): + """Test default connection configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="default_conn", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["url"] == "jdbc:clickhouse://clickhouse:8123/default" + assert task["username"] == "default" + assert task["password"] == "" + + def test_custom_retry_config(self): + """Test custom retry configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="retry_test", namespace="clgraph.test", retry_attempts=5 + ) + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["retry"]["maxAttempt"] == 5 + + def test_custom_labels(self): + """Test custom labels.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="labels_test", + namespace="clgraph.test", + labels={"env": "production", "team": "analytics"}, + ) + + flow = yaml.safe_load(yaml_content) + + assert flow["labels"]["env"] == "production" + assert flow["labels"]["team"] == "analytics" + + def test_default_labels(self): + """Test default labels include generator tag.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="default_labels", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert flow["labels"]["generator"] == "clgraph" + + +class TestToKestraFlowOutput: + """Test Kestra flow output format.""" + + def test_valid_yaml_output(self): + """Test that output is valid, parseable YAML.""" + pipeline = Pipeline( + [ + ("q1", "CREATE TABLE t1 AS SELECT 1"), + ("q2", "CREATE TABLE t2 AS SELECT * FROM t1"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="valid_yaml", namespace="clgraph.test") + + # Should not raise + flow = yaml.safe_load(yaml_content) + assert isinstance(flow, dict) + + def test_yaml_is_string(self): + """Test that to_kestra_flow returns a string.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + result = pipeline.to_kestra_flow(flow_id="string_test", namespace="clgraph.test") + + assert isinstance(result, str) + + def test_yaml_has_required_fields(self): + """Test YAML has all required Kestra fields.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="required_fields", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + + # Required Kestra flow fields + assert "id" in flow + assert "namespace" in flow + assert "tasks" in flow + + +class TestKestraOrchestrator: + """Test KestraOrchestrator class directly.""" + + def test_orchestrator_initialization(self): + """Test KestraOrchestrator initialization.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + assert orchestrator.pipeline == pipeline + assert orchestrator.table_graph == pipeline.table_graph + + def test_to_flow_dict(self): + """Test to_flow_dict returns dictionary.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + result = orchestrator.to_flow_dict(flow_id="dict_test", namespace="clgraph.test") + + assert isinstance(result, dict) + assert result["id"] == "dict_test" + assert result["namespace"] == "clgraph.test" + + def test_to_flow_with_triggers(self): + """Test to_flow_with_triggers adds triggers.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="trigger_test", namespace="clgraph.test", cron="0 0 * * *" + ) + + flow = yaml.safe_load(yaml_content) + assert "triggers" in flow + assert flow["triggers"][0]["cron"] == "0 0 * * *" + + +class TestKestraFlowComplexPipeline: + """Test Kestra flow generation with complex pipelines.""" + + def test_enterprise_like_pipeline(self): + """Test with a pipeline similar to enterprise demo.""" + pipeline = Pipeline( + [ + ( + "raw_sales", + """ + CREATE TABLE raw_sales AS + SELECT + toDate('2024-01-01') + number as date, + number % 100 as product_id, + number % 10 as region_id, + rand() % 1000 as amount + FROM numbers(1000) + """, + ), + ( + "raw_products", + """ + CREATE TABLE raw_products AS + SELECT + number as product_id, + concat('Product ', toString(number)) as product_name, + rand() % 5 as category_id + FROM numbers(100) + """, + ), + ( + "sales_with_products", + """ + CREATE TABLE sales_with_products AS + SELECT + s.date, + s.product_id, + p.product_name, + s.region_id, + s.amount + FROM raw_sales s + JOIN raw_products p ON s.product_id = p.product_id + """, + ), + ( + "daily_summary", + """ + CREATE TABLE daily_summary AS + SELECT + date, + count() as num_sales, + sum(amount) as total_amount + FROM sales_with_products + GROUP BY date + """, + ), + ], + dialect="clickhouse", + ) + + yaml_content = pipeline.to_kestra_flow( + flow_id="enterprise_pipeline", + namespace="clgraph.enterprise", + labels={"env": "production", "source": "clgraph"}, + ) + + flow = yaml.safe_load(yaml_content) + + assert flow["id"] == "enterprise_pipeline" + assert flow["namespace"] == "clgraph.enterprise" + assert len(flow["tasks"]) == 4 + + tasks_by_id = {t["id"]: t for t in flow["tasks"]} + + # Verify dependencies + assert "dependsOn" not in tasks_by_id["raw_sales"] + assert "dependsOn" not in tasks_by_id["raw_products"] + assert set(tasks_by_id["sales_with_products"]["dependsOn"]) == { + "raw_sales", + "raw_products", + } + assert tasks_by_id["daily_summary"]["dependsOn"] == ["sales_with_products"] + + def test_many_queries_pipeline(self): + """Test with many queries.""" + queries = [] + for i in range(10): + if i == 0: + queries.append((f"step_{i}", f"CREATE TABLE step_{i} AS SELECT {i}")) + else: + queries.append( + ( + f"step_{i}", + f"CREATE TABLE step_{i} AS SELECT * FROM step_{i - 1}", + ) + ) + + pipeline = Pipeline(queries) + + yaml_content = pipeline.to_kestra_flow(flow_id="many_queries", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert len(flow["tasks"]) == 10 + + # Verify chain dependencies + tasks_by_id = {t["id"]: t for t in flow["tasks"]} + for i in range(1, 10): + assert tasks_by_id[f"step_{i}"]["dependsOn"] == [f"step_{i - 1}"] From 8032c1c63f18db5e7fc879e383251164ddc47ab3 Mon Sep 17 00:00:00 2001 From: Ming-Jer Lee Date: Mon, 12 Jan 2026 18:41:34 -0800 Subject: [PATCH 2/2] fix: Remove dependsOn from Kestra tasks (not supported by JDBC plugins) Kestra's io.kestra.plugin.jdbc.clickhouse.Query task type doesn't support the dependsOn field. Instead, we rely on topological ordering of tasks, since Kestra executes tasks sequentially in the order they're defined. Changes: - Remove dependsOn field generation from KestraOrchestrator - Update tests to verify topological ordering instead of dependsOn - Update docstrings to reflect actual behavior Co-Authored-By: Claude Opus 4.5 --- src/clgraph/orchestrators/kestra.py | 24 +++------- tests/test_kestra_integration.py | 69 +++++++++++++---------------- 2 files changed, 38 insertions(+), 55 deletions(-) diff --git a/src/clgraph/orchestrators/kestra.py b/src/clgraph/orchestrators/kestra.py index 11f5660..259aa8e 100644 --- a/src/clgraph/orchestrators/kestra.py +++ b/src/clgraph/orchestrators/kestra.py @@ -19,8 +19,9 @@ class KestraOrchestrator(BaseOrchestrator): """ Converts clgraph pipelines to Kestra YAML flows. - Kestra uses YAML-based workflow definitions with tasks that can have - dependencies via the `dependsOn` field. + Kestra uses YAML-based workflow definitions. Tasks are executed sequentially + in the order they're defined. This orchestrator generates tasks in topological + order to ensure dependencies are respected. Example: from clgraph.orchestrators import KestraOrchestrator @@ -90,7 +91,7 @@ def to_flow( Note: - Use to_flow_with_triggers() to add scheduling - Tasks use io.kestra.plugin.jdbc.clickhouse.Query type - - Dependencies are managed via dependsOn field + - Tasks are ordered topologically to respect dependencies """ table_graph = self.table_graph @@ -111,22 +112,14 @@ def to_flow( "password": "", } - # Build tasks list + # Build tasks list in topological order + # Kestra executes tasks sequentially in the order they're defined, + # so topological ordering ensures dependencies are respected tasks = [] - task_id_mapping: Dict[str, str] = {} # query_id -> task_id for query_id in table_graph.topological_sort(): query = table_graph.queries[query_id] task_id = self._sanitize_name(query_id) - task_id_mapping[query_id] = task_id - - # Find dependencies - depends_on = [] - for source_table in query.source_tables: - if source_table in table_graph.tables: - table_node = table_graph.tables[source_table] - if table_node.created_by and table_node.created_by in task_id_mapping: - depends_on.append(task_id_mapping[table_node.created_by]) task: Dict[str, Any] = { "id": task_id, @@ -142,9 +135,6 @@ def to_flow( }, } - if depends_on: - task["dependsOn"] = depends_on - tasks.append(task) # Build flow structure diff --git a/tests/test_kestra_integration.py b/tests/test_kestra_integration.py index 8408818..3b31c99 100644 --- a/tests/test_kestra_integration.py +++ b/tests/test_kestra_integration.py @@ -90,10 +90,10 @@ def test_task_sql_content(self): class TestToKestraFlowDependencies: - """Test Kestra flow dependency handling.""" + """Test Kestra flow dependency handling via topological ordering.""" def test_linear_dependencies(self): - """Test linear dependency chain.""" + """Test linear dependency chain - tasks should be in topological order.""" pipeline = Pipeline( [ ("step1", "CREATE TABLE step1 AS SELECT 1"), @@ -105,17 +105,14 @@ def test_linear_dependencies(self): yaml_content = pipeline.to_kestra_flow(flow_id="linear_deps", namespace="clgraph.test") flow = yaml.safe_load(yaml_content) - tasks_by_id = {t["id"]: t for t in flow["tasks"]} + task_ids = [t["id"] for t in flow["tasks"]] - # step1 has no dependencies - assert "dependsOn" not in tasks_by_id["step1"] - # step2 depends on step1 - assert tasks_by_id["step2"]["dependsOn"] == ["step1"] - # step3 depends on step2 - assert tasks_by_id["step3"]["dependsOn"] == ["step2"] + # Tasks should be in topological order: step1 before step2 before step3 + assert task_ids.index("step1") < task_ids.index("step2") + assert task_ids.index("step2") < task_ids.index("step3") def test_diamond_dependencies(self): - """Test diamond pattern dependencies.""" + """Test diamond pattern dependencies - tasks should be in valid topological order.""" pipeline = Pipeline( [ ("source", "CREATE TABLE source AS SELECT 1 as id"), @@ -131,19 +128,17 @@ def test_diamond_dependencies(self): yaml_content = pipeline.to_kestra_flow(flow_id="diamond_deps", namespace="clgraph.test") flow = yaml.safe_load(yaml_content) - tasks_by_id = {t["id"]: t for t in flow["tasks"]} - - # source has no dependencies - assert "dependsOn" not in tasks_by_id["source"] - # left and right depend on source - assert tasks_by_id["left_branch"]["dependsOn"] == ["source"] - assert tasks_by_id["right_branch"]["dependsOn"] == ["source"] - # final depends on both left and right - final_deps = set(tasks_by_id["final"]["dependsOn"]) - assert final_deps == {"left_branch", "right_branch"} - - def test_no_dependencies_for_source_queries(self): - """Test that source queries have no dependsOn field.""" + task_ids = [t["id"] for t in flow["tasks"]] + + # source should come before left and right branches + assert task_ids.index("source") < task_ids.index("left_branch") + assert task_ids.index("source") < task_ids.index("right_branch") + # final should come after both branches + assert task_ids.index("left_branch") < task_ids.index("final") + assert task_ids.index("right_branch") < task_ids.index("final") + + def test_no_depends_on_field(self): + """Test that tasks don't have dependsOn field (Kestra uses sequential execution).""" pipeline = Pipeline( [ ("source1", "CREATE TABLE source1 AS SELECT 1"), @@ -158,10 +153,10 @@ def test_no_dependencies_for_source_queries(self): yaml_content = pipeline.to_kestra_flow(flow_id="multi_source", namespace="clgraph.test") flow = yaml.safe_load(yaml_content) - tasks_by_id = {t["id"]: t for t in flow["tasks"]} - assert "dependsOn" not in tasks_by_id["source1"] - assert "dependsOn" not in tasks_by_id["source2"] + # No task should have dependsOn field + for task in flow["tasks"]: + assert "dependsOn" not in task class TestToKestraFlowScheduling: @@ -433,16 +428,14 @@ def test_enterprise_like_pipeline(self): assert flow["namespace"] == "clgraph.enterprise" assert len(flow["tasks"]) == 4 - tasks_by_id = {t["id"]: t for t in flow["tasks"]} + task_ids = [t["id"] for t in flow["tasks"]] - # Verify dependencies - assert "dependsOn" not in tasks_by_id["raw_sales"] - assert "dependsOn" not in tasks_by_id["raw_products"] - assert set(tasks_by_id["sales_with_products"]["dependsOn"]) == { - "raw_sales", - "raw_products", - } - assert tasks_by_id["daily_summary"]["dependsOn"] == ["sales_with_products"] + # Verify topological ordering (no dependsOn field, just correct order) + # raw_sales and raw_products should come before sales_with_products + assert task_ids.index("raw_sales") < task_ids.index("sales_with_products") + assert task_ids.index("raw_products") < task_ids.index("sales_with_products") + # sales_with_products should come before daily_summary + assert task_ids.index("sales_with_products") < task_ids.index("daily_summary") def test_many_queries_pipeline(self): """Test with many queries.""" @@ -465,7 +458,7 @@ def test_many_queries_pipeline(self): flow = yaml.safe_load(yaml_content) assert len(flow["tasks"]) == 10 - # Verify chain dependencies - tasks_by_id = {t["id"]: t for t in flow["tasks"]} + # Verify topological ordering (each step should come after its predecessor) + task_ids = [t["id"] for t in flow["tasks"]] for i in range(1, 10): - assert tasks_by_id[f"step_{i}"]["dependsOn"] == [f"step_{i - 1}"] + assert task_ids.index(f"step_{i - 1}") < task_ids.index(f"step_{i}")