Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Jan 29, 2026

Summary

Adds universal source and destination connectors that leverage PyAirbyte as a backend:

Universal Destination (destination-pyairbyte): Writes to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). Implements the Airbyte CDK Destination interface with spec(), check(), write(), and support for overwrite/append sync modes.

Universal Source (source-pyairbyte): Wraps any PyAirbyte-supported source connector, acting as a proxy that delegates to the underlying source based on configuration. Implements the CDK Source interface with spec(), check(), discover(), and read() methods.

Both connectors are registered as CLI entrypoints and include a Dockerfile for containerization.

Updates since last revision

  • Added TK-TODO comment for STATE message handling (tracks tech debt for incremental sync support)
  • Imported TK-TODO CI check workflow from airbyte-ops-mcp repo
  • The TK-TODO check intentionally fails (non-required) to track this tech debt until resolved

Review & Testing Checklist for Human

  • State handling limitation (source): The state parameter in read() is ignored and STATE messages are skipped. Incremental syncs will not work - only full refresh is supported. This is tracked with a TK-TODO marker.
  • SQL injection risk (destination): Schema and table names are interpolated into SQL strings. Verify this is acceptable given these values come from configuration/catalog.
  • Private API usage (source): Uses PyAirbyte's internal _read_with_catalog() method which may change without notice.
  • No automated tests included: Manual testing was performed but unit/integration tests should be added.
  • Source auto-install: Source uses install_if_missing=True which will install connectors on first use. Consider implications for production environments.

Recommended test plan:

  1. Build the Docker image locally
  2. Destination: Run a test sync with DuckDB backend, verify records in _airbyte_raw_* tables
  3. Source: Configure with a simple source (e.g., source-faker), run discover and read operations
  4. Pipe test: source-pyairbyte read --config source.json --catalog catalog.json | destination-pyairbyte write --config dest.json --catalog catalog.json

Notes

This is part 1 of 2 PRs - a companion PR will register these connectors in the airbytehq/airbyte repo.

Requested by: @aaronsteers (AJ)
Devin session: https://app.devin.ai/sessions/46cd19c4793a4e1ea32bb660de9c1fe0

This adds a new universal destination connector that can write to any
PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery,
MotherDuck).

The destination implements the Airbyte CDK Destination interface and uses
PyAirbyte's cache system to handle the actual writing. Configuration allows
selecting the destination type and passing backend-specific settings.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal'

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /uv-lock - Updates uv.lock file
  • /test-pr - Runs tests with the updated PyAirbyte
  • /prerelease - Builds and publishes a prerelease version to PyPI
📚 Show Repo Guidance

Helpful Resources

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Fast Tests Only, No Creds)

326 tests  ±0   326 ✅ ±0   5m 39s ⏱️ +8s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4e326cd. ± Comparison against base commit 8418815.

♻️ This comment has been updated with latest results.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 29, 2026

📝 Walkthrough

Walkthrough

This pull request introduces a new CLI-based universal destination connector for PyAirbyte, enabling data streaming to multiple configurable cache backends (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck) with built-in schema management and state handling. Supporting infrastructure includes a Docker container definition and console script registration.

Changes

Cohort / File(s) Summary
CLI Module Infrastructure
airbyte/cli/__init__.py, airbyte/cli/universal_connector/__init__.py, pyproject.toml
Package initializers and console script registration for the CLI module. Exposes DestinationPyAirbyteUniversal and registers destination-pyairbyte as a CLI entry point.
Universal Destination Implementation
airbyte/cli/universal_connector/destination.py
Core destination connector implementation with cache backend selection, schema/table lifecycle management, streaming record buffering with generated IDs and timestamps, per-stream flushing, and state message handling across DuckDB, Postgres, Snowflake, BigQuery, and MotherDuck backends.
Entry Point & Containerization
airbyte/cli/universal_connector/run.py, airbyte/cli/universal_connector/Dockerfile
CLI entry point script that instantiates and runs the destination connector, and Docker image configuration using airbyte/python-connector-base:4.0.2 as the base image.

Sequence Diagram

