-
Notifications
You must be signed in to change notification settings - Fork 33
FEAT: [POC] BCP implementation in mssql-python driver - #397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
01fb45d
2800534
abba999
6e91f3c
4b344e9
8f3d8db
cf438ed
f8102a4
593c405
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,102 @@ | ||
| from mssql_python import connect | ||
| from mssql_python.logging import setup_logging | ||
| import os | ||
| from datetime import datetime | ||
|
|
||
| # Clean one-liner: set level and output mode together | ||
| setup_logging(output="both") | ||
|
|
||
| conn_str = os.getenv("DB_CONNECTION_STRING") | ||
| print("=" * 70) | ||
| print("SQL Server - Bulk Copy Demo") | ||
| print("=" * 70) | ||
|
|
||
| # Use local SQL Server or environment variable | ||
| conn_str = os.getenv("DB_CONNECTION_STRING", | ||
| "Server=localhost,1433;Database=master;UID=sa;PWD=uvFvisUxK4En7AAV;TrustServerCertificate=yes;") | ||
|
|
||
| print("\n[1] Connecting to database...") | ||
| conn = connect(conn_str) | ||
| cursor = conn.cursor() | ||
|
|
||
| # Query databases | ||
| print("[2] Querying sys.databases...") | ||
| cursor.execute("SELECT database_id, name from sys.databases;") | ||
| rows = cursor.fetchall() | ||
|
|
||
| for row in rows: | ||
| print(f"Database ID: {row[0]}, Name: {row[1]}") | ||
| print(f" Database ID: {row[0]}, Name: {row[1]}") | ||
|
|
||
| print(f"\n Total databases: {len(rows)}") | ||
|
|
||
| # Demonstrate bulk copy functionality | ||
| print("\n" + "=" * 70) | ||
| print("Bulk Copy with Rust Bindings") | ||
| print("=" * 70) | ||
|
|
||
| try: | ||
| print("\n[3] Creating temporary table for bulk copy...") | ||
| cursor.execute(""" | ||
| CREATE TABLE #bulk_copy_demo ( | ||
| id INT, | ||
| name NVARCHAR(50), | ||
| value DECIMAL(10, 2), | ||
| created_date DATETIME | ||
| ) | ||
| """) | ||
| conn.commit() | ||
| print(" ✓ Table created") | ||
|
|
||
| # Generate 100 rows of test data | ||
| print("\n[4] Generating 100 rows of test data...") | ||
| test_data = [] | ||
| for i in range(1, 101): | ||
| test_data.append([ | ||
| i, | ||
| f"TestItem_{i}", | ||
| float(i * 10.5), | ||
| datetime.now() | ||
| ]) | ||
| print(f" ✓ Generated {len(test_data)} rows") | ||
|
|
||
| # Perform bulk copy using cursor method | ||
| print("\n[5] Performing bulk copy via cursor...") | ||
| result = cursor.bulk_copy('#bulk_copy_demo', test_data) | ||
| print(f" ✓ Bulk copy completed: {result}") | ||
|
|
||
| # Verify the data | ||
| print("\n[6] Verifying bulk copy results...") | ||
| cursor.execute("SELECT COUNT(*) FROM #bulk_copy_demo") | ||
| count = cursor.fetchone()[0] | ||
| print(f" ✓ Total rows copied: {count}") | ||
|
|
||
| # Show sample data | ||
| cursor.execute("SELECT TOP 5 id, name, value FROM #bulk_copy_demo ORDER BY id") | ||
| sample_rows = cursor.fetchall() | ||
| print("\n Sample data:") | ||
| for row in sample_rows: | ||
| print(f" ID: {row[0]}, Name: {row[1]}, Value: {row[2]}") | ||
|
|
||
| print("\n✓ Bulk copy demo completed successfully!") | ||
|
|
||
| # Cleanup | ||
| cursor.execute("DROP TABLE IF EXISTS #bulk_copy_demo") | ||
| conn.commit() | ||
|
|
||
| except ImportError as e: | ||
| print(f"\n✗ Rust bindings or mssql_core_tds not available: {e}") | ||
| except AttributeError as e: | ||
| print(f"\n⚠ {e}") | ||
| print(" Skipping bulk copy demo") | ||
| # Cleanup | ||
| cursor.execute("DROP TABLE IF EXISTS #bulk_copy_demo") | ||
| conn.commit() | ||
| except Exception as e: | ||
| print(f"\n✗ Bulk copy failed: {e}") | ||
| # Cleanup | ||
| cursor.execute("DROP TABLE IF EXISTS #bulk_copy_demo") | ||
| conn.commit() | ||
|
|
||
| print("\n" + "=" * 70) | ||
| cursor.close() | ||
| conn.close() | ||
| conn.close() | ||
| print("✓ Connection closed") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2825,3 +2825,81 @@ | |
| are managed automatically by the underlying driver. | ||
| """ | ||
| # This is a no-op - buffer sizes are managed automatically | ||
|
|
||
| def bulk_copy(self, table_name: str, data: List[List[Any]]) -> Any: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a great start! An important thing that's missing is a target column list. Imagine that your table has columns (a, b, c, d). "b" is nullable and you wish to bulk copy values for only columns "a", "c", and "d". As a user you should be able to supply a positional list of columns that the input data should correspond to. Another thing: The outer type for the "data" input should not be List, but rather Iterable, and the inner type should be either List or Tuple. The latter is more of a type hinting issue; I suspect either List or Tuple will work already given the implementation. The Iterable, however, might require some work on the Rust side. This is really important from a bulk copy perspective as a user might wish to send many more rows than will fit in memory at once. Outside of those things it would be nice to see some common control flags, such as the ability to enable or disable trigger firing, but I'd say that those are less important than the top two. Great PR so far! |
||
| """ | ||
| Perform bulk copy operation using Rust bindings. | ||
| This method provides high-performance bulk insert operations by leveraging | ||
| the mssql_core_tds library through Rust PyO3 bindings. | ||
| Args: | ||
| table_name: Target table name for bulk copy | ||
| data: List of rows to insert, where each row is a list of column values | ||
| Returns: | ||
| Result from the underlying bulk_copy operation | ||
| Raises: | ||
| ImportError: If mssql_rust_bindings or mssql_core_tds are not available | ||
| AttributeError: If bulk_copy method is not implemented in mssql_core_tds | ||
| ProgrammingError: If cursor or connection is closed | ||
| DatabaseError: If bulk copy operation fails | ||
| Example: | ||
| >>> cursor = conn.cursor() | ||
| >>> data = [[1, 'Alice', 100.50], [2, 'Bob', 200.75]] | ||
| >>> cursor.bulk_copy('employees', data) | ||
| """ | ||
| self._check_closed() | ||
|
|
||
| try: | ||
| import mssql_rust_bindings | ||
| except ImportError as e: | ||
| raise ImportError( | ||
| f"Bulk copy requires mssql_rust_bindings module: {e}" | ||
| ) from e | ||
|
|
||
| # Parse connection string to extract parameters | ||
| conn_str = self._connection.connection_str | ||
| params = {} | ||
|
|
||
| for part in conn_str.split(';'): | ||
| if '=' in part: | ||
| key, value = part.split('=', 1) | ||
| key = key.strip().lower() | ||
| value = value.strip() | ||
|
|
||
| if key in ['server', 'data source']: | ||
| params['server'] = value.split(',')[0] # Remove port if present | ||
| elif key in ['database', 'initial catalog']: | ||
| params['database'] = value | ||
| elif key in ['uid', 'user id', 'user']: | ||
| params['user_name'] = value | ||
| elif key in ['pwd', 'password']: | ||
| params['password'] = value | ||
| elif key == 'trustservercertificate': | ||
| params['trust_server_certificate'] = value | ||
|
|
||
| # Set defaults if not found | ||
| params.setdefault('server', 'localhost') | ||
Check noticeCode scanning / devskim Accessing localhost could indicate debug code, or could hinder scaling. Note
Do not leave debug code in production
|
||
| params.setdefault('database', 'master') | ||
| params.setdefault('user_name', 'sa') | ||
| params.setdefault('password', '') | ||
| params.setdefault('trust_server_certificate', 'yes') | ||
|
|
||
| try: | ||
| # BulkCopyWrapper handles mssql_core_tds connection internally | ||
| bulk_wrapper = mssql_rust_bindings.BulkCopyWrapper(params) | ||
| result = bulk_wrapper.bulk_copy(table_name, data) | ||
| bulk_wrapper.close() | ||
| return result | ||
| except AttributeError as e: | ||
| raise AttributeError( | ||
| "bulk_copy method not implemented in mssql_core_tds.DdbcConnection" | ||
| ) from e | ||
| except Exception as e: | ||
| raise DatabaseError( | ||
| driver_error=f"Bulk copy operation failed: {e}", | ||
| ddbc_error=str(e) | ||
| ) from e | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Check notice
Code scanning / devskim
Accessing localhost could indicate debug code, or could hinder scaling. Note