Skip to content

Commit f2babbf

Browse files
authored
Internal packages (testcontainers, redis-worker and zod-worker) (#1392)
* Some notes on the new run engine * lockfile with setup for the run engine * Documenting where TaskRun is currently mutated, to try figure out the shape of the new system * Added notes about how triggering currently works * Details about when triggering happens * Lots of notes about waitpoints * Started scaffolding the RunEngine * Sketch of Prisma waitpoint schema while it’s fresh in my mind * Got Prisma working with testcontainers * Use beforeEach/afterEach * Simple Prisma and Redis test * Return Redis options instead of a client * Simplified things * A very simple FIFO pull-based queue to check the tests working properly * Use vitest extend * Separate redis, postgres and combined tests for faster testing * Some fixes and test improvements * Pass a logger into the queue * A queue processor that processes items from the given queue as fast as it can * Test for retrying an item that wasn’t processed * First draft of waitpoints in the Prisma schema * Remove the custom logger from the test * Added a completedAt to Waitpoint * Notes on the flow for an execution starting * Added redlock, moved some files around * Starting point for the TaskRunExecutionSnapshot table * Added relationships to TaskRunExecutionSnapshot * Change some tsconfig * Moved some things around * Added some packages * WIP on the RunQueue * Fix for some imports * Key producer with some tests * Removed the nv type from the keys… it’s not useful to do global queries * Passing unit tests for all the public key producer functions * Some basic tests passing for the RunQueue * Simple enqueue test working * Enqueue and dequeue for dev is working * Don’t log everything during the tests * Enqueuing/dequeuing from the shared queue is working * Tests for getting a shared queue * The key producer sharedQueue can now be named, to allow multiple separate queues * The key producer uses the name of the queue as the input * Extra info in the Prisma schema * Dequeuing a message gets the payload and sets the task concurrency all in one Lua script * Adding more keys so we can read the concurrency from the queue * Setting the concurrency with dequeue and enquque is working * Improved the tests and fixed some bugs * Acking is resetting the concurrencies * Check the key has been removed after acking * Nacking is working * Changed the package to CommonJS + Node10 so it works with Redlock * Moved the database, otel and emails packages to be in internal-packages * Moved some Prisma code to the database package * Started using the RunEngine for triggering * Progress on run engine triggering, first waitpoint code * Create a delay waitpoint * Moved ZodWorker to an internal package so it can be used in the run engine as well as the webapp * Web app now uses the zod worker package * Added parseNaturalLanguageDuration to core/apps * internal-packages/zod-worker in the lockfile * Pass in the master queue, remove old rebalance workers code * Add masterQueue to TaskRun * Fixed the tests * Moved waitpoint code into the run engine, also the zod worker * Completing waitpoints * An experiment to create a new test container with environment * More changes to triggering * Started testing triggering * Test for a run getting triggered and being enqueued * Removed dequeueMessageInEnv * Update dev queue tests to use the shared queue function * Schema changes for TaskRunExecutionSnapshot * First execution snapshot when the run is created. Dequeue run function added to the engine * Separate internal package for testcontainers so they can be used elsewhere * Remove the simple queue and testcontainers from the run-engine. They’re going to be separate * Fix for the wrong path to the Prisma schem,a * Added the testcontainers package to the run-engine * redis-worker package, just a copy of the simple queue for now * The queue now uses Lua to enqueue dequeue * The queue now has a catalog and an invisible period after dequeuing * Added a visibility timeout and acking, with tests * Added more Redis connection logging, deleted todos * Visibility timeouts are now defined on the catalog and can be overridden when enqueuing * Dequeue multiple items at once * Test for dequeuing multiple items * Export some types to be used elsewhere * Partial refactor of the processor * First stab at a worker with concurrency and NodeWorkers * Don’t have a default visibility timeout in the queue * Worker setup and processing items in a simple test * Process jobs in parallel with retrying * Get the attempt when dequeuing * Workers do exponential backoff * Moved todos * DLQ functionality * DLQ tests * Same cluster for all keys in the same queue * Added DLQ tests * Whitespace * Redis pubsub to redrive from the worker * Fixed database paths * Fix for path to zod-worker * Fixes for typecheck errors, mostly with TS versions and module resolution * Redlock required a patch * Moved the new DB migrations to the new database package folder * Remove the run-engine package * Remove the RunEngine prisma schema changes * Delete triggerTaskV2 * Remove zodworker test script (no tests) * Update test-containers readme * Generate the client first * Use a specific version of the prisma package * Generate the prisma client before running the unit tests
1 parent fc60947 commit f2babbf

File tree

648 files changed

+2925
-768
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

648 files changed

+2925
-768
lines changed

.github/workflows/unit-tests.yml

+3
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,8 @@ jobs:
2727
- name: 📥 Download deps
2828
run: pnpm install --frozen-lockfile
2929

30+
- name: 📀 Generate Prisma Client
31+
run: pnpm run generate
32+
3033
- name: 🧪 Run Unit Tests
3134
run: pnpm run test

.gitmodules

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
[submodule "packages/otlp-importer/protos"]
2-
path = packages/otlp-importer/protos
1+
[submodule "internal-packages/otlp-importer/protos"]
2+
path = internal-packages/otlp-importer/protos
33
url = https://github.com/open-telemetry/opentelemetry-proto.git

apps/webapp/app/db.server.ts

+21-47
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,51 @@
1-
import { Prisma, PrismaClient } from "@trigger.dev/database";
1+
import {
2+
Prisma,
3+
PrismaClient,
4+
PrismaClientOrTransaction,
5+
PrismaReplicaClient,
6+
PrismaTransactionClient,
7+
PrismaTransactionOptions,
8+
} from "@trigger.dev/database";
29
import invariant from "tiny-invariant";
310
import { z } from "zod";
411
import { env } from "./env.server";
512
import { logger } from "./services/logger.server";
613
import { isValidDatabaseUrl } from "./utils/db";
714
import { singleton } from "./utils/singleton";
15+
import { $transaction as transac } from "@trigger.dev/database";
816

9-
export type PrismaTransactionClient = Omit<
10-
PrismaClient,
11-
"$connect" | "$disconnect" | "$on" | "$transaction" | "$use" | "$extends"
12-
>;
13-
14-
export type PrismaClientOrTransaction = PrismaClient | PrismaTransactionClient;
15-
16-
function isTransactionClient(prisma: PrismaClientOrTransaction): prisma is PrismaTransactionClient {
17-
return !("$transaction" in prisma);
18-
}
19-
20-
function isPrismaKnownError(error: unknown): error is Prisma.PrismaClientKnownRequestError {
21-
return (
22-
typeof error === "object" && error !== null && "code" in error && typeof error.code === "string"
23-
);
24-
}
25-
26-
export type PrismaTransactionOptions = {
27-
/** The maximum amount of time (in ms) Prisma Client will wait to acquire a transaction from the database. The default value is 2000ms. */
28-
maxWait?: number;
29-
30-
/** The maximum amount of time (in ms) the interactive transaction can run before being canceled and rolled back. The default value is 5000ms. */
31-
timeout?: number;
32-
33-
/** Sets the transaction isolation level. By default this is set to the value currently configured in your database. */
34-
isolationLevel?: Prisma.TransactionIsolationLevel;
35-
36-
swallowPrismaErrors?: boolean;
17+
export type {
18+
PrismaTransactionClient,
19+
PrismaClientOrTransaction,
20+
PrismaTransactionOptions,
21+
PrismaReplicaClient,
3722
};
3823

3924
export async function $transaction<R>(
4025
prisma: PrismaClientOrTransaction,
4126
fn: (prisma: PrismaTransactionClient) => Promise<R>,
4227
options?: PrismaTransactionOptions
4328
): Promise<R | undefined> {
44-
if (isTransactionClient(prisma)) {
45-
return fn(prisma);
46-
}
47-
48-
try {
49-
return await (prisma as PrismaClient).$transaction(fn, options);
50-
} catch (error) {
51-
if (isPrismaKnownError(error)) {
29+
return transac(
30+
prisma,
31+
fn,
32+
(error) => {
5233
logger.error("prisma.$transaction error", {
5334
code: error.code,
5435
meta: error.meta,
5536
stack: error.stack,
5637
message: error.message,
5738
name: error.name,
5839
});
59-
60-
if (options?.swallowPrismaErrors) {
61-
return;
62-
}
63-
}
64-
65-
throw error;
66-
}
40+
},
41+
options
42+
);
6743
}
6844

6945
export { Prisma };
7046

7147
export const prisma = singleton("prisma", getClient);
7248

73-
export type PrismaReplicaClient = Omit<PrismaClient, "$transaction">;
74-
7549
export const $replica: PrismaReplicaClient = singleton(
7650
"replica",
7751
() => getReplicaClient() ?? prisma

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.environments/ConfigureEndpointSheet.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { Paragraph } from "~/components/primitives/Paragraph";
2222
import { Sheet, SheetBody, SheetContent, SheetHeader } from "~/components/primitives/Sheet";
2323
import { ClientEndpoint } from "~/presenters/EnvironmentsPresenter.server";
2424
import { endpointStreamingPath } from "~/utils/pathBuilder";
25-
import { EndpointIndexStatus, RuntimeEnvironmentType } from "../../../../../packages/database/src";
25+
import { EndpointIndexStatus, RuntimeEnvironmentType } from "@trigger.dev/database";
2626
import { bodySchema } from "../resources.environments.$environmentParam.endpoint";
2727

2828
type ConfigureEndpointSheetProps = {

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.environments/route.tsx

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import {
3939
projectEnvironmentsStreamingPath,
4040
} from "~/utils/pathBuilder";
4141
import { requestUrl } from "~/utils/requestUrl.server";
42-
import { RuntimeEnvironmentType } from "../../../../../packages/database/src";
42+
import { RuntimeEnvironmentType } from "@trigger.dev/database";
4343
import { ConfigureEndpointSheet } from "./ConfigureEndpointSheet";
4444
import { FirstEndpointSheet } from "./FirstEndpointSheet";
4545
import { BookOpenIcon } from "@heroicons/react/20/solid";

apps/webapp/app/services/runExecutionRateLimiter.server.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
import { JobHelpers, Task } from "graphile-worker";
1212
import { singleton } from "~/utils/singleton";
1313
import { logger } from "./logger.server";
14-
import { ZodWorkerRateLimiter } from "~/platform/zodWorker.server";
14+
import { ZodWorkerRateLimiter } from "@internal/zod-worker";
1515
import {
1616
ConcurrencyLimitGroup,
1717
JobRun,
@@ -117,7 +117,7 @@ if currentSize < maxSize then
117117
return true
118118
else
119119
redis.call('SADD', forbiddenFlagsKey, forbiddenFlag)
120-
120+
121121
return false
122122
end
123123
`,

apps/webapp/app/services/worker.server.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { DeliverEmailSchema } from "@/../../packages/emails/src";
1+
import { DeliverEmailSchema } from "emails";
22
import { ScheduledPayloadSchema, addMissingVersionField } from "@trigger.dev/core";
3+
import { ZodWorker } from "@internal/zod-worker";
34
import { z } from "zod";
4-
import { prisma } from "~/db.server";
5+
import { $replica, prisma } from "~/db.server";
56
import { env } from "~/env.server";
6-
import { ZodWorker } from "~/platform/zodWorker.server";
77
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
88
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
99
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
@@ -54,6 +54,7 @@ import {
5454
CancelDevSessionRunsService,
5555
CancelDevSessionRunsServiceOptions,
5656
} from "~/v3/services/cancelDevSessionRuns.server";
57+
import { logger } from "./logger.server";
5758

5859
const workerCatalog = {
5960
indexEndpoint: z.object({
@@ -279,6 +280,7 @@ function getWorkerQueue() {
279280
return new ZodWorker({
280281
name: "workerQueue",
281282
prisma,
283+
replica: $replica,
282284
runnerOptions: {
283285
connectionString: env.DATABASE_URL,
284286
concurrency: env.WORKER_CONCURRENCY,
@@ -287,6 +289,7 @@ function getWorkerQueue() {
287289
schema: env.WORKER_SCHEMA,
288290
maxPoolSize: env.WORKER_CONCURRENCY + 1,
289291
},
292+
logger: logger,
290293
shutdownTimeoutInMs: env.GRACEFUL_SHUTDOWN_TIMEOUT,
291294
schema: workerCatalog,
292295
recurringTasks: {
@@ -732,6 +735,8 @@ function getExecutionWorkerQueue() {
732735
return new ZodWorker({
733736
name: "executionWorker",
734737
prisma,
738+
replica: $replica,
739+
logger: logger,
735740
runnerOptions: {
736741
connectionString: env.DATABASE_URL,
737742
concurrency: env.EXECUTION_WORKER_CONCURRENCY,
@@ -786,6 +791,8 @@ function getTaskOperationWorkerQueue() {
786791
return new ZodWorker({
787792
name: "taskOperationWorker",
788793
prisma,
794+
replica: $replica,
795+
logger: logger,
789796
runnerOptions: {
790797
connectionString: env.DATABASE_URL,
791798
concurrency: env.TASK_OPERATION_WORKER_CONCURRENCY,
+1-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1 @@
1-
import { customAlphabet } from "nanoid";
2-
3-
const idGenerator = customAlphabet("123456789abcdefghijkmnopqrstuvwxyz", 21);
4-
5-
export function generateFriendlyId(prefix: string, size?: number) {
6-
return `${prefix}_${idGenerator(size)}`;
7-
}
1+
export { generateFriendlyId } from "@trigger.dev/core/v3/apps";

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
12
import { $transaction } from "~/db.server";
23
import { logger } from "~/services/logger.server";
34
import { marqs } from "~/v3/marqs/index.server";
45
import { BaseService } from "./baseService.server";
56
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
6-
import { parseNaturalLanguageDuration } from "./triggerTask.server";
77

88
export class EnqueueDelayedRunService extends BaseService {
99
public async call(runId: string) {

apps/webapp/app/v3/services/triggerTask.server.ts

+1-52
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2121
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2222
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2323
import { handleMetadataPacket } from "~/utils/packets";
24+
import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/apps";
2425
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
2526
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2627
import { clampMaxDuration } from "../utils/maxDuration";
@@ -646,58 +647,6 @@ export async function parseDelay(value?: string | Date): Promise<Date | undefine
646647
}
647648
}
648649

649-
export function parseNaturalLanguageDuration(duration: string): Date | undefined {
650-
const regexPattern = /^(\d+w)?(\d+d)?(\d+h)?(\d+m)?(\d+s)?$/;
651-
652-
const result: Date = new Date();
653-
let hasMatch = false;
654-
655-
const elements = duration.match(regexPattern);
656-
if (elements) {
657-
if (elements[1]) {
658-
const weeks = Number(elements[1].slice(0, -1));
659-
if (weeks >= 0) {
660-
result.setDate(result.getDate() + 7 * weeks);
661-
hasMatch = true;
662-
}
663-
}
664-
if (elements[2]) {
665-
const days = Number(elements[2].slice(0, -1));
666-
if (days >= 0) {
667-
result.setDate(result.getDate() + days);
668-
hasMatch = true;
669-
}
670-
}
671-
if (elements[3]) {
672-
const hours = Number(elements[3].slice(0, -1));
673-
if (hours >= 0) {
674-
result.setHours(result.getHours() + hours);
675-
hasMatch = true;
676-
}
677-
}
678-
if (elements[4]) {
679-
const minutes = Number(elements[4].slice(0, -1));
680-
if (minutes >= 0) {
681-
result.setMinutes(result.getMinutes() + minutes);
682-
hasMatch = true;
683-
}
684-
}
685-
if (elements[5]) {
686-
const seconds = Number(elements[5].slice(0, -1));
687-
if (seconds >= 0) {
688-
result.setSeconds(result.getSeconds() + seconds);
689-
hasMatch = true;
690-
}
691-
}
692-
}
693-
694-
if (hasMatch) {
695-
return result;
696-
}
697-
698-
return undefined;
699-
}
700-
701650
function stringifyDuration(seconds: number): string | undefined {
702651
if (seconds <= 0) {
703652
return;

apps/webapp/package.json

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
"@electric-sql/react": "^0.3.5",
5050
"@headlessui/react": "^1.7.8",
5151
"@heroicons/react": "^2.0.12",
52+
"@internal/run-engine": "workspace:*",
53+
"@internal/zod-worker": "workspace:*",
5254
"@internationalized/date": "^3.5.1",
5355
"@lezer/highlight": "^1.1.6",
5456
"@opentelemetry/api": "1.9.0",

apps/webapp/tsconfig.json

+10-6
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,18 @@
2424
"@trigger.dev/sdk/*": ["../../packages/trigger-sdk/src/*"],
2525
"@trigger.dev/core": ["../../packages/core/src/index"],
2626
"@trigger.dev/core/*": ["../../packages/core/src/*"],
27-
"@trigger.dev/database": ["../../packages/database/src/index"],
28-
"@trigger.dev/database/*": ["../../packages/database/src/*"],
27+
"@trigger.dev/database": ["../../internal-packages/database/src/index"],
28+
"@trigger.dev/database/*": ["../../internal-packages/database/src/*"],
2929
"@trigger.dev/yalt": ["../../packages/yalt/src/index"],
3030
"@trigger.dev/yalt/*": ["../../packages/yalt/src/*"],
31-
"@trigger.dev/otlp-importer": ["../../packages/otlp-importer/src/index"],
32-
"@trigger.dev/otlp-importer/*": ["../../packages/otlp-importer/src/*"],
33-
"emails": ["../../packages/emails/src/index"],
34-
"emails/*": ["../../packages/emails/src/*"]
31+
"@trigger.dev/otlp-importer": ["../../internal-packages/otlp-importer/src/index"],
32+
"@trigger.dev/otlp-importer/*": ["../../internal-packages/otlp-importer/src/*"],
33+
"emails": ["../../internal-packages/emails/src/index"],
34+
"emails/*": ["../../internal-packages/emails/src/*"],
35+
"@internal/run-engine": ["../../internal-packages/run-engine/src/index"],
36+
"@internal/run-engine/*": ["../../internal-packages/run-engine/src/*"],
37+
"@internal/zod-worker": ["../../internal-packages/zod-worker/src/index"],
38+
"@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"]
3539
},
3640
"noEmit": true
3741
}
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)