11import type { ZodError } from "zod" ;
22
33import { captureException } from "@sentry/nextjs" ;
4- import { sql } from "kysely" ;
4+ import { Kysely , sql } from "kysely" ;
55
66import type { ProcessedPub } from "contracts" ;
7+ import type { Database } from "db/Database" ;
78import type {
89 ActionInstancesId ,
910 ActionRunsId ,
@@ -17,6 +18,7 @@ import type { BaseActionInstanceConfig, Json } from "db/types";
1718import type { Prettify , XOR } from "utils/types" ;
1819import { ActionRunStatus , Event } from "db/public" ;
1920import { logger } from "logger" ;
21+ import { tryCatch } from "utils/try-catch" ;
2022
2123import type { run as logRun } from "../log/run" ;
2224import type { ActionSuccess } from "../types" ;
@@ -36,7 +38,6 @@ import { ActionConfigBuilder } from "./ActionConfigBuilder";
3638import { evaluateConditions } from "./evaluateConditions" ;
3739import { getActionRunByName } from "./getRuns" ;
3840import { createPubProxy } from "./pubProxy" ;
39- import { scheduleActionInstances } from "./scheduleActionInstance" ;
4041
4142export type ActionInstanceRunResult = ( ClientException | ClientExceptionOptions | ActionSuccess ) & {
4243 stack : ActionRunsId [ ] ;
@@ -185,7 +186,6 @@ const _runActionInstance = async (
185186 actionRunId : args . actionRunId ,
186187 } ) ;
187188
188- const jsonOrPubId = pub ? { pubId : pub . id } : { json : args . json ! } ;
189189 try {
190190 // just hard cast it to one option so we at least have some typesafety
191191 const result = await ( actionRun as typeof logRun ) ( {
@@ -201,37 +201,14 @@ const _runActionInstance = async (
201201 } ) ;
202202
203203 if ( isClientExceptionOptions ( result ) ) {
204- await scheduleActionInstances ( {
205- stageId : args . actionInstance . stageId ,
206- event : Event . actionFailed ,
207- stack,
208- sourceActionInstanceId : args . actionInstance . id ,
209- ...jsonOrPubId ,
210- } ) ;
211204 return { ...result , stack } ;
212205 }
213206
214- await scheduleActionInstances ( {
215- stageId : args . actionInstance . stageId ,
216- event : Event . actionSucceeded ,
217- stack,
218- sourceActionInstanceId : args . actionInstance . id ,
219- ...jsonOrPubId ,
220- } ) ;
221-
222207 return { ...result , stack } ;
223208 } catch ( error ) {
224209 captureException ( error ) ;
225210 logger . error ( error ) ;
226211
227- await scheduleActionInstances ( {
228- stageId : args . actionInstance . stageId ,
229- event : Event . actionFailed ,
230- stack,
231- sourceActionInstanceId : args . actionInstance . id ,
232- ...jsonOrPubId ,
233- } ) ;
234-
235212 return {
236213 title : "Failed to run action" ,
237214 error : error . message ,
@@ -299,26 +276,60 @@ export async function runActionInstance(
299276 stack : args . stack ,
300277 } ;
301278 }
302- console . log ( "BBBBBBBBBBBBbbbbb" , automation ) ;
303-
279+ // check if we need to evaluate conditions at execution time
304280 if ( automation ?. condition ) {
305- const input = pub
306- ? { pub : createPubProxy ( pub , community ?. slug ) }
307- : { json : args . json ?? ( { } as Json ) } ;
308- const res = await evaluateConditions ( automation . condition , input ) ;
309-
310- console . log ( "AAAAAAAAAAAAA" , res ) ;
311- if ( ! res ) {
312- logger . info ( "Automation condition not met" , {
281+ const automationTiming = ( automation as any ) . conditionEvaluationTiming as
282+ | string
283+ | null
284+ | undefined ;
285+ const shouldEvaluateNow =
286+ automationTiming === "onExecution" ||
287+ automationTiming === "both" ||
288+ // if no timing is set, default to evaluating at execution time for backwards compatibility
289+ ! automationTiming ;
290+
291+ if ( shouldEvaluateNow ) {
292+ const input = pub
293+ ? { pub : createPubProxy ( pub , community ?. slug ) }
294+ : { json : args . json ?? ( { } as Json ) } ;
295+ const [ error , res ] = await tryCatch ( evaluateConditions ( automation . condition , input ) ) ;
296+
297+ if ( error ) {
298+ await insertActionRun ( trx , {
299+ actionInstanceId : args . actionInstanceId ,
300+ pubId : pub ?. id ,
301+ json : args . json ,
302+ result : { error : error . message } ,
303+ status : ActionRunStatus . failure ,
304+ event : automation . event ,
305+ communityId : args . communityId ,
306+ stack : args . stack ,
307+ scheduledActionRunId : args . scheduledActionRunId ,
308+ actionInstanceArgs : args . actionInstanceArgs ,
309+ } ) ;
310+
311+ return {
312+ error : error . message ,
313+ stack : args . stack ,
314+ } ;
315+ }
316+
317+ if ( ! res ) {
318+ logger . debug ( "Automation condition not met at execution time" , {
319+ automationId : automation . id ,
320+ conditionEvaluationTiming : automationTiming ,
321+ condition : automation . condition ,
322+ } ) ;
323+ return {
324+ error : "Automation condition not met" ,
325+ stack : args . stack ,
326+ } ;
327+ }
328+
329+ logger . debug ( "Automation condition met at execution time" , {
313330 automationId : automation . id ,
314- condition : automation . condition ,
315- input,
316- res,
331+ conditionEvaluationTiming : automationTiming ,
317332 } ) ;
318- return {
319- error : "Automation condition not met" ,
320- stack : args . stack ,
321- } ;
322333 }
323334 }
324335
@@ -328,39 +339,19 @@ export async function runActionInstance(
328339 // in case the action modifies the pub and needs to pass the lastModifiedBy field
329340 // which in this case would be `action-run:<action-run-id>`
330341
331- const actionRuns = await autoRevalidate (
332- trx
333- . insertInto ( "action_runs" )
334- . values ( ( eb ) => ( {
335- id : args . scheduledActionRunId ,
336- actionInstanceId : args . actionInstanceId ,
337- pubId : pub ?. id ,
338- json : args . json ,
339- userId : isActionUserInitiated ? args . userId : null ,
340- result : { scheduled : `Action to be run immediately` } ,
341- // we are setting it to `scheduled` very briefly
342- status : ActionRunStatus . scheduled ,
343- // this is a bit hacky, would be better to pass this around methinks
344- config :
345- args . actionInstanceArgs ??
346- eb
347- . selectFrom ( "action_instances" )
348- . select ( "config" )
349- . where ( "action_instances.id" , "=" , args . actionInstanceId ) ,
350- params : args ,
351- event : isActionUserInitiated ? undefined : args . event ,
352- sourceActionRunId : args . stack . at ( - 1 ) ,
353- } ) )
354- . returningAll ( )
355- // conflict should only happen if a scheduled action is excecuted
356- // not on user initiated actions or on other events
357- . onConflict ( ( oc ) =>
358- oc . column ( "id" ) . doUpdateSet ( {
359- params : args ,
360- event : "userId" in args ? undefined : args . event ,
361- } )
362- )
363- ) . execute ( ) ;
342+ const actionRuns = await insertActionRun ( trx , {
343+ actionInstanceId : args . actionInstanceId ,
344+ pubId : pub ?. id ,
345+ json : args . json as Json ,
346+ event : args . event as Event ,
347+ communityId : args . communityId ,
348+ stack : args . stack ,
349+ scheduledActionRunId : args . scheduledActionRunId ,
350+ actionInstanceArgs : args . actionInstanceArgs as Record < string , unknown > | null ,
351+ result : { scheduled : `Action to be run immediately` } ,
352+ status : ActionRunStatus . scheduled ,
353+ userId : isActionUserInitiated ? args . userId : undefined ,
354+ } ) ;
364355
365356 if ( actionRuns . length > 1 ) {
366357 const errorMessage : ActionInstanceRunResult = {
@@ -427,26 +418,98 @@ export async function runActionInstance(
427418 return result ;
428419}
429420
421+ export const runAutomationById = async (
422+ args :
423+ | {
424+ automationId : AutomationsId ;
425+ pubId : PubsId ;
426+ json ?: never ;
427+ event : Event ;
428+ communityId : CommunitiesId ;
429+ stack : ActionRunsId [ ] ;
430+ scheduledActionRunId ?: ActionRunsId ;
431+ actionInstanceArgs ?: Record < string , unknown > | null ;
432+ }
433+ | {
434+ automationId : AutomationsId ;
435+ pubId ?: never ;
436+ json : Json ;
437+ event : Event ;
438+ communityId : CommunitiesId ;
439+ stack : ActionRunsId [ ] ;
440+ scheduledActionRunId ?: ActionRunsId ;
441+ actionInstanceArgs ?: Record < string , unknown > | null ;
442+ }
443+ ) : Promise < {
444+ actionInstanceId : ActionInstancesId ;
445+ result : any ;
446+ } > => {
447+ const automation = await getAutomation ( args . automationId ) . executeTakeFirst ( ) ;
448+
449+ if ( ! automation ) {
450+ throw new Error ( `Automation ${ args . automationId } not found` ) ;
451+ }
452+
453+ const runArgs = args . pubId
454+ ? ( {
455+ pubId : args . pubId ,
456+ communityId : args . communityId ,
457+ actionInstanceId : automation . actionInstance . id ,
458+ event : args . event ,
459+ actionInstanceArgs : args . actionInstanceArgs ?? null ,
460+ stack : args . stack ?? [ ] ,
461+ automationId : args . automationId ,
462+ scheduledActionRunId : args . scheduledActionRunId ,
463+ } as const )
464+ : ( {
465+ json : args . json ! ,
466+ communityId : args . communityId ,
467+ actionInstanceId : automation . actionInstance . id ,
468+ event : args . event ,
469+ actionInstanceArgs : args . actionInstanceArgs ?? null ,
470+ stack : args . stack ?? [ ] ,
471+ automationId : args . automationId ,
472+ scheduledActionRunId : args . scheduledActionRunId ,
473+ } as const ) ;
474+
475+ const result = await runActionInstance ( runArgs as any , db ) ;
476+
477+ return {
478+ actionInstanceId : automation . actionInstance . id ,
479+ result,
480+ } ;
481+ } ;
482+
430483export const runInstancesForEvent = async (
431484 pubId : PubsId ,
432- stageId : StagesId ,
485+ stageId : StagesId | null ,
433486 event : Event ,
434487 communityId : CommunitiesId ,
435488 stack : ActionRunsId [ ] ,
489+ automationId ?: AutomationsId ,
436490 trx = db
437491) => {
438- const instances = await trx
492+ let query = trx
439493 . selectFrom ( "action_instances" )
440494 . innerJoin ( "automations" , "automations.actionInstanceId" , "action_instances.id" )
441495 . select ( [
442496 "action_instances.id as actionInstanceId" ,
443497 "automations.config as automationConfig" ,
444498 "automations.id as automationId" ,
445499 "action_instances.name as actionInstanceName" ,
500+ "action_instances.stageId as stageId" ,
446501 ] )
447- . where ( "automations.event" , "=" , event )
448- . where ( "action_instances.stageId" , "=" , stageId )
449- . execute ( ) ;
502+ . where ( "automations.event" , "=" , event ) ;
503+
504+ if ( stageId ) {
505+ query = query . where ( "action_instances.stageId" , "=" , stageId ) ;
506+ }
507+
508+ if ( automationId ) {
509+ query = query . where ( "automations.id" , "=" , automationId ) ;
510+ }
511+
512+ const instances = await query . execute ( ) ;
450513
451514 const results = await Promise . all (
452515 instances . map ( async ( instance ) => {
@@ -472,3 +535,53 @@ export const runInstancesForEvent = async (
472535
473536 return results ;
474537} ;
538+
539+ export function insertActionRun (
540+ trx : Kysely < Database > ,
541+ args : {
542+ actionInstanceId : ActionInstancesId ;
543+ pubId ?: PubsId ;
544+ json ?: Json ;
545+ event : Event ;
546+ communityId : CommunitiesId ;
547+ stack : ActionRunsId [ ] ;
548+ scheduledActionRunId ?: ActionRunsId ;
549+ actionInstanceArgs ?: Record < string , unknown > | null ;
550+ result : Record < string , unknown > ;
551+ status : ActionRunStatus ;
552+ userId ?: UsersId ;
553+ }
554+ ) {
555+ return autoRevalidate (
556+ trx
557+ . insertInto ( "action_runs" )
558+ . values ( ( eb ) => ( {
559+ id : args . scheduledActionRunId ,
560+ actionInstanceId : args . actionInstanceId ,
561+ pubId : args . pubId ,
562+ json : args . json ,
563+ userId : "userId" in args ? ( args . userId as UsersId | null ) : null ,
564+ result : args . result ,
565+ status : args . status ,
566+ // this is a bit hacky, would be better to pass this around methinks
567+ config :
568+ args . actionInstanceArgs ??
569+ eb
570+ . selectFrom ( "action_instances" )
571+ . select ( "config" )
572+ . where ( "action_instances.id" , "=" , args . actionInstanceId ) ,
573+ params : args ,
574+ event : "userId" in args ? undefined : args . event ,
575+ sourceActionRunId : args . stack . at ( - 1 ) ,
576+ } ) )
577+ . returningAll ( )
578+ // conflict should only happen if a scheduled action is excecuted
579+ // not on user initiated actions or on other events
580+ . onConflict ( ( oc ) =>
581+ oc . column ( "id" ) . doUpdateSet ( {
582+ params : args ,
583+ event : "userId" in args ? undefined : args . event ,
584+ } )
585+ )
586+ ) . execute ( ) ;
587+ }
0 commit comments