sequenceDiagram
    participant CLI as CLI Entry
    participant Dest as DestinationPyAirbyteUniversal
    participant Cache as Cache Backend<br/>(DuckDB/Postgres/etc.)
    participant Engine as SQL Engine

    CLI->>Dest: check(config)
    Dest->>Cache: instantiate cache
    Dest->>Engine: verify connectivity
    alt Connection OK
        Engine-->>Dest: success
        Dest-->>CLI: ConnectionStatus(success)
    else Connection Failed
        Engine-->>Dest: error
        Dest-->>CLI: ConnectionStatus(failed)
    end

    CLI->>Dest: write(config, catalog, messages)
    
    loop For each RECORD message
        Dest->>Dest: buffer record<br/>(add ID, timestamp, JSON)
    end

    alt STATE message received
        Dest->>Dest: flush_buffer()
        Dest->>Engine: INSERT buffered records<br/>to stream table
        Engine-->>Dest: commit
        Dest-->>CLI: emit STATE
    end

    alt On completion
        Dest->>Dest: final flush_buffer()
        Dest->>Engine: INSERT remaining records
        Dest->>Cache: close connection
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Title check ✅ Passed The title 'feat(connectors): add universal source and destination using PyAirbyte' accurately and concisely summarizes the main change—adding new universal connectors leveraging PyAirbyte for multiple backend support.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1769653021-destination-pyairbyte-universal

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Fix all issues with AI agents
In `@airbyte/destinations/universal/destination.py`:
- Around line 135-214: The write() implementation currently calls cache.close()
only at the end, so exceptions can leak resources; wrap the main body after
obtaining cache = self._get_cache(config) in a try/finally (or
try/except/finally) so that cache.close() is always executed in the finally
block; ensure the finally runs after any early yields (STATE handling) by moving
the try to encompass the loop over input_messages and the final buffer flush,
referencing the write() function, the cache variable, and the existing
_flush_buffer calls to locate where to wrap.
- Around line 107-123: The check() path currently acquires cache =
self._get_cache(config) but only releases it on success; wrap the cache usage in
a try/finally so the cache is always closed on both success and failure (e.g.
call cache.close() in the finally block—if your cache API uses a different
method name, call that release/dispose method). Keep the existing
engine.connect() context manager; move the return of AirbyteConnectionStatus
into the try and ensure the finally calls
cache.close()/cache.dispose()/cache.release() so connections are always
released.
- Around line 141-167: The code interpolates schema_name and table_name directly
into raw SQL, which risks SQL injection and broken identifiers; update the logic
that uses cache.get_sql_engine() and the loop over configured_catalog.streams
(where table_name is built as f"_airbyte_raw_{name}" and you check
DestinationSyncMode.overwrite) to quote identifiers using the engine's dialect
preparer (engine.dialect.identifier_preparer.quote(name)) before embedding them
in CREATE SCHEMA, DROP TABLE, and CREATE TABLE statements so all schema/table
identifiers are safely escaped per the database dialect.
- Around line 148-192: Stream identity currently uses only
configured_stream.stream.name which can collide across namespaces; update stream
matching and table naming to incorporate configured_stream.stream.namespace
(e.g., include configured_stream.stream.namespace when building table_name and
when building the key used to populate and lookup in streams and buffer), and
adjust uses in the loop that checks "if stream_name not in streams" and in
_flush_buffer calls to use the combined namespace+name key (or a normalized
identifier) so that records are routed to the correct per-namespace table;
ensure the same composite key logic is applied when creating tables (table_name)
and when resetting/iterating buffer and streams to avoid collisions while
preserving current performance characteristics.
- Around line 195-198: The code currently sets _airbyte_emitted_at to the
destination time; change it to use the incoming record's emitted_at when present
by converting record.emitted_at (epoch ms) to an ISO UTC string (e.g.,
datetime.fromtimestamp(record.emitted_at/1000, tz=timezone.utc).isoformat()) and
append that to buffer[stream_name]["_airbyte_emitted_at"]; if record.emitted_at
is missing or falsy, fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat() so the behavior is
safe. Ensure you reference the record variable used where
buffer[stream_name]["_airbyte_ab_id"] is appended and update only the
_airbyte_emitted_at assignment.
🧹 Nitpick comments (1)
airbyte/destinations/universal/destination.py (1)

216-248: Batch inserts instead of per-row loop for throughput.
Could we use executemany to reduce round-trips when flushing? SQLAlchemy 2.0 supports passing a list of dicts to conn.execute(), so we could build a payload list once and execute it in a single call instead of looping—wdyt?

🚀 Proposed refactor
         with engine.connect() as conn:
-            for i in range(len(entries["_airbyte_ab_id"])):
-                insert_sql = text(f"""
-                    INSERT INTO {schema_name}.{table_name}
-                    (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
-                    VALUES (:ab_id, :emitted_at, :data)
-                """)
-                conn.execute(
-                    insert_sql,
-                    {
-                        "ab_id": entries["_airbyte_ab_id"][i],
-                        "emitted_at": entries["_airbyte_emitted_at"][i],
-                        "data": entries["_airbyte_data"][i],
-                    },
-                )
+            insert_sql = text(f"""
+                INSERT INTO {schema_name}.{table_name}
+                (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
+                VALUES (:ab_id, :emitted_at, :data)
+            """)
+            payload = [
+                {
+                    "ab_id": entries["_airbyte_ab_id"][i],
+                    "emitted_at": entries["_airbyte_emitted_at"][i],
+                    "data": entries["_airbyte_data"][i],
+                }
+                for i in range(len(entries["_airbyte_ab_id"]))
+            ]
+            conn.execute(insert_sql, payload)
             conn.commit()  # pyrefly: ignore[missing-attribute]

