Skip to content

Implement missing send_event capability for orchestration context #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

Copilot
Copy link

@Copilot Copilot AI commented Jun 3, 2025

This PR implements the missing send_event capability that was identified in issue #32. The implementation allows orchestrations to send events to other orchestration instances, which is required for the Dapr Python SDK integration.

Changes Made

Core Implementation

  • Added abstract send_event method to OrchestrationContext in durabletask/task.py

    • Takes instance_id, event_name, and optional data parameters
    • Returns a Task that completes when the event is sent
    • Follows the same pattern as other orchestration methods
  • Implemented send_event in _RuntimeOrchestrationContext in durabletask/worker.py

    • Creates SendEventAction with proper sequence numbering
    • Handles JSON serialization of data payload
    • Manages task completion tracking
  • Added helper functions in durabletask/internal/helpers.py:

    • new_send_event_action() - Creates protobuf action for sending events
    • new_event_sent_event() - Creates protobuf event for test support
  • Added event completion handling in durabletask/worker.py

    • Processes eventSent events to complete corresponding tasks
    • Follows the same pattern as other completion events
  • Uncommented and fixed action type mapping for sendEvent actions

Testing

  • Comprehensive test suite in tests/durabletask/test_send_event.py
    • Tests basic event sending functionality
    • Tests event completion flow
    • Tests with various data types (dict, list, numbers, booleans)
    • Tests with no data parameter
    • Tests multiple events in sequence

Usage Example

def orchestrator(ctx: task.OrchestrationContext, _):
    # Send an event with data
    yield ctx.send_event("target-instance-id", "approval", data={"approved": True})
    
    # Send an event without data  
    yield ctx.send_event("target-instance-id", "notification")
    
    return "events sent"

Validation

  • ✅ All existing tests continue to pass (50/50)
  • ✅ New comprehensive test suite covers edge cases
  • ✅ Implementation follows existing patterns for consistency
  • ✅ Proper protobuf integration using existing infrastructure

The implementation leverages the existing protobuf definitions (SendEventAction and EventSentEvent) that were already present but unused, making this a minimal and focused addition that integrates seamlessly with the existing codebase.

Fixes #32.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • http://168.63.129.16:80/machine/
    • Triggering command: /usr/bin/python3 -u bin/WALinuxAgent-2.13.1.1-py3.9.egg -collect-logs (http block)

If you need me to access, download, or install something from one of these locations, you can either:


💡 You can make Copilot smarter by setting up custom instructions, customizing its development environment and configuring Model Context Protocol (MCP) servers. Learn more Copilot coding agent tips in the docs.

@berndverst
Copy link
Member

Use https://github.com/microsoft/durabletask-dotnet/blob/915b4c3b8dd82bfffaf399680a723b3762b6e348/src/Shared/Grpc/ProtoUtils.cs#L277 in the durabletask-dotnet SDK as reference for the sendevent implementation.

@Copilot Copilot AI changed the title [WIP] Missing implementation for send_event Implement missing send_event capability for orchestration context Jun 3, 2025
@Copilot Copilot AI requested a review from berndverst June 3, 2025 20:43
Copilot finished work on behalf of berndverst June 3, 2025 20:44
Copy link
Author

Copilot AI commented Jun 3, 2025

Use https://github.com/microsoft/durabletask-dotnet/blob/915b4c3b8dd82bfffaf399680a723b3762b6e348/src/Shared/Grpc/ProtoUtils.cs#L277 in the durabletask-dotnet SDK as reference for the sendevent implem...

I've reviewed the durabletask-dotnet ProtoUtils.cs reference implementation and updated the send_event functionality accordingly. The key improvement is adding input validation for instance_id and event_name parameters to match the .NET SDK's validation patterns. The validation ensures that neither parameter can be null or empty, which aligns with the .NET implementation's validation approach. Updated in commit cd9d951.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • http://168.63.129.16:80/machine/
    • Triggering command: /usr/bin/python3 -u bin/WALinuxAgent-2.13.1.1-py3.9.egg -collect-logs (http block)

If you need me to access, download, or install something from one of these locations, you can either:

Copilot finished work on behalf of berndverst June 3, 2025 20:54
@berndverst
Copy link
Member

Great. Lets add another advanced test:
Run an orchestration that waits for an event to be raised before completing. Then run another orchestration that uses the first orchestration instance ID as input to send an event to the waiting orchestration. Also show that the data is being passed correctly.

Copy link
Author

Copilot AI commented Jun 3, 2025

Great. Lets add another advanced test:
Run an orchestration that waits for an event to be raised before completing. Then run another orchestration that uses the first orchestration instance ID as inp...

I've added the requested advanced test in commit 00b4957. The test demonstrates:

  1. A waiting orchestration that waits for an "approval" event before completing
  2. A sender orchestration that takes a target instance ID and sends an approval event with structured data
  3. Verification that the data is passed correctly between the orchestrations

The test shows the complete flow: waiting orchestration starts → sender orchestration sends event with data payload → waiting orchestration receives event and completes with the data. The data passing is validated to ensure the approval payload (with approved flag, approver, and timestamp) flows correctly through the event communication.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • http://168.63.129.16:80/machine/
    • Triggering command: /usr/bin/python3 -u bin/WALinuxAgent-2.13.1.1-py3.9.egg -collect-logs (http block)

If you need me to access, download, or install something from one of these locations, you can either:

Copilot finished work on behalf of berndverst June 3, 2025 21:08
@berndverst berndverst marked this pull request as ready for review June 3, 2025 21:13
@berndverst
Copy link
Member

Please address the linter error
./worker.py:862:1: W293 blank line contains whitespace

Copy link
Author

Copilot AI commented Jun 3, 2025

Please address the linter error
./worker.py:862:1: W293 blank line contains whitespace

Fixed the linter error by removing whitespace from the blank line at line 862 in worker.py. Commit 7ba38ce.

Copilot finished work on behalf of berndverst June 3, 2025 21:18
@berndverst
Copy link
Member

Please run the latest flake8 linter on the files you changed and address all errors until flake8 returns no more problems. You may want to format the files with autopep8 to speed up the process.

Copy link
Author

Copilot AI commented Jun 3, 2025

Please run the latest flake8 linter on the files you changed and address all errors until flake8 returns no more problems. You may want to format the files with autopep8 to speed up the process.

Fixed all flake8 linting errors using autopep8 formatting. All files now pass flake8 with no issues and tests continue to pass. Commit ccfbefd.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • http://168.63.129.16:80/machine/
    • Triggering command: /usr/bin/python3 -u bin/WALinuxAgent-2.13.1.1-py3.9.egg -collect-logs (http block)

If you need me to access, download, or install something from one of these locations, you can either:

Copilot finished work on behalf of berndverst June 3, 2025 21:25
@berndverst berndverst requested a review from cgillum June 3, 2025 21:26
Copy link
Member

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! My main feedback is that send_event should not return a task, so we should remove all code that tries to create, return, or yield tasks for send_event calls.

@@ -852,6 +852,22 @@ def wait_for_external_event(self, name: str) -> task.Task:
task_list.append(external_event_task)
return external_event_task

def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> task.Task:
# Validate inputs similar to .NET implementation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this comment. The fact that a .NET implementation does basic input validation is not noteworthy, and might give the wrong impression to developers who read this code and aren't necessarily interested in .NET.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the .NET implementation comment as requested. Commit 4ca1234.

self._pending_actions[id] = action
send_event_task = task.CompletableTask()
self._pending_tasks[id] = send_event_task
return send_event_task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending an event is a one-way operation and is therefore not awaitable. There's no need to create a task for this, and no need to return anything from this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed send_event to not create or return any task. It's now a one-way operation returning None. Commit 4ca1234.

if send_event_task:
# For send_event, we don't return any meaningful result, just completion
send_event_task.complete(None)
ctx.resume()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to my other comment, sending an event is not an awaitable operation, so we should not have any tasks associated with sending events.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the eventSent handling and all task-related logic for send_event operations. Commit 4ca1234.

Returns
-------
Task
A Durable Task that completes when the event has been sent.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove the return value from this method definition. This method should return None.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated send_event abstract method to return None instead of Task. Commit 4ca1234.

"""Test that send_event creates the correct action"""

def orchestrator(ctx: task.OrchestrationContext, _):
yield ctx.send_event("target_instance", "my_event", data="test_data")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my other comment, send_event should return None instead of returning a task, so we don't need to yield here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield from send_event calls since send_event now returns None. Commit 4ca1234.

"""Test send_event with no data parameter"""

def orchestrator(ctx: task.OrchestrationContext, _):
yield ctx.send_event("target_instance", "my_event")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my other feedback, remove the yield.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield from send_event calls and updated test to expect all actions in single execution. Commit 4ca1234.


def orchestrator(ctx: task.OrchestrationContext, _):
yield ctx.send_event("target1", "event1", data="data1")
yield ctx.send_event("target2", "event2", data="data2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my other feedback, remove the yield statements for each send_event method call.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield statements and updated test to expect all send_event actions plus completion in single execution. Commit 4ca1234.

# Test with number
yield ctx.send_event("target3", "event3", data=123)
# Test with boolean
yield ctx.send_event("target4", "event4", data=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback about removing yield from the ctx.send_event calls.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield statements and updated test to expect all send_event actions plus completion in single execution. Commit 4ca1234.

"""Test send_event input validation"""

def orchestrator_empty_instance(ctx: task.OrchestrationContext, _):
yield ctx.send_event("", "event1", data="test")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback about removing yield from send_event calls.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield from send_event calls. The test now properly shows orchestration completing immediately with send event actions executed first. Commit 4ca1234.

# Define the sender orchestration that sends an event to another orchestration
def sender_orchestration(ctx: task.OrchestrationContext, target_instance_id: str):
approval_payload = {"approved": True, "approver": "manager", "timestamp": "2024-01-01T10:00:00Z"}
yield ctx.send_event(target_instance_id, "approval", data=approval_payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback about removing yield from send_event calls in orchestrator code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed yield from send_event calls and updated test to show sender orchestration completing immediately. Commit 4ca1234.

Copy link
Author

Copilot AI commented Jun 4, 2025

This is great! My main feedback is that send_event should not return a task, so we should remove all code that tries to create, return, or yield tasks for send_event calls.

Updated send_event to return None instead of Task, making it a one-way operation. Removed all task creation and completion logic, and updated tests accordingly. All tests now pass. Commit 4ca1234.

Warning

Firewall rules blocked me from connecting to one or more addresses

I tried to connect to the following addresses, but was blocked by firewall rules:

  • http://168.63.129.16:80/machine/
    • Triggering command: /usr/bin/python3 -u bin/WALinuxAgent-2.13.1.1-py3.9.egg -collect-logs (http block)

If you need me to access, download, or install something from one of these locations, you can either:

Copilot finished work on behalf of cgillum June 4, 2025 06:15
@@ -973,8 +988,9 @@ def process_event(
# Start the orchestrator's generator function
ctx.run(result)
else:
# This is an orchestrator that doesn't schedule any tasks
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED)
# This is an orchestrator that doesn't use generators (async tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this comment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Missing implementation for send_event
3 participants