-
Notifications
You must be signed in to change notification settings - Fork 71
feat(connectors): add universal source and destination using PyAirbyte #969
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(connectors): add universal source and destination using PyAirbyte #969
Conversation
This adds a new universal destination connector that can write to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). The destination implements the Airbyte CDK Destination interface and uses PyAirbyte's cache system to handle the actual writing. Configuration allows selecting the destination type and passing backend-specific settings. Co-Authored-By: AJ Steers <aj@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1769653021-destination-pyairbyte-universal'PR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful ResourcesCommunity SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughThis pull request introduces a new CLI-based universal destination connector for PyAirbyte, enabling data streaming to multiple configurable cache backends (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck) with built-in schema management and state handling. Supporting infrastructure includes a Docker container definition and console script registration. Changes
Sequence DiagramsequenceDiagram
participant CLI as CLI Entry
participant Dest as DestinationPyAirbyteUniversal
participant Cache as Cache Backend<br/>(DuckDB/Postgres/etc.)
participant Engine as SQL Engine
CLI->>Dest: check(config)
Dest->>Cache: instantiate cache
Dest->>Engine: verify connectivity
alt Connection OK
Engine-->>Dest: success
Dest-->>CLI: ConnectionStatus(success)
else Connection Failed
Engine-->>Dest: error
Dest-->>CLI: ConnectionStatus(failed)
end
CLI->>Dest: write(config, catalog, messages)
loop For each RECORD message
Dest->>Dest: buffer record<br/>(add ID, timestamp, JSON)
end
alt STATE message received
Dest->>Dest: flush_buffer()
Dest->>Engine: INSERT buffered records<br/>to stream table
Engine-->>Dest: commit
Dest-->>CLI: emit STATE
end
alt On completion
Dest->>Dest: final flush_buffer()
Dest->>Engine: INSERT remaining records
Dest->>Cache: close connection
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@airbyte/destinations/universal/destination.py`:
- Around line 135-214: The write() implementation currently calls cache.close()
only at the end, so exceptions can leak resources; wrap the main body after
obtaining cache = self._get_cache(config) in a try/finally (or
try/except/finally) so that cache.close() is always executed in the finally
block; ensure the finally runs after any early yields (STATE handling) by moving
the try to encompass the loop over input_messages and the final buffer flush,
referencing the write() function, the cache variable, and the existing
_flush_buffer calls to locate where to wrap.
- Around line 107-123: The check() path currently acquires cache =
self._get_cache(config) but only releases it on success; wrap the cache usage in
a try/finally so the cache is always closed on both success and failure (e.g.
call cache.close() in the finally block—if your cache API uses a different
method name, call that release/dispose method). Keep the existing
engine.connect() context manager; move the return of AirbyteConnectionStatus
into the try and ensure the finally calls
cache.close()/cache.dispose()/cache.release() so connections are always
released.
- Around line 141-167: The code interpolates schema_name and table_name directly
into raw SQL, which risks SQL injection and broken identifiers; update the logic
that uses cache.get_sql_engine() and the loop over configured_catalog.streams
(where table_name is built as f"_airbyte_raw_{name}" and you check
DestinationSyncMode.overwrite) to quote identifiers using the engine's dialect
preparer (engine.dialect.identifier_preparer.quote(name)) before embedding them
in CREATE SCHEMA, DROP TABLE, and CREATE TABLE statements so all schema/table
identifiers are safely escaped per the database dialect.
- Around line 148-192: Stream identity currently uses only
configured_stream.stream.name which can collide across namespaces; update stream
matching and table naming to incorporate configured_stream.stream.namespace
(e.g., include configured_stream.stream.namespace when building table_name and
when building the key used to populate and lookup in streams and buffer), and
adjust uses in the loop that checks "if stream_name not in streams" and in
_flush_buffer calls to use the combined namespace+name key (or a normalized
identifier) so that records are routed to the correct per-namespace table;
ensure the same composite key logic is applied when creating tables (table_name)
and when resetting/iterating buffer and streams to avoid collisions while
preserving current performance characteristics.
- Around line 195-198: The code currently sets _airbyte_emitted_at to the
destination time; change it to use the incoming record's emitted_at when present
by converting record.emitted_at (epoch ms) to an ISO UTC string (e.g.,
datetime.fromtimestamp(record.emitted_at/1000, tz=timezone.utc).isoformat()) and
append that to buffer[stream_name]["_airbyte_emitted_at"]; if record.emitted_at
is missing or falsy, fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat() so the behavior is
safe. Ensure you reference the record variable used where
buffer[stream_name]["_airbyte_ab_id"] is appended and update only the
_airbyte_emitted_at assignment.
🧹 Nitpick comments (1)
airbyte/destinations/universal/destination.py (1)
216-248: Batch inserts instead of per-row loop for throughput.
Could we useexecutemanyto reduce round-trips when flushing? SQLAlchemy 2.0 supports passing a list of dicts toconn.execute(), so we could build a payload list once and execute it in a single call instead of looping—wdyt?🚀 Proposed refactor
with engine.connect() as conn: - for i in range(len(entries["_airbyte_ab_id"])): - insert_sql = text(f""" - INSERT INTO {schema_name}.{table_name} - (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) - VALUES (:ab_id, :emitted_at, :data) - """) - conn.execute( - insert_sql, - { - "ab_id": entries["_airbyte_ab_id"][i], - "emitted_at": entries["_airbyte_emitted_at"][i], - "data": entries["_airbyte_data"][i], - }, - ) + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + payload = [ + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + } + for i in range(len(entries["_airbyte_ab_id"])) + ] + conn.execute(insert_sql, payload) conn.commit() # pyrefly: ignore[missing-attribute]
| def check( | ||
| self, | ||
| logger: logging.Logger, # noqa: ARG002 | ||
| config: Mapping[str, Any], | ||
| ) -> AirbyteConnectionStatus: | ||
| """Test the connection to the destination.""" | ||
| try: | ||
| cache = self._get_cache(config) | ||
| engine = cache.get_sql_engine() | ||
| with engine.connect() as conn: | ||
| conn.execute(text("SELECT 1")) | ||
| return AirbyteConnectionStatus(status=Status.SUCCEEDED) | ||
| except Exception as e: | ||
| return AirbyteConnectionStatus( | ||
| status=Status.FAILED, message=f"Connection failed: {e!r}" | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close the cache on both success and failure in check().
Could we wrap cache usage in a try/finally so connections are always released, wdyt?
🔧 Proposed fix
- try:
- cache = self._get_cache(config)
- engine = cache.get_sql_engine()
- with engine.connect() as conn:
- conn.execute(text("SELECT 1"))
- return AirbyteConnectionStatus(status=Status.SUCCEEDED)
- except Exception as e:
- return AirbyteConnectionStatus(
- status=Status.FAILED, message=f"Connection failed: {e!r}"
- )
+ cache = None
+ try:
+ cache = self._get_cache(config)
+ engine = cache.get_sql_engine()
+ with engine.connect() as conn:
+ conn.execute(text("SELECT 1"))
+ return AirbyteConnectionStatus(status=Status.SUCCEEDED)
+ except Exception as e:
+ return AirbyteConnectionStatus(
+ status=Status.FAILED, message=f"Connection failed: {e!r}"
+ )
+ finally:
+ if cache is not None:
+ cache.close()🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 107 - 123, The
check() path currently acquires cache = self._get_cache(config) but only
releases it on success; wrap the cache usage in a try/finally so the cache is
always closed on both success and failure (e.g. call cache.close() in the
finally block—if your cache API uses a different method name, call that
release/dispose method). Keep the existing engine.connect() context manager;
move the return of AirbyteConnectionStatus into the try and ensure the finally
calls cache.close()/cache.dispose()/cache.release() so connections are always
released.
| cache = self._get_cache(config) | ||
| streams = {s.stream.name for s in configured_catalog.streams} | ||
| schema_name = cache.schema_name | ||
|
|
||
| logger.info(f"Starting write to PyAirbyte Universal with {len(streams)} streams") | ||
|
|
||
| # Get SQL engine and ensure schema exists | ||
| engine = cache.get_sql_engine() | ||
| with engine.connect() as conn: | ||
| conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Create tables for each stream | ||
| for configured_stream in configured_catalog.streams: | ||
| name = configured_stream.stream.name | ||
| table_name = f"_airbyte_raw_{name}" | ||
|
|
||
| with engine.connect() as conn: | ||
| if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: | ||
| logger.info(f"Dropping table for overwrite: {table_name}") | ||
| conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Create the raw table if needed | ||
| create_sql = f""" | ||
| CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( | ||
| _airbyte_ab_id VARCHAR(36) PRIMARY KEY, | ||
| _airbyte_emitted_at TIMESTAMP, | ||
| _airbyte_data JSON | ||
| ) | ||
| """ | ||
| conn.execute(text(create_sql)) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Buffer for records | ||
| buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) | ||
|
|
||
| for message in input_messages: | ||
| if message.type == Type.STATE: | ||
| # Flush the buffer before yielding state | ||
| for stream_name in list(buffer.keys()): | ||
| self._flush_buffer( | ||
| engine=engine, | ||
| buffer=buffer, | ||
| schema_name=schema_name, | ||
| stream_name=stream_name, | ||
| ) | ||
| buffer = defaultdict(lambda: defaultdict(list)) | ||
| yield message | ||
|
|
||
| elif message.type == Type.RECORD: | ||
| record = message.record | ||
| if record is None: | ||
| continue | ||
| stream_name = record.stream | ||
| if stream_name not in streams: | ||
| logger.debug(f"Stream {stream_name} not in configured streams, skipping") | ||
| continue | ||
|
|
||
| # Add to buffer | ||
| buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) | ||
| buffer[stream_name]["_airbyte_emitted_at"].append( | ||
| datetime.datetime.now(datetime.timezone.utc).isoformat() | ||
| ) | ||
| buffer[stream_name]["_airbyte_data"].append(json.dumps(record.data)) | ||
|
|
||
| else: | ||
| logger.debug(f"Message type {message.type} not handled, skipping") | ||
|
|
||
| # Flush any remaining records | ||
| for stream_name in list(buffer.keys()): | ||
| self._flush_buffer( | ||
| engine=engine, | ||
| buffer=buffer, | ||
| schema_name=schema_name, | ||
| stream_name=stream_name, | ||
| ) | ||
|
|
||
| # Close the cache | ||
| cache.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure cache.close() runs on exceptions in write().
Could we wrap the write body in try/finally so the cache is always closed, wdyt?
🧹 Proposed fix
- cache = self._get_cache(config)
- streams = {s.stream.name for s in configured_catalog.streams}
- schema_name = cache.schema_name
+ cache = self._get_cache(config)
+ try:
+ streams = {s.stream.name for s in configured_catalog.streams}
+ schema_name = cache.schema_name
@@
- # Close the cache
- cache.close()
+ finally:
+ cache.close()🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 135 - 214, The
write() implementation currently calls cache.close() only at the end, so
exceptions can leak resources; wrap the main body after obtaining cache =
self._get_cache(config) in a try/finally (or try/except/finally) so that
cache.close() is always executed in the finally block; ensure the finally runs
after any early yields (STATE handling) by moving the try to encompass the loop
over input_messages and the final buffer flush, referencing the write()
function, the cache variable, and the existing _flush_buffer calls to locate
where to wrap.
| # Get SQL engine and ensure schema exists | ||
| engine = cache.get_sql_engine() | ||
| with engine.connect() as conn: | ||
| conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Create tables for each stream | ||
| for configured_stream in configured_catalog.streams: | ||
| name = configured_stream.stream.name | ||
| table_name = f"_airbyte_raw_{name}" | ||
|
|
||
| with engine.connect() as conn: | ||
| if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: | ||
| logger.info(f"Dropping table for overwrite: {table_name}") | ||
| conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Create the raw table if needed | ||
| create_sql = f""" | ||
| CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( | ||
| _airbyte_ab_id VARCHAR(36) PRIMARY KEY, | ||
| _airbyte_emitted_at TIMESTAMP, | ||
| _airbyte_data JSON | ||
| ) | ||
| """ | ||
| conn.execute(text(create_sql)) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "destination.py" | head -20Repository: airbytehq/PyAirbyte
Length of output: 110
🏁 Script executed:
fd -t f "destination.py" | grep -i universalRepository: airbytehq/PyAirbyte
Length of output: 108
🌐 Web query:
SQLAlchemy 2.0 quote identifier dialect preparer best practice
💡 Result:
Best practice (SQLAlchemy 2.0) for quoting identifiers
1) Prefer modeling the identifier as a schema object, not manually quoting strings.
If the “real” table/column/schema name needs quoting (reserved word, mixed case, special chars), declare it on the construct:
Table(..., quote=True, quote_schema=True)(and similarly onColumn(..., quote=True)), which forces quoting on/off; otherwise SQLAlchemy applies backend-specific quoting rules automatically. [1]- Or pass a
quoted_name("MyName", True)where a “name” is accepted, to force quoting (and to prevent “name normalize” behavior on some backends like Oracle/DB2/Firebird). [2]
2) If you must quote an identifier programmatically, use the dialect’s IdentifierPreparer.
Use engine.dialect.identifier_preparer.quote("identifier") to apply the dialect’s conditional quoting rules (reserved words, special characters, or quoted_name(..., True)). [3]
(And don’t rely on the old force= argument—SQLAlchemy notes it’s deprecated/unused and points you to quoted_name instead. [3])
Minimal examples
from sqlalchemy import Table, Column, Integer, MetaData
from sqlalchemy.sql import quoted_name
t = Table(
"User", MetaData(), # mixed case -> typically quoted automatically
Column("select", Integer, quote=True), # force quoting for a column name
quote=True, # force quoting for table name
)# quoting a “raw” identifier string using the dialect rules
prep = engine.dialect.identifier_preparer
safe = prep.quote("User") # e.g. "User" on many dialectsRule of thumb: use quote= / quoted_name() when defining or reflecting schema; use engine.dialect.identifier_preparer.quote() only when you truly need to transform a plain identifier string into its correctly-quoted SQL form for that specific dialect. [1][2][3]
Sources: SQLAlchemy 2.0 docs on Table(..., quote=..., quote_schema=...) [1], quoted_name [2], and IdentifierPreparer.quote() / quote_identifier() behavior and deprecations [3].
References:
[1] (docs.sqlalchemy.org)
[2] (docs.sqlalchemy.org)
[3] (docs.sqlalchemy.org)
Citations:
- 1: https://docs.sqlalchemy.org/20/core/metadata.html?utm_source=openai
- 2: https://docs.sqlalchemy.org/20/core/sqlelement.html?utm_source=openai
- 3: https://docs.sqlalchemy.org/20/core/internals.html
🏁 Script executed:
cat -n airbyte/destinations/universal/destination.py | sed -n '130,175p'Repository: airbytehq/PyAirbyte
Length of output: 2472
🏁 Script executed:
# Check if there's any existing validation or sanitization of stream names or schema_name
rg -A 3 -B 3 "stream\.name" airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 627
🏁 Script executed:
# Check what _get_cache does and where schema_name comes from
rg -A 10 "_get_cache" airbyte/destinations/universal/destination.py | head -30Repository: airbytehq/PyAirbyte
Length of output: 1245
🏁 Script executed:
# Look for any existing identifier quoting in the codebase
rg "identifier_preparer\|\.quote\(" airbyte/destinations/universal/ | head -20Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if there are any tests for the write method that might show expected input patterns
rg -l "def test.*write" airbyte/destinations/universal/ | head -5Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the imports to see what's available
head -30 airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 891
Quote schema and table identifiers to prevent SQL injection.
Schema and table names from config and catalog are directly interpolated into SQL without quoting. Reserved words, special characters, or maliciously-crafted identifiers could lead to SQL injection. Would you consider using SQLAlchemy's engine.dialect.identifier_preparer.quote() to safely escape these identifiers per the database dialect, wdyt?
The fix would look something like:
def _quote_ident(self, engine: Engine, name: str) -> str:
return engine.dialect.identifier_preparer.quote(name)Then apply it to schema_name and table_name before the SQL statements (lines 144, 155, 160).
🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 141 - 167, The
code interpolates schema_name and table_name directly into raw SQL, which risks
SQL injection and broken identifiers; update the logic that uses
cache.get_sql_engine() and the loop over configured_catalog.streams (where
table_name is built as f"_airbyte_raw_{name}" and you check
DestinationSyncMode.overwrite) to quote identifiers using the engine's dialect
preparer (engine.dialect.identifier_preparer.quote(name)) before embedding them
in CREATE SCHEMA, DROP TABLE, and CREATE TABLE statements so all schema/table
identifiers are safely escaped per the database dialect.
| for configured_stream in configured_catalog.streams: | ||
| name = configured_stream.stream.name | ||
| table_name = f"_airbyte_raw_{name}" | ||
|
|
||
| with engine.connect() as conn: | ||
| if configured_stream.destination_sync_mode == DestinationSyncMode.overwrite: | ||
| logger.info(f"Dropping table for overwrite: {table_name}") | ||
| conn.execute(text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Create the raw table if needed | ||
| create_sql = f""" | ||
| CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ( | ||
| _airbyte_ab_id VARCHAR(36) PRIMARY KEY, | ||
| _airbyte_emitted_at TIMESTAMP, | ||
| _airbyte_data JSON | ||
| ) | ||
| """ | ||
| conn.execute(text(create_sql)) | ||
| conn.commit() # pyrefly: ignore[missing-attribute] | ||
|
|
||
| # Buffer for records | ||
| buffer: dict[str, dict[str, list[Any]]] = defaultdict(lambda: defaultdict(list)) | ||
|
|
||
| for message in input_messages: | ||
| if message.type == Type.STATE: | ||
| # Flush the buffer before yielding state | ||
| for stream_name in list(buffer.keys()): | ||
| self._flush_buffer( | ||
| engine=engine, | ||
| buffer=buffer, | ||
| schema_name=schema_name, | ||
| stream_name=stream_name, | ||
| ) | ||
| buffer = defaultdict(lambda: defaultdict(list)) | ||
| yield message | ||
|
|
||
| elif message.type == Type.RECORD: | ||
| record = message.record | ||
| if record is None: | ||
| continue | ||
| stream_name = record.stream | ||
| if stream_name not in streams: | ||
| logger.debug(f"Stream {stream_name} not in configured streams, skipping") | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the destination file to see the actual current implementation
fd "destination.py" --path "*/universal/*" -x wc -lRepository: airbytehq/PyAirbyte
Length of output: 278
🏁 Script executed:
# Find and examine the relevant section of destination.py
fd "destination.py" --path "*/universal/*" -x head -200Repository: airbytehq/PyAirbyte
Length of output: 278
🏁 Script executed:
# Search for AirbyteRecordMessage definition to understand its structure
rg "class AirbyteRecordMessage" --type py -A 15Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for ConfiguredStream or similar data structures
rg "class ConfiguredStream" --type py -A 10Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Search for namespace field usage in the codebase
rg "\.namespace" --type py | head -30Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Find the destination.py file correctly
find . -name "destination.py" -path "*/universal/*" 2>/dev/nullRepository: airbytehq/PyAirbyte
Length of output: 110
🏁 Script executed:
# Get file size first
wc -l airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 112
🏁 Script executed:
# Look at the relevant section of the file (lines 140-200)
sed -n '140,200p' airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 2743
🏁 Script executed:
# Search for AirbyteRecordMessage in the codebase
rg "AirbyteRecordMessage" --type py -A 5 | head -50Repository: airbytehq/PyAirbyte
Length of output: 3428
🏁 Script executed:
# Search for stream namespace field usage
rg "stream\.namespace\|record\.namespace" --type py -B 2 -A 2Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for ConfiguredStream class
rg "class ConfiguredStream" --type py -A 15Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Search for imports of airbyte_protocol to understand data models
rg "from airbyte_protocol" --type py | head -20Repository: airbytehq/PyAirbyte
Length of output: 1822
🏁 Script executed:
# Check if there's any documentation or type hints for AirbyteRecordMessage
rg "AirbyteRecordMessage\|namespace" --type py -B 3 -A 10 | grep -A 10 "record\."Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at how records are actually used in the destination code
sed -n '180,210p' airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 1317
🏁 Script executed:
# Check if namespace exists in the protocol by searching imports and usage
rg "\.namespace" --type py --type jsonRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Examine the _flush_buffer method to understand stream_name usage
rg "_flush_buffer" --type py -A 20 | head -40Repository: airbytehq/PyAirbyte
Length of output: 3164
🏁 Script executed:
# Look at the full _flush_buffer method
sed -n '210,248p' airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 1397
🏁 Script executed:
# Search for where 'streams' is defined
rg "streams\s*=" airbyte/destinations/universal/destination.py -B 5 -A 5Repository: airbytehq/PyAirbyte
Length of output: 514
🏁 Script executed:
# Look at the imports and beginning of the process method
sed -n '1,150p' airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 5643
🏁 Script executed:
# Check the airbyte_protocol to understand the actual AirbyteRecordMessage structure
python3 << 'EOF'
import sys
try:
from airbyte_protocol.models import AirbyteRecordMessage
import inspect
# Get the source or signature
sig = inspect.signature(AirbyteRecordMessage)
print("AirbyteRecordMessage signature:")
print(sig)
# Try to get docstring
if AirbyteRecordMessage.__doc__:
print("\nDocstring:")
print(AirbyteRecordMessage.__doc__)
# List attributes
if hasattr(AirbyteRecordMessage, '__annotations__'):
print("\nAnnotations:")
print(AirbyteRecordMessage.__annotations__)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
EOFRepository: airbytehq/PyAirbyte
Length of output: 104
🏁 Script executed:
# Check if ConfiguredStream or ConfiguredAirbyteStream has namespace field
python3 << 'EOF'
import sys
try:
from airbyte_protocol.models import ConfiguredAirbyteStream, AirbyteStream
import inspect
print("=== AirbyteStream ===")
if hasattr(AirbyteStream, '__annotations__'):
print("Annotations:")
for key, val in AirbyteStream.__annotations__.items():
print(f" {key}: {val}")
print("\n=== ConfiguredAirbyteStream ===")
if hasattr(ConfiguredAirbyteStream, '__annotations__'):
print("Annotations:")
for key, val in ConfiguredAirbyteStream.__annotations__.items():
print(f" {key}: {val}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
EOFRepository: airbytehq/PyAirbyte
Length of output: 104
🏁 Script executed:
# Search for how namespace is actually used in the codebase
rg "namespace" --type py | head -40Repository: airbytehq/PyAirbyte
Length of output: 490
🏁 Script executed:
# Check the airbyte_cdk imports used
rg "from airbyte_cdk" --type py | grep -E "models|destinations"Repository: airbytehq/PyAirbyte
Length of output: 236
🏁 Script executed:
# Look for any schema or table naming that includes namespace
rg "table_name\|schema_name" --type py -B 2 -A 2 | head -60Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Search for namespace in any .json or .md files (documentation)
rg "namespace" --type json --type md | head -30Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the tests to see how streams with namespaces are handled
rg "namespace" tests/ --type py -B 3 -A 3 | head -60Repository: airbytehq/PyAirbyte
Length of output: 810
🏁 Script executed:
# Look at the _connector_base.py file to understand the namespace comment
rg "namespace" airbyte/_connector_base.py -B 5 -A 5Repository: airbytehq/PyAirbyte
Length of output: 423
🏁 Script executed:
# Check _message_iterators.py for namespace handling context
rg "namespace" airbyte/_message_iterators.py -B 5 -A 5Repository: airbytehq/PyAirbyte
Length of output: 650
🏁 Script executed:
# Search for any existing tests that use multiple streams with same name
rg "configured_catalog\|ConfiguredAirbyteCatalog" tests/ --type py -A 10 | head -80Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for actual record.namespace usage anywhere
rg "record\.namespace\|\.stream\.namespace" --type pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if there are any GitHub issues or documentation about namespace handling
find . -name "*.md" -o -name "*.txt" -o -name "*.rst" | xargs grep -l "namespace" 2>/dev/null | head -10Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for any handling of namespace in configured streams or catalogs
rg "ConfiguredAirbyteStream\|AirbyteStream" --type py -A 10 | grep -E "namespace|ConfiguredAirbyteStream" | head -30Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if there are actual scenarios where namespace is used with configured streams
rg "\.stream\.namespace" --type py -B 3 -A 3Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at what the review's proposed fix references - check if _stream_key exists
rg "_stream_key" --type pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at how sources handle stream names/namespaces
rg "stream.*name\|stream\.namespace" airbyte/sources/ --type py | head -30Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the entire destination.py file for any namespace-related logic
cat airbyte/destinations/universal/destination.py | grep -i namespaceRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if airbyte_cdk defines namespace in their models
find . -type f -name "*.py" | xargs grep -l "airbyte_cdk.models" | head -5 | xargs grep -l "namespace"Repository: airbytehq/PyAirbyte
Length of output: 47
🏁 Script executed:
# Look for any comments or issues related to namespace in destination code
grep -r "namespace\|collision\|conflict" airbyte/destinations/ --type pyRepository: airbytehq/PyAirbyte
Length of output: 180
🏁 Script executed:
# Check all usages in cache implementations
rg "stream.*name" airbyte/caches/ --type py | head -40Repository: airbytehq/PyAirbyte
Length of output: 3414
🏁 Script executed:
# See the full context of how streams are identified in sql_processor
sed -n '1,100p' airbyte/shared/sql_processor.py | grep -A 20 -B 5 "stream_name"Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for any test files that might show expected behavior with namespaces
find tests/ -name "*.py" -type f | xargs grep -l "stream.*namespace\|namespace.*stream" | head -5Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at the sql_processor to see how it handles stream identification
cat airbyte/shared/sql_processor.py | head -150Repository: airbytehq/PyAirbyte
Length of output: 4428
🏁 Script executed:
# Check if there are any docs or examples showing namespace usage
find . -name "README*" -o -name "*.md" | head -10 | xargs cat 2>/dev/null | grep -i namespaceRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check the actual destination test to understand the expected behavior
cat tests/integration_tests/destinations/test_source_to_destination.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 3413
🏁 Script executed:
# Look for any configuration that allows multiple sources with same stream names
rg "multiple.*source\|source.*multiple\|namespace" tests/ --type py -i | head -20Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if the ConfiguredAirbyteCatalog or AirbyteStream actually has namespace field by looking at imports and usage
rg "ConfiguredAirbyteCatalog\|AirbyteStream" airbyte/ --type py -A 3 | grep -E "class|namespace|def" | head -40Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Verify the specific lines mentioned in the review
sed -n '148,192p' airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 2070
🏁 Script executed:
# Double-check the namespace handling philosophy in PyAirbyte
grep -r "namespace" airbyte/ --include="*.py" | grep -i "omit\|skip\|not.*handle\|intentional"Repository: airbytehq/PyAirbyte
Length of output: 290
🏁 Script executed:
# Check if AirbyteRecordMessage or ConfiguredAirbyteStream actually exposes namespace
rg "AirbyteRecordMessage|ConfiguredAirbyteStream" airbyte/ --type py -B 2 | grep -E "from|import" | head -10Repository: airbytehq/PyAirbyte
Length of output: 805
Consider the namespace handling trade-off before including namespace in stream identity.
Airbyte streams can share names across namespaces; incorporating stream.namespace into table names and stream matching would prevent potential collisions. However, note that PyAirbyte intentionally omits namespace handling for performance reasons (as documented in _connector_base.py and _message_iterators.py). If this destination needs to support multiple namespaces, would it be worth revisiting that performance trade-off, or should namespace handling remain out of scope?
🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 148 - 192, Stream
identity currently uses only configured_stream.stream.name which can collide
across namespaces; update stream matching and table naming to incorporate
configured_stream.stream.namespace (e.g., include
configured_stream.stream.namespace when building table_name and when building
the key used to populate and lookup in streams and buffer), and adjust uses in
the loop that checks "if stream_name not in streams" and in _flush_buffer calls
to use the combined namespace+name key (or a normalized identifier) so that
records are routed to the correct per-namespace table; ensure the same composite
key logic is applied when creating tables (table_name) and when
resetting/iterating buffer and streams to avoid collisions while preserving
current performance characteristics.
| buffer[stream_name]["_airbyte_ab_id"].append(str(uuid.uuid4())) | ||
| buffer[stream_name]["_airbyte_emitted_at"].append( | ||
| datetime.datetime.now(datetime.timezone.utc).isoformat() | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the file structure around the specified lines
head -220 airbyte/destinations/universal/destination.py | tail -50Repository: airbytehq/PyAirbyte
Length of output: 1865
🏁 Script executed:
# Find the AirbyteRecordMessage definition to understand emitted_at field
find . -type f -name "*.py" | xargs rg "class AirbyteRecordMessage" | head -5Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Search for the complete context around lines 195-198
wc -l airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 112
🏁 Script executed:
# Search for AirbyteRecordMessage definition
rg "class AirbyteRecordMessage|emitted_at" --type py | head -20Repository: airbytehq/PyAirbyte
Length of output: 1796
🏁 Script executed:
# Also search in proto files or model files
find . -type f \( -name "*.py" -o -name "*.pyi" \) | xargs rg "AirbyteRecordMessage" | grep -E "(class|@|def)" | head -10Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look for imports at the top of destination.py to understand where AirbyteRecordMessage comes from
head -30 airbyte/destinations/universal/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 891
🏁 Script executed:
# Check the records.py file to see how emitted_at is used
cat -n airbyte/records.py | grep -A 5 -B 5 "extracted_at=datetime.fromtimestamp(record_message.emitted_at"Repository: airbytehq/PyAirbyte
Length of output: 586
🏁 Script executed:
# Search for where AirbyteRecordMessage is defined (should be in airbyte_cdk)
rg "class AirbyteRecordMessage" --type pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if the record variable is actually the AirbyteRecordMessage type
rg "message.record" airbyte/destinations/universal/destination.py -A 2 -B 2Repository: airbytehq/PyAirbyte
Length of output: 213
🌐 Web query:
airbyte_cdk AirbyteRecordMessage emitted_at field documentation
💡 Result:
In the Airbyte Python CDK (airbyte_cdk.models.AirbyteRecordMessage), the emitted_at field is required and represents when the source extracted/emitted the record, encoded as an epoch timestamp in milliseconds (integer). [1]
When data is written to destinations, this value is commonly surfaced as the metadata column _airbyte_extracted_at (older destinations may call it _airbyte_emitted_at). [2]
Sources: Airbyte Protocol docs [1]; Airbyte metadata fields docs [2].
Use the record's emitted_at to preserve canonical source timing.
The record.emitted_at field (epoch milliseconds) is available in this context—would preserving the source's original timestamp instead of the destination's current time be better for tracking when records were actually extracted? [1]
[1] emitted_at is already used in airbyte/records.py with the same conversion pattern (datetime.fromtimestamp(record_message.emitted_at / 1000, tz=timezone.utc)).
⏱️ Proposed fix
- buffer[stream_name]["_airbyte_emitted_at"].append(
- datetime.datetime.now(datetime.timezone.utc).isoformat()
- )
+ emitted_at = (
+ datetime.datetime.fromtimestamp(
+ record.emitted_at / 1000, tz=datetime.timezone.utc
+ )
+ if record.emitted_at is not None
+ else datetime.datetime.now(datetime.timezone.utc)
+ )
+ buffer[stream_name]["_airbyte_emitted_at"].append(emitted_at.isoformat())🤖 Prompt for AI Agents
In `@airbyte/destinations/universal/destination.py` around lines 195 - 198, The
code currently sets _airbyte_emitted_at to the destination time; change it to
use the incoming record's emitted_at when present by converting
record.emitted_at (epoch ms) to an ISO UTC string (e.g.,
datetime.fromtimestamp(record.emitted_at/1000, tz=timezone.utc).isoformat()) and
append that to buffer[stream_name]["_airbyte_emitted_at"]; if record.emitted_at
is missing or falsy, fall back to
datetime.datetime.now(datetime.timezone.utc).isoformat() so the behavior is
safe. Ensure you reference the record variable used where
buffer[stream_name]["_airbyte_ab_id"] is appended and update only the
_airbyte_emitted_at assignment.
…or/ and add CLI entrypoint Co-Authored-By: AJ Steers <aj@airbyte.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@airbyte/cli/universal_connector/destination.py`:
- Around line 76-87: The code currently constructs a SnowflakeCache when
destination_type == DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings
for required fields; add early validation to fail fast by checking required
Snowflake fields (account, username, warehouse, database, role) are non-empty
before constructing SnowflakeCache (or implement the same checks in
SnowflakeConfig/SnowflakeCache initializer/constructor), and raise a clear
ValueError indicating which field is missing; preserve the existing SecretString
wrapping for password and include that validation logic near the SnowflakeCache
creation site or inside SnowflakeCache/SnowflakeConfig (functions/classes to
touch: destination_type check, SnowflakeCache constructor, or SnowflakeConfig
class) so misconfigurations are caught immediately with explicit error messages.
In `@airbyte/cli/universal_connector/Dockerfile`:
- Line 5: The Dockerfile's COPY instruction currently references a nonexistent
poetry.lock; update the COPY line that mentions "poetry.lock" to instead
reference "uv.lock" so it reads as copying pyproject.toml and uv.lock into the
image (ensure uv.lock exists at repo root); this change is in the Dockerfile's
COPY command.
🧹 Nitpick comments (2)
airbyte/cli/universal_connector/destination.py (1)
233-248: Consider batch inserts for better performance, wdyt?The current implementation executes one INSERT per record. For large buffers, this can be significantly slower than using
executemanyor SQLAlchemy's bulk insert capabilities.⚡ Proposed batch insert approach
with engine.connect() as conn: - for i in range(len(entries["_airbyte_ab_id"])): - insert_sql = text(f""" - INSERT INTO {schema_name}.{table_name} - (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) - VALUES (:ab_id, :emitted_at, :data) - """) - conn.execute( - insert_sql, - { - "ab_id": entries["_airbyte_ab_id"][i], - "emitted_at": entries["_airbyte_emitted_at"][i], - "data": entries["_airbyte_data"][i], - }, - ) + insert_sql = text(f""" + INSERT INTO {schema_name}.{table_name} + (_airbyte_ab_id, _airbyte_emitted_at, _airbyte_data) + VALUES (:ab_id, :emitted_at, :data) + """) + params = [ + { + "ab_id": entries["_airbyte_ab_id"][i], + "emitted_at": entries["_airbyte_emitted_at"][i], + "data": entries["_airbyte_data"][i], + } + for i in range(len(entries["_airbyte_ab_id"])) + ] + conn.execute(insert_sql, params) conn.commit() # pyrefly: ignore[missing-attribute]airbyte/cli/universal_connector/Dockerfile (1)
1-11: Consider adding aspec.jsonfor connector registration, wdyt?The PR objectives note that
spec.jsonmay be required for connector registration in the airbytehq/airbyte repository. The Dockerfile looks good otherwise—nice use of SHA pinning for reproducibility!Would you like me to help generate a
spec.jsonfile based on the configuration schema used in_get_cache()?
| if destination_type == DESTINATION_TYPE_SNOWFLAKE: | ||
| sf_config = config.get("snowflake", {}) | ||
| password = sf_config.get("password") | ||
| return SnowflakeCache( | ||
| account=sf_config.get("account", ""), | ||
| username=sf_config.get("username", ""), | ||
| password=SecretString(password) if password else None, | ||
| warehouse=sf_config.get("warehouse", ""), | ||
| database=sf_config.get("database", ""), | ||
| schema_name=sf_config.get("schema_name", "PUBLIC"), | ||
| role=sf_config.get("role", ""), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find the SnowflakeCache class definition
find . -type f -name "*.py" | xargs rg "class SnowflakeCache" -A 50Repository: airbytehq/PyAirbyte
Length of output: 1430
🏁 Script executed:
# Also check if there's any validation of these parameters
rg "SnowflakeCache" --type py -B 2 -A 10 | head -100Repository: airbytehq/PyAirbyte
Length of output: 7486
🏁 Script executed:
# Search for snowflake cache files
fd "snowflake" --type f --extension py | grep -i cacheRepository: airbytehq/PyAirbyte
Length of output: 90
🏁 Script executed:
# Find SnowflakeConfig definition
rg "class SnowflakeConfig" -A 50 airbyte/caches/Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Check if there's validation in SnowflakeConfig or SnowflakeCache
rg -A 5 -B 5 "validator\|root_validator\|field_validator" airbyte/caches/snowflake.pyRepository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at the full SnowflakeConfig definition to understand defaults
cat -n airbyte/caches/snowflake.py | head -100Repository: airbytehq/PyAirbyte
Length of output: 3232
🏁 Script executed:
# Find the SnowflakeConfig definition
find . -path "*/_processors/sql/snowflake.py" -type fRepository: airbytehq/PyAirbyte
Length of output: 101
🏁 Script executed:
# Search for SnowflakeConfig class
rg "class SnowflakeConfig" -A 100 --type pyRepository: airbytehq/PyAirbyte
Length of output: 7591
🏁 Script executed:
# Check SqlConfig for any validation
rg "class SqlConfig" -A 30 airbyte/_processors/sql/Repository: airbytehq/PyAirbyte
Length of output: 45
🏁 Script executed:
# Look at the full _get_cache method in destination.py for context
rg -B 10 -A 40 "def _get_cache" airbyte/cli/universal_connector/destination.pyRepository: airbytehq/PyAirbyte
Length of output: 2399
Add early validation to reject empty strings for required Snowflake fields, wdyt?
The constructor accepts all these parameters, but passing empty strings for required fields like account, username, warehouse, database, and role will slip through validation and only fail at runtime during connection. Consider adding validation in SnowflakeConfig to catch these misconfigurations early with clear error messages.
🤖 Prompt for AI Agents
In `@airbyte/cli/universal_connector/destination.py` around lines 76 - 87, The
code currently constructs a SnowflakeCache when destination_type ==
DESTINATION_TYPE_SNOWFLAKE without rejecting empty strings for required fields;
add early validation to fail fast by checking required Snowflake fields
(account, username, warehouse, database, role) are non-empty before constructing
SnowflakeCache (or implement the same checks in SnowflakeConfig/SnowflakeCache
initializer/constructor), and raise a clear ValueError indicating which field is
missing; preserve the existing SecretString wrapping for password and include
that validation logic near the SnowflakeCache creation site or inside
SnowflakeCache/SnowflakeConfig (functions/classes to touch: destination_type
check, SnowflakeCache constructor, or SnowflakeConfig class) so
misconfigurations are caught immediately with explicit error messages.
- Add SourcePyAirbyteUniversal class that wraps any PyAirbyte source - Add CLI entrypoint 'source-pyairbyte' for the universal source - Fix Dockerfile to use uv.lock instead of poetry.lock - Use proper CDK AirbyteEntrypoint.launch() pattern for sources Co-Authored-By: AJ Steers <aj@airbyte.io>
…iping Co-Authored-By: AJ Steers <aj@airbyte.io>
| # Skip STATE messages for now - they require per-stream format conversion | ||
| # which is complex and not needed for basic full-refresh syncs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TK-TODO comment here, and inport the TK-TODO CI check from the ops repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! I've added the TK-TODO comment for STATE message handling and imported the TK-TODO CI check workflow from the ops repo.
The TK-TODO check is now failing as expected - this is the intended behavior to track this tech debt until STATE message handling is implemented for incremental sync support.
Co-Authored-By: AJ Steers <aj@airbyte.io>
Summary
Adds universal source and destination connectors that leverage PyAirbyte as a backend:
Universal Destination (
destination-pyairbyte): Writes to any PyAirbyte-supported cache backend (DuckDB, Postgres, Snowflake, BigQuery, MotherDuck). Implements the Airbyte CDKDestinationinterface withspec(),check(),write(), and support foroverwrite/appendsync modes.Universal Source (
source-pyairbyte): Wraps any PyAirbyte-supported source connector, acting as a proxy that delegates to the underlying source based on configuration. Implements the CDKSourceinterface withspec(),check(),discover(), andread()methods.Both connectors are registered as CLI entrypoints and include a Dockerfile for containerization.
Updates since last revision
Review & Testing Checklist for Human
stateparameter inread()is ignored and STATE messages are skipped. Incremental syncs will not work - only full refresh is supported. This is tracked with a TK-TODO marker._read_with_catalog()method which may change without notice.install_if_missing=Truewhich will install connectors on first use. Consider implications for production environments.Recommended test plan:
_airbyte_raw_*tablessource-faker), run discover and read operationssource-pyairbyte read --config source.json --catalog catalog.json | destination-pyairbyte write --config dest.json --catalog catalog.jsonNotes
This is part 1 of 2 PRs - a companion PR will register these connectors in the airbytehq/airbyte repo.
Requested by: @aaronsteers (AJ)
Devin session: https://app.devin.ai/sessions/46cd19c4793a4e1ea32bb660de9c1fe0