-
Notifications
You must be signed in to change notification settings - Fork 2.2k
protofsm: extend EmittedEvents with new Outbox field #10346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: actor
Are you sure you want to change the base?
Conversation
In this commit, we add the actor module as a dependency to support request-response patterns in the protofsm state machine. The actor module provides a Future/Promise abstraction that enables asynchronous operations with result handling. We use a local replace directive to point to the ./actor subdirectory, as the actor package is being developed alongside protofsm. This allows us to iterate on both packages simultaneously while maintaining clean module boundaries.
In this commit, we extend the EmittedEvent structure with an Outbox field that can hold events to be returned to the caller. This enables nested state machines to emit events that bubble up to their parent state machine for further processing. The Outbox field serves a different purpose than InternalEvent and ExternalEvents. InternalEvent loops events back into the same state machine, while ExternalEvents triggers I/O operations like sending messages or broadcasting transactions. The Outbox, on the other hand, carries events outward to be processed by a parent or calling context. This pattern is particularly useful for composed state machines where a child state machine needs to communicate state changes or results back to its parent without tight coupling between the two machines.
In this commit, we introduce the AskEvent method that implements the Ask pattern from actor systems, complementing the existing fire-and- forget SendEvent method. The key distinction is that AskEvent returns a Future that resolves with the accumulated outbox events after the state machine fully processes the event. The implementation uses the Promise/Future abstraction from the actor module to enable asynchronous result delivery. When a caller invokes AskEvent, we create a Promise and send both the event and the promise to the state machine via a dedicated syncEvents channel. The state machine's driveMachine goroutine will process the event and complete the promise with either the accumulated outbox events or an error.
In this commit, we complete the outbox pattern implementation by modifying applyEvents to accumulate outbox events and extending driveMachine to handle AskEvent requests. The applyEvents function now maintains an outbox slice that accumulates events from every EmittedEvent encountered during the event processing loop. This includes events from the initial state transition as well as all nested internal events, ensuring callers receive the complete set of outbox messages regardless of how deeply nested the state transitions become. The function signature changes from returning (State, error) to (State, []Event, error) to carry this information. The driveMachine event loop now handles the syncEvents channel alongside the existing events channel. When processing an AskEvent request, we call applyEvents and capture the returned outbox slice. If processing succeeds, we complete the promise with the accumulated events using fn.Ok(outbox). If an error occurs, we complete the promise with the error using fn.Err[[]Event](err) before shutting down the state machine, maintaining our invariant that processing errors are fatal. The regular SendEvent path remains unchanged except for discarding the outbox return value with an underscore, preserving backward compatibility for fire-and-forget event processing.
Summary of ChangesHello @Roasbeef, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the "protofsm" package by introducing an "outbox" mechanism for state machines. This new feature allows state machines to emit events that are collected and returned synchronously to the caller, enabling a more flexible and composable architecture. The "AskEvent" method provides a structured way to interact with state machines, receiving a future that resolves with these emitted outbox events, thereby supporting patterns like nested state machines and actor-like communication. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new Outbox field to EmittedEvents and a corresponding AskEvent method to the state machine. This allows state machines to emit events for external processing, which is a great feature for composition. The implementation is sound and is accompanied by a comprehensive set of tests covering various scenarios, including nested events, context cancellation, and shutdown. My feedback includes a suggestion to refactor the new AskEvent function for improved conciseness and maintainability by reducing some duplicated logic. Overall, this is a well-executed enhancement to the protofsm package.
|
|
||
| // Send the request to the state machine. If we can't send it due to | ||
| // context cancellation or shutdown, complete the promise with an error. | ||
| select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I don't fully grok the future stuff, but wouldn't we start a goroutine here and return the future early? then complete it once anything from this select here happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good q. That's another way to write it, but then you need the extra book keeping around the goroutine. One benefit as you note is that things return instantly and you wait for the response later.
I wrote it like this to be mostly single threaded when sending. The main executor itself is already single threaded itself, so it must process messages one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without the goroutine wouldn't this be essentially a forced .await() on the caller though? With the goroutine the caller could still just use await() to have it blocking until the goroutine finished thoughr right?
In this commit, we implement the actor.ActorBehavior interface for StateMachine. This enables the state machine executor to be registered as an actor, and have messages be sent to it via a unique ServiceKey that a concrete instance will set.
In this commit, we modify the Receive method that implements the actor.ActorBehavior interface to use the AskEvent pattern instead of the fire-and-forget SendEvent approach. This fundamental change enables proper event propagation through actor hierarchies when state machines are composed. Previously, Receive would send events directly to the events channel and return a boolean indicating success. This provided no visibility into what happened during event processing or what events were emitted as a result. The fire-and-forget nature meant that parent actors had no way to receive outbox events from nested state machines. With this change, Receive now delegates to AskEvent and awaits the Future, returning fn.Result[[]Event] instead of fn.Result[bool]. This means the actor system can now propagate events upward through the actor hierarchy. When a state machine is embedded as an actor and processes a message, any outbox events it emits are returned to the calling actor for further processing. The implementation becomes remarkably simple as AskEvent already handles all the complexity of context cancellation, shutdown detection, and promise completion. We simply invoke AskEvent with the actor's context and the incoming event, then await and return the result directly. This change completes the outbox pattern implementation by ensuring that state machines used as actors can participate in event propagation, enabling clean composition of state machines in actor-based systems.
| // processing and returned to the caller for processing into the main | ||
| // state machine. This enables nested state machines to emit events that | ||
| // bubble up to their parent. | ||
| Outbox []Event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Outbox events should be a new type, similiar to DaemonEvents
This is useful when you want to be able to take a message type A from an actor, and adapt it to message type B.
In this PR, we extend
EmittedEventswith a new outbox field which is similar to the existingInternalEventsfield. The existing internal events is used to break up a given state machine by allowing it to send new events to itself in a future state. The outbox enables composition of state machines in a loosely coupled manner. A state machine can process an input event, then generate messages to send to destinations (in the abstract). To use this new API, callers use theAskEventmethod which now returnsactor.Future[[]Event](future of potential events). Callers can then take those new outbox events, then send them to other state machines or directly to clients, etc, etc.