-
Notifications
You must be signed in to change notification settings - Fork 3
Implement decision state machine #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect" | ||
|
||
|
||
def decision_state_to_string(state: DecisionState) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to class method
return f"DecisionType: {decision_type_to_string(self.decision_type)}, ID: {self.id}" | ||
|
||
|
||
class DecisionStateMachine: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: BaseDecisionStateMachine
def get_id(self) -> str: | ||
raise NotImplementedError | ||
|
||
# Typed handlers (Go-parity naming semantics) | ||
def handle_initiated_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_started_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_completion_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_initiation_failed_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_cancel_initiated_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_cancel_failed_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
||
def handle_canceled_event(self, event: history.HistoryEvent) -> None: | ||
pass | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add TODOs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should just return NotImplemented Error for those method. What do you think about this?
def get_id(self) -> str: | ||
return self.activity_id | ||
|
||
def handle_initiated_event(self, event: history.HistoryEvent) -> None: | ||
a = getattr(event, "activity_task_scheduled_event_attributes", None) | ||
if a is not None and a.activity_id == self.activity_id: | ||
self.status = MachineStatus.SCHEDULED | ||
self.scheduled_event_id = event.event_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least these two can be moved to Base class. check go sdk to get the full list
TIMED_OUT = auto() | ||
|
||
|
||
class DecisionState(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is never used
) | ||
|
||
|
||
class MachineStatus(Enum): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change to DecisionState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
# Event name constants (used by Go state machine; provided for parity/testing/logs) | ||
EVENT_CANCEL = "cancel" | ||
EVENT_DECISION_SENT = "handleDecisionSent" | ||
EVENT_INITIATED = "handleInitiatedEvent" | ||
EVENT_INITIATION_FAILED = "handleInitiationFailedEvent" | ||
EVENT_STARTED = "handleStartedEvent" | ||
EVENT_COMPLETION = "handleCompletionEvent" | ||
EVENT_CANCEL_INITIATED = "handleCancelInitiatedEvent" | ||
EVENT_CANCEL_FAILED = "handleCancelFailedEvent" | ||
EVENT_CANCELED = "handleCanceledEvent" | ||
|
||
|
||
# Marker names | ||
SIDE_EFFECT_MARKER_NAME = "SideEffect" | ||
VERSION_MARKER_NAME = "Version" | ||
LOCAL_ACTIVITY_MARKER_NAME = "LocalActivity" | ||
MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure how these constants are used
return mapping.get(state, "Unknown") | ||
|
||
|
||
def decision_type_to_string(dt: DecisionType) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move to class method
What changed?
Implemented decision state machine system for Cadence Python client including:
DecisionStateMachine
class for workflow decision lifecycle trackingActivityDecisionMachine
for activity task scheduling, execution, and cancellationTimerDecisionMachine
for timer operationsChildWorkflowDecisionMachine
for child workflow lifecycle managementDecisionManager
for aggregating state machines and coordinating decisionsAlso try to use proto types directly and avoid any mapping
(heavily relied on Go client state machine: https://github.com/cadence-workflow/cadence-go-client/blob/master/internal/internal_decision_state_machine.go)
Why?
Workflows need to track what decisions they've made and what state they're in. This state machine ensures decisions are made correctly, in the right order, and handles properly. It's essential for deterministic workflow execution.
How did you test it?
Local unit tests
uv test
Potential risks
Release notes
Documentation Changes