Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ jobs:
TEMPORAL_CLOUD_OPS_TEST_NAMESPACE: ${{ vars.TEMPORAL_CLIENT_NAMESPACE }}
TEMPORAL_CLOUD_OPS_TEST_API_KEY: ${{ secrets.TEMPORAL_CLIENT_CLOUD_API_KEY }}
TEMPORAL_CLOUD_OPS_TEST_API_VERSION: 2024-05-13-00

run: bundle exec rake TESTOPTS="--verbose"

- name: Deploy docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def initialize(endpoint:, service:, outbound:) # rubocop:disable Lint/MissingSup
@outbound = outbound
end

def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation_type: nil, summary: nil,
def start_operation(operation, arg, schedule_to_close_timeout: nil, schedule_to_start_timeout: nil,
start_to_close_timeout: nil, cancellation_type: nil, summary: nil,
cancellation: Workflow.cancellation, arg_hint: nil, result_hint: nil)
@outbound.start_nexus_operation(
Temporalio::Worker::Interceptor::Workflow::StartNexusOperationInput.new(
Expand All @@ -26,6 +27,8 @@ def start_operation(operation, arg, schedule_to_close_timeout: nil, cancellation
operation: operation.to_s,
arg:,
schedule_to_close_timeout:,
schedule_to_start_timeout:,
start_to_close_timeout:,
cancellation_type:,
summary:,
cancellation:,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ def start_nexus_operation(input)
operation: input.operation,
input: @instance.payload_converter.to_payload(input.arg, hint: input.arg_hint),
schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout),
schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout),
start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout),
nexus_header: input.headers,
cancellation_type: input.cancellation_type
),
Expand Down
2 changes: 2 additions & 0 deletions temporalio/lib/temporalio/worker/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ def handle_update(input)
:operation,
:arg,
:schedule_to_close_timeout,
:schedule_to_start_timeout,
:start_to_close_timeout,
:cancellation_type,
:summary,
:cancellation,
Expand Down
16 changes: 14 additions & 2 deletions temporalio/lib/temporalio/workflow/nexus_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def service
# @param operation [Symbol, String] Operation name.
# @param arg [Object] Argument for the operation.
# @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds.
# @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the
# operation has not started within this window, a SCHEDULE_TO_START timeout error is raised.
# @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has
# started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised.
# @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation.
# @param summary [String, nil] Optional summary for the operation (appears in UI/CLI).
# @param cancellation [Cancellation] Cancellation for the operation.
Expand All @@ -38,6 +42,8 @@ def start_operation(
operation,
arg,
schedule_to_close_timeout: nil,
schedule_to_start_timeout: nil,
start_to_close_timeout: nil,
cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED,
summary: nil,
cancellation: Workflow.cancellation,
Expand All @@ -54,6 +60,10 @@ def start_operation(
# @param operation [Symbol, String] Operation name.
# @param arg [Object] Argument for the operation.
# @param schedule_to_close_timeout [Float, nil] Total timeout for the operation in seconds.
# @param schedule_to_start_timeout [Float, nil] Timeout in seconds for the operation to start executing. If the
# operation has not started within this window, a SCHEDULE_TO_START timeout error is raised.
# @param start_to_close_timeout [Float, nil] Timeout in seconds for an async operation to complete after it has
# started. If the operation does not complete within this window, a START_TO_CLOSE timeout error is raised.
# @param cancellation_type [NexusOperationCancellationType] How the operation will react to cancellation.
# @param summary [String, nil] Optional summary for the operation (appears in UI/CLI).
# @param cancellation [Cancellation] Cancellation for the operation.
Expand All @@ -65,15 +75,17 @@ def execute_operation(
operation,
arg,
schedule_to_close_timeout: nil,
schedule_to_start_timeout: nil,
start_to_close_timeout: nil,
cancellation_type: NexusOperationCancellationType::WAIT_CANCELLATION_COMPLETED,
summary: nil,
cancellation: Workflow.cancellation,
arg_hint: nil,
result_hint: nil
)
start_operation(
operation, arg, schedule_to_close_timeout:, cancellation_type:, summary:, cancellation:,
arg_hint:, result_hint:
operation, arg, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:,
cancellation_type:, summary:, cancellation:, arg_hint:, result_hint:
).result
end
end
Expand Down
4 changes: 4 additions & 0 deletions temporalio/sig/temporalio/worker/interceptor.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ module Temporalio
attr_reader operation: String
attr_reader arg: Object?
attr_reader schedule_to_close_timeout: duration?
attr_reader schedule_to_start_timeout: duration?
attr_reader start_to_close_timeout: duration?
attr_reader cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum
attr_reader summary: String?
attr_reader cancellation: Cancellation
Expand All @@ -331,6 +333,8 @@ module Temporalio
operation: String,
arg: Object?,
schedule_to_close_timeout: duration?,
schedule_to_start_timeout: duration?,
start_to_close_timeout: duration?,
cancellation_type: Temporalio::Workflow::NexusOperationCancellationType::enum,
summary: String?,
cancellation: Cancellation,
Expand Down
4 changes: 4 additions & 0 deletions temporalio/sig/temporalio/workflow/nexus_client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ module Temporalio
Symbol | String operation,
Object? arg,
?schedule_to_close_timeout: duration?,
?schedule_to_start_timeout: duration?,
?start_to_close_timeout: duration?,
?cancellation_type: NexusOperationCancellationType::enum,
?summary: String?,
?cancellation: Cancellation,
Expand All @@ -19,6 +21,8 @@ module Temporalio
Symbol | String operation,
Object? arg,
?schedule_to_close_timeout: duration?,
?schedule_to_start_timeout: duration?,
?start_to_close_timeout: duration?,
?cancellation_type: NexusOperationCancellationType::enum,
?summary: String?,
?cancellation: Cancellation,
Expand Down
1 change: 1 addition & 0 deletions temporalio/test/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def initialize
if target_host.empty?
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
logger: Logger.new($stdout),
dev_server_download_version: 'v1.6.1-server-1.31.0-150.0',
dev_server_extra_args: [
# Allow continue as new to be immediate
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',
Expand Down
59 changes: 59 additions & 0 deletions temporalio/test/worker_workflow_nexus_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,65 @@ def test_nexus_operation_summary_in_history
end
end

# Test schedule_to_start_timeout: operation never picked up, times out before starting
class NexusScheduleToStartTimeoutWorkflow < Temporalio::Workflow::Definition
def execute(endpoint)
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
client.execute_operation(
'workflow-operation',
{ 'action' => 'wait-for-cancel' },
schedule_to_start_timeout: 0.1
)
end
end

def test_nexus_schedule_to_start_timeout
# Create a nexus endpoint pointing to a task queue with no worker so no one
# picks up the nexus task, triggering a SCHEDULE_TO_START timeout.
nexus_task_queue = "tq-#{SecureRandom.uuid}"
endpoint_name = "nexus-endpoint-#{nexus_task_queue}"
endpoint = env.server.create_nexus_endpoint(name: endpoint_name, task_queue: nexus_task_queue)

begin
err = assert_raises(Temporalio::Error::WorkflowFailedError) do
execute_workflow(NexusScheduleToStartTimeoutWorkflow, endpoint_name)
end

assert_instance_of Temporalio::Error::NexusOperationError, err.cause
assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause
assert_equal Temporalio::Error::TimeoutError::TimeoutType::SCHEDULE_TO_START, err.cause.cause.type
ensure
env.server.delete_nexus_endpoint(endpoint)
end
end

# Test start_to_close_timeout: operation starts (async token returned) but never completes
class NexusStartToCloseTimeoutWorkflow < Temporalio::Workflow::Definition
def execute(endpoint)
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
handle = client.start_operation(
'workflow-operation',
{ 'action' => 'wait-for-cancel' },
start_to_close_timeout: 0.1
)
handle.result
end
end

def test_nexus_start_to_close_timeout
env.with_kitchen_sink_worker(nexus: true) do |task_queue|
endpoint = "nexus-endpoint-#{task_queue}"

err = assert_raises(Temporalio::Error::WorkflowFailedError) do
execute_workflow(NexusStartToCloseTimeoutWorkflow, endpoint)
end

assert_instance_of Temporalio::Error::NexusOperationError, err.cause
assert_instance_of Temporalio::Error::TimeoutError, err.cause.cause
assert_equal Temporalio::Error::TimeoutError::TimeoutType::START_TO_CLOSE, err.cause.cause.type
end
end

class NexusOperationTracingWorkflow < Temporalio::Workflow::Definition
def execute(endpoint)
client = Temporalio::Workflow.create_nexus_client(endpoint:, service: 'test-service')
Expand Down
Loading