Skip to content

Add new options to reset/update activities #808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: next-server
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ require (
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.10.0
github.com/temporalio/ui-server/v2 v2.36.0
go.temporal.io/api v1.49.0
go.temporal.io/api v1.49.2-0.20250530030647-68ab55b29576
go.temporal.io/sdk v1.34.0
go.temporal.io/server v1.28.0-133.1
go.temporal.io/server v1.28.0-132.0.0.20250602225915-1bf96bd729d7
google.golang.org/grpc v1.71.0
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v3 v3.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,12 @@ go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
go.temporal.io/api v1.49.0 h1:aL+zfrdZC6iRU0Lqc1Qds83oMEj1DwhmPUdfiIenGE4=
go.temporal.io/api v1.49.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/api v1.49.2-0.20250530030647-68ab55b29576 h1:sPkf+uwFnHLo1UnlCvcGwKpsxtqXHX8dWIm33iADb30=
go.temporal.io/api v1.49.2-0.20250530030647-68ab55b29576/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM=
go.temporal.io/sdk v1.34.0 h1:VLg/h6ny7GvLFVoQPqz2NcC93V9yXboQwblkRvZ1cZE=
go.temporal.io/sdk v1.34.0/go.mod h1:iE4U5vFrH3asOhqpBBphpj9zNtw8btp8+MSaf5A0D3w=
go.temporal.io/server v1.28.0-133.1 h1:BctAiuSCHislZXOtDNzFi5emfyQUCqZ0KwUNlresvus=
go.temporal.io/server v1.28.0-133.1/go.mod h1:vC3iscwLvmDrF0MQqI9/bvhCKWD9BCnwodIB1XVvXqU=
go.temporal.io/server v1.28.0-132.0.0.20250602225915-1bf96bd729d7 h1:clOHBhc21oL12cvGm7n1UuA/oaV50vmOD+G3oh1bXow=
go.temporal.io/server v1.28.0-132.0.0.20250602225915-1bf96bd729d7/go.mod h1:H7t/69esmtw9VpAsW1J4xfaisXaAcYDbmpU2pUkMmlA=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
go.temporal.io/version v0.3.0/go.mod h1:UA9S8/1LaKYae6TyD9NaPMJTZb911JcbqghI2CBSP78=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
52 changes: 34 additions & 18 deletions temporalcli/commands.activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (c *TemporalActivityUpdateOptionsCommand) run(cctx *CommandContext, args []
UpdateMask: &fieldmaskpb.FieldMask{
Paths: updatePath,
},
Identity: c.Identity,
Identity: c.Identity,
RestoreOriginal: c.RestoreOriginal,
})
if err != nil {
return fmt.Errorf("unable to update Activity options: %w", err)
Expand Down Expand Up @@ -225,12 +226,6 @@ func (c *TemporalActivityPauseCommand) run(cctx *CommandContext, args []string)
}

func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

opts := SingleWorkflowOrBatchOptions{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
Expand All @@ -239,11 +234,30 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
Yes: c.Yes,
Rps: c.Rps,
}

exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, singleOrBatchOverrides{
overrides := singleOrBatchOverrides{
// You're allowed to specify a reason when terminating a workflow
AllowReasonWithWorkflowID: true,
})
}

if c.Query == "" {
// single operation.
if c.ActivityType == "" && c.ActivityId == "" {
return fmt.Errorf("either Activity Type or Activity Id must be provided")
}
} else {
// batch operation.
if c.ActivityType == "" && !c.MatchAll {
return fmt.Errorf("either Activity Type must be provided or MatchAll must be set to true")
}
}

cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

exec, batchReq, err := opts.workflowExecOrBatch(cctx, c.Parent.Namespace, cl, overrides)
if err != nil {
return err
}
Expand Down Expand Up @@ -301,21 +315,17 @@ func (c *TemporalActivityUnpauseCommand) run(cctx *CommandContext, args []string
}

func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

request := &workflowservice.ResetActivityRequest{
Namespace: c.Parent.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: c.WorkflowId,
RunId: c.RunId,
},
Identity: c.Identity,
KeepPaused: c.KeepPaused,
ResetHeartbeat: c.ResetHeartbeats,
Identity: c.Identity,
KeepPaused: c.KeepPaused,
ResetHeartbeat: c.ResetHeartbeats,
RestoreOriginalOptions: c.RestoreOriginalOptions,
}

if c.ActivityType != "" {
Expand All @@ -326,6 +336,12 @@ func (c *TemporalActivityResetCommand) run(cctx *CommandContext, args []string)
return fmt.Errorf("either Activity Type or Activity Id must be provided")
}

cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

resp, err := cl.WorkflowService().ResetActivity(cctx, request)
if err != nil {
return fmt.Errorf("unable to reset an Activity: %w", err)
Expand Down
46 changes: 46 additions & 0 deletions temporalcli/commands.activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,45 @@ func (s *SharedServerSuite) TestActivityOptionsUpdate_Partial() {
s.ContainsOnSameLine(out, "BackoffCoefficient", "2")
}

func (s *SharedServerSuite) TestActivityOptionsUpdate_Restore() {
run := s.waitActivityStarted()

// update activity options to some custom values
res := s.Execute(
"activity", "update-options",
"--activity-id", activityId,
"--workflow-id", run.GetID(),
"--run-id", run.GetRunID(),
"--identity", identity,
"--task-queue", "new-task-queue",
"--schedule-to-close-timeout", "41s",
"--address", s.Address(),
)

s.NoError(res.Err)
out := res.Stdout.String()

// updated
s.ContainsOnSameLine(out, "ScheduleToCloseTimeout", "41s")

// restore the original activity options. Call should pass
res = s.Execute(
"activity", "update-options",
"--activity-id", activityId,
"--workflow-id", run.GetID(),
"--run-id", run.GetRunID(),
"--identity", identity,
"--restore_original",
"--address", s.Address(),
)

s.NoError(res.Err)
out = res.Stdout.String()

// restored
s.ContainsOnSameLine(out, "ScheduleToCloseTimeout", "0s")
}

func sendActivityCommand(command string, run client.WorkflowRun, s *SharedServerSuite, extraArgs ...string) *CommandResult {
args := []string{
"activity", command,
Expand Down Expand Up @@ -256,6 +295,13 @@ func (s *SharedServerSuite) TestActivityReset() {
out := res.Stdout.String()
s.ContainsOnSameLine(out, "ServerResponse", "true")

// same but with activity options reset
res = sendActivityCommand("reset", run, s, "--activity-id", activityId, "--restore-original-options")
s.NoError(res.Err)
// make sure we receive a server response
out = res.Stdout.String()
s.ContainsOnSameLine(out, "ServerResponse", "true")

// reset should fail because activity is not found
res = sendActivityCommand("reset", run, s, "--activity-id", "fake_id")
s.Error(res.Err)
Expand Down
28 changes: 16 additions & 12 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ func NewTemporalActivityPauseCommand(cctx *CommandContext, parent *TemporalActiv
s.Command.Long = "Pause an Activity.\n\nIf the Activity is not currently running (e.g. because it previously\nfailed), it will not be run again until it is unpaused.\n\nHowever, if the Activity is currently running, it will run to completion.\nIf the Activity is on its last retry attempt and fails, the failure will\nbe returned to the caller, just as if the Activity had not been paused.\n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored.\n\n```\n\nSpecify the Activity and Workflow IDs:\n\n```\ntemporal activity pause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "g", "", "Activity Type to pause.")
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "i", "", "Activity ID to pause.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "t", "", "Activity Type to pause.")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
Expand All @@ -498,11 +498,12 @@ type TemporalActivityResetCommand struct {
Parent *TemporalActivityCommand
Command cobra.Command
WorkflowReferenceOptions
ActivityId string
ActivityType string
Identity string
KeepPaused bool
ResetHeartbeats bool
ActivityId string
ActivityType string
Identity string
KeepPaused bool
ResetHeartbeats bool
RestoreOriginalOptions bool
}

func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityResetCommand {
Expand All @@ -517,11 +518,12 @@ func NewTemporalActivityResetCommand(cctx *CommandContext, parent *TemporalActiv
s.Command.Long = "Resetting an activity resets both the number of attempts and the activity \ntimeout. \n\nIf activity is paused and 'keep_paused' flag is not provided - it will be \nunpaused.\nIf activity is paused and 'keep_paused' flag is provided - it will stay \npaused.\nIf activity is waiting for the retry, is will be rescheduled immediately.\nIf the 'reset_heartbeats' flag is set, the activity heartbeat timer and \nheartbeats will be reset.\n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored. \n\nSpecify the Activity Type of ID and Workflow IDs:\n\n```\ntemporal activity reset \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --keep-paused\n --reset-heartbeats\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to pause.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "g", "", "Activity Type to pause.")
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "i", "", "Activity ID to pause.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "t", "", "Activity Type to pause.")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.KeepPaused, "keep-paused", false, "If activity was paused - it will stay paused.")
s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeat.")
s.Command.Flags().BoolVar(&s.RestoreOriginalOptions, "restore-original-options", false, "If set, the activity options will be restored to the defaults. Default options are then options activity was created with. They are part of the first SCHEDULE event.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
Expand Down Expand Up @@ -556,8 +558,8 @@ func NewTemporalActivityUnpauseCommand(cctx *CommandContext, parent *TemporalAct
s.Command.Long = "Re-schedule a previously-paused Activity for execution.\n\nIf the Activity is not running and is past its retry timeout, it will be\nscheduled immediately. Otherwise, it will be scheduled after its retry\ntimeout expires. \n\nUse `--reset-attempts` to reset the number of previous run attempts to \nzero. For example, if an Activity is near the maximum number of attempts \nN specified in its retry policy, `--reset-attempts` will allow the \nActivity to be retried another N times after unpausing.\n\nUse `--reset-heartbeat` to reset the Activity's heartbeats. \n\nActivities can be specified by their Activity ID or Activity Type. \nOne of those parameters must be provided. If both are provided - Activity\nType will be used, and Activity ID will be ignored.\n\nActivities can be unpaused in bulk via a visibility Query list filter:\n\n```\ntemporal activity unpause \\\n --query YourQuery \\\n --reason YourReasonForTermination\n```\n\n\nSpecify the Activity ID or Type and Workflow IDs:\n\n```\ntemporal activity unpause \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId\n --reset-attempts\n --reset-heartbeats\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "a", "", "Activity ID to unpause. Can only be used without --query.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "g", "", "Activity Type to unpause.")
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "i", "", "Activity ID to unpause. Can only be used without --query.")
s.Command.Flags().StringVarP(&s.ActivityType, "activity-type", "t", "", "Activity Type to unpause.")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.ResetAttempts, "reset-attempts", false, "Also reset the activity attempts.")
s.Command.Flags().BoolVar(&s.ResetHeartbeats, "reset-heartbeats", false, "Reset the Activity's heartbeats. Only works with --reset-attempts.")
Expand Down Expand Up @@ -588,6 +590,7 @@ type TemporalActivityUpdateOptionsCommand struct {
RetryBackoffCoefficient float32
RetryMaximumAttempts int
Identity string
RestoreOriginal bool
}

func NewTemporalActivityUpdateOptionsCommand(cctx *CommandContext, parent *TemporalActivityCommand) *TemporalActivityUpdateOptionsCommand {
Expand All @@ -602,7 +605,7 @@ func NewTemporalActivityUpdateOptionsCommand(cctx *CommandContext, parent *Tempo
s.Command.Long = "Update Activity options. Specify the Activity and Workflow IDs, and\noptions you want to update.\nUpdates are incremental, only changing the specified options.\n\n```\ntemporal activity update-options \\\n --activity-id YourActivityId \\\n --workflow-id YourWorkflowId \\\n --task-queue NewTaskQueueName \\\n --schedule-to-close-timeout DURATION \\\n --schedule-to-start-timeout DURATION \\\n --start-to-close-timeout DURATION \\\n --heartbeat-timeout DURATION \\\n --retry-initial-interval DURATION \\\n --retry-maximum-interval DURATION \\\n --retry-backoff-coefficient NewBackoffCoefficient \\\n --retry-maximum-attempts NewMaximumAttempts\n\n```"
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.ActivityId, "activity-id", "", "Activity ID. Required.")
s.Command.Flags().StringVarP(&s.ActivityId, "activity-id", "i", "", "Activity ID. Required.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "activity-id")
s.Command.Flags().StringVar(&s.TaskQueue, "task-queue", "", "Name of the task queue for the Activity.")
s.ScheduleToCloseTimeout = 0
Expand All @@ -620,6 +623,7 @@ func NewTemporalActivityUpdateOptionsCommand(cctx *CommandContext, parent *Tempo
s.Command.Flags().Float32Var(&s.RetryBackoffCoefficient, "retry-backoff-coefficient", 0, "Coefficient used to calculate the next retry interval. The next retry interval is previous interval multiplied by the backoff coefficient. Must be 1 or larger.")
s.Command.Flags().IntVar(&s.RetryMaximumAttempts, "retry-maximum-attempts", 0, "Maximum number of attempts. When exceeded the retries stop even if not expired yet. Setting this value to 1 disables retries. Setting this value to 0 means unlimited attempts(up to the timeouts).")
s.Command.Flags().StringVar(&s.Identity, "identity", "", "Identity of the user submitting this request.")
s.Command.Flags().BoolVar(&s.RestoreOriginal, "restore_original", false, "If set, the activity options will be restored to the default. Default options are then options activity was created with. They are part of the first SCHEDULE event. This flag cannot be combined with any other option; if you supply restore_original together with other options, the request will be rejected.")
s.WorkflowReferenceOptions.buildFlags(cctx, s.Command.Flags())
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
Expand Down
28 changes: 22 additions & 6 deletions temporalcli/commandsgen/commands.yml
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ commands:
```
options:
- name: activity-id
short: i
type: string
description: Activity ID.
required: true
Expand Down Expand Up @@ -379,6 +380,14 @@ commands:
- name: identity
type: string
description: Identity of the user submitting this request.
- name: restore_original
type: bool
description: |
If set, the activity options will be restored to the default.
Default options are then options activity was created with.
They are part of the first SCHEDULE event.
This flag cannot be combined with any other option; if you supply
restore_original together with other options, the request will be rejected.
option-sets:
- workflow reference

Expand Down Expand Up @@ -409,11 +418,11 @@ commands:
```
options:
- name: activity-id
short: a
short: i
type: string
description: Activity ID to pause.
- name: activity-type
short: g
short: t
type: string
description: Activity Type to pause.
- name: identity
Expand Down Expand Up @@ -462,12 +471,12 @@ commands:
```
options:
- name: activity-id
short: a
short: i
type: string
description: |
Activity ID to unpause. Can only be used without --query.
- name: activity-type
short: g
short: t
type: string
description: Activity Type to unpause.
- name: identity
Expand Down Expand Up @@ -524,11 +533,11 @@ commands:
```
options:
- name: activity-id
short: a
short: i
type: string
description: Activity ID to pause.
- name: activity-type
short: g
short: t
type: string
description: Activity Type to pause.
- name: identity
Expand All @@ -540,6 +549,13 @@ commands:
- name: reset-heartbeats
type: bool
description: Reset the Activity's heartbeat.
- name: restore-original-options
type: bool
description: |
If set, the activity options will be restored to the defaults.
Default options are then options activity was created with.
They are part of the first SCHEDULE event.

option-sets:
- workflow reference

Expand Down
Loading