|
1 | 1 | import { CreateBackgroundWorkerRequestBody, TaskResource } from "@trigger.dev/core/v3";
|
2 | 2 | import type { BackgroundWorker } from "@trigger.dev/database";
|
3 |
| -import { PrismaClientOrTransaction } from "~/db.server"; |
| 3 | +import { Prisma, PrismaClientOrTransaction } from "~/db.server"; |
4 | 4 | import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
|
5 | 5 | import { logger } from "~/services/logger.server";
|
6 | 6 | import { generateFriendlyId } from "../friendlyIdentifiers";
|
@@ -89,46 +89,83 @@ export async function createBackgroundTasks(
|
89 | 89 | prisma: PrismaClientOrTransaction
|
90 | 90 | ) {
|
91 | 91 | for (const task of tasks) {
|
92 |
| - await prisma.backgroundWorkerTask.create({ |
93 |
| - data: { |
94 |
| - friendlyId: generateFriendlyId("task"), |
95 |
| - projectId: worker.projectId, |
96 |
| - runtimeEnvironmentId: worker.runtimeEnvironmentId, |
97 |
| - workerId: worker.id, |
98 |
| - slug: task.id, |
99 |
| - filePath: task.filePath, |
100 |
| - exportName: task.exportName, |
101 |
| - retryConfig: task.retry, |
102 |
| - queueConfig: task.queue, |
103 |
| - }, |
104 |
| - }); |
| 92 | + try { |
| 93 | + await prisma.backgroundWorkerTask.create({ |
| 94 | + data: { |
| 95 | + friendlyId: generateFriendlyId("task"), |
| 96 | + projectId: worker.projectId, |
| 97 | + runtimeEnvironmentId: worker.runtimeEnvironmentId, |
| 98 | + workerId: worker.id, |
| 99 | + slug: task.id, |
| 100 | + filePath: task.filePath, |
| 101 | + exportName: task.exportName, |
| 102 | + retryConfig: task.retry, |
| 103 | + queueConfig: task.queue, |
| 104 | + }, |
| 105 | + }); |
105 | 106 |
|
106 |
| - const queueName = task.queue?.name ?? `task/${task.id}`; |
| 107 | + const queueName = task.queue?.name ?? `task/${task.id}`; |
107 | 108 |
|
108 |
| - const taskQueue = await prisma.taskQueue.upsert({ |
109 |
| - where: { |
110 |
| - runtimeEnvironmentId_name: { |
111 |
| - runtimeEnvironmentId: worker.runtimeEnvironmentId, |
| 109 | + const taskQueue = await prisma.taskQueue.upsert({ |
| 110 | + where: { |
| 111 | + runtimeEnvironmentId_name: { |
| 112 | + runtimeEnvironmentId: worker.runtimeEnvironmentId, |
| 113 | + name: queueName, |
| 114 | + }, |
| 115 | + }, |
| 116 | + update: { |
| 117 | + concurrencyLimit: task.queue?.concurrencyLimit, |
| 118 | + rateLimit: task.queue?.rateLimit, |
| 119 | + }, |
| 120 | + create: { |
| 121 | + friendlyId: generateFriendlyId("queue"), |
112 | 122 | name: queueName,
|
| 123 | + concurrencyLimit: task.queue?.concurrencyLimit, |
| 124 | + runtimeEnvironmentId: worker.runtimeEnvironmentId, |
| 125 | + projectId: worker.projectId, |
| 126 | + rateLimit: task.queue?.rateLimit, |
| 127 | + type: task.queue?.name ? "NAMED" : "VIRTUAL", |
113 | 128 | },
|
114 |
| - }, |
115 |
| - update: { |
116 |
| - concurrencyLimit: task.queue?.concurrencyLimit, |
117 |
| - rateLimit: task.queue?.rateLimit, |
118 |
| - }, |
119 |
| - create: { |
120 |
| - friendlyId: generateFriendlyId("queue"), |
121 |
| - name: queueName, |
122 |
| - concurrencyLimit: task.queue?.concurrencyLimit, |
123 |
| - runtimeEnvironmentId: worker.runtimeEnvironmentId, |
124 |
| - projectId: worker.projectId, |
125 |
| - rateLimit: task.queue?.rateLimit, |
126 |
| - type: task.queue?.name ? "NAMED" : "VIRTUAL", |
127 |
| - }, |
128 |
| - }); |
| 129 | + }); |
129 | 130 |
|
130 |
| - if (taskQueue.concurrencyLimit) { |
131 |
| - await marqs?.updateQueueConcurrency(env, taskQueue.name, taskQueue.concurrencyLimit); |
| 131 | + if (taskQueue.concurrencyLimit) { |
| 132 | + await marqs?.updateQueueConcurrency(env, taskQueue.name, taskQueue.concurrencyLimit); |
| 133 | + } |
| 134 | + } catch (error) { |
| 135 | + if (error instanceof Prisma.PrismaClientKnownRequestError) { |
| 136 | + // The error code for unique constraint violation in Prisma is P2002 |
| 137 | + if (error.code === "P2002") { |
| 138 | + logger.warn("Task already exists", { |
| 139 | + task, |
| 140 | + worker, |
| 141 | + }); |
| 142 | + } else { |
| 143 | + logger.error("Prisma Error creating background worker task", { |
| 144 | + error: { |
| 145 | + code: error.code, |
| 146 | + message: error.message, |
| 147 | + }, |
| 148 | + task, |
| 149 | + worker, |
| 150 | + }); |
| 151 | + } |
| 152 | + } else if (error instanceof Error) { |
| 153 | + logger.error("Error creating background worker task", { |
| 154 | + error: { |
| 155 | + name: error.name, |
| 156 | + message: error.message, |
| 157 | + stack: error.stack, |
| 158 | + }, |
| 159 | + task, |
| 160 | + worker, |
| 161 | + }); |
| 162 | + } else { |
| 163 | + logger.error("Unknown error creating background worker task", { |
| 164 | + error, |
| 165 | + task, |
| 166 | + worker, |
| 167 | + }); |
| 168 | + } |
132 | 169 | }
|
133 | 170 | }
|
134 | 171 | }
|
0 commit comments