You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
My service gets messages from users on a thread. Any time a message comes in, the server starts the orchestrator workflow. Orchestrator workflows need to make sure that only one instance of the orchestrator agent is running at a given time, and that all messages get handled by the agent.
I identified a couple of points in this workflow where there might be resource contention or out-of-order events. My solution is just to loop and do that part of the workflow again until an iteration exits clean.
I guess what I want to know is, is there a cleaner way to design this thing that I'm not thinking of? Also, is the way I'm using workflows even valid? I'm not sure I understand all the particular rules about what you can and can't do with workflows/steps/hooks, yet.
I checked #301 and while it seems useful for some other things, it doesn't seem relevant here because I want a leader/follower pattern, not a lock that delays incoming messages.
I don't think I can really explain my approach better than the mockup I made:
exportasyncfunctionorchestrator(params: {incomingMessages: IncomingMessage[],thread_uid: string,}){"use workflow"constincomingMessageHookToken=`orchestrator:${params.thread_uid}:incoming-messages`;letpendingMessages: IncomingMessage[]=[...params.incomingMessages];// strategy: use a named hook as basically a lock on the thread.// Only one orchestrator runs its agent at a time.// New incoming messages get sent to that run, and injected into the agent's messages array as it iterates.// At points where concurrency interferes with this run's attempt to do things, loop.while(pendingMessages.length){letincomingMessageHook=null;// step 1: either acquire the incoming message hook, or pass the messages on to the run that has it.while(!incomingMessageHook){try{// try to become leaderincomingMessageHook=incomingMessageHookDef.create({token: incomingMessageHookToken})}catch(err){if(HookConflictError.is(err)){// fallback to follower mode: try to pass the incoming message to the leader run, via the hook:try{awaitincomingMessageHookDef.resume(incomingMessageHookToken,{messages: pendingMessages});return;}catch(err){if(!HookNotFoundError.is(err)){throwerr;// re-throw unexpected error}}}else{throwerr;// re-throw unexpected error}}// concurrency edge case?? leader run disposes the hook just as this run tries to resume it.// this run loops again to decide whether it's a leader or follower.}// step 2: this run has the hook. It will run the agent and listen for new messages.letdone=false;// listen for new messages in a background loopvoid(async()=>{while(!done){constpayload=awaitincomingMessageHook;pendingMessages.concat(payload.messages);}})();// TODO run DurableAgent here, handle pending messages in the prepareStep callback.done=true;incomingMessageHook.dispose();// concurrency edge case?? the agent completes while a new message comes in, this run disposes the hook, pending message left unhandled.// loop back to the start, and process any new messages as if they were just passed to this workflow.}}
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
My service gets messages from users on a thread. Any time a message comes in, the server starts the orchestrator workflow. Orchestrator workflows need to make sure that only one instance of the orchestrator agent is running at a given time, and that all messages get handled by the agent.
I identified a couple of points in this workflow where there might be resource contention or out-of-order events. My solution is just to loop and do that part of the workflow again until an iteration exits clean.
I guess what I want to know is, is there a cleaner way to design this thing that I'm not thinking of? Also, is the way I'm using workflows even valid? I'm not sure I understand all the particular rules about what you can and can't do with workflows/steps/hooks, yet.
I checked #301 and while it seems useful for some other things, it doesn't seem relevant here because I want a leader/follower pattern, not a lock that delays incoming messages.
I don't think I can really explain my approach better than the mockup I made:
Beta Was this translation helpful? Give feedback.
All reactions