Comment on lines +107 to +123
def check(
self,
logger: logging.Logger, # noqa: ARG002
config: Mapping[str, Any],
) -> AirbyteConnectionStatus:
"""Test the connection to the destination."""
try:
cache = self._get_cache(config)
engine = cache.get_sql_engine()
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
except Exception as e:
return AirbyteConnectionStatus(
status=Status.FAILED, message=f"Connection failed: {e!r}"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Close the cache on both success and failure in check().
Could we wrap cache usage in a try/finally so connections are always released, wdyt?

🔧 Proposed fix
-        try:
-            cache = self._get_cache(config)
-            engine = cache.get_sql_engine()
-            with engine.connect() as conn:
-                conn.execute(text("SELECT 1"))
-            return AirbyteConnectionStatus(status=Status.SUCCEEDED)
-        except Exception as e:
-            return AirbyteConnectionStatus(
-                status=Status.FAILED, message=f"Connection failed: {e!r}"
-            )
+        cache = None
+        try:
+            cache = self._get_cache(config)
+            engine = cache.get_sql_engine()
+            with engine.connect() as conn:
+                conn.execute(text("SELECT 1"))
+            return AirbyteConnectionStatus(status=Status.SUCCEEDED)
+        except Exception as e:
+            return AirbyteConnectionStatus(
+                status=Status.FAILED, message=f"Connection failed: {e!r}"
+            )
+        finally:
+            if cache is not None:
+                cache.close()
🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 107 - 123, The
check() path currently acquires cache = self._get_cache(config) but only
releases it on success; wrap the cache usage in a try/finally so the cache is
always closed on both success and failure (e.g. call cache.close() in the
finally block—if your cache API uses a different method name, call that
release/dispose method). Keep the existing engine.connect() context manager;
move the return of AirbyteConnectionStatus into the try and ensure the finally
calls cache.close()/cache.dispose()/cache.release() so connections are always
released.

Comment on lines +135 to +214
cache = self._get_cache(config)
streams = {s.stream.name for s in configured_catalog.streams}
schema_name = cache.schema_name

logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams")

# Get SQL engine and ensure schema exists
engine = cache.get_sql_engine()
with engine.connect() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create tables for each stream
for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"

with engine.connect() as conn:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
logger.info(f"Dropping table for overwrite: {table_name}")
conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create the raw table if needed
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id VARCHAR(36) PRIMARY KEY,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data JSON
)
"""
conn.execute(text(create_sql))
conn.commit() # pyrefly: ignore[missing-attribute]

# Buffer for records
buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))

for message in input_messages:
if message.type == Type.STATE:
# Flush the buffer before yielding state
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)
buffer = defaultdict(lambda: defaultdict(list))
yield message

elif message.type == Type.RECORD:
record = message.record
if record is None:
continue
stream_name = record.stream
if stream_name not in streams:
logger.debug(f"Stream {stream_name} not in configured streams, skipping")
continue

# Add to buffer
buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4()))
buffer[stream_name]["_airbyte_emitted_at"].append(
datetime.datetime.now(datetime.timezone.utc).isoformat()
)
buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data))

else:
logger.debug(f"Message type {message.type} not handled, skipping")

# Flush any remaining records
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)

# Close the cache
cache.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure cache.close() runs on exceptions in write().
Could we wrap the write body in try/finally so the cache is always closed, wdyt?

🧹 Proposed fix
-        cache = self._get_cache(config)
-        streams = {s.stream.name for s in configured_catalog.streams}
-        schema_name = cache.schema_name
+        cache = self._get_cache(config)
+        try:
+            streams = {s.stream.name for s in configured_catalog.streams}
+            schema_name = cache.schema_name
@@
-        # Close the cache
-        cache.close()
+        finally:
+            cache.close()
🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 135 - 214, The
write() implementation currently calls cache.close() only at the end, so
exceptions can leak resources; wrap the main body after obtaining cache =
self._get_cache(config) in a try/finally (or try/except/finally) so that
cache.close() is always executed in the finally block; ensure the finally runs
after any early yields (STATE handling) by moving the try to encompass the loop
over input_messages and the final buffer flush, referencing the write()
function, the cache variable, and the existing _flush_buffer calls to locate
where to wrap.

Comment on lines +141 to +167
# Get SQL engine and ensure schema exists
engine = cache.get_sql_engine()
with engine.connect() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create tables for each stream
for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"

with engine.connect() as conn:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
logger.info(f"Dropping table for overwrite: {table_name}")
conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create the raw table if needed
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id VARCHAR(36) PRIMARY KEY,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data JSON
)
"""
conn.execute(text(create_sql))
conn.commit() # pyrefly: ignore[missing-attribute]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "destination.py" | head -20

Repository: airbytehq/PyAirbyte

Length of output: 110


🏁 Script executed:

fd -t f "destination.py" | grep -i universal

Repository: airbytehq/PyAirbyte

Length of output: 108


🌐 Web query:

SQLAlchemy 2.0 quote identifier dialect preparer best practice

💡 Result:

Best practice (SQLAlchemy 2.0) for quoting identifiers

1) Prefer modeling the identifier as a schema object, not manually quoting strings.
If the “real” table/column/schema name needs quoting (reserved word, mixed case, special chars), declare it on the construct:

  • Table(..., quote=True, quote_schema=True) (and similarly on Column(..., quote=True)), which forces quoting on/off; otherwise SQLAlchemy applies backend-specific quoting rules automatically. [1]
  • Or pass a quoted_name("MyName", True) where a “name” is accepted, to force quoting (and to prevent “name normalize” behavior on some backends like Oracle/DB2/Firebird). [2]

2) If you must quote an identifier programmatically, use the dialect’s IdentifierPreparer.
Use engine.dialect.identifier_preparer.quote("identifier") to apply the dialect’s conditional quoting rules (reserved words, special characters, or quoted_name(..., True)). [3]
(And don’t rely on the old force= argument—SQLAlchemy notes it’s deprecated/unused and points you to quoted_name instead. [3])

Minimal examples

from sqlalchemy import Table, Column, Integer, MetaData
from sqlalchemy.sql import quoted_name

t = Table(
    "User", MetaData(),  # mixed case -> typically quoted automatically
    Column("select", Integer, quote=True),  # force quoting for a column name
    quote=True,  # force quoting for table name
)
# quoting a “raw” identifier string using the dialect rules
prep = engine.dialect.identifier_preparer
safe = prep.quote("User")  # e.g. "User" on many dialects

Rule of thumb: use quote= / quoted_name() when defining or reflecting schema; use engine.dialect.identifier_preparer.quote() only when you truly need to transform a plain identifier string into its correctly-quoted SQL form for that specific dialect. [1][2][3]

Sources: SQLAlchemy 2.0 docs on Table(..., quote=..., quote_schema=...) [1], quoted_name [2], and IdentifierPreparer.quote() / quote_identifier() behavior and deprecations [3].

References:
[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)
[3] (docs.sqlalchemy.org)

Citations:


🏁 Script executed:

cat -n airbyte/destinations/universal/destination.py | sed -n '130,175p'

Repository: airbytehq/PyAirbyte

Length of output: 2472


🏁 Script executed:

# Check if there's any existing validation or sanitization of stream names or schema_name
rg -A 3 -B 3 "stream\.name" airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 627


🏁 Script executed:

# Check what _get_cache does and where schema_name comes from
rg -A 10 "_get_cache" airbyte/destinations/universal/destination.py | head -30

Repository: airbytehq/PyAirbyte

Length of output: 1245


🏁 Script executed:

# Look for any existing identifier quoting in the codebase
rg "identifier_preparer\|\.quote\(" airbyte/destinations/universal/ | head -20

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there are any tests for the write method that might show expected input patterns
rg -l "def test.*write" airbyte/destinations/universal/ | head -5

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the imports to see what's available
head -30 airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 891


Quote schema and table identifiers to prevent SQL injection.

Schema and table names from config and catalog are directly interpolated into SQL without quoting. Reserved words, special characters, or maliciously-crafted identifiers could lead to SQL injection. Would you consider using SQLAlchemy's engine.dialect.identifier_preparer.quote() to safely escape these identifiers per the database dialect, wdyt?

The fix would look something like:

def _quote_ident(self, engine: Engine, name: str) -> str:
    return engine.dialect.identifier_preparer.quote(name)

Then apply it to schema_name and table_name before the SQL statements (lines 144, 155, 160).

🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 141 - 167, The
code interpolates schema_name and table_name directly into raw SQL, which risks
SQL injection and broken identifiers; update the logic that uses
cache.get_sql_engine() and the loop over configured_catalog.streams (where
table_name is built as f"_airbyte_raw_{name}" and you check
DestinationSyncMode.overwrite) to quote identifiers using the engine's dialect
preparer (engine.dialect.identifier_preparer.quote(name)) before embedding them
in CREATE SCHEMA, DROP TABLE, and CREATE TABLE statements so all schema/table
identifiers are safely escaped per the database dialect.

Comment on lines +148 to +192
for configured_stream in configured_catalog.streams:
name = configured_stream.stream.name
table_name = f"_airbyte_raw_{name}"

with engine.connect() as conn:
if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite:
logger.info(f"Dropping table for overwrite: {table_name}")
conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}"))
conn.commit() # pyrefly: ignore[missing-attribute]

# Create the raw table if needed
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} (
_airbyte_ab_id VARCHAR(36) PRIMARY KEY,
_airbyte_emitted_at TIMESTAMP,
_airbyte_data JSON
)
"""
conn.execute(text(create_sql))
conn.commit() # pyrefly: ignore[missing-attribute]

# Buffer for records
buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list))

for message in input_messages:
if message.type == Type.STATE:
# Flush the buffer before yielding state
for stream_name in list(buffer.keys()):
self._flush_buffer(
engine=engine,
buffer=buffer,
schema_name=schema_name,
stream_name=stream_name,
)
buffer = defaultdict(lambda: defaultdict(list))
yield message

elif message.type == Type.RECORD:
record = message.record
if record is None:
continue
stream_name = record.stream
if stream_name not in streams:
logger.debug(f"Stream {stream_name} not in configured streams, skipping")
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the destination file to see the actual current implementation
fd "destination.py" --path "*/universal/*" -x wc -l

Repository: airbytehq/PyAirbyte

Length of output: 278


🏁 Script executed:

# Find and examine the relevant section of destination.py
fd "destination.py" --path "*/universal/*" -x head -200

Repository: airbytehq/PyAirbyte

Length of output: 278


🏁 Script executed:

# Search for AirbyteRecordMessage definition to understand its structure
rg "class AirbyteRecordMessage" --type py -A 15

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for ConfiguredStream or similar data structures
rg "class ConfiguredStream" --type py -A 10

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Search for namespace field usage in the codebase
rg "\.namespace" --type py | head -30

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Find the destination.py file correctly
find . -name "destination.py" -path "*/universal/*" 2>/dev/null

Repository: airbytehq/PyAirbyte

Length of output: 110


🏁 Script executed:

# Get file size first
wc -l airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 112


🏁 Script executed:

# Look at the relevant section of the file (lines 140-200)
sed -n '140,200p' airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 2743


🏁 Script executed:

# Search for AirbyteRecordMessage in the codebase
rg "AirbyteRecordMessage" --type py -A 5 | head -50

Repository: airbytehq/PyAirbyte

Length of output: 3428


🏁 Script executed:

# Search for stream namespace field usage
rg "stream\.namespace\|record\.namespace" --type py -B 2 -A 2

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for ConfiguredStream class
rg "class ConfiguredStream" --type py -A 15

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Search for imports of airbyte_protocol to understand data models
rg "from airbyte_protocol" --type py | head -20

Repository: airbytehq/PyAirbyte

Length of output: 1822


🏁 Script executed:

# Check if there's any documentation or type hints for AirbyteRecordMessage
rg "AirbyteRecordMessage\|namespace" --type py -B 3 -A 10 | grep -A 10 "record\."

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at how records are actually used in the destination code
sed -n '180,210p' airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 1317


🏁 Script executed:

# Check if namespace exists in the protocol by searching imports and usage
rg "\.namespace" --type py --type json

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Examine the _flush_buffer method to understand stream_name usage
rg "_flush_buffer" --type py -A 20 | head -40

Repository: airbytehq/PyAirbyte

Length of output: 3164


🏁 Script executed:

# Look at the full _flush_buffer method
sed -n '210,248p' airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 1397


🏁 Script executed:

# Search for where 'streams' is defined
rg "streams\s*=" airbyte/destinations/universal/destination.py -B 5 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 514


🏁 Script executed:

# Look at the imports and beginning of the process method
sed -n '1,150p' airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 5643


🏁 Script executed:

# Check the airbyte_protocol to understand the actual AirbyteRecordMessage structure
python3 << 'EOF'
import sys
try:
    from airbyte_protocol.models import AirbyteRecordMessage
    import inspect
    
    # Get the source or signature
    sig = inspect.signature(AirbyteRecordMessage)
    print("AirbyteRecordMessage signature:")
    print(sig)
    
    # Try to get docstring
    if AirbyteRecordMessage.__doc__:
        print("\nDocstring:")
        print(AirbyteRecordMessage.__doc__)
    
    # List attributes
    if hasattr(AirbyteRecordMessage, '__annotations__'):
        print("\nAnnotations:")
        print(AirbyteRecordMessage.__annotations__)
        
except Exception as e:
    print(f"Error: {e}")
    sys.exit(1)
EOF

Repository: airbytehq/PyAirbyte

Length of output: 104


🏁 Script executed:

# Check if ConfiguredStream or ConfiguredAirbyteStream has namespace field
python3 << 'EOF'
import sys
try:
    from airbyte_protocol.models import ConfiguredAirbyteStream, AirbyteStream
    import inspect
    
    print("=== AirbyteStream ===")
    if hasattr(AirbyteStream, '__annotations__'):
        print("Annotations:")
        for key, val in AirbyteStream.__annotations__.items():
            print(f"  {key}: {val}")
    
    print("\n=== ConfiguredAirbyteStream ===")
    if hasattr(ConfiguredAirbyteStream, '__annotations__'):
        print("Annotations:")
        for key, val in ConfiguredAirbyteStream.__annotations__.items():
            print(f"  {key}: {val}")
        
except Exception as e:
    print(f"Error: {e}")
    sys.exit(1)
EOF

Repository: airbytehq/PyAirbyte

Length of output: 104


🏁 Script executed:

# Search for how namespace is actually used in the codebase
rg "namespace" --type py | head -40

Repository: airbytehq/PyAirbyte

Length of output: 490


🏁 Script executed:

# Check the airbyte_cdk imports used
rg "from airbyte_cdk" --type py | grep -E "models|destinations"

Repository: airbytehq/PyAirbyte

Length of output: 236


🏁 Script executed:

# Look for any schema or table naming that includes namespace
rg "table_name\|schema_name" --type py -B 2 -A 2 | head -60

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Search for namespace in any .json or .md files (documentation)
rg "namespace" --type json --type md | head -30

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the tests to see how streams with namespaces are handled
rg "namespace" tests/ --type py -B 3 -A 3 | head -60

Repository: airbytehq/PyAirbyte

Length of output: 810


🏁 Script executed:

# Look at the _connector_base.py file to understand the namespace comment
rg "namespace" airbyte/_connector_base.py -B 5 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 423


🏁 Script executed:

# Check _message_iterators.py for namespace handling context
rg "namespace" airbyte/_message_iterators.py -B 5 -A 5

Repository: airbytehq/PyAirbyte

Length of output: 650


🏁 Script executed:

# Search for any existing tests that use multiple streams with same name
rg "configured_catalog\|ConfiguredAirbyteCatalog" tests/ --type py -A 10 | head -80

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for actual record.namespace usage anywhere
rg "record\.namespace\|\.stream\.namespace" --type py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there are any GitHub issues or documentation about namespace handling
find . -name "*.md" -o -name "*.txt" -o -name "*.rst" | xargs grep -l "namespace" 2>/dev/null | head -10

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for any handling of namespace in configured streams or catalogs
rg "ConfiguredAirbyteStream\|AirbyteStream" --type py -A 10 | grep -E "namespace|ConfiguredAirbyteStream" | head -30

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there are actual scenarios where namespace is used with configured streams
rg "\.stream\.namespace" --type py -B 3 -A 3

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at what the review's proposed fix references - check if _stream_key exists
rg "_stream_key" --type py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at how sources handle stream names/namespaces
rg "stream.*name\|stream\.namespace" airbyte/sources/ --type py | head -30

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the entire destination.py file for any namespace-related logic
cat airbyte/destinations/universal/destination.py | grep -i namespace

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if airbyte_cdk defines namespace in their models
find . -type f -name "*.py" | xargs grep -l "airbyte_cdk.models" | head -5 | xargs grep -l "namespace"

Repository: airbytehq/PyAirbyte

Length of output: 47


🏁 Script executed:

# Look for any comments or issues related to namespace in destination code
grep -r "namespace\|collision\|conflict" airbyte/destinations/ --type py

Repository: airbytehq/PyAirbyte

Length of output: 180


🏁 Script executed:

# Check all usages in cache implementations
rg "stream.*name" airbyte/caches/ --type py | head -40

Repository: airbytehq/PyAirbyte

Length of output: 3414


🏁 Script executed:

# See the full context of how streams are identified in sql_processor
sed -n '1,100p' airbyte/shared/sql_processor.py | grep -A 20 -B 5 "stream_name"

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for any test files that might show expected behavior with namespaces
find tests/ -name "*.py" -type f | xargs grep -l "stream.*namespace\|namespace.*stream" | head -5

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the sql_processor to see how it handles stream identification
cat airbyte/shared/sql_processor.py | head -150

Repository: airbytehq/PyAirbyte

Length of output: 4428


🏁 Script executed:

# Check if there are any docs or examples showing namespace usage
find . -name "README*" -o -name "*.md" | head -10 | xargs cat 2>/dev/null | grep -i namespace

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check the actual destination test to understand the expected behavior
cat tests/integration_tests/destinations/test_source_to_destination.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3413


🏁 Script executed:

# Look for any configuration that allows multiple sources with same stream names
rg "multiple.*source\|source.*multiple\|namespace" tests/ --type py -i | head -20

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if the ConfiguredAirbyteCatalog or AirbyteStream actually has namespace field by looking at imports and usage
rg "ConfiguredAirbyteCatalog\|AirbyteStream" airbyte/ --type py -A 3 | grep -E "class|namespace|def" | head -40

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Verify the specific lines mentioned in the review
sed -n '148,192p' airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 2070


🏁 Script executed:

# Double-check the namespace handling philosophy in PyAirbyte
grep -r "namespace" airbyte/ --include="*.py" | grep -i "omit\|skip\|not.*handle\|intentional"

Repository: airbytehq/PyAirbyte

Length of output: 290


🏁 Script executed:

# Check if AirbyteRecordMessage or ConfiguredAirbyteStream actually exposes namespace
rg "AirbyteRecordMessage|ConfiguredAirbyteStream" airbyte/ --type py -B 2 | grep -E "from|import" | head -10

Repository: airbytehq/PyAirbyte

Length of output: 805


Consider the namespace handling trade-off before including namespace in stream identity.

Airbyte streams can share names across namespaces; incorporating stream.namespace into table names and stream matching would prevent potential collisions. However, note that PyAirbyte intentionally omits namespace handling for performance reasons (as documented in _connector_base.py and _message_iterators.py). If this destination needs to support multiple namespaces, would it be worth revisiting that performance trade-off, or should namespace handling remain out of scope?

🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 148 - 192, Stream
identity currently uses only configured_stream.stream.name which can collide
across namespaces; update stream matching and table naming to incorporate
configured_stream.stream.namespace (e.g., include
configured_stream.stream.namespace when building table_name and when building
the key used to populate and lookup in streams and buffer), and adjust uses in
the loop that checks "if stream_name not in streams" and in _flush_buffer calls
to use the combined namespace+name key (or a normalized identifier) so that
records are routed to the correct per-namespace table; ensure the same composite
key logic is applied when creating tables (table_name) and when
resetting/iterating buffer and streams to avoid collisions while preserving
current performance characteristics.

Comment on lines +195 to +198
buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4()))
buffer[stream_name]["_airbyte_emitted_at"].append(
datetime.datetime.now(datetime.timezone.utc).isoformat()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's look at the file structure around the specified lines
head -220 airbyte/destinations/universal/destination.py | tail -50

Repository: airbytehq/PyAirbyte

Length of output: 1865


🏁 Script executed:

# Find the AirbyteRecordMessage definition to understand emitted_at field
find . -type f -name "*.py" | xargs rg "class AirbyteRecordMessage" | head -5

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Search for the complete context around lines 195-198
wc -l airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 112


🏁 Script executed:

# Search for AirbyteRecordMessage definition
rg "class AirbyteRecordMessage|emitted_at" --type py | head -20

Repository: airbytehq/PyAirbyte

Length of output: 1796


🏁 Script executed:

# Also search in proto files or model files
find . -type f \( -name "*.py" -o -name "*.pyi" \) | xargs rg "AirbyteRecordMessage" | grep -E "(class|@|def)" | head -10

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look for imports at the top of destination.py to understand where AirbyteRecordMessage comes from
head -30 airbyte/destinations/universal/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 891


🏁 Script executed:

# Check the records.py file to see how emitted_at is used
cat -n airbyte/records.py | grep -A 5 -B 5 "extracted_at=datetime.fromtimestamp(record_message.emitted_at"

Repository: airbytehq/PyAirbyte

Length of output: 586


🏁 Script executed:

# Search for where AirbyteRecordMessage is defined (should be in airbyte_cdk)
rg "class AirbyteRecordMessage" --type py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if the record variable is actually the AirbyteRecordMessage type
rg "message.record" airbyte/destinations/universal/destination.py -A 2 -B 2

Repository: airbytehq/PyAirbyte

Length of output: 213


🌐 Web query:

airbyte_cdk AirbyteRecordMessage emitted_at field documentation

💡 Result:

In the Airbyte Python CDK (airbyte_cdk.models.AirbyteRecordMessage), the emitted_at field is required and represents when the source extracted/emitted the record, encoded as an epoch timestamp in milliseconds (integer). [1]

When data is written to destinations, this value is commonly surfaced as the metadata column _airbyte_extracted_at (older destinations may call it _airbyte_emitted_at). [2]

Sources: Airbyte Protocol docs [1]; Airbyte metadata fields docs [2].


Use the record's emitted_at to preserve canonical source timing.
The record.emitted_at field (epoch milliseconds) is available in this context—would preserving the source's original timestamp instead of the destination's current time be better for tracking when records were actually extracted? [1]

[1] emitted_at is already used in airbyte/records.py with the same conversion pattern (datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc)).

⏱️ Proposed fix
-                buffer[stream_name]["_airbyte_emitted_at"].append(
-                    datetime.datetime.now(datetime.timezone.utc).isoformat()
-                )
+                emitted_at = (
+                    datetime.datetime.fromtimestamp(
+                        record.emitted_at / 1000, tz=datetime.timezone.utc
+                    )
+                    if record.emitted_at is not None
+                    else datetime.datetime.now(datetime.timezone.utc)
+                )
+                buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat())
🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 195 - 198, The
code currently sets _airbyte_emitted_at to the destination time; change it to
use the incoming record's emitted_at when present by converting
record.emitted_at (epoch ms) to an ISO UTC string (e.g.,
datetime.fromtimestamp(record.emitted_at/1000, tz=timezone.utc).isoformat()) and
append that to buffer[stream_name]["_airbyte_emitted_at"]; if record.emitted_at
is missing or falsy, fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat() so the behavior is
safe. Ensure you reference the record variable used where
buffer[stream_name]["_airbyte_ab_id"] is appended and update only the
_airbyte_emitted_at assignment.

@github-actions
Copy link

github-actions bot commented Jan 29, 2026

PyTest Results (Full)

396 tests  ±0   379 ✅ ±0   24m 36s ⏱️ -21s
  1 suites ±0    17 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit 4e326cd. ± Comparison against base commit 8418815.

♻️ This comment has been updated with latest results.

…or/ and add CLI entrypoint

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 76-87: The code currently constructs a SnowflakeCache when
destination_type == DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings
for required fields; add early validation to fail fast by checking required
Snowflake fields (account, username, warehouse, database, role) are non-empty
before constructing SnowflakeCache (or implement the same checks in
SnowflakeConfig/SnowflakeCache initializer/constructor), and raise a clear
ValueError indicating which field is missing; preserve the existing SecretString
wrapping for password and include that validation logic near the SnowflakeCache
creation site or inside SnowflakeCache/SnowflakeConfig (functions/classes to
touch: destination_type check, SnowflakeCache constructor, or SnowflakeConfig
class) so misconfigurations are caught immediately with explicit error messages.

In `@airbyte/cli/universal_connector/Dockerfile`:
- Line 5: The Dockerfile's COPY instruction currently references a nonexistent
poetry.lock; update the COPY line that mentions "poetry.lock" to instead
reference "uv.lock" so it reads as copying pyproject.toml and uv.lock into the
image (ensure uv.lock exists at repo root); this change is in the Dockerfile's
COPY command.
🧹 Nitpick comments (2)
airbyte/cli/universal_connector/destination.py (1)

233-248: Consider batch inserts for better performance, wdyt?

The current implementation executes one INSERT per record. For large buffers, this can be significantly slower than using executemany or SQLAlchemy's bulk insert capabilities.

⚡ Proposed batch insert approach
         with engine.connect() as conn:
-            for i in range(len(entries["_airbyte_ab_id"])):
-                insert_sql = text(f"""
-                    INSERT INTO {schema_name}.{table_name}
-                    (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
-                    VALUES (:ab_id, :emitted_at, :data)
-                """)
-                conn.execute(
-                    insert_sql,
-                    {
-                        "ab_id": entries["_airbyte_ab_id"][i],
-                        "emitted_at": entries["_airbyte_emitted_at"][i],
-                        "data": entries["_airbyte_data"][i],
-                    },
-                )
+            insert_sql = text(f"""
+                INSERT INTO {schema_name}.{table_name}
+                (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data)
+                VALUES (:ab_id, :emitted_at, :data)
+            """)
+            params = [
+                {
+                    "ab_id": entries["_airbyte_ab_id"][i],
+                    "emitted_at": entries["_airbyte_emitted_at"][i],
+                    "data": entries["_airbyte_data"][i],
+                }
+                for i in range(len(entries["_airbyte_ab_id"]))
+            ]
+            conn.execute(insert_sql, params)
             conn.commit()  # pyrefly: ignore[missing-attribute]
airbyte/cli/universal_connector/Dockerfile (1)

1-11: Consider adding a spec.json for connector registration, wdyt?

The PR objectives note that spec.json may be required for connector registration in the airbytehq/airbyte repository. The Dockerfile looks good otherwise—nice use of SHA pinning for reproducibility!

Would you like me to help generate a spec.json file based on the configuration schema used in _get_cache()?

Comment on lines +76 to +87
if destination_type == DESTINATION_TYPE_SNOWFLAKE:
sf_config = config.get("snowflake", {})
password = sf_config.get("password")
return SnowflakeCache(
account=sf_config.get("account", ""),
username=sf_config.get("username", ""),
password=SecretString(password) if password else None,
warehouse=sf_config.get("warehouse", ""),
database=sf_config.get("database", ""),
schema_name=sf_config.get("schema_name", "PUBLIC"),
role=sf_config.get("role", ""),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Find the SnowflakeCache class definition
find . -type f -name "*.py" | xargs rg "class SnowflakeCache" -A 50

Repository: airbytehq/PyAirbyte

Length of output: 1430


🏁 Script executed:

# Also check if there's any validation of these parameters
rg "SnowflakeCache" --type py -B 2 -A 10 | head -100

Repository: airbytehq/PyAirbyte

Length of output: 7486


🏁 Script executed:

# Search for snowflake cache files
fd "snowflake" --type f --extension py | grep -i cache

Repository: airbytehq/PyAirbyte

Length of output: 90


🏁 Script executed:

# Find SnowflakeConfig definition
rg "class SnowflakeConfig" -A 50 airbyte/caches/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Check if there's validation in SnowflakeConfig or SnowflakeCache
rg -A 5 -B 5 "validator\|root_validator\|field_validator" airbyte/caches/snowflake.py

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full SnowflakeConfig definition to understand defaults
cat -n airbyte/caches/snowflake.py | head -100

Repository: airbytehq/PyAirbyte

Length of output: 3232


🏁 Script executed:

# Find the SnowflakeConfig definition
find . -path "*/_processors/sql/snowflake.py" -type f

Repository: airbytehq/PyAirbyte

Length of output: 101


🏁 Script executed:

# Search for SnowflakeConfig class
rg "class SnowflakeConfig" -A 100 --type py

Repository: airbytehq/PyAirbyte

Length of output: 7591


🏁 Script executed:

# Check SqlConfig for any validation
rg "class SqlConfig" -A 30 airbyte/_processors/sql/

Repository: airbytehq/PyAirbyte

Length of output: 45


🏁 Script executed:

# Look at the full _get_cache method in destination.py for context
rg -B 10 -A 40 "def _get_cache" airbyte/cli/universal_connector/destination.py

Repository: airbytehq/PyAirbyte

Length of output: 2399


Add early validation to reject empty strings for required Snowflake fields, wdyt?

The constructor accepts all these parameters, but passing empty strings for required fields like account, username, warehouse, database, and role will slip through validation and only fail at runtime during connection. Consider adding validation in SnowflakeConfig to catch these misconfigurations early with clear error messages.

🤖 Prompt for AI Agents
In `@airbyte/cli/universal_connector/destination.py` around lines 76 - 87, The
code currently constructs a SnowflakeCache when destination_type ==
DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings for required fields;
add early validation to fail fast by checking required Snowflake fields
(account, username, warehouse, database, role) are non-empty before constructing
SnowflakeCache (or implement the same checks in SnowflakeConfig/SnowflakeCache
initializer/constructor), and raise a clear ValueError indicating which field is
missing; preserve the existing SecretString wrapping for password and include
that validation logic near the SnowflakeCache creation site or inside
SnowflakeCache/SnowflakeConfig (functions/classes to touch: destination_type
check, SnowflakeCache constructor, or SnowflakeConfig class) so
misconfigurations are caught immediately with explicit error messages.

- Add SourcePyAirbyteUniversal class that wraps any PyAirbyte source
- Add CLI entrypoint 'source-pyairbyte' for the universal source
- Fix Dockerfile to use uv.lock instead of poetry.lock
- Use proper CDK AirbyteEntrypoint.launch() pattern for sources

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration devin-ai-integration bot changed the title feat(destinations): add universal destination using PyAirbyte caches feat(connectors): add universal source and destination using PyAirbyte Jan 29, 2026
…iping

Co-Authored-By: AJ Steers <aj@airbyte.io>
Comment on lines 195 to 196
# Skip STATE messages for now - they require per-stream format conversion
# which is complex and not needed for basic full-refresh syncs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TK-TODO comment here, and inport the TK-TODO CI check from the ops repo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I've added the TK-TODO comment for STATE message handling and imported the TK-TODO CI check workflow from the ops repo.

The TK-TODO check is now failing as expected - this is the intended behavior to track this tech debt until STATE message handling is implemented for incremental sync support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants