22 * @since 1.0.0
33 */
44import * as Rpc from "@effect/rpc/Rpc"
5+ import * as RpcServer from "@effect/rpc/RpcServer"
56import { DurableDeferred } from "@effect/workflow"
67import * as Activity from "@effect/workflow/Activity"
78import * as DurableClock from "@effect/workflow/DurableClock"
89import * as Workflow from "@effect/workflow/Workflow"
910import { WorkflowEngine , WorkflowInstance } from "@effect/workflow/WorkflowEngine"
1011import * as Arr from "effect/Array"
12+ import * as Cause from "effect/Cause"
1113import * as Context from "effect/Context"
1214import * as DateTime from "effect/DateTime"
1315import * as Duration from "effect/Duration"
1416import * as Effect from "effect/Effect"
1517import type * as Exit from "effect/Exit"
1618import * as Fiber from "effect/Fiber"
19+ import * as FiberId from "effect/FiberId"
1720import * as Layer from "effect/Layer"
1821import * as Option from "effect/Option"
1922import type * as ParseResult from "effect/ParseResult"
@@ -291,17 +294,17 @@ export const make = Effect.gen(function*() {
291294 ) as any
292295 } ,
293296
294- activity : Effect . fnUntraced (
295- function * ( request : Entity . Request < any > ) {
296- const activityId = `${ executionId } /${ request . payload . name } `
297+ activity ( request : Entity . Request < any > ) {
298+ const activityId = `${ executionId } /${ request . payload . name } `
299+ const instance = WorkflowInstance . initial ( workflow , executionId )
300+ return Effect . gen ( function * ( ) {
297301 let entry = activities . get ( activityId )
298302 while ( ! entry ) {
299303 const latch = Effect . unsafeMakeLatch ( )
300304 activityLatches . set ( activityId , latch )
301305 yield * latch . await
302306 entry = activities . get ( activityId )
303307 }
304- const instance = WorkflowInstance . initial ( workflow , executionId )
305308 const contextMap = new Map ( entry . runtime . context . unsafeMap )
306309 contextMap . set ( Activity . CurrentAttempt . key , request . payload . attempt )
307310 contextMap . set ( WorkflowInstance . key , instance )
@@ -311,23 +314,29 @@ export const make = Effect.gen(function*() {
311314 runtimeFlags : Runtime . defaultRuntimeFlags
312315 } )
313316 return yield * entry . activity . executeEncoded . pipe (
314- Effect . interruptible ,
315- Effect . onInterrupt ( ( ) => {
316- instance . suspended = true
317- return Effect . void
318- } ) ,
319- Workflow . intoResult ,
320- Effect . provide ( runtime ) ,
321- Effect . ensuring ( Effect . sync ( ( ) => {
322- activities . delete ( activityId )
323- } ) )
317+ Effect . provide ( runtime )
324318 )
325- } ,
326- Rpc . wrap ( {
327- fork : true ,
328- uninterruptible : true
329- } )
330- ) ,
319+ } ) . pipe (
320+ Workflow . intoResult ,
321+ Effect . catchAllCause ( ( cause ) => {
322+ const interruptors = Cause . interruptors ( cause )
323+ // we only want to store explicit interrupts
324+ const ids = Array . from ( interruptors , ( id ) => Array . from ( FiberId . ids ( id ) ) ) . flat ( )
325+ const suspend = ids . includes ( RpcServer . fiberIdClientInterrupt . id ) ||
326+ ids . includes ( RpcServer . fiberIdTransientInterrupt . id )
327+ return suspend ? Effect . succeed ( new Workflow . Suspended ( ) ) : Effect . failCause ( cause )
328+ } ) ,
329+ Effect . provideService ( WorkflowInstance , instance ) ,
330+ Effect . provideService ( Activity . CurrentAttempt , request . payload . attempt ) ,
331+ Effect . ensuring ( Effect . sync ( ( ) => {
332+ activities . delete ( activityId )
333+ } ) ) ,
334+ Rpc . wrap ( {
335+ fork : true ,
336+ uninterruptible : true
337+ } )
338+ )
339+ } ,
331340
332341 deferred : Effect . fnUntraced ( function * ( request : Entity . Request < any > ) {
333342 yield * ensureSuccess ( resume ( workflow , executionId ) )
0 commit comments