diff --git a/src/clgraph/orchestrators/__init__.py b/src/clgraph/orchestrators/__init__.py index 32cb64c..b98bc7b 100644 --- a/src/clgraph/orchestrators/__init__.py +++ b/src/clgraph/orchestrators/__init__.py @@ -8,10 +8,16 @@ - Airflow (2.x and 3.x) - Dagster (1.x) - Prefect (2.x and 3.x) +- Kestra (YAML-based declarative workflows) Example: from clgraph import Pipeline - from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator + from clgraph.orchestrators import ( + AirflowOrchestrator, + DagsterOrchestrator, + PrefectOrchestrator, + KestraOrchestrator, + ) pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery") @@ -26,16 +32,22 @@ # Generate Prefect flow prefect = PrefectOrchestrator(pipeline) flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline") + + # Generate Kestra flow YAML + kestra = KestraOrchestrator(pipeline) + yaml_content = kestra.to_flow(flow_id="my_pipeline", namespace="clgraph") """ from .airflow import AirflowOrchestrator from .base import BaseOrchestrator from .dagster import DagsterOrchestrator +from .kestra import KestraOrchestrator from .prefect import PrefectOrchestrator __all__ = [ "BaseOrchestrator", "AirflowOrchestrator", "DagsterOrchestrator", + "KestraOrchestrator", "PrefectOrchestrator", ] diff --git a/src/clgraph/orchestrators/kestra.py b/src/clgraph/orchestrators/kestra.py new file mode 100644 index 0000000..259aa8e --- /dev/null +++ b/src/clgraph/orchestrators/kestra.py @@ -0,0 +1,226 @@ +""" +Kestra orchestrator integration for clgraph. + +Converts clgraph pipelines to Kestra YAML flows. +Kestra is a declarative orchestration platform using YAML-based workflow definitions. +""" + +from typing import TYPE_CHECKING, Any, Dict, Optional + +import yaml + +from .base import BaseOrchestrator + +if TYPE_CHECKING: + pass + + +class KestraOrchestrator(BaseOrchestrator): + """ + Converts clgraph pipelines to Kestra YAML flows. + + Kestra uses YAML-based workflow definitions. Tasks are executed sequentially + in the order they're defined. This orchestrator generates tasks in topological + order to ensure dependencies are respected. + + Example: + from clgraph.orchestrators import KestraOrchestrator + + orchestrator = KestraOrchestrator(pipeline) + yaml_content = orchestrator.to_flow( + flow_id="my_pipeline", + namespace="clgraph.production", + ) + + # Save to file + with open("flows/my_pipeline.yml", "w") as f: + f.write(yaml_content) + """ + + def to_flow( + self, + flow_id: str, + namespace: str, + description: Optional[str] = None, + connection_config: Optional[Dict[str, str]] = None, + retry_attempts: int = 3, + retry_delay: str = "PT1M", + labels: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML from pipeline. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace (e.g., "clgraph.production") + description: Optional flow description (auto-generated if not provided) + connection_config: Database connection config: + { + "url": "jdbc:clickhouse://host:port/db", + "username": "default", + "password": "" + } + retry_attempts: Number of retry attempts (default: 3) + retry_delay: ISO 8601 duration between retries (default: PT1M) + labels: Optional key-value labels for flow + **kwargs: Additional flow-level configuration + + Returns: + YAML string representing Kestra flow + + Examples: + # Basic usage + yaml_content = orchestrator.to_flow( + flow_id="my_pipeline", + namespace="clgraph.production" + ) + + # With custom connection and schedule + yaml_content = orchestrator.to_flow( + flow_id="daily_analytics", + namespace="clgraph.analytics", + connection_config={ + "url": "jdbc:clickhouse://localhost:8123/default", + "username": "default", + "password": "", + }, + labels={"env": "production", "team": "analytics"}, + ) + + Note: + - Use to_flow_with_triggers() to add scheduling + - Tasks use io.kestra.plugin.jdbc.clickhouse.Query type + - Tasks are ordered topologically to respect dependencies + """ + table_graph = self.table_graph + + # Auto-generate description + if description is None: + query_count = len(table_graph.queries) + table_count = len(table_graph.tables) + description = ( + f"Pipeline with {query_count} queries operating on " + f"{table_count} tables. Generated by clgraph." + ) + + # Default connection config + if connection_config is None: + connection_config = { + "url": "jdbc:clickhouse://clickhouse:8123/default", + "username": "default", + "password": "", + } + + # Build tasks list in topological order + # Kestra executes tasks sequentially in the order they're defined, + # so topological ordering ensures dependencies are respected + tasks = [] + + for query_id in table_graph.topological_sort(): + query = table_graph.queries[query_id] + task_id = self._sanitize_name(query_id) + + task: Dict[str, Any] = { + "id": task_id, + "type": "io.kestra.plugin.jdbc.clickhouse.Query", + "url": connection_config["url"], + "username": connection_config["username"], + "password": connection_config["password"], + "sql": query.sql, + "retry": { + "type": "constant", + "maxAttempt": retry_attempts, + "interval": retry_delay, + }, + } + + tasks.append(task) + + # Build flow structure + flow: Dict[str, Any] = { + "id": flow_id, + "namespace": namespace, + "description": description, + "labels": labels or {"generator": "clgraph"}, + "tasks": tasks, + } + + # Add any additional kwargs + flow.update(kwargs) + + return yaml.dump(flow, default_flow_style=False, sort_keys=False) + + def to_flow_with_triggers( + self, + flow_id: str, + namespace: str, + cron: Optional[str] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML with schedule triggers. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace + cron: Cron expression for scheduling (e.g., "0 0 * * *" for daily) + **kwargs: Additional arguments passed to to_flow() + + Returns: + YAML string with triggers configured + + Examples: + # Daily at midnight + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="daily_pipeline", + namespace="clgraph.production", + cron="0 0 * * *" + ) + + # Every hour + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="hourly_pipeline", + namespace="clgraph.production", + cron="0 * * * *" + ) + """ + flow_yaml = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs) + flow_dict = yaml.safe_load(flow_yaml) + + if cron: + flow_dict["triggers"] = [ + { + "id": "schedule", + "type": "io.kestra.plugin.core.trigger.Schedule", + "cron": cron, + } + ] + + return yaml.dump(flow_dict, default_flow_style=False, sort_keys=False) + + def to_flow_dict( + self, + flow_id: str, + namespace: str, + **kwargs, + ) -> Dict[str, Any]: + """ + Generate Kestra flow as a Python dictionary. + + Useful when you need to manipulate the flow structure programmatically + before serializing to YAML. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace + **kwargs: Additional arguments passed to to_flow() + + Returns: + Dictionary representing Kestra flow structure + """ + yaml_content = self.to_flow(flow_id=flow_id, namespace=namespace, **kwargs) + return yaml.safe_load(yaml_content) + + +__all__ = ["KestraOrchestrator"] diff --git a/src/clgraph/pipeline.py b/src/clgraph/pipeline.py index 42bc559..edc6192 100644 --- a/src/clgraph/pipeline.py +++ b/src/clgraph/pipeline.py @@ -2501,6 +2501,100 @@ def to_prefect_deployment( **kwargs, ) + # ======================================================================== + # Orchestrator Methods - Kestra + # ======================================================================== + + def to_kestra_flow( + self, + flow_id: str, + namespace: str, + description: Optional[str] = None, + connection_config: Optional[Dict[str, str]] = None, + cron: Optional[str] = None, + retry_attempts: int = 3, + labels: Optional[Dict[str, str]] = None, + **kwargs, + ) -> str: + """ + Generate Kestra flow YAML from this pipeline. + + Kestra is a declarative orchestration platform using YAML-based + workflow definitions. This method generates a complete flow file. + + Args: + flow_id: Unique identifier for the flow + namespace: Kestra namespace (e.g., "clgraph.production") + description: Optional flow description (auto-generated if not provided) + connection_config: Database connection config dict: + { + "url": "jdbc:clickhouse://host:port/db", + "username": "default", + "password": "", + } + cron: Optional cron expression for scheduling (e.g., "0 0 * * *") + retry_attempts: Number of retry attempts (default: 3) + labels: Optional key-value labels for the flow + **kwargs: Additional flow configuration + + Returns: + YAML string representing Kestra flow + + Examples: + # Basic usage + yaml_content = pipeline.to_kestra_flow( + flow_id="enterprise_pipeline", + namespace="clgraph.production" + ) + + # Save to file + with open("flows/enterprise_pipeline.yml", "w") as f: + f.write(yaml_content) + + # With schedule and custom connection + yaml_content = pipeline.to_kestra_flow( + flow_id="daily_analytics", + namespace="clgraph.analytics", + cron="0 0 * * *", # Daily at midnight + connection_config={ + "url": "jdbc:clickhouse://localhost:8123/default", + "username": "default", + "password": "", + }, + labels={"env": "production", "team": "analytics"}, + ) + + Note: + - Kestra uses io.kestra.plugin.jdbc.clickhouse.Query task type + - Dependencies are managed via dependsOn field + - Install Kestra ClickHouse plugin for database connectivity + """ + from .orchestrators import KestraOrchestrator + + orchestrator = KestraOrchestrator(self) + + if cron: + return orchestrator.to_flow_with_triggers( + flow_id=flow_id, + namespace=namespace, + description=description, + connection_config=connection_config, + cron=cron, + retry_attempts=retry_attempts, + labels=labels, + **kwargs, + ) + else: + return orchestrator.to_flow( + flow_id=flow_id, + namespace=namespace, + description=description, + connection_config=connection_config, + retry_attempts=retry_attempts, + labels=labels, + **kwargs, + ) + # ======================================================================== # Validation Methods # ======================================================================== diff --git a/tests/test_kestra_integration.py b/tests/test_kestra_integration.py new file mode 100644 index 0000000..3b31c99 --- /dev/null +++ b/tests/test_kestra_integration.py @@ -0,0 +1,464 @@ +""" +Tests for Kestra orchestrator integration. + +Tests the to_kestra_flow() method and KestraOrchestrator class. +""" + +import yaml + +from clgraph import Pipeline + + +def mock_executor(sql: str) -> None: + """Mock executor for testing.""" + pass + + +class TestToKestraFlowBasic: + """Basic tests for to_kestra_flow method.""" + + def test_basic_flow_generation(self): + """Test basic Kestra flow YAML generation.""" + pipeline = Pipeline( + [ + ("staging", "CREATE TABLE staging AS SELECT 1 as id"), + ("analytics", "CREATE TABLE analytics AS SELECT * FROM staging"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="test_flow", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + + assert flow["id"] == "test_flow" + assert flow["namespace"] == "clgraph.test" + assert len(flow["tasks"]) == 2 + + def test_flow_with_description(self): + """Test flow with custom description.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="desc_flow", + namespace="clgraph.test", + description="Custom description for testing", + ) + + flow = yaml.safe_load(yaml_content) + assert flow["description"] == "Custom description for testing" + + def test_flow_auto_description(self): + """Test auto-generated description.""" + pipeline = Pipeline( + [ + ("q1", "CREATE TABLE t1 AS SELECT 1"), + ("q2", "CREATE TABLE t2 AS SELECT * FROM t1"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="auto_desc", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert "2 queries" in flow["description"] + assert "clgraph" in flow["description"] + + def test_task_structure(self): + """Test task structure contains required fields.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="task_test", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["id"] == "query1" + assert task["type"] == "io.kestra.plugin.jdbc.clickhouse.Query" + assert "url" in task + assert "username" in task + assert "sql" in task + assert "retry" in task + + def test_task_sql_content(self): + """Test that SQL is correctly embedded in tasks.""" + sql = "CREATE TABLE test_table AS SELECT 42 as value" + pipeline = Pipeline([("query1", sql)]) + + yaml_content = pipeline.to_kestra_flow(flow_id="sql_test", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert flow["tasks"][0]["sql"] == sql + + +class TestToKestraFlowDependencies: + """Test Kestra flow dependency handling via topological ordering.""" + + def test_linear_dependencies(self): + """Test linear dependency chain - tasks should be in topological order.""" + pipeline = Pipeline( + [ + ("step1", "CREATE TABLE step1 AS SELECT 1"), + ("step2", "CREATE TABLE step2 AS SELECT * FROM step1"), + ("step3", "CREATE TABLE step3 AS SELECT * FROM step2"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="linear_deps", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task_ids = [t["id"] for t in flow["tasks"]] + + # Tasks should be in topological order: step1 before step2 before step3 + assert task_ids.index("step1") < task_ids.index("step2") + assert task_ids.index("step2") < task_ids.index("step3") + + def test_diamond_dependencies(self): + """Test diamond pattern dependencies - tasks should be in valid topological order.""" + pipeline = Pipeline( + [ + ("source", "CREATE TABLE source AS SELECT 1 as id"), + ("left_branch", "CREATE TABLE left_branch AS SELECT * FROM source"), + ("right_branch", "CREATE TABLE right_branch AS SELECT * FROM source"), + ( + "final", + "CREATE TABLE final AS SELECT * FROM left_branch, right_branch", + ), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="diamond_deps", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task_ids = [t["id"] for t in flow["tasks"]] + + # source should come before left and right branches + assert task_ids.index("source") < task_ids.index("left_branch") + assert task_ids.index("source") < task_ids.index("right_branch") + # final should come after both branches + assert task_ids.index("left_branch") < task_ids.index("final") + assert task_ids.index("right_branch") < task_ids.index("final") + + def test_no_depends_on_field(self): + """Test that tasks don't have dependsOn field (Kestra uses sequential execution).""" + pipeline = Pipeline( + [ + ("source1", "CREATE TABLE source1 AS SELECT 1"), + ("source2", "CREATE TABLE source2 AS SELECT 2"), + ( + "combined", + "CREATE TABLE combined AS SELECT * FROM source1, source2", + ), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="multi_source", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + + # No task should have dependsOn field + for task in flow["tasks"]: + assert "dependsOn" not in task + + +class TestToKestraFlowScheduling: + """Test Kestra flow scheduling.""" + + def test_flow_with_cron(self): + """Test flow with cron schedule.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="scheduled_flow", namespace="clgraph.test", cron="0 0 * * *" + ) + + flow = yaml.safe_load(yaml_content) + + assert "triggers" in flow + assert len(flow["triggers"]) == 1 + trigger = flow["triggers"][0] + assert trigger["type"] == "io.kestra.plugin.core.trigger.Schedule" + assert trigger["cron"] == "0 0 * * *" + + def test_flow_without_schedule(self): + """Test flow without schedule has no triggers.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="unscheduled_flow", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert "triggers" not in flow + + def test_hourly_cron(self): + """Test hourly cron schedule.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="hourly_flow", namespace="clgraph.test", cron="0 * * * *" + ) + + flow = yaml.safe_load(yaml_content) + assert flow["triggers"][0]["cron"] == "0 * * * *" + + +class TestToKestraFlowConfiguration: + """Test Kestra flow configuration options.""" + + def test_custom_connection_config(self): + """Test custom connection configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="custom_conn", + namespace="clgraph.test", + connection_config={ + "url": "jdbc:clickhouse://custom-host:8123/mydb", + "username": "myuser", + "password": "mypassword", + }, + ) + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["url"] == "jdbc:clickhouse://custom-host:8123/mydb" + assert task["username"] == "myuser" + assert task["password"] == "mypassword" + + def test_default_connection_config(self): + """Test default connection configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="default_conn", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["url"] == "jdbc:clickhouse://clickhouse:8123/default" + assert task["username"] == "default" + assert task["password"] == "" + + def test_custom_retry_config(self): + """Test custom retry configuration.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="retry_test", namespace="clgraph.test", retry_attempts=5 + ) + + flow = yaml.safe_load(yaml_content) + task = flow["tasks"][0] + + assert task["retry"]["maxAttempt"] == 5 + + def test_custom_labels(self): + """Test custom labels.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow( + flow_id="labels_test", + namespace="clgraph.test", + labels={"env": "production", "team": "analytics"}, + ) + + flow = yaml.safe_load(yaml_content) + + assert flow["labels"]["env"] == "production" + assert flow["labels"]["team"] == "analytics" + + def test_default_labels(self): + """Test default labels include generator tag.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="default_labels", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert flow["labels"]["generator"] == "clgraph" + + +class TestToKestraFlowOutput: + """Test Kestra flow output format.""" + + def test_valid_yaml_output(self): + """Test that output is valid, parseable YAML.""" + pipeline = Pipeline( + [ + ("q1", "CREATE TABLE t1 AS SELECT 1"), + ("q2", "CREATE TABLE t2 AS SELECT * FROM t1"), + ] + ) + + yaml_content = pipeline.to_kestra_flow(flow_id="valid_yaml", namespace="clgraph.test") + + # Should not raise + flow = yaml.safe_load(yaml_content) + assert isinstance(flow, dict) + + def test_yaml_is_string(self): + """Test that to_kestra_flow returns a string.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + result = pipeline.to_kestra_flow(flow_id="string_test", namespace="clgraph.test") + + assert isinstance(result, str) + + def test_yaml_has_required_fields(self): + """Test YAML has all required Kestra fields.""" + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + + yaml_content = pipeline.to_kestra_flow(flow_id="required_fields", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + + # Required Kestra flow fields + assert "id" in flow + assert "namespace" in flow + assert "tasks" in flow + + +class TestKestraOrchestrator: + """Test KestraOrchestrator class directly.""" + + def test_orchestrator_initialization(self): + """Test KestraOrchestrator initialization.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + assert orchestrator.pipeline == pipeline + assert orchestrator.table_graph == pipeline.table_graph + + def test_to_flow_dict(self): + """Test to_flow_dict returns dictionary.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + result = orchestrator.to_flow_dict(flow_id="dict_test", namespace="clgraph.test") + + assert isinstance(result, dict) + assert result["id"] == "dict_test" + assert result["namespace"] == "clgraph.test" + + def test_to_flow_with_triggers(self): + """Test to_flow_with_triggers adds triggers.""" + from clgraph.orchestrators import KestraOrchestrator + + pipeline = Pipeline([("query1", "CREATE TABLE table1 AS SELECT 1")]) + orchestrator = KestraOrchestrator(pipeline) + + yaml_content = orchestrator.to_flow_with_triggers( + flow_id="trigger_test", namespace="clgraph.test", cron="0 0 * * *" + ) + + flow = yaml.safe_load(yaml_content) + assert "triggers" in flow + assert flow["triggers"][0]["cron"] == "0 0 * * *" + + +class TestKestraFlowComplexPipeline: + """Test Kestra flow generation with complex pipelines.""" + + def test_enterprise_like_pipeline(self): + """Test with a pipeline similar to enterprise demo.""" + pipeline = Pipeline( + [ + ( + "raw_sales", + """ + CREATE TABLE raw_sales AS + SELECT + toDate('2024-01-01') + number as date, + number % 100 as product_id, + number % 10 as region_id, + rand() % 1000 as amount + FROM numbers(1000) + """, + ), + ( + "raw_products", + """ + CREATE TABLE raw_products AS + SELECT + number as product_id, + concat('Product ', toString(number)) as product_name, + rand() % 5 as category_id + FROM numbers(100) + """, + ), + ( + "sales_with_products", + """ + CREATE TABLE sales_with_products AS + SELECT + s.date, + s.product_id, + p.product_name, + s.region_id, + s.amount + FROM raw_sales s + JOIN raw_products p ON s.product_id = p.product_id + """, + ), + ( + "daily_summary", + """ + CREATE TABLE daily_summary AS + SELECT + date, + count() as num_sales, + sum(amount) as total_amount + FROM sales_with_products + GROUP BY date + """, + ), + ], + dialect="clickhouse", + ) + + yaml_content = pipeline.to_kestra_flow( + flow_id="enterprise_pipeline", + namespace="clgraph.enterprise", + labels={"env": "production", "source": "clgraph"}, + ) + + flow = yaml.safe_load(yaml_content) + + assert flow["id"] == "enterprise_pipeline" + assert flow["namespace"] == "clgraph.enterprise" + assert len(flow["tasks"]) == 4 + + task_ids = [t["id"] for t in flow["tasks"]] + + # Verify topological ordering (no dependsOn field, just correct order) + # raw_sales and raw_products should come before sales_with_products + assert task_ids.index("raw_sales") < task_ids.index("sales_with_products") + assert task_ids.index("raw_products") < task_ids.index("sales_with_products") + # sales_with_products should come before daily_summary + assert task_ids.index("sales_with_products") < task_ids.index("daily_summary") + + def test_many_queries_pipeline(self): + """Test with many queries.""" + queries = [] + for i in range(10): + if i == 0: + queries.append((f"step_{i}", f"CREATE TABLE step_{i} AS SELECT {i}")) + else: + queries.append( + ( + f"step_{i}", + f"CREATE TABLE step_{i} AS SELECT * FROM step_{i - 1}", + ) + ) + + pipeline = Pipeline(queries) + + yaml_content = pipeline.to_kestra_flow(flow_id="many_queries", namespace="clgraph.test") + + flow = yaml.safe_load(yaml_content) + assert len(flow["tasks"]) == 10 + + # Verify topological ordering (each step should come after its predecessor) + task_ids = [t["id"] for t in flow["tasks"]] + for i in range(1, 10): + assert task_ids.index(f"step_{i - 1}") < task_ids.index(f"step_{i}")