22 * @since 1.0.0
33 */
44import * as Rpc from "@effect/rpc/Rpc"
5+ import * as RpcServer from "@effect/rpc/RpcServer"
56import { DurableDeferred } from "@effect/workflow"
67import * as Activity from "@effect/workflow/Activity"
78import * as DurableClock from "@effect/workflow/DurableClock"
89import * as Workflow from "@effect/workflow/Workflow"
910import { WorkflowEngine , WorkflowInstance } from "@effect/workflow/WorkflowEngine"
1011import * as Arr from "effect/Array"
12+ import * as Cause from "effect/Cause"
1113import * as Context from "effect/Context"
1214import * as DateTime from "effect/DateTime"
1315import * as Duration from "effect/Duration"
1416import * as Effect from "effect/Effect"
1517import type * as Exit from "effect/Exit"
1618import * as Fiber from "effect/Fiber"
19+ import * as FiberId from "effect/FiberId"
1720import * as Layer from "effect/Layer"
1821import * as Option from "effect/Option"
1922import type * as ParseResult from "effect/ParseResult"
2023import * as PrimaryKey from "effect/PrimaryKey"
2124import * as RcMap from "effect/RcMap"
22- import * as Record from "effect/Record"
25+ import type * as Record from "effect/Record"
2326import * as Runtime from "effect/Runtime"
2427import * as Schedule from "effect/Schedule"
2528import * as Schema from "effect/Schema"
@@ -260,13 +263,12 @@ export const make = Effect.gen(function*() {
260263 return {
261264 run : ( request : Entity . Request < any > ) => {
262265 const instance = WorkflowInstance . initial ( workflow , executionId )
263- let payload = request . payload
266+ const payload = request . payload
264267 let parent : { workflowName : string ; executionId : string } | undefined
265268 if ( payload [ payloadParentKey ] ) {
266269 parent = payload [ payloadParentKey ]
267- payload = Record . remove ( payload , payloadParentKey )
268270 }
269- return execute ( payload , executionId ) . pipe (
271+ return execute ( workflow . payloadSchema . make ( payload ) , executionId ) . pipe (
270272 Effect . ensuring ( Effect . suspend ( ( ) => {
271273 if ( ! instance . suspended ) {
272274 return parent ? ensureSuccess ( sendResumeParent ( parent ) ) : Effect . void
@@ -291,17 +293,17 @@ export const make = Effect.gen(function*() {
291293 ) as any
292294 } ,
293295
294- activity : Effect . fnUntraced (
295- function * ( request : Entity . Request < any > ) {
296- const activityId = `${ executionId } /${ request . payload . name } `
296+ activity ( request : Entity . Request < any > ) {
297+ const activityId = `${ executionId } /${ request . payload . name } `
298+ const instance = WorkflowInstance . initial ( workflow , executionId )
299+ return Effect . gen ( function * ( ) {
297300 let entry = activities . get ( activityId )
298301 while ( ! entry ) {
299302 const latch = Effect . unsafeMakeLatch ( )
300303 activityLatches . set ( activityId , latch )
301304 yield * latch . await
302305 entry = activities . get ( activityId )
303306 }
304- const instance = WorkflowInstance . initial ( workflow , executionId )
305307 const contextMap = new Map ( entry . runtime . context . unsafeMap )
306308 contextMap . set ( Activity . CurrentAttempt . key , request . payload . attempt )
307309 contextMap . set ( WorkflowInstance . key , instance )
@@ -311,23 +313,29 @@ export const make = Effect.gen(function*() {
311313 runtimeFlags : Runtime . defaultRuntimeFlags
312314 } )
313315 return yield * entry . activity . executeEncoded . pipe (
314- Effect . interruptible ,
315- Effect . onInterrupt ( ( ) => {
316- instance . suspended = true
317- return Effect . void
318- } ) ,
319- Workflow . intoResult ,
320- Effect . provide ( runtime ) ,
321- Effect . ensuring ( Effect . sync ( ( ) => {
322- activities . delete ( activityId )
323- } ) )
316+ Effect . provide ( runtime )
324317 )
325- } ,
326- Rpc . wrap ( {
327- fork : true ,
328- uninterruptible : true
329- } )
330- ) ,
318+ } ) . pipe (
319+ Workflow . intoResult ,
320+ Effect . catchAllCause ( ( cause ) => {
321+ const interruptors = Cause . interruptors ( cause )
322+ // we only want to store explicit interrupts
323+ const ids = Array . from ( interruptors , ( id ) => Array . from ( FiberId . ids ( id ) ) ) . flat ( )
324+ const suspend = ids . includes ( RpcServer . fiberIdClientInterrupt . id ) ||
325+ ids . includes ( RpcServer . fiberIdTransientInterrupt . id )
326+ return suspend ? Effect . succeed ( new Workflow . Suspended ( ) ) : Effect . failCause ( cause )
327+ } ) ,
328+ Effect . provideService ( WorkflowInstance , instance ) ,
329+ Effect . provideService ( Activity . CurrentAttempt , request . payload . attempt ) ,
330+ Effect . ensuring ( Effect . sync ( ( ) => {
331+ activities . delete ( activityId )
332+ } ) ) ,
333+ Rpc . wrap ( {
334+ fork : true ,
335+ uninterruptible : true
336+ } )
337+ )
338+ } ,
331339
332340 deferred : Effect . fnUntraced ( function * ( request : Entity . Request < any > ) {
333341 yield * ensureSuccess ( resume ( workflow , executionId ) )
0 commit comments