From 339b255b46e4b4406a40663c54fed95f69876daa Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 17 Feb 2026 23:56:09 -0800 Subject: [PATCH 1/3] Add support for more timeouts to Nexus operations --- .github/workflows/ci.yml | 3 + .../worker/workflow_instance/nexus_client.rb | 5 +- .../outbound_implementation.rb | 2 + .../lib/temporalio/worker/interceptor.rb | 2 + .../lib/temporalio/workflow/nexus_client.rb | 16 ++++- .../sig/temporalio/worker/interceptor.rbs | 4 ++ .../sig/temporalio/workflow/nexus_client.rbs | 4 ++ temporalio/test/test.rb | 1 + temporalio/test/worker_workflow_nexus_test.rb | 59 +++++++++++++++++++ 9 files changed, 93 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a38369e0..42f70b07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,6 +101,9 @@ jobs: TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }} TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }} TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00 + + # Pin the dev server version used for local testing + TEMPORAL_DEV_SERVER_VERSION: v1.6.1-server-1.31.0-150.0 run: bundle exec rake TESTOPTS="--verbose" - name: Deploy docs diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb index 873dded9..04c77c7f 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/nexus_client.rb @@ -17,7 +17,8 @@ def initialize(endpoint:, service:, outbound:) # rubocop:disable Lint/MissingSup @outbound = outbound end - def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation_type: nil, summary: nil, + def start_operation(operation, arg, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil, + start_to_close_timeout: nil, cancellation_type: nil, summary: nil, cancellation: Workflow.cancellation, arg_hint: nil, result_hint: nil) @outbound.start_nexus_operation( Temporalio::Worker::Interceptor::Workflow::StartNexusOperationInput.new( @@ -26,6 +27,8 @@ def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation operation: operation.to_s, arg:, schedule_to_close_timeout:, + schedule_to_start_timeout:, + start_to_close_timeout:, cancellation_type:, summary:, cancellation:, diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index db5c5c55..bde64cd3 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -446,6 +446,8 @@ def start_nexus_operation(input) operation: input.operation, input: @instance.payload_converter.to_payload(input.arg, hint: input.arg_hint), schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), + schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout), + start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout), nexus_header: input.headers, cancellation_type: input.cancellation_type ), diff --git a/temporalio/lib/temporalio/worker/interceptor.rb b/temporalio/lib/temporalio/worker/interceptor.rb index 5ca385f5..763a0764 100644 --- a/temporalio/lib/temporalio/worker/interceptor.rb +++ b/temporalio/lib/temporalio/worker/interceptor.rb @@ -309,6 +309,8 @@ def handle_update(input) :operation, :arg, :schedule_to_close_timeout, + :schedule_to_start_timeout, + :start_to_close_timeout, :cancellation_type, :summary, :cancellation, diff --git a/temporalio/lib/temporalio/workflow/nexus_client.rb b/temporalio/lib/temporalio/workflow/nexus_client.rb index 6c946326..64da87a8 100644 --- a/temporalio/lib/temporalio/workflow/nexus_client.rb +++ b/temporalio/lib/temporalio/workflow/nexus_client.rb @@ -28,6 +28,10 @@ def service # @param operation [Symbol, String] Operation name. # @param arg [Object] Argument for the operation. # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the + # operation has not started within this window, a SCHEDULE_TO_START timeout error is raised. + # @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has + # started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised. # @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation. # @param summary [String, nil] Optional summary for the operation (appears in UI/CLI). # @param cancellation [Cancellation] Cancellation for the operation. @@ -38,6 +42,8 @@ def start_operation( operation, arg, schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED, summary: nil, cancellation: Workflow.cancellation, @@ -54,6 +60,10 @@ def start_operation( # @param operation [Symbol, String] Operation name. # @param arg [Object] Argument for the operation. # @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds. + # @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the + # operation has not started within this window, a SCHEDULE_TO_START timeout error is raised. + # @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has + # started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised. # @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation. # @param summary [String, nil] Optional summary for the operation (appears in UI/CLI). # @param cancellation [Cancellation] Cancellation for the operation. @@ -65,6 +75,8 @@ def execute_operation( operation, arg, schedule_to_close_timeout: nil, + schedule_to_start_timeout: nil, + start_to_close_timeout: nil, cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED, summary: nil, cancellation: Workflow.cancellation, @@ -72,8 +84,8 @@ def execute_operation( result_hint: nil ) start_operation( - operation, arg, schedule_to_close_timeout:, cancellation_type:, summary:, cancellation:, - arg_hint:, result_hint: + operation, arg, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, + cancellation_type:, summary:, cancellation:, arg_hint:, result_hint: ).result end end diff --git a/temporalio/sig/temporalio/worker/interceptor.rbs b/temporalio/sig/temporalio/worker/interceptor.rbs index 214b5597..0219c510 100644 --- a/temporalio/sig/temporalio/worker/interceptor.rbs +++ b/temporalio/sig/temporalio/worker/interceptor.rbs @@ -318,6 +318,8 @@ module Temporalio attr_reader operation: String attr_reader arg: Object? attr_reader schedule_to_close_timeout: duration? + attr_reader schedule_to_start_timeout: duration? + attr_reader start_to_close_timeout: duration? attr_reader cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum attr_reader summary: String? attr_reader cancellation: Cancellation @@ -331,6 +333,8 @@ module Temporalio operation: String, arg: Object?, schedule_to_close_timeout: duration?, + schedule_to_start_timeout: duration?, + start_to_close_timeout: duration?, cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum, summary: String?, cancellation: Cancellation, diff --git a/temporalio/sig/temporalio/workflow/nexus_client.rbs b/temporalio/sig/temporalio/workflow/nexus_client.rbs index 94d86121..fbef0123 100644 --- a/temporalio/sig/temporalio/workflow/nexus_client.rbs +++ b/temporalio/sig/temporalio/workflow/nexus_client.rbs @@ -8,6 +8,8 @@ module Temporalio Symbol | String operation, Object? arg, ?schedule_to_close_timeout: duration?, + ?schedule_to_start_timeout: duration?, + ?start_to_close_timeout: duration?, ?cancellation_type: NexusOperationCancellationType::enum, ?summary: String?, ?cancellation: Cancellation, @@ -19,6 +21,8 @@ module Temporalio Symbol | String operation, Object? arg, ?schedule_to_close_timeout: duration?, + ?schedule_to_start_timeout: duration?, + ?start_to_close_timeout: duration?, ?cancellation_type: NexusOperationCancellationType::enum, ?summary: String?, ?cancellation: Cancellation, diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index 14df676b..7453c96b 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -147,6 +147,7 @@ def initialize if target_host.empty? @server = Temporalio::Testing::WorkflowEnvironment.start_local( logger: Logger.new($stdout), + dev_server_download_version: ENV.fetch('TEMPORAL_DEV_SERVER_VERSION', 'default'), dev_server_extra_args: [ # Allow continue as new to be immediate '--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"', diff --git a/temporalio/test/worker_workflow_nexus_test.rb b/temporalio/test/worker_workflow_nexus_test.rb index b5b62d0f..e6e71f53 100644 --- a/temporalio/test/worker_workflow_nexus_test.rb +++ b/temporalio/test/worker_workflow_nexus_test.rb @@ -426,6 +426,65 @@ def test_nexus_operation_summary_in_history end end + # Test schedule_to_start_timeout: operation never picked up, times out before starting + class NexusScheduleToStartTimeoutWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + client.execute_operation( + 'workflow-operation', + { 'action' => 'wait-for-cancel' }, + schedule_to_start_timeout: 0.1 + ) + end + end + + def test_nexus_schedule_to_start_timeout + # Create a nexus endpoint pointing to a task queue with no worker so no one + # picks up the nexus task, triggering a SCHEDULE_TO_START timeout. + nexus_task_queue = "tq-#{SecureRandom.uuid}" + endpoint_name = "nexus-endpoint-#{nexus_task_queue}" + endpoint = env.server.create_nexus_endpoint(name: endpoint_name, task_queue: nexus_task_queue) + + begin + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + execute_workflow(NexusScheduleToStartTimeoutWorkflow, endpoint_name) + end + + assert_instance_of Temporalio::Error::NexusOperationError, err.cause + assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause + assert_equal Temporalio::Error::TimeoutError::TimeoutType::SCHEDULE_TO_START, err.cause.cause.type + ensure + env.server.delete_nexus_endpoint(endpoint) + end + end + + # Test start_to_close_timeout: operation starts (async token returned) but never completes + class NexusStartToCloseTimeoutWorkflow < Temporalio::Workflow::Definition + def execute(endpoint) + client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') + handle = client.start_operation( + 'workflow-operation', + { 'action' => 'wait-for-cancel' }, + start_to_close_timeout: 0.1 + ) + handle.result + end + end + + def test_nexus_start_to_close_timeout + env.with_kitchen_sink_worker(nexus: true) do |task_queue| + endpoint = "nexus-endpoint-#{task_queue}" + + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + execute_workflow(NexusStartToCloseTimeoutWorkflow, endpoint) + end + + assert_instance_of Temporalio::Error::NexusOperationError, err.cause + assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause + assert_equal Temporalio::Error::TimeoutError::TimeoutType::START_TO_CLOSE, err.cause.cause.type + end + end + class NexusOperationTracingWorkflow < Temporalio::Workflow::Definition def execute(endpoint) client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service') From b7e64ab952864a70fc8e69b1da6046cdc6d5f36f Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 18 Feb 2026 08:54:27 -0800 Subject: [PATCH 2/3] hardcode dev server --- temporalio/test/test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index 7453c96b..c42e6273 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -147,7 +147,7 @@ def initialize if target_host.empty? @server = Temporalio::Testing::WorkflowEnvironment.start_local( logger: Logger.new($stdout), - dev_server_download_version: ENV.fetch('TEMPORAL_DEV_SERVER_VERSION', 'default'), + dev_server_download_version: 'v1.6.1-server-1.31.0-150.0', dev_server_extra_args: [ # Allow continue as new to be immediate '--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"', From fbe6f89d208ab625f81a2620c17630be676a55b9 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 18 Feb 2026 09:03:00 -0800 Subject: [PATCH 3/3] Forgot to commit this --- .github/workflows/ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 42f70b07..04257d37 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -102,8 +102,6 @@ jobs: TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }} TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00 - # Pin the dev server version used for local testing - TEMPORAL_DEV_SERVER_VERSION: v1.6.1-server-1.31.0-150.0 run: bundle exec rake TESTOPTS="--verbose" - name: Deploy docs