diff --git a/src/snowflake/connector/aio/_cursor.py b/src/snowflake/connector/aio/_cursor.py index 5466d7045..4e8f9ef9f 100644 --- a/src/snowflake/connector/aio/_cursor.py +++ b/src/snowflake/connector/aio/_cursor.py @@ -47,6 +47,7 @@ from snowflake.connector.cursor import SnowflakeCursorBase as SnowflakeCursorBaseSync from snowflake.connector.cursor import T from snowflake.connector.errorcode import ( + ER_CURSOR_EXECUTE_IN_PROGRESS, ER_CURSOR_IS_CLOSED, ER_FAILED_PROCESSING_PYFORMAT, ER_FAILED_TO_REWRITE_MULTI_ROW_INSERT, @@ -85,6 +86,7 @@ def __init__( self._lock_canceling = asyncio.Lock() self._timebomb: asyncio.Task | None = None self._prefetch_hook: typing.Callable[[], typing.Awaitable] | None = None + self._executing: bool = False def __aiter__(self): return self @@ -552,9 +554,6 @@ async def execute( _force_qmark_paramstyle: bool = False, _dataframe_ast: str | None = None, ) -> Self | dict[str, Any] | None: - if _exec_async: - _no_results = True - logger.debug("executing SQL/command") if self.is_closed(): Error.errorhandler_wrapper( self.connection, @@ -562,6 +561,85 @@ async def execute( InterfaceError, {"msg": "Cursor is closed in execute.", "errno": ER_CURSOR_IS_CLOSED}, ) + if self._executing: + Error.errorhandler_wrapper( + self.connection, + self, + InterfaceError, + { + "msg": "Another execute is already in progress on this cursor. " + "Async cursors are not safe for concurrent use by multiple coroutines. " + "Use a separate cursor for each concurrent operation.", + "errno": ER_CURSOR_EXECUTE_IN_PROGRESS, + }, + ) + + self._executing = True + try: + return await self._execute_impl( + command=command, + params=params, + _bind_stage=_bind_stage, + timeout=timeout, + _exec_async=_exec_async, + _no_retry=_no_retry, + _do_reset=_do_reset, + _put_callback=_put_callback, + _put_azure_callback=_put_azure_callback, + _put_callback_output_stream=_put_callback_output_stream, + _get_callback=_get_callback, + _get_azure_callback=_get_azure_callback, + _get_callback_output_stream=_get_callback_output_stream, + _show_progress_bar=_show_progress_bar, + _statement_params=_statement_params, + _is_internal=_is_internal, + _describe_only=_describe_only, + _no_results=_no_results, + _is_put_get=_is_put_get, + _raise_put_get_error=_raise_put_get_error, + _force_put_overwrite=_force_put_overwrite, + _skip_upload_on_content_match=_skip_upload_on_content_match, + file_stream=file_stream, + num_statements=num_statements, + _force_qmark_paramstyle=_force_qmark_paramstyle, + _dataframe_ast=_dataframe_ast, + ) + finally: + self._executing = False + + async def _execute_impl( + self, + command: str, + params: Sequence[Any] | dict[Any, Any] | None = None, + _bind_stage: str | None = None, + timeout: int | None = None, + _exec_async: bool = False, + _no_retry: bool = False, + _do_reset: bool = True, + _put_callback: SnowflakeProgressPercentage = None, + _put_azure_callback: SnowflakeProgressPercentage = None, + _put_callback_output_stream: IO[str] = sys.stdout, + _get_callback: SnowflakeProgressPercentage = None, + _get_azure_callback: SnowflakeProgressPercentage = None, + _get_callback_output_stream: IO[str] = sys.stdout, + _show_progress_bar: bool = True, + _statement_params: dict[str, str] | None = None, + _is_internal: bool = False, + _describe_only: bool = False, + _no_results: bool = False, + _is_put_get: bool | None = None, + _raise_put_get_error: bool = True, + _force_put_overwrite: bool = False, + _skip_upload_on_content_match: bool = False, + file_stream: IO[bytes] | None = None, + num_statements: int | None = None, + _force_qmark_paramstyle: bool = False, + _dataframe_ast: str | None = None, + ) -> Self | dict[str, Any] | None: + """Internal implementation of execute, called after concurrency checks.""" + if _exec_async: + _no_results = True + logger.debug("executing SQL/command") if _do_reset: self.reset() diff --git a/src/snowflake/connector/errorcode.py b/src/snowflake/connector/errorcode.py index e5f07e0a4..f65f060c3 100644 --- a/src/snowflake/connector/errorcode.py +++ b/src/snowflake/connector/errorcode.py @@ -50,6 +50,7 @@ ER_CHUNK_DOWNLOAD_FAILED = 252010 ER_NOT_IMPLICITY_SNOWFLAKE_DATATYPE = 252011 ER_FAILED_PROCESSING_QMARK = 252012 +ER_CURSOR_EXECUTE_IN_PROGRESS = 252013 # file_transfer ER_INVALID_STAGE_FS = 253001