diff --git a/features/activity/shutdown/feature.cs b/features/activity/shutdown/feature.cs index 38b014be..5bc89181 100644 --- a/features/activity/shutdown/feature.cs +++ b/features/activity/shutdown/feature.cs @@ -38,15 +38,20 @@ class MyWorkflow [WorkflowRun] public async Task RunAsync() { - var options = new ActivityOptions + var gracefulOptions = new ActivityOptions + { + ScheduleToCloseTimeout = TimeSpan.FromSeconds(30), + RetryPolicy = new() { MaximumAttempts = 1 }, + }; + var ignoreOptions = new ActivityOptions { ScheduleToCloseTimeout = TimeSpan.FromMilliseconds(300), RetryPolicy = new() { MaximumAttempts = 1 }, }; - var fut = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelSuccess(), options); - var fut1 = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelFailure(), options); - var fut2 = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelIgnore(), options); + var fut = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelSuccess(), gracefulOptions); + var fut1 = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelFailure(), gracefulOptions); + var fut2 = Workflow.ExecuteActivityAsync((MyActivities act) => act.CancelIgnore(), ignoreOptions); await fut; diff --git a/features/activity/shutdown/feature.go b/features/activity/shutdown/feature.go index 3dc8604d..3abce465 100644 --- a/features/activity/shutdown/feature.go +++ b/features/activity/shutdown/feature.go @@ -47,16 +47,22 @@ func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) } func Workflow(ctx workflow.Context) (string, error) { - ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + gracefulCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToCloseTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 1, + }, + }) + ignoreCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ ScheduleToCloseTimeout: 300 * time.Millisecond, RetryPolicy: &temporal.RetryPolicy{ MaximumAttempts: 1, }, }) - fut := workflow.ExecuteActivity(ctx, activities.CancelSuccess) - fut1 := workflow.ExecuteActivity(ctx, activities.CancelFailure) - fut2 := workflow.ExecuteActivity(ctx, activities.CancelIgnore) + fut := workflow.ExecuteActivity(gracefulCtx, activities.CancelSuccess) + fut1 := workflow.ExecuteActivity(gracefulCtx, activities.CancelFailure) + fut2 := workflow.ExecuteActivity(ignoreCtx, activities.CancelIgnore) err := fut.Get(ctx, nil) if err != nil { diff --git a/features/activity/shutdown/feature.java b/features/activity/shutdown/feature.java index 0689800b..776b4bc2 100644 --- a/features/activity/shutdown/feature.java +++ b/features/activity/shutdown/feature.java @@ -41,7 +41,14 @@ class Impl implements feature, ShutdownWorkflow { @Override public String workflow() { - var activities = + var gracefulActivities = + activities( + feature.class, + builder -> + builder + .setScheduleToCloseTimeout(Duration.ofSeconds(30)) + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())); + var ignoringActivities = activities( feature.class, builder -> @@ -49,10 +56,10 @@ public String workflow() { .setScheduleToCloseTimeout(Duration.ofMillis(300)) .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())); - activities.cancelSuccess(); + gracefulActivities.cancelSuccess(); try { - activities.cancelFailure(); + gracefulActivities.cancelFailure(); throw ApplicationFailure.newFailure("expected failure", "NoError"); } catch (ActivityFailure e) { if (!(e.getCause() instanceof ApplicationFailure) @@ -62,7 +69,7 @@ public String workflow() { } try { - activities.cancelIgnore(); + ignoringActivities.cancelIgnore(); throw ApplicationFailure.newFailure("expected timeout", "NoError"); } catch (ActivityFailure e) { if (e.getCause() instanceof TimeoutFailure) { diff --git a/features/activity/shutdown/feature.py b/features/activity/shutdown/feature.py index 3df80f4e..090bd60f 100644 --- a/features/activity/shutdown/feature.py +++ b/features/activity/shutdown/feature.py @@ -20,12 +20,12 @@ class Workflow: async def run(self) -> str: handle = workflow.start_activity( cancel_success, - schedule_to_close_timeout=timedelta(milliseconds=300), + schedule_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=1), ) handle1 = workflow.start_activity( cancel_failure, - schedule_to_close_timeout=timedelta(milliseconds=300), + schedule_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=1), ) handle2 = workflow.start_activity( diff --git a/features/activity/shutdown/feature.ts b/features/activity/shutdown/feature.ts index 23cfa642..623667e3 100644 --- a/features/activity/shutdown/feature.ts +++ b/features/activity/shutdown/feature.ts @@ -1,4 +1,3 @@ -import { Context } from '@temporalio/activity'; import { Feature } from '@temporalio/harness'; import * as wf from '@temporalio/workflow'; import { ApplicationFailure, TimeoutFailure, TimeoutType } from '@temporalio/common'; @@ -27,19 +26,26 @@ const activitiesImpl = { throw new Error('worker is shutting down'); }, async cancelIgnore(): Promise { - await Context.current().sleep(15000); + // Use a plain setTimeout that doesn't respond to activity cancellation, + // so the worker must abandon this activity on shutdown. + await new Promise((resolve) => setTimeout(resolve, 15000)); }, }; -const activities = wf.proxyActivities({ +const gracefulActivities = wf.proxyActivities({ + scheduleToCloseTimeout: '30s', + retry: { maximumAttempts: 1 }, +}); + +const ignoringActivities = wf.proxyActivities({ scheduleToCloseTimeout: '300ms', retry: { maximumAttempts: 1 }, }); export async function workflow(): Promise { - const fut = activities.cancelSuccess(); - const fut1 = activities.cancelFailure(); - const fut2 = activities.cancelIgnore(); + const fut = gracefulActivities.cancelSuccess(); + const fut1 = gracefulActivities.cancelFailure(); + const fut2 = ignoringActivities.cancelIgnore(); await fut;