Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 81 additions & 3 deletions src/snowflake/connector/aio/_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from snowflake.connector.cursor import SnowflakeCursorBase as SnowflakeCursorBaseSync
from snowflake.connector.cursor import T
from snowflake.connector.errorcode import (
ER_CURSOR_EXECUTE_IN_PROGRESS,
ER_CURSOR_IS_CLOSED,
ER_FAILED_PROCESSING_PYFORMAT,
ER_FAILED_TO_REWRITE_MULTI_ROW_INSERT,
Expand Down Expand Up @@ -85,6 +86,7 @@ def __init__(
self._lock_canceling = asyncio.Lock()
self._timebomb: asyncio.Task | None = None
self._prefetch_hook: typing.Callable[[], typing.Awaitable] | None = None
self._executing: bool = False

def __aiter__(self):
return self
Expand Down Expand Up @@ -552,16 +554,92 @@ async def execute(
_force_qmark_paramstyle: bool = False,
_dataframe_ast: str | None = None,
) -> Self | dict[str, Any] | None:
if _exec_async:
_no_results = True
logger.debug("executing SQL/command")
if self.is_closed():
Error.errorhandler_wrapper(
self.connection,
self,
InterfaceError,
{"msg": "Cursor is closed in execute.", "errno": ER_CURSOR_IS_CLOSED},
)
if self._executing:
Error.errorhandler_wrapper(
self.connection,
self,
InterfaceError,
{
"msg": "Another execute is already in progress on this cursor. "
"Async cursors are not safe for concurrent use by multiple coroutines. "
"Use a separate cursor for each concurrent operation.",
"errno": ER_CURSOR_EXECUTE_IN_PROGRESS,
},
)

self._executing = True
try:
return await self._execute_impl(
command=command,
params=params,
_bind_stage=_bind_stage,
timeout=timeout,
_exec_async=_exec_async,
_no_retry=_no_retry,
_do_reset=_do_reset,
_put_callback=_put_callback,
_put_azure_callback=_put_azure_callback,
_put_callback_output_stream=_put_callback_output_stream,
_get_callback=_get_callback,
_get_azure_callback=_get_azure_callback,
_get_callback_output_stream=_get_callback_output_stream,
_show_progress_bar=_show_progress_bar,
_statement_params=_statement_params,
_is_internal=_is_internal,
_describe_only=_describe_only,
_no_results=_no_results,
_is_put_get=_is_put_get,
_raise_put_get_error=_raise_put_get_error,
_force_put_overwrite=_force_put_overwrite,
_skip_upload_on_content_match=_skip_upload_on_content_match,
file_stream=file_stream,
num_statements=num_statements,
_force_qmark_paramstyle=_force_qmark_paramstyle,
_dataframe_ast=_dataframe_ast,
)
finally:
self._executing = False

async def _execute_impl(
self,
command: str,
params: Sequence[Any] | dict[Any, Any] | None = None,
_bind_stage: str | None = None,
timeout: int | None = None,
_exec_async: bool = False,
_no_retry: bool = False,
_do_reset: bool = True,
_put_callback: SnowflakeProgressPercentage = None,
_put_azure_callback: SnowflakeProgressPercentage = None,
_put_callback_output_stream: IO[str] = sys.stdout,
_get_callback: SnowflakeProgressPercentage = None,
_get_azure_callback: SnowflakeProgressPercentage = None,
_get_callback_output_stream: IO[str] = sys.stdout,
_show_progress_bar: bool = True,
_statement_params: dict[str, str] | None = None,
_is_internal: bool = False,
_describe_only: bool = False,
_no_results: bool = False,
_is_put_get: bool | None = None,
_raise_put_get_error: bool = True,
_force_put_overwrite: bool = False,
_skip_upload_on_content_match: bool = False,
file_stream: IO[bytes] | None = None,
num_statements: int | None = None,
_force_qmark_paramstyle: bool = False,
_dataframe_ast: str | None = None,
) -> Self | dict[str, Any] | None:
"""Internal implementation of execute, called after concurrency checks."""
if _exec_async:
_no_results = True
logger.debug("executing SQL/command")

if _do_reset:
self.reset()
Expand Down
1 change: 1 addition & 0 deletions src/snowflake/connector/errorcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
ER_CHUNK_DOWNLOAD_FAILED = 252010
ER_NOT_IMPLICITY_SNOWFLAKE_DATATYPE = 252011
ER_FAILED_PROCESSING_QMARK = 252012
ER_CURSOR_EXECUTE_IN_PROGRESS = 252013

# file_transfer
ER_INVALID_STAGE_FS = 253001
Expand Down
Loading