Skip to content

Converter hints #296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub
- [Cloud Client Using API Key](#cloud-client-using-api-key)
- [Data Conversion](#data-conversion)
- [ActiveModel](#activemodel)
- [Converter Hints](#converter-hints)
- [Workers](#workers)
- [Workflows](#workflows)
- [Workflow Definition](#workflow-definition)
Expand Down Expand Up @@ -336,6 +337,25 @@ Now if `include ActiveModelJSONSupport` is present on any ActiveModel class, on
which will use `as_json` which calls the super `as_json` but also includes the fully qualified class name as the JSON
`create_id` key. On deserialization, Ruby JSON then uses this key to know what class to call `json_create` on.

##### Converter Hints

In most places where objects are converted to payloads or vice versa, a "hint" can be provided to tell the converter
something else about the object/payload to assist conversion. The default converters ignore these hints, but custom
converters can be written to take advantage of them. For example, hints may be used to provide a custom converter the
Ruby type to deserialize a payload into.

These hints manifest themselves various ways throughout the API. The most obvious way is when making definitions. An
activity can define `activity_arg_hint` (which accepts multiple) and/or `activity_result_hint` for activity-level hints.
Similarly, a workflow can define `workflow_arg_hint` and/or `workflow_result_hint` for workflow-level hints.
`workflow_signal`, `workflow_query`, and `workflow_update` all similarly accept `arg_hints` and `result_hint` (except
signal of course). These definition-level hints are passed to converters both from the caller side and the
implementation side.

There are some advanced payload uses in the SDK that do not currently have a way to set hints. These include
workflow/schedule memo, workflow get/upsert memo, activity last heartbeat details, and application error details. In
some cases, users can use `Temporalio::Converters::RawValue` and then manually convert with hints. For others, hints can
be added as needed, please open an issue or otherwise contact Temporal.

### Workers

Workers host workflows and/or activities. Here's how to run a worker:
Expand Down
3 changes: 2 additions & 1 deletion temporalio/lib/temporalio/activity/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def instance
# Users do not have to be concerned with burdening the server by calling this too frequently.
#
# @param details [Array<Object>] Details to record with the heartbeat.
def heartbeat(*details)
# @param detail_hints [Array<Object>, nil] Hints to pass to converter.
def heartbeat(*details, detail_hints: nil)
raise NotImplementedError
end

Expand Down
39 changes: 36 additions & 3 deletions temporalio/lib/temporalio/activity/definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParam

@activity_raw_args = value
end

# Add activity hints to be passed to converter for activity args.
#
# @param hints [Array<Object>] Hints to add.
def activity_arg_hint(*hints)
@activity_arg_hints ||= []
@activity_arg_hints.concat(hints)
end

# Set activity result hint to be passed to converter for activity result.
#
# @param hint [Object] Hint to set.
def activity_result_hint(hint)
@activity_result_hint = hint
end
end

# @!visibility private
Expand All @@ -96,7 +111,9 @@ def self._activity_definition_details
activity_name:,
activity_executor: @activity_executor || :default,
activity_cancel_raise: @activity_cancel_raise.nil? || @activity_cancel_raise,
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args,
activity_arg_hints: @activity_arg_hints,
activity_result_hint: @activity_result_hint
}
end

Expand Down Expand Up @@ -127,6 +144,12 @@ class Info
# @return [Boolean] Whether to use {Converters::RawValue}s as arguments.
attr_reader :raw_args

# @return [Array<Object>, nil] Argument hints.
attr_reader :arg_hints

# @return [Object, nil] Result hint
attr_reader :result_hint

# Obtain definition info representing the given activity, which can be a class, instance, or definition info.
#
# @param activity [Definition, Class<Definition>, Info] Activity to get info for.
Expand All @@ -147,7 +170,9 @@ def self.from_activity(activity)
instance: proc { activity.new },
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
raw_args: details[:activity_raw_args],
arg_hints: details[:activity_arg_hints],
result_hint: details[:activity_result_hint]
) { |*args| Context.current.instance&.execute(*args) }
when Definition
details = activity.class._activity_definition_details
Expand All @@ -156,7 +181,9 @@ def self.from_activity(activity)
instance: activity,
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
raw_args: details[:activity_raw_args],
arg_hints: details[:activity_arg_hints],
result_hint: details[:activity_result_hint]
) { |*args| Context.current.instance&.execute(*args) }
when Info
activity
Expand All @@ -172,13 +199,17 @@ def self.from_activity(activity)
# @param executor [Symbol] Name of the executor.
# @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation.
# @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments.
# @param arg_hints [Array<Object>, nil] Argument hints.
# @param result_hint [Object, nil] Result hint.
# @yield Use this block as the activity.
def initialize(
name:,
instance: nil,
executor: :default,
cancel_raise: true,
raw_args: false,
arg_hints: nil,
result_hint: nil,
&block
)
@name = name
Expand All @@ -189,6 +220,8 @@ def initialize(
@executor = executor
@cancel_raise = cancel_raise
@raw_args = raw_args
@arg_hints = arg_hints
@result_hint = result_hint
Internal::ProtoUtils.assert_non_reserved_name(name)
end
end
Expand Down
61 changes: 52 additions & 9 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ def operator_service
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority of the workflow. This is currently experimental.
# @param arg_hints [Array<Object>, nil] Overrides converter hints for arguments if any. If unset/nil and the
# workflow definition has arg hints, those are used by default.
# @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow
# definition has result hint, it is used by default.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the started workflow.
Expand All @@ -256,10 +260,15 @@ def start_workflow(
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
arg_hints: nil,
result_hint: nil,
rpc_options: nil
)
# Take hints from definition if there is a definition
workflow, defn_arg_hints, defn_result_hint =
Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow)
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow),
workflow:,
args:,
workflow_id: id,
task_queue:,
Expand All @@ -279,6 +288,8 @@ def start_workflow(
headers: {},
versioning_override:,
priority:,
arg_hints: arg_hints || defn_arg_hints,
result_hint: result_hint || defn_result_hint,
rpc_options:
))
end
Expand Down Expand Up @@ -314,6 +325,10 @@ def start_workflow(
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority for the workflow. This is currently experimental.
# @param arg_hints [Array<Object>, nil] Overrides converter hints for arguments if any. If unset/nil and the
# workflow definition has arg hints, those are used by default.
# @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow
# definition has result hint, it is used by default.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful result of the workflow.
Expand All @@ -340,10 +355,11 @@ def execute_workflow(
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
follow_runs: true,
arg_hints: nil,
result_hint: nil,
rpc_options: nil
)
handle = start_workflow(
start_workflow(
workflow,
*args,
id:,
Expand All @@ -363,9 +379,10 @@ def execute_workflow(
request_eager_start:,
versioning_override:,
priority:,
arg_hints:,
result_hint:,
rpc_options:
)
follow_runs ? handle.result : handle
).result
end

# Get a workflow handle to an existing workflow by its ID.
Expand All @@ -375,14 +392,18 @@ def execute_workflow(
# interactions occur on the latest of the workflow ID.
# @param first_execution_run_id [String, nil] First execution run ID used for some calls like cancellation and
# termination to ensure the affected workflow is only within the same chain as this given run ID.
# @param result_hint [Object, nil] Converter hint for the workflow's result.
#
# @return [WorkflowHandle] The workflow handle.
def workflow_handle(
workflow_id,
run_id: nil,
first_execution_run_id: nil
first_execution_run_id: nil,
result_hint: nil
)
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
WorkflowHandle.new(
client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:, result_hint:
)
end

# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
Expand All @@ -396,6 +417,10 @@ def workflow_handle(
# @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not
# currently supported. See https://docs.temporal.io/workflows#update for more details.
# @param id [String] ID of the update.
# @param arg_hints [Array<Object>, nil] Overrides converter hints for update arguments if any. If unset/nil and the
# update definition has arg hints, those are used by default.
# @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update
# definition has result hint, it is used by default.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowUpdateHandle] The update handle.
Expand All @@ -409,15 +434,20 @@ def start_update_with_start_workflow(
start_workflow_operation:,
wait_for_stage:,
id: SecureRandom.uuid,
arg_hints: nil,
result_hint: nil,
rpc_options: nil
)
update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update)
@impl.start_update_with_start_workflow(
Interceptor::StartUpdateWithStartWorkflowInput.new(
update_id: id,
update: Workflow::Definition::Update._name_from_parameter(update),
update:,
args:,
wait_for_stage:,
start_workflow_operation:,
arg_hints: arg_hints || defn_arg_hints,
result_hint: result_hint || defn_result_hint,
headers: {},
rpc_options:
)
Expand All @@ -433,6 +463,10 @@ def start_update_with_start_workflow(
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
# have an `id_conflict_policy` set.
# @param id [String] ID of the update.
# @param arg_hints [Array<Object>, nil] Overrides converter hints for update arguments if any. If unset/nil and the
# update definition has arg hints, those are used by default.
# @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update
# definition has result hint, it is used by default.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful update result.
Expand All @@ -446,6 +480,8 @@ def execute_update_with_start_workflow(
*args,
start_workflow_operation:,
id: SecureRandom.uuid,
arg_hints: nil,
result_hint: nil,
rpc_options: nil
)
start_update_with_start_workflow(
Expand All @@ -454,6 +490,8 @@ def execute_update_with_start_workflow(
start_workflow_operation:,
wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
id:,
arg_hints:,
result_hint:,
rpc_options:
).result
end
Expand All @@ -464,6 +502,8 @@ def execute_update_with_start_workflow(
# @param args [Array<Object>] Signal arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not
# support all `id_conflict_policy` options.
# @param arg_hints [Array<Object>, nil] Overrides converter hints for signal arguments if any. If unset/nil and the
# signal definition has arg hints, those are used by default.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the workflow.
Expand All @@ -473,13 +513,16 @@ def signal_with_start_workflow(
signal,
*args,
start_workflow_operation:,
arg_hints: nil,
rpc_options: nil
)
signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal)
@impl.signal_with_start_workflow(
Interceptor::SignalWithStartWorkflowInput.new(
signal: Workflow::Definition::Signal._name_from_parameter(signal),
signal:,
args:,
start_workflow_operation:,
arg_hints: arg_hints || defn_arg_hints,
rpc_options:
)
)
Expand Down
16 changes: 12 additions & 4 deletions temporalio/lib/temporalio/client/async_activity_handle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,27 @@ def initialize(client:, task_token:, id_reference:)
# Record a heartbeat for the activity.
#
# @param details [Array<Object>] Details of the heartbeat.
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
def heartbeat(*details, rpc_options: nil)
def heartbeat(*details, detail_hints: nil, rpc_options: nil)
@client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new(
task_token_or_id_reference:,
details:,
detail_hints:,
rpc_options:
))
end

# Complete the activity.
#
# @param result [Object, nil] Result of the activity.
# @param result_hint [Object, nil] Converter hint for the result.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
def complete(result = nil, rpc_options: nil)
def complete(result = nil, result_hint: nil, rpc_options: nil)
@client._impl.complete_async_activity(Interceptor::CompleteAsyncActivityInput.new(
task_token_or_id_reference:,
result:,
result_hint:,
rpc_options:
))
end
Expand All @@ -52,25 +56,29 @@ def complete(result = nil, rpc_options: nil)
#
# @param error [Exception] Error for the activity.
# @param last_heartbeat_details [Array<Object>] Last heartbeat details for the activity.
# @param last_heartbeat_detail_hints [Array<Object>, nil] Converter hints for the last heartbeat details.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
def fail(error, last_heartbeat_details: [], rpc_options: nil)
def fail(error, last_heartbeat_details: [], last_heartbeat_detail_hints: nil, rpc_options: nil)
@client._impl.fail_async_activity(Interceptor::FailAsyncActivityInput.new(
task_token_or_id_reference:,
error:,
last_heartbeat_details:,
last_heartbeat_detail_hints:,
rpc_options:
))
end

# Report the activity as canceled.
#
# @param details [Array<Object>] Cancellation details.
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
# @raise [AsyncActivityCanceledError] If the activity has been canceled.
def report_cancellation(*details, rpc_options: nil)
def report_cancellation(*details, detail_hints: nil, rpc_options: nil)
@client._impl.report_cancellation_async_activity(Interceptor::ReportCancellationAsyncActivityInput.new(
task_token_or_id_reference:,
details:,
detail_hints:,
rpc_options:
))
end
Expand Down
Loading
Loading