diff --git a/apps/event-worker/src/index.ts b/apps/event-worker/src/index.ts index 3502f387f..e68056179 100644 --- a/apps/event-worker/src/index.ts +++ b/apps/event-worker/src/index.ts @@ -1,23 +1,13 @@ import { logger } from "@ctrlplane/logger"; import { register } from "./instrumentation.js"; -import { createDispatchExecutionJobWorker } from "./job-dispatch/index.js"; import { redis } from "./redis.js"; -import { createReleaseNewVersionWorker } from "./releases/new-version/index.js"; -import { createReleaseVariableChangeWorker } from "./releases/variable-change/index.js"; -import { createResourceScanWorker } from "./resource-scan/index.js"; import { workers } from "./workers/index.js"; console.log("Registering instrumentation..."); await register(); -const allWorkers = [ - createResourceScanWorker(), - createDispatchExecutionJobWorker(), - createReleaseNewVersionWorker(), - createReleaseVariableChangeWorker(), - ...Object.values(workers), -]; +const allWorkers = Object.values(workers); const shutdown = () => { logger.warn("Exiting..."); diff --git a/apps/event-worker/src/job-dispatch/index.ts b/apps/event-worker/src/job-dispatch/index.ts deleted file mode 100644 index cc21381ec..000000000 --- a/apps/event-worker/src/job-dispatch/index.ts +++ /dev/null @@ -1,57 +0,0 @@ -import type { DispatchJobEvent } from "@ctrlplane/validators/events"; -import { Worker } from "bullmq"; - -import { eq, takeFirstOrNull } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import * as schema from "@ctrlplane/db/schema"; -import { updateJob } from "@ctrlplane/job-dispatch"; -import { Channel } from "@ctrlplane/validators/events"; -import { JobAgentType, JobStatus } from "@ctrlplane/validators/jobs"; - -import { redis } from "../redis.js"; -import { dispatchGithubJob } from "./github.js"; - -export const createDispatchExecutionJobWorker = () => - new Worker( - Channel.DispatchJob, - async (job) => { - const { jobId } = job.data; - const je = await db - .select() - .from(schema.job) - .innerJoin( - schema.jobAgent, - eq(schema.job.jobAgentId, schema.jobAgent.id), - ) - .where(eq(schema.job.id, jobId)) - .then(takeFirstOrNull); - - if (je == null) { - job.log(`Job ${jobId} not found`); - return null; - } - - try { - job.log( - `Dispatching job ${je.job.id} --- ${je.job_agent.type}/${je.job_agent.name}`, - ); - if (je.job_agent.type === String(JobAgentType.GithubApp)) { - job.log(`Dispatching to GitHub app`); - await dispatchGithubJob(je.job); - } - } catch (error: unknown) { - await updateJob(db, je.job.id, { - status: JobStatus.Failure, - message: (error as Error).message, - }); - } - - return je; - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 100 }, - removeOnFail: { age: 12 * 60 * 60, count: 100 }, - concurrency: 10, - }, - ); diff --git a/apps/event-worker/src/job-sync/index.ts b/apps/event-worker/src/job-sync/index.ts deleted file mode 100644 index 4f3b94b11..000000000 --- a/apps/event-worker/src/job-sync/index.ts +++ /dev/null @@ -1,69 +0,0 @@ -// import type { Job } from "@ctrlplane/db/schema"; -// import type { DispatchJobEvent } from "@ctrlplane/validators/events"; -// import type { Job as JobMq } from "bullmq"; -// import { Queue, Worker } from "bullmq"; - -// import { eq, takeFirstOrNull } from "@ctrlplane/db"; -// import { db } from "@ctrlplane/db/client"; -// import * as schema from "@ctrlplane/db/schema"; -// import { onJobCompletion } from "@ctrlplane/job-dispatch"; -// import { Channel } from "@ctrlplane/validators/events"; -// import { JobAgentType, JobStatus } from "@ctrlplane/validators/jobs"; - -// import { redis } from "../redis.js"; -// import { syncGithubJob } from "./github.js"; - -// const jobSyncQueue = new Queue(Channel.JobSync, { -// connection: redis, -// }); -// const removeJobSyncJob = (job: JobMq) => -// job.repeatJobKey != null -// ? jobSyncQueue.removeRepeatableByKey(job.repeatJobKey) -// : null; - -// type SyncFunction = (je: Job) => Promise; - -// const getSyncFunction = (agentType: string): SyncFunction | null => { -// if (agentType === String(JobAgentType.GithubApp)) return syncGithubJob; -// return null; -// }; - -// export const createjobSyncWorker = () => -// new Worker( -// Channel.JobSync, -// (job) => -// db -// .select() -// .from(schema.job) -// .innerJoin( -// schema.jobAgent, -// eq(schema.job.jobAgentId, schema.jobAgent.id), -// ) -// .where(eq(schema.job.id, job.data.jobId)) -// .then(takeFirstOrNull) -// .then((je) => { -// if (je == null) return; - -// const syncFunction = getSyncFunction(je.job_agent.type); -// if (syncFunction == null) return; - -// try { -// syncFunction(je.job).then(async (isCompleted) => { -// if (!isCompleted) return; -// removeJobSyncJob(job); -// await onJobCompletion(je.job); -// }); -// } catch (error) { -// db.update(schema.job).set({ -// status: JobStatus.Failure, -// message: (error as Error).message, -// }); -// } -// }), -// { -// connection: redis, -// removeOnComplete: { age: 0, count: 0 }, -// removeOnFail: { age: 0, count: 0 }, -// concurrency: 10, -// }, -// ); diff --git a/apps/event-worker/src/releases/create-job-from-release.ts b/apps/event-worker/src/releases/create-job-from-release.ts new file mode 100644 index 000000000..cebd8b202 --- /dev/null +++ b/apps/event-worker/src/releases/create-job-from-release.ts @@ -0,0 +1,72 @@ +import type { Tx } from "@ctrlplane/db"; +import _ from "lodash"; + +import { eq, takeFirstOrNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { JobStatus } from "@ctrlplane/validators/jobs"; + +const copyReleaseVariables = async ( + tx: Tx, + jobId: string, + releaseId: string, +): Promise => { + const releaseVars = await tx.query.releaseVariable.findMany({ + where: eq(schema.releaseVariable.releaseId, releaseId), + }); + + if (releaseVars.length === 0) return; + + await tx.insert(schema.jobVariable).values( + releaseVars.map((v) => ({ + jobId, + ..._.pick(v, ["key", "value", "sensitive"]), + })), + ); +}; + +const getJobAgentConfig = async (tx: Tx, versionId: string) => { + const { job_agent: jobAgent } = + (await tx + .select() + .from(schema.deploymentVersion) + .innerJoin( + schema.deployment, + eq(schema.deploymentVersion.deploymentId, schema.deployment.id), + ) + .innerJoin( + schema.jobAgent, + eq(schema.deployment.jobAgentId, schema.jobAgent.id), + ) + .where(eq(schema.deploymentVersion.id, versionId)) + .then(takeFirstOrNull)) ?? {}; + + if (jobAgent == null) + throw new Error(`Job agent not found for version ${versionId}`); + return { + jobAgentId: jobAgent.id, + jobAgentConfig: _.merge({}, jobAgent.config), + }; +}; + +export const createJobFromRelease = async ( + release: typeof schema.release.$inferSelect, +) => { + return db.transaction(async (tx) => { + // Create the job + const job = await tx + .insert(schema.job) + .values({ + status: JobStatus.Pending, + ...(await getJobAgentConfig(tx, release.versionId)), + }) + .returning() + .then(takeFirstOrNull); + + if (job == null) return null; + + await copyReleaseVariables(tx, job.id, release.id); + + return job; + }); +}; diff --git a/apps/event-worker/src/releases/create-release.ts b/apps/event-worker/src/releases/create-release.ts index cd529ddaf..c858a2318 100644 --- a/apps/event-worker/src/releases/create-release.ts +++ b/apps/event-worker/src/releases/create-release.ts @@ -1,17 +1,15 @@ import type { ReleaseRepository } from "@ctrlplane/rule-engine"; -import { Queue } from "bullmq"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { ReleaseManager } from "@ctrlplane/release-manager"; -import { Channel } from "@ctrlplane/validators/events"; -import { redis } from "../redis.js"; import { ReleaseRepositoryMutex } from "./mutex.js"; -const evaluate = new Queue(Channel.ReleaseEvaluate, { - connection: redis, -}); +const policyEvaluateQueue = getQueue(Channel.PolicyEvaluate); -export const createAndEvaluateRelease = async ( +const createReleaseWithLock = async ( repo: ReleaseRepository, versionId?: string, ) => { @@ -31,9 +29,34 @@ export const createAndEvaluateRelease = async ( // it causes issues. await releaseManager.setDesiredRelease(release.id); - await evaluate.add(release.id, repo); + await policyEvaluateQueue.add(release.id, repo); } finally { // Always release the mutex lock await mutex.unlock(); } }; + +export const createReleases = async ( + releaseTargets: (typeof schema.releaseTarget.$inferInsert & { + versionId?: string; + })[], +) => { + // First upsert all release targets + await db + .insert(schema.releaseTarget) + .values(releaseTargets) + .onConflictDoNothing(); + + // Create releases and evaluate for each target + await Promise.all( + releaseTargets.map(async (target) => { + const repo = { + deploymentId: target.deploymentId, + environmentId: target.environmentId, + resourceId: target.resourceId, + }; + + await createReleaseWithLock(repo, target.versionId); + }), + ); +}; diff --git a/apps/event-worker/src/releases/deployment-resources.ts b/apps/event-worker/src/releases/deployment-resources.ts deleted file mode 100644 index ecaa3a7ef..000000000 --- a/apps/event-worker/src/releases/deployment-resources.ts +++ /dev/null @@ -1,49 +0,0 @@ -import type { Tx } from "@ctrlplane/db"; -import type { ResourceCondition } from "@ctrlplane/validators/resources"; -import _ from "lodash"; - -import { and, eq, isNull } from "@ctrlplane/db"; -import * as schema from "@ctrlplane/db/schema"; - -/** - * Retrieves all resources for a given deployment - */ -export const getDeploymentResources = async ( - tx: Tx, - deployment: { - id: string; - systemId: string; - resourceSelector?: ResourceCondition | null; - }, -) => { - const system = await tx.query.system.findFirst({ - where: eq(schema.system.id, deployment.systemId), - with: { environments: true }, - }); - - if (system == null) throw new Error("System or deployment not found"); - - const { environments } = system; - - // Simplify the chained operations with standard Promise.all - const resources = await Promise.all( - environments.map(async (env) => { - if (env.resourceSelector == null) return []; - - const res = await tx - .select() - .from(schema.resource) - .where( - and( - eq(schema.resource.workspaceId, system.workspaceId), - isNull(schema.resource.deletedAt), - schema.resourceMatchesMetadata(tx, env.resourceSelector), - schema.resourceMatchesMetadata(tx, deployment.resourceSelector), - ), - ); - return res.map((r) => ({ ...r, environment: env })); - }), - ).then((arrays) => arrays.flat()); - - return resources; -}; diff --git a/apps/event-worker/src/releases/evaluate/index.ts b/apps/event-worker/src/releases/evaluate/index.ts deleted file mode 100644 index 461c80c26..000000000 --- a/apps/event-worker/src/releases/evaluate/index.ts +++ /dev/null @@ -1,50 +0,0 @@ -import type { ReleaseEvaluateEvent } from "@ctrlplane/validators/events"; -import { Worker } from "bullmq"; - -import { db } from "@ctrlplane/db/client"; -import { evaluate, getReleasesFromDb } from "@ctrlplane/rule-engine"; -import { createCtx, getApplicablePolicies } from "@ctrlplane/rule-engine/db"; -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../../redis.js"; -import { ReleaseRepositoryMutex } from "../mutex.js"; - -export const createReleaseEvaluateWorker = () => - new Worker( - Channel.ReleaseEvaluate, - async (job) => { - job.log( - `Evaluating release for deployment ${job.data.deploymentId} and resource ${job.data.resourceId}`, - ); - - const mutex = await ReleaseRepositoryMutex.lock(job.data); - - try { - const ctx = await createCtx(db, job.data); - if (ctx == null) { - job.log( - `Resource ${job.data.resourceId} not found for deployment ${job.data.deploymentId} and environment ${job.data.environmentId}`, - ); - return; - } - - const { workspaceId } = ctx.resource; - const policy = await getApplicablePolicies(db, workspaceId, job.data); - - const result = await evaluate(policy, ctx, getReleasesFromDb(db)); - console.log(result); - } catch (error) { - const message = - error instanceof Error ? error.message : "Unknown error"; - job.log(`Error evaluating release: ${message}`); - } finally { - await mutex.unlock(); - } - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, - removeOnFail: { age: 12 * 60 * 60, count: 5000 }, - concurrency: 100, - }, - ); diff --git a/apps/event-worker/src/releases/latest-release.ts b/apps/event-worker/src/releases/latest-release.ts new file mode 100644 index 000000000..e69de29bb diff --git a/apps/event-worker/src/releases/new-version/index.ts b/apps/event-worker/src/releases/new-version/index.ts deleted file mode 100644 index 1eb13a6bb..000000000 --- a/apps/event-worker/src/releases/new-version/index.ts +++ /dev/null @@ -1,47 +0,0 @@ -import type { ReleaseRepository } from "@ctrlplane/rule-engine"; -import type { ReleaseNewVersionEvent } from "@ctrlplane/validators/events"; -import { Worker } from "bullmq"; -import _ from "lodash"; - -import { eq } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import * as schema from "@ctrlplane/db/schema"; -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../../redis.js"; -import { createAndEvaluateRelease } from "../create-release.js"; -import { getDeploymentResources } from "../deployment-resources.js"; - -export const createReleaseNewVersionWorker = () => - new Worker( - Channel.ReleaseNewVersion, - async (job) => { - const version = await db.query.deploymentVersion.findFirst({ - where: eq(schema.deploymentVersion.id, job.data.versionId), - with: { deployment: true }, - }); - - if (version == null) throw new Error("Version not found"); - - const { deployment } = version; - - const impactedResources = await getDeploymentResources(db, deployment); - const releaseRepos: ReleaseRepository[] = impactedResources.map((r) => ({ - deploymentId: deployment.id, - resourceId: r.id, - environmentId: r.environment.id, - })); - - job.log(`Creating ${releaseRepos.length} releases`); - await Promise.allSettled( - releaseRepos.map((repo) => createAndEvaluateRelease(repo, version.id)), - ); - job.log(`Created ${releaseRepos.length} releases`); - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, - removeOnFail: { age: 12 * 60 * 60, count: 5000 }, - concurrency: 100, - }, - ); diff --git a/apps/event-worker/src/releases/variable-change/index.ts b/apps/event-worker/src/releases/variable-change/index.ts deleted file mode 100644 index cb21b4419..000000000 --- a/apps/event-worker/src/releases/variable-change/index.ts +++ /dev/null @@ -1,104 +0,0 @@ -import type { SQL } from "@ctrlplane/db"; -import type { ReleaseRepository } from "@ctrlplane/rule-engine"; -import type { ReleaseVariableChangeEvent } from "@ctrlplane/validators/events"; -import { Worker } from "bullmq"; - -import { and, eq, isNull, takeFirstOrNull } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import * as schema from "@ctrlplane/db/schema"; -import { - Channel, - releaseDeploymentVariableChangeEvent, - releaseResourceVariableChangeEvent, - releaseSystemVariableChangeEvent, -} from "@ctrlplane/validators/events"; - -import { redis } from "../../redis.js"; -import { createAndEvaluateRelease } from "../create-release.js"; - -const getResourceReleases = async (where: SQL) => - db - .select() - .from(schema.releaseTarget) - .innerJoin( - schema.resource, - eq(schema.releaseTarget.resourceId, schema.resource.id), - ) - .where(and(where, isNull(schema.resource.deletedAt))) - .then((rows) => - rows.map((r) => ({ ...r.release_target, resource: r.resource })), - ); - -const handleDeploymentVariableChange = async (deploymentVariableId: string) => { - const variable = await db - .select() - .from(schema.deploymentVariable) - .where(eq(schema.deploymentVariable.id, deploymentVariableId)) - .then(takeFirstOrNull); - - if (variable == null) throw new Error("Deployment variable not found"); - - return getResourceReleases( - eq(schema.releaseTarget.deploymentId, variable.deploymentId), - ); -}; - -// const handleSystemVariableChange = (_: string) => { -// const variableSet = await db.query.variableSet.findFirst({ -// where: eq(schema.variableSet.id, systemVariableSetId), -// with: { system: true }, -// }); -// if (variableSet == null) throw new Error("System variable set not found"); - -// const { deployment } = variableSet; -// return getDeploymentResources(db, deployment); -// }; - -const handleResourceVariableChange = async (resourceVariableId: string) => { - return getResourceReleases( - eq(schema.releaseTarget.resourceId, resourceVariableId), - ); -}; - -export const createReleaseVariableChangeWorker = () => - new Worker( - Channel.ReleaseVariableChange, - async (job) => { - const repos: ReleaseRepository[] = []; - - const deploymentResult = releaseDeploymentVariableChangeEvent.safeParse( - job.data, - ); - if (deploymentResult.success) - repos.push( - ...(await handleDeploymentVariableChange( - deploymentResult.data.deploymentVariableId, - )), - ); - - const systemResult = releaseSystemVariableChangeEvent.safeParse(job.data); - if (systemResult.success) throw new Error("Not supported yet"); - - const resourceResult = releaseResourceVariableChangeEvent.safeParse( - job.data, - ); - if (resourceResult.success) - repos.push( - ...(await handleResourceVariableChange( - resourceResult.data.resourceVariableId, - )), - ); - - job.log(`Creating ${repos.length} releases`); - await Promise.allSettled( - repos.map((repo) => createAndEvaluateRelease(repo)), - ); - job.log(`Created ${repos.length} releases`); - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, - removeOnFail: { age: 12 * 60 * 60, count: 5000 }, - concurrency: 100, - }, - ); diff --git a/apps/event-worker/src/resource-scan/index.ts b/apps/event-worker/src/resource-scan/index.ts deleted file mode 100644 index 3ead7601e..000000000 --- a/apps/event-worker/src/resource-scan/index.ts +++ /dev/null @@ -1,121 +0,0 @@ -import type { ResourceScanEvent } from "@ctrlplane/validators/events"; -import type { Job } from "bullmq"; -import { Queue, Worker } from "bullmq"; - -import { eq, takeFirstOrNull } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import { - resourceProvider, - resourceProviderAws, - resourceProviderAzure, - resourceProviderGoogle, - workspace, -} from "@ctrlplane/db/schema"; -import { upsertResources } from "@ctrlplane/job-dispatch"; -import { logger } from "@ctrlplane/logger"; -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../redis.js"; -import { getEksResources } from "./aws/eks.js"; -import { getVpcResources as getAwsVpcResources } from "./aws/vpc.js"; -import { getAksResources } from "./azure/aks.js"; -import { getGkeResources } from "./google/gke.js"; -import { getGoogleVMResources } from "./google/vm.js"; -import { getVpcResources as getGoogleVpcResources } from "./google/vpc.js"; - -const log = logger.child({ label: "resource-scan" }); - -const resourceScanQueue = new Queue(Channel.ResourceScan, { - connection: redis, -}); - -const removeResourceJob = (job: Job) => - job.repeatJobKey != null - ? resourceScanQueue.removeRepeatableByKey(job.repeatJobKey) - : null; - -const getResources = async (rp: any) => { - if (rp.resource_provider_google != null) { - const [gkeResources, vpcResources, vmResources] = await Promise.all([ - getGkeResources(rp.workspace, rp.resource_provider_google), - getGoogleVpcResources(rp.workspace, rp.resource_provider_google), - getGoogleVMResources(rp.workspace, rp.resource_provider_google), - ]); - return [...gkeResources, ...vpcResources, ...vmResources]; - } - - if (rp.resource_provider_aws != null) { - const [eksResources, vpcResources] = await Promise.all([ - getEksResources(rp.workspace, rp.resource_provider_aws), - getAwsVpcResources(rp.workspace, rp.resource_provider_aws), - ]); - return [...eksResources, ...vpcResources]; - } - - if (rp.resource_provider_azure != null) - return getAksResources(rp.workspace, rp.resource_provider_azure); - throw new Error("Invalid resource provider"); -}; - -export const createResourceScanWorker = () => - new Worker( - Channel.ResourceScan, - async (job) => { - const { resourceProviderId } = job.data; - - const rp = await db - .select() - .from(resourceProvider) - .where(eq(resourceProvider.id, resourceProviderId)) - .innerJoin(workspace, eq(resourceProvider.workspaceId, workspace.id)) - .leftJoin( - resourceProviderGoogle, - eq(resourceProvider.id, resourceProviderGoogle.resourceProviderId), - ) - .leftJoin( - resourceProviderAws, - eq(resourceProvider.id, resourceProviderAws.resourceProviderId), - ) - .leftJoin( - resourceProviderAzure, - eq(resourceProvider.id, resourceProviderAzure.resourceProviderId), - ) - .then(takeFirstOrNull); - - if (rp == null) { - log.error(`Resource provider with ID ${resourceProviderId} not found.`); - await removeResourceJob(job); - return; - } - - log.info( - `Received scanning request for "${rp.resource_provider.name}" (${resourceProviderId}).`, - ); - - try { - const resources = await getResources(rp); - if (resources.length === 0) { - log.info( - `No resources found for provider ${rp.resource_provider.id}, skipping upsert.`, - ); - return; - } - - log.info( - `Upserting ${resources.length} resources for provider ${rp.resource_provider.id}`, - ); - await upsertResources(db, resources); - } catch (error: any) { - log.error( - `Error scanning/upserting resources for provider ${rp.resource_provider.id}: ${error.message}`, - { error }, - ); - } - }, - { - connection: redis, - removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, - removeOnFail: { age: 12 * 60 * 60, count: 5000 }, - concurrency: 10, - }, - ); diff --git a/apps/event-worker/src/workers/change-deployment-variable.ts b/apps/event-worker/src/workers/change-deployment-variable.ts new file mode 100644 index 000000000..6d25d1079 --- /dev/null +++ b/apps/event-worker/src/workers/change-deployment-variable.ts @@ -0,0 +1,32 @@ +import { createReleases } from "src/releases/create-release"; + +import { eq } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker } from "@ctrlplane/events"; + +/** + * Worker that handles deployment variable changes by triggering evaluations for + * all existing release targets related to that deployment. This keeps the logic + * simple by re-evaluating all affected releases. + * + * Note: This assumes that release targets have been correctly created + * previously. The worker only handles re-evaluation of existing targets. + */ +export const changeDeploymentVariableWorker = createWorker( + Channel.UpdateDeploymentVariable, + async (job) => { + const variable = await db.query.deploymentVariable.findFirst({ + where: eq(schema.deploymentVariable.id, job.data.id), + with: { deployment: true }, + }); + + if (variable == null) throw new Error("Deployment variable not found"); + + const releaseTargets = await db.query.releaseTarget.findMany({ + where: eq(schema.releaseTarget.deploymentId, variable.deploymentId), + }); + + await createReleases(releaseTargets); + }, +); diff --git a/apps/event-worker/src/workers/change-resource-variable.ts b/apps/event-worker/src/workers/change-resource-variable.ts new file mode 100644 index 000000000..b20575ffb --- /dev/null +++ b/apps/event-worker/src/workers/change-resource-variable.ts @@ -0,0 +1,32 @@ +import { createReleases } from "src/releases/create-release"; + +import { eq } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker } from "@ctrlplane/events"; + +/** + * Worker that handles resource variable changes by triggering evaluations for + * all existing release targets related to that resource. This keeps the logic + * simple by re-evaluating all affected releases. + * + * Note: This assumes that release targets have been correctly created + * previously. The worker only handles re-evaluation of existing targets. + */ +export const changeResourceVariableWorker = createWorker( + Channel.UpdateResourceVariable, + async (job) => { + const variable = await db.query.resourceVariable.findFirst({ + where: eq(schema.resourceVariable.id, job.data.id), + with: { resource: true }, + }); + + if (variable == null) throw new Error("Resource variable not found"); + + const releaseTargets = await db.query.releaseTarget.findMany({ + where: eq(schema.releaseTarget.resourceId, variable.resourceId), + }); + + await createReleases(releaseTargets); + }, +); diff --git a/apps/event-worker/src/workers/index.ts b/apps/event-worker/src/workers/index.ts index a97fe170c..a47763c83 100644 --- a/apps/event-worker/src/workers/index.ts +++ b/apps/event-worker/src/workers/index.ts @@ -3,12 +3,27 @@ import type { Worker } from "bullmq"; import { Channel } from "@ctrlplane/events"; +import { changeDeploymentVariableWorker } from "./change-deployment-variable.js"; +import { changeResourceVariableWorker } from "./change-resource-variable.js"; +import { dispatchJobWorker } from "./job-dispatch/index.js"; +import { newDeploymentVersionWorker } from "./new-deployment-version.js"; +import { newDeploymentWorker } from "./new-deployment.js"; +import { policyEvaluateWorker } from "./policy-evaluate.js"; +import { resourceScanWorker } from "./resource-scan/index.js"; + type Workers = { [K in T]: Worker | null; }; export const workers: Workers = { - [Channel.NewDeployment]: null, - [Channel.NewEnvironment]: null, - [Channel.ReleaseEvaluate]: null, + [Channel.ResourceScan]: resourceScanWorker, + [Channel.DispatchJob]: dispatchJobWorker, + + [Channel.NewDeployment]: newDeploymentWorker, + [Channel.NewDeploymentVersion]: newDeploymentVersionWorker, + + [Channel.UpdateDeploymentVariable]: changeDeploymentVariableWorker, + [Channel.UpdateResourceVariable]: changeResourceVariableWorker, + + [Channel.PolicyEvaluate]: policyEvaluateWorker, }; diff --git a/apps/event-worker/src/job-dispatch/github.ts b/apps/event-worker/src/workers/job-dispatch/github.ts similarity index 92% rename from apps/event-worker/src/job-dispatch/github.ts rename to apps/event-worker/src/workers/job-dispatch/github.ts index be74b1bda..7aca03d74 100644 --- a/apps/event-worker/src/job-dispatch/github.ts +++ b/apps/event-worker/src/workers/job-dispatch/github.ts @@ -9,7 +9,7 @@ import { logger } from "@ctrlplane/logger"; import { configSchema } from "@ctrlplane/validators/github"; import { JobStatus } from "@ctrlplane/validators/jobs"; -import { getInstallationOctokit } from "../github-utils.js"; +import { getInstallationOctokit } from "../../github-utils.js"; const log = logger.child({ module: "github-job-dispatch" }); @@ -174,14 +174,20 @@ export const dispatchGithubJob = async (je: Job) => { }); try { - await octokit.actions.createWorkflowDispatch({ - owner: mergedConfig.owner, - repo: mergedConfig.repo, - workflow_id: mergedConfig.workflowId, - ref, - inputs: { job_id: je.id }, - headers, - }); + await octokit.actions + .createWorkflowDispatch({ + owner: mergedConfig.owner, + repo: mergedConfig.repo, + workflow_id: mergedConfig.workflowId, + ref, + inputs: { job_id: je.id }, + headers, + }) + .then(() => + updateJob(db, je.id, { + status: JobStatus.InProgress, + }), + ); } catch (e) { const error = e instanceof Error ? e.message : String(e); log.error(`Failed to create workflow dispatch for job ${je.id}`, { diff --git a/apps/event-worker/src/workers/job-dispatch/index.ts b/apps/event-worker/src/workers/job-dispatch/index.ts new file mode 100644 index 000000000..4c943cf1a --- /dev/null +++ b/apps/event-worker/src/workers/job-dispatch/index.ts @@ -0,0 +1,41 @@ +import { eq, takeFirstOrNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker } from "@ctrlplane/events"; +import { updateJob } from "@ctrlplane/job-dispatch"; +import { JobAgentType, JobStatus } from "@ctrlplane/validators/jobs"; + +import { dispatchGithubJob } from "./github.js"; + +export const dispatchJobWorker = createWorker( + Channel.DispatchJob, + async (job) => { + const { jobId } = job.data; + const je = await db + .select() + .from(schema.job) + .innerJoin(schema.jobAgent, eq(schema.job.jobAgentId, schema.jobAgent.id)) + .where(eq(schema.job.id, jobId)) + .then(takeFirstOrNull); + + if (je == null) { + job.log(`Job ${jobId} not found`); + return; + } + + try { + job.log( + `Dispatching job ${je.job.id} --- ${je.job_agent.type}/${je.job_agent.name}`, + ); + if (je.job_agent.type === String(JobAgentType.GithubApp)) { + job.log(`Dispatching to GitHub app`); + await dispatchGithubJob(je.job); + } + } catch (error: unknown) { + await updateJob(db, je.job.id, { + status: JobStatus.Failure, + message: (error as Error).message, + }); + } + }, +); diff --git a/apps/event-worker/src/workers/new-deployment-version.ts b/apps/event-worker/src/workers/new-deployment-version.ts new file mode 100644 index 000000000..50bb134bf --- /dev/null +++ b/apps/event-worker/src/workers/new-deployment-version.ts @@ -0,0 +1,76 @@ +import type { Tx } from "@ctrlplane/db"; +import _ from "lodash"; + +import { and, eq, isNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker } from "@ctrlplane/events"; + +import { createReleases } from "../releases/create-release.js"; + +const getDeploymentResources = async ( + tx: Tx, + deployment: schema.Deployment, +) => { + const system = await tx.query.system.findFirst({ + where: eq(schema.system.id, deployment.systemId), + with: { environments: true }, + }); + + if (system == null) throw new Error("System or deployment not found"); + + const { environments } = system; + const resources = await Promise.all( + environments.map(async (env) => { + if (env.resourceSelector == null) return []; + + const res = await tx + .select() + .from(schema.resource) + .where( + and( + eq(schema.resource.workspaceId, system.workspaceId), + isNull(schema.resource.deletedAt), + schema.resourceMatchesMetadata(tx, env.resourceSelector), + schema.resourceMatchesMetadata(tx, deployment.resourceSelector), + ), + ); + return res.map((r) => ({ ...r, environment: env })); + }), + ).then((arrays) => arrays.flat()); + + return resources; +}; + +/** + * Worker that handles new deployment versions. When a new version is created + * for a deployment: + * 1. Finds the associated deployment + * 2. Gets all resources that match both the deployment's and environments' + * resource selectors + * 3. Creates release targets mapping resources to environments for this + * deployment + * 4. Creates releases for all targets with the new version, which will trigger + * policy evaluation + */ +export const newDeploymentVersionWorker = createWorker( + Channel.NewDeploymentVersion, + async ({ data: version }) => { + const deployment = await db.query.deployment.findFirst({ + where: eq(schema.deployment.id, version.deploymentId), + }); + + if (!deployment) throw new Error("Deployment not found"); + + const resources = await getDeploymentResources(db, deployment); + + const releaseTargets = resources.map((resource) => ({ + resourceId: resource.id, + environmentId: resource.environment.id, + deploymentId: version.deploymentId, + versionId: version.id, + })); + + await createReleases(releaseTargets); + }, +); diff --git a/apps/event-worker/src/workers/new-deployment.ts b/apps/event-worker/src/workers/new-deployment.ts index b5005a80c..b3104169d 100644 --- a/apps/event-worker/src/workers/new-deployment.ts +++ b/apps/event-worker/src/workers/new-deployment.ts @@ -1,19 +1,15 @@ import type { Tx } from "@ctrlplane/db"; -import type { ResourceCondition } from "@ctrlplane/validators/resources"; import _ from "lodash"; +import { createReleases } from "src/releases/create-release"; -import { and, eq, isNull } from "@ctrlplane/db"; +import { and, desc, eq, isNull } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; -import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { Channel, createWorker } from "@ctrlplane/events"; const getDeploymentResources = async ( tx: Tx, - deployment: { - id: string; - systemId: string; - resourceSelector?: ResourceCondition | null; - }, + deployment: schema.Deployment, ) => { const system = await tx.query.system.findFirst({ where: eq(schema.system.id, deployment.systemId), @@ -23,8 +19,6 @@ const getDeploymentResources = async ( if (system == null) throw new Error("System or deployment not found"); const { environments } = system; - - // Simplify the chained operations with standard Promise.all const resources = await Promise.all( environments.map(async (env) => { if (env.resourceSelector == null) return []; @@ -47,21 +41,25 @@ const getDeploymentResources = async ( return resources; }; -const evaluatedQueue = getQueue(Channel.ReleaseEvaluate); - export const newDeploymentWorker = createWorker( Channel.NewDeployment, - async (job) => { - const resources = await getDeploymentResources(db, job.data); - const jobData = resources.map((r) => { - const resourceId = r.id; - const environmentId = r.environment.id; - const deploymentId = job.data.id; - return { - name: `${resourceId}-${environmentId}-${deploymentId}`, - data: { resourceId, environmentId, deploymentId }, - }; + async ({ data: deployment }) => { + const latestVersion = await db.query.deploymentVersion.findFirst({ + where: eq(schema.deploymentVersion.deploymentId, deployment.id), + orderBy: desc(schema.deploymentVersion.createdAt), }); - await evaluatedQueue.addBulk(jobData); + + if (latestVersion == null) throw new Error("No deployment version found"); + + const resources = await getDeploymentResources(db, deployment); + + const releaseTargets = resources.map((r) => ({ + resourceId: r.id, + environmentId: r.environment.id, + deploymentId: deployment.id, + versionId: latestVersion.id, + })); + + await createReleases(releaseTargets); }, ); diff --git a/apps/event-worker/src/workers/policy-evaluate.ts b/apps/event-worker/src/workers/policy-evaluate.ts new file mode 100644 index 000000000..4c096a1ec --- /dev/null +++ b/apps/event-worker/src/workers/policy-evaluate.ts @@ -0,0 +1,124 @@ +import { and, desc, eq, inArray, takeFirstOrNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { evaluate, getReleasesFromDb } from "@ctrlplane/rule-engine"; +import { getApplicablePolicies } from "@ctrlplane/rule-engine/db"; +import { exitedStatus } from "@ctrlplane/validators/jobs"; + +import { createJobFromRelease } from "../releases/create-job-from-release.js"; +import { ReleaseRepositoryMutex } from "../releases/mutex.js"; + +const getLastDeployedRelease = async (releaseTargetId: string) => { + const lastDeployedJob = await db + .select() + .from(schema.job) + .innerJoin(schema.releaseJob, eq(schema.job.id, schema.releaseJob.jobId)) + .innerJoin( + schema.release, + eq(schema.releaseJob.releaseId, schema.release.id), + ) + .innerJoin( + schema.releaseTarget, + eq(schema.release.releaseTargetId, schema.releaseTarget.id), + ) + .where( + and( + eq(schema.releaseTarget.id, releaseTargetId), + inArray(schema.job.status, exitedStatus), + ), + ) + .orderBy(desc(schema.job.createdAt)) + .limit(1) + .then(takeFirstOrNull); + + return lastDeployedJob?.release ?? null; +}; + +/** + * Worker that evaluates policies for a release target and creates a job if a + * release passes the policies. + * + * When triggered: + * 1. Finds the release target and associated resource, environment, and + * deployment + * 2. Acquires a mutex lock to prevent concurrent modifications + * 3. Gets applicable policies for the workspace and release target + * 4. Evaluates the policies against the release target + * 5. If a release passes policies and hasn't been deployed yet: + * - Creates a new job for the release + * - Dispatches the job for execution + */ +export const policyEvaluateWorker = createWorker( + Channel.PolicyEvaluate, + async (job) => { + const releaseRepo = job.data; + const mutex = await ReleaseRepositoryMutex.lock(releaseRepo); + + try { + const releaseTarget = await db.query.releaseTarget.findFirst({ + where: eq(schema.releaseTarget.resourceId, releaseRepo.resourceId), + with: { + resource: true, + environment: true, + deployment: true, + }, + }); + + if (releaseTarget == null) { + job.log( + `Release target for resource ${releaseRepo.resourceId} not found`, + ); + return; + } + + if (releaseTarget.resource.deletedAt != null) { + job.log(`Resource ${releaseTarget.resource.id} has been deleted`); + return; + } + + const policies = await getApplicablePolicies( + db, + releaseTarget.resource.workspaceId, + releaseRepo, + ); + + const getReleases = getReleasesFromDb(db); + const result = await evaluate(policies, releaseTarget, getReleases); + if (result.chosenRelease == null) { + job.log(`No passing releases found.`); + return; + } + + const { chosenRelease } = result; + const release = await db.query.release.findFirst({ + where: eq(schema.release.id, chosenRelease.id), + with: { jobs: true }, + }); + if (release == null) { + job.log(`Release ${chosenRelease.id} not found`); + return; + } + + const lastDeployedRelease = await getLastDeployedRelease( + releaseTarget.id, + ); + if (lastDeployedRelease?.id === release.id) { + job.log( + `Release ${chosenRelease.id} is the same as the last deployed release`, + ); + return; + } + + const dbJob = await createJobFromRelease(release); + if (dbJob == null) { + job.log(`Failed to create job for release ${chosenRelease.id}`); + return; + } + + getQueue(Channel.DispatchJob).add(dbJob.id, { jobId: dbJob.id }); + } finally { + await mutex.unlock(); + } + }, +); diff --git a/apps/event-worker/src/resource-scan/aws/aws.ts b/apps/event-worker/src/workers/resource-scan/aws/aws.ts similarity index 100% rename from apps/event-worker/src/resource-scan/aws/aws.ts rename to apps/event-worker/src/workers/resource-scan/aws/aws.ts diff --git a/apps/event-worker/src/resource-scan/aws/eks.ts b/apps/event-worker/src/workers/resource-scan/aws/eks.ts similarity index 99% rename from apps/event-worker/src/resource-scan/aws/eks.ts rename to apps/event-worker/src/workers/resource-scan/aws/eks.ts index 9a4681ca8..5ceca9588 100644 --- a/apps/event-worker/src/resource-scan/aws/eks.ts +++ b/apps/event-worker/src/workers/resource-scan/aws/eks.ts @@ -16,7 +16,7 @@ import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; import type { AwsCredentials } from "./aws.js"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; import { assumeRole, assumeWorkspaceRole } from "./aws.js"; const log = logger.child({ label: "resource-scan/eks" }); diff --git a/apps/event-worker/src/resource-scan/aws/vpc.ts b/apps/event-worker/src/workers/resource-scan/aws/vpc.ts similarity index 99% rename from apps/event-worker/src/resource-scan/aws/vpc.ts rename to apps/event-worker/src/workers/resource-scan/aws/vpc.ts index 3890f7782..7ad372a20 100644 --- a/apps/event-worker/src/resource-scan/aws/vpc.ts +++ b/apps/event-worker/src/workers/resource-scan/aws/vpc.ts @@ -14,7 +14,7 @@ import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import type { AwsCredentials } from "./aws.js"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; import { assumeRole, assumeWorkspaceRole } from "./aws.js"; const log = logger.child({ label: "resource-scan/aws/vpc" }); diff --git a/apps/event-worker/src/resource-scan/azure/aks.ts b/apps/event-worker/src/workers/resource-scan/azure/aks.ts similarity index 97% rename from apps/event-worker/src/resource-scan/azure/aks.ts rename to apps/event-worker/src/workers/resource-scan/azure/aks.ts index 79d76c5ad..f2c6f8399 100644 --- a/apps/event-worker/src/resource-scan/azure/aks.ts +++ b/apps/event-worker/src/workers/resource-scan/azure/aks.ts @@ -7,7 +7,7 @@ import { eq, takeFirstOrNull } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as SCHEMA from "@ctrlplane/db/schema"; -import { env } from "../../config.js"; +import { env } from "../../../config.js"; import { convertManagedClusterToResource } from "./cluster-to-resource.js"; const AZURE_CLIENT_ID = env.AZURE_APP_CLIENT_ID; diff --git a/apps/event-worker/src/resource-scan/azure/cluster-to-resource.ts b/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts similarity index 99% rename from apps/event-worker/src/resource-scan/azure/cluster-to-resource.ts rename to apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts index 34748797c..27c5eab57 100644 --- a/apps/event-worker/src/resource-scan/azure/cluster-to-resource.ts +++ b/apps/event-worker/src/workers/resource-scan/azure/cluster-to-resource.ts @@ -11,7 +11,7 @@ import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; const log = logger.child({ module: "resource-scan/azure" }); diff --git a/apps/event-worker/src/resource-scan/google/client.ts b/apps/event-worker/src/workers/resource-scan/google/client.ts similarity index 100% rename from apps/event-worker/src/resource-scan/google/client.ts rename to apps/event-worker/src/workers/resource-scan/google/client.ts diff --git a/apps/event-worker/src/resource-scan/google/cluster-to-resource.ts b/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts similarity index 98% rename from apps/event-worker/src/resource-scan/google/cluster-to-resource.ts rename to apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts index 2fb3d90eb..980d8fb6f 100644 --- a/apps/event-worker/src/resource-scan/google/cluster-to-resource.ts +++ b/apps/event-worker/src/workers/resource-scan/google/cluster-to-resource.ts @@ -5,7 +5,7 @@ import { SemVer } from "semver"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { cloudRegionsGeo } from "@ctrlplane/validators/resources"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; export const clusterToResource = ( workspaceId: string, diff --git a/apps/event-worker/src/resource-scan/google/gke.ts b/apps/event-worker/src/workers/resource-scan/google/gke.ts similarity index 100% rename from apps/event-worker/src/resource-scan/google/gke.ts rename to apps/event-worker/src/workers/resource-scan/google/gke.ts diff --git a/apps/event-worker/src/resource-scan/google/kube.ts b/apps/event-worker/src/workers/resource-scan/google/kube.ts similarity index 100% rename from apps/event-worker/src/resource-scan/google/kube.ts rename to apps/event-worker/src/workers/resource-scan/google/kube.ts diff --git a/apps/event-worker/src/resource-scan/google/vcluster.ts b/apps/event-worker/src/workers/resource-scan/google/vcluster.ts similarity index 100% rename from apps/event-worker/src/resource-scan/google/vcluster.ts rename to apps/event-worker/src/workers/resource-scan/google/vcluster.ts diff --git a/apps/event-worker/src/resource-scan/google/vm.ts b/apps/event-worker/src/workers/resource-scan/google/vm.ts similarity index 99% rename from apps/event-worker/src/resource-scan/google/vm.ts rename to apps/event-worker/src/workers/resource-scan/google/vm.ts index 261e07754..3df339a8c 100644 --- a/apps/event-worker/src/resource-scan/google/vm.ts +++ b/apps/event-worker/src/workers/resource-scan/google/vm.ts @@ -7,7 +7,7 @@ import _ from "lodash"; import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; import { getGoogleClient } from "./client.js"; const log = logger.child({ module: "resource-scan/gke/vm" }); diff --git a/apps/event-worker/src/resource-scan/google/vpc.ts b/apps/event-worker/src/workers/resource-scan/google/vpc.ts similarity index 99% rename from apps/event-worker/src/resource-scan/google/vpc.ts rename to apps/event-worker/src/workers/resource-scan/google/vpc.ts index 45db1a632..8790e3040 100644 --- a/apps/event-worker/src/resource-scan/google/vpc.ts +++ b/apps/event-worker/src/workers/resource-scan/google/vpc.ts @@ -15,7 +15,7 @@ import { isPresent } from "ts-is-present"; import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; -import { omitNullUndefined } from "../../utils.js"; +import { omitNullUndefined } from "../../../utils.js"; import { getGoogleClient } from "./client.js"; const log = logger.child({ label: "resource-scan/google/vpc" }); diff --git a/apps/event-worker/src/workers/resource-scan/index.ts b/apps/event-worker/src/workers/resource-scan/index.ts new file mode 100644 index 000000000..fa4eea9cd --- /dev/null +++ b/apps/event-worker/src/workers/resource-scan/index.ts @@ -0,0 +1,109 @@ +import type { Job } from "bullmq"; + +import { eq, takeFirstOrNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import { + resourceProvider, + resourceProviderAws, + resourceProviderAzure, + resourceProviderGoogle, + workspace, +} from "@ctrlplane/db/schema"; +import { Channel, createWorker, getQueue } from "@ctrlplane/events"; +import { upsertResources } from "@ctrlplane/job-dispatch"; +import { logger } from "@ctrlplane/logger"; + +import { getEksResources } from "./aws/eks.js"; +import { getVpcResources as getAwsVpcResources } from "./aws/vpc.js"; +import { getAksResources } from "./azure/aks.js"; +import { getGkeResources } from "./google/gke.js"; +import { getGoogleVMResources } from "./google/vm.js"; +import { getVpcResources as getGoogleVpcResources } from "./google/vpc.js"; + +const log = logger.child({ label: "resource-scan" }); + +const resourceScanQueue = getQueue(Channel.ResourceScan); + +const removeResourceJob = (job: Job) => + job.repeatJobKey != null + ? resourceScanQueue.removeRepeatableByKey(job.repeatJobKey) + : null; + +const getResources = async (rp: any) => { + if (rp.resource_provider_google != null) { + const [gkeResources, vpcResources, vmResources] = await Promise.all([ + getGkeResources(rp.workspace, rp.resource_provider_google), + getGoogleVpcResources(rp.workspace, rp.resource_provider_google), + getGoogleVMResources(rp.workspace, rp.resource_provider_google), + ]); + return [...gkeResources, ...vpcResources, ...vmResources]; + } + + if (rp.resource_provider_aws != null) { + const [eksResources, vpcResources] = await Promise.all([ + getEksResources(rp.workspace, rp.resource_provider_aws), + getAwsVpcResources(rp.workspace, rp.resource_provider_aws), + ]); + return [...eksResources, ...vpcResources]; + } + + if (rp.resource_provider_azure != null) + return getAksResources(rp.workspace, rp.resource_provider_azure); + throw new Error("Invalid resource provider"); +}; + +export const resourceScanWorker = createWorker( + Channel.ResourceScan, + async (job) => { + const { resourceProviderId } = job.data; + + const rp = await db + .select() + .from(resourceProvider) + .where(eq(resourceProvider.id, resourceProviderId)) + .innerJoin(workspace, eq(resourceProvider.workspaceId, workspace.id)) + .leftJoin( + resourceProviderGoogle, + eq(resourceProvider.id, resourceProviderGoogle.resourceProviderId), + ) + .leftJoin( + resourceProviderAws, + eq(resourceProvider.id, resourceProviderAws.resourceProviderId), + ) + .leftJoin( + resourceProviderAzure, + eq(resourceProvider.id, resourceProviderAzure.resourceProviderId), + ) + .then(takeFirstOrNull); + + if (rp == null) { + log.error(`Resource provider with ID ${resourceProviderId} not found.`); + await removeResourceJob(job); + return; + } + + log.info( + `Received scanning request for "${rp.resource_provider.name}" (${resourceProviderId}).`, + ); + + try { + const resources = await getResources(rp); + if (resources.length === 0) { + log.info( + `No resources found for provider ${rp.resource_provider.id}, skipping upsert.`, + ); + return; + } + + log.info( + `Upserting ${resources.length} resources for provider ${rp.resource_provider.id}`, + ); + await upsertResources(db, resources); + } catch (error: any) { + log.error( + `Error scanning/upserting resources for provider ${rp.resource_provider.id}: ${error.message}`, + { error }, + ); + } + }, +); diff --git a/apps/webservice/Dockerfile b/apps/webservice/Dockerfile index 1812f47a7..8de30b44a 100644 --- a/apps/webservice/Dockerfile +++ b/apps/webservice/Dockerfile @@ -37,6 +37,7 @@ COPY packages/job-dispatch/package.json ./packages/job-dispatch/package.json COPY packages/ui/package.json ./packages/ui/package.json COPY packages/logger/package.json ./packages/logger/package.json COPY packages/secrets/package.json ./packages/secrets/package.json +COPY packages/events/package.json ./packages/events/package.json COPY apps/webservice/package.json ./apps/webservice/package.json diff --git a/apps/webservice/package.json b/apps/webservice/package.json index 367955b5a..769b6e5aa 100644 --- a/apps/webservice/package.json +++ b/apps/webservice/package.json @@ -19,6 +19,7 @@ "@ctrlplane/api": "workspace:*", "@ctrlplane/auth": "workspace:*", "@ctrlplane/db": "workspace:*", + "@ctrlplane/events": "workspace:*", "@ctrlplane/job-dispatch": "workspace:*", "@ctrlplane/logger": "workspace:*", "@ctrlplane/secrets": "workspace:*", diff --git a/apps/webservice/src/app/api/v1/deployment-versions/route.ts b/apps/webservice/src/app/api/v1/deployment-versions/route.ts index 871a65a7b..4808f77b2 100644 --- a/apps/webservice/src/app/api/v1/deployment-versions/route.ts +++ b/apps/webservice/src/app/api/v1/deployment-versions/route.ts @@ -2,7 +2,6 @@ import { NextResponse } from "next/server"; import httpStatus from "http-status"; import { z } from "zod"; -import { releaseNewVersion } from "@ctrlplane/api/queues"; import { and, buildConflictUpdateColumns, @@ -12,6 +11,7 @@ import { } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { cancelOldReleaseJobTriggersOnJobDispatch, createJobApprovals, @@ -110,9 +110,7 @@ export const POST = request() depVersion.status === DeploymentVersionStatus.Ready); if (shouldTrigger) { - releaseNewVersion.add(depVersion.id, { - versionId: depVersion.id, - }); + getQueue(Channel.NewDeploymentVersion).add(depVersion.id, depVersion); await createReleaseJobTriggers(db, "new_version") .causedById(ctx.user.id) diff --git a/packages/api/package.json b/packages/api/package.json index 920e416bf..7a13aed93 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -26,6 +26,7 @@ "@aws-sdk/client-iam": "^3.696.0", "@ctrlplane/auth": "workspace:*", "@ctrlplane/db": "workspace:*", + "@ctrlplane/events": "workspace:*", "@ctrlplane/job-dispatch": "workspace:*", "@ctrlplane/logger": "workspace:*", "@ctrlplane/secrets": "workspace:*", diff --git a/packages/api/src/queues/index.ts b/packages/api/src/queues/index.ts deleted file mode 100644 index 3d4d88159..000000000 --- a/packages/api/src/queues/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from "./releases"; -export * from "./resoure-scan"; diff --git a/packages/api/src/queues/releases.ts b/packages/api/src/queues/releases.ts deleted file mode 100644 index c3fb6eb65..000000000 --- a/packages/api/src/queues/releases.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { - ReleaseNewVersionEvent, - ReleaseVariableChangeEvent, -} from "@ctrlplane/validators/events"; -import { Queue } from "bullmq"; - -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../redis"; - -export const releaseNewVersion = new Queue( - Channel.ReleaseNewVersion, - { connection: redis }, -); - -export const releaseVariableChange = new Queue( - Channel.ReleaseVariableChange, - { connection: redis }, -); diff --git a/packages/api/src/queues/resoure-scan.ts b/packages/api/src/queues/resoure-scan.ts deleted file mode 100644 index a76516c17..000000000 --- a/packages/api/src/queues/resoure-scan.ts +++ /dev/null @@ -1,11 +0,0 @@ -import type { ResourceScanEvent } from "@ctrlplane/validators/events"; -import { Queue } from "bullmq"; - -import { Channel } from "@ctrlplane/validators/events"; - -import { redis } from "../redis"; - -export const resourceScanQueue = new Queue( - Channel.ResourceScan, - { connection: redis }, -); diff --git a/packages/api/src/router/deployment-version.ts b/packages/api/src/router/deployment-version.ts index 1ab5e4c9d..06840246d 100644 --- a/packages/api/src/router/deployment-version.ts +++ b/packages/api/src/router/deployment-version.ts @@ -19,6 +19,7 @@ import { } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as SCHEMA from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { cancelOldReleaseJobTriggersOnJobDispatch, createJobApprovals, @@ -42,7 +43,6 @@ import { DeploymentVersionStatus, } from "@ctrlplane/validators/releases"; -import { releaseNewVersion } from "../queues"; import { createTRPCRouter, protectedProcedure } from "../trpc"; import { versionDeployRouter } from "./version-deploy"; import { deploymentVersionMetadataKeysRouter } from "./version-metadata-keys"; @@ -338,7 +338,7 @@ export const versionRouter = createTRPCRouter({ if (versionDeps.length > 0) await db.insert(SCHEMA.versionDependency).values(versionDeps); - await releaseNewVersion.add(rel.id, { versionId: rel.id }); + getQueue(Channel.NewDeploymentVersion).add(rel.id, rel); const releaseJobTriggers = await createReleaseJobTriggers( db, diff --git a/packages/api/src/router/resource-provider.ts b/packages/api/src/router/resource-provider.ts index bf11dca9d..e099f6562 100644 --- a/packages/api/src/router/resource-provider.ts +++ b/packages/api/src/router/resource-provider.ts @@ -23,9 +23,9 @@ import { updateResourceProviderAws, updateResourceProviderGoogle, } from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { Permission } from "@ctrlplane/validators/auth"; -import { resourceScanQueue } from "../queues"; import { createTRPCRouter, protectedProcedure } from "../trpc"; import { resourceProviderPageRouter } from "./resource-provider-page/router"; @@ -174,7 +174,9 @@ export const resourceProviderRouter = createTRPCRouter({ }) .input(z.string().uuid()) .mutation(({ input }) => - resourceScanQueue.add(input, { resourceProviderId: input }), + getQueue(Channel.ResourceScan).add(input, { + resourceProviderId: input, + }), ), google: createTRPCRouter({ @@ -210,7 +212,7 @@ export const resourceProviderRouter = createTRPCRouter({ .returning() .then(takeFirst); - await resourceScanQueue.add( + await getQueue(Channel.ResourceScan).add( tg.id, { resourceProviderId: tg.id }, { repeat: { every: ms("10m"), immediately: true } }, @@ -254,8 +256,10 @@ export const resourceProviderRouter = createTRPCRouter({ .then(takeFirst); if (input.repeatSeconds != null) { - await resourceScanQueue.remove(input.resourceProviderId); - await resourceScanQueue.add( + await getQueue(Channel.ResourceScan).remove( + input.resourceProviderId, + ); + await getQueue(Channel.ResourceScan).add( input.resourceProviderId, { resourceProviderId: input.resourceProviderId }, { @@ -268,7 +272,7 @@ export const resourceProviderRouter = createTRPCRouter({ return; } - await resourceScanQueue.add(input.resourceProviderId, { + await getQueue(Channel.ResourceScan).add(input.resourceProviderId, { resourceProviderId: input.resourceProviderId, }); }); @@ -308,7 +312,7 @@ export const resourceProviderRouter = createTRPCRouter({ .returning() .then(takeFirst); - await resourceScanQueue.add( + await getQueue(Channel.ResourceScan).add( provider.id, { resourceProviderId: provider.id }, { repeat: { every: ms("10m"), immediately: true } }, @@ -358,8 +362,10 @@ export const resourceProviderRouter = createTRPCRouter({ .then(takeFirst); if (input.repeatSeconds != null) { - await resourceScanQueue.remove(input.resourceProviderId); - await resourceScanQueue.add( + await getQueue(Channel.ResourceScan).remove( + input.resourceProviderId, + ); + await getQueue(Channel.ResourceScan).add( input.resourceProviderId, { resourceProviderId: input.resourceProviderId }, { @@ -371,7 +377,7 @@ export const resourceProviderRouter = createTRPCRouter({ ); return; } - await resourceScanQueue.add(input.resourceProviderId, { + await getQueue(Channel.ResourceScan).add(input.resourceProviderId, { resourceProviderId: input.resourceProviderId, }); }); @@ -450,7 +456,7 @@ export const resourceProviderRouter = createTRPCRouter({ // We should think about the edge case here, if a scan is in progress, // what do we do? - await resourceScanQueue.remove(input.providerId); + await getQueue(Channel.ResourceScan).remove(input.providerId); return deletedProvider; }), diff --git a/packages/events/src/index.ts b/packages/events/src/index.ts index db5c1c7d5..ed44ed8d4 100644 --- a/packages/events/src/index.ts +++ b/packages/events/src/index.ts @@ -14,14 +14,14 @@ export const createWorker = ( removeOnComplete: { age: 1 * 60 * 60, count: 5000 }, removeOnFail: { age: 12 * 60 * 60, count: 5000 }, concurrency: 100, + autorun: true, ...opts, }); const _queues = new Map(); export const getQueue = (name: T) => { - if (!_queues.has(name)) { + if (!_queues.has(name)) _queues.set(name, new Queue(String(name), { connection: bullmqRedis })); - } return _queues.get(name) as Queue; }; diff --git a/packages/events/src/types.ts b/packages/events/src/types.ts index 01545ef7c..b3d56bb15 100644 --- a/packages/events/src/types.ts +++ b/packages/events/src/types.ts @@ -1,29 +1,31 @@ import type * as schema from "@ctrlplane/db/schema"; export enum Channel { - JobSync = "job-sync", DispatchJob = "dispatch-job", - ResourceScan = "resource-scan", - ReleaseNewVersion = "release-new-version", - ReleaseNewRepository = "release-new-repository", - ReleaseVariableChange = "release-variable-change", + ResourceScan = "resource-scan", NewDeployment = "new-deployment", NewEnvironment = "new-environment", - NewRelease = "new-release", - ReleaseEvaluate = "release-evaluate", + NewDeploymentVersion = "new-deployment-version", + + PolicyEvaluate = "policy-evaluate", + UpdateDeploymentVariable = "update-deployment-variable", + UpdateResourceVariable = "update-resource-variable", } -export type ReleaseEvaluateJobData = { +export type PolicyEvaluateJobData = { environmentId: string; resourceId: string; deploymentId: string; }; export type ChannelMap = { - // [Channel.UpsertRelease]: typeof schema.release.$inferInsert; + [Channel.ResourceScan]: { resourceProviderId: string }; + [Channel.NewDeploymentVersion]: typeof schema.deploymentVersion.$inferSelect; [Channel.NewDeployment]: typeof schema.deployment.$inferSelect; - [Channel.NewEnvironment]: typeof schema.environment.$inferSelect; - [Channel.ReleaseEvaluate]: ReleaseEvaluateJobData; + [Channel.PolicyEvaluate]: PolicyEvaluateJobData; + [Channel.DispatchJob]: { jobId: string }; + [Channel.UpdateDeploymentVariable]: typeof schema.deploymentVariable.$inferSelect; + [Channel.UpdateResourceVariable]: typeof schema.resourceVariable.$inferSelect; }; diff --git a/packages/job-dispatch/package.json b/packages/job-dispatch/package.json index 12cf33fef..34c3b8d5b 100644 --- a/packages/job-dispatch/package.json +++ b/packages/job-dispatch/package.json @@ -26,6 +26,7 @@ }, "dependencies": { "@ctrlplane/db": "workspace:*", + "@ctrlplane/events": "workspace:*", "@ctrlplane/logger": "workspace:*", "@ctrlplane/secrets": "workspace:*", "@ctrlplane/validators": "workspace:*", diff --git a/packages/job-dispatch/src/index.ts b/packages/job-dispatch/src/index.ts index a4041555b..ce6b10ba9 100644 --- a/packages/job-dispatch/src/index.ts +++ b/packages/job-dispatch/src/index.ts @@ -8,7 +8,6 @@ export * from "./policy-checker.js"; export * from "./policy-create.js"; export * from "./release-sequencing.js"; export * from "./lock-checker.js"; -export * from "./queue.js"; export * from "./cancel-previous-jobs.js"; export { isDateInTimeWindow } from "./utils.js"; diff --git a/packages/job-dispatch/src/job-dispatch.ts b/packages/job-dispatch/src/job-dispatch.ts index 19077c8bb..514e87a2b 100644 --- a/packages/job-dispatch/src/job-dispatch.ts +++ b/packages/job-dispatch/src/job-dispatch.ts @@ -3,12 +3,12 @@ import _ from "lodash"; import { eq, inArray, takeFirst } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { JobStatus } from "@ctrlplane/validators/jobs"; import { createTriggeredRunbookJob } from "./job-creation.js"; import { updateJob } from "./job-update.js"; import { createReleaseVariables } from "./job-variables-deployment/job-variables-deployment.js"; -import { dispatchJobsQueue } from "./queue.js"; export type DispatchFilterFunc = ( db: Tx, @@ -93,18 +93,12 @@ class DispatchBuilder { ); if (validJobsWithResolvedVariables.length > 0) { - await dispatchJobsQueue.addBulk( + await getQueue(Channel.DispatchJob).addBulk( validJobsWithResolvedVariables.map((wf) => ({ name: wf.id, data: { jobId: wf.id }, })), ); - - await Promise.all( - validJobsWithResolvedVariables.map((j) => - updateJob(this.db, j.id, { status: JobStatus.InProgress }), - ), - ); } await Promise.all( @@ -133,6 +127,6 @@ export const dispatchRunbook = async ( .where(eq(schema.runbook.id, runbookId)) .then(takeFirst); const job = await createTriggeredRunbookJob(db, runbook, values); - await dispatchJobsQueue.add(job.id, { jobId: job.id }); + await getQueue(Channel.DispatchJob).add(job.id, { jobId: job.id }); return job; }; diff --git a/packages/job-dispatch/src/queue.ts b/packages/job-dispatch/src/queue.ts index 8d0c90712..9af0baba5 100644 --- a/packages/job-dispatch/src/queue.ts +++ b/packages/job-dispatch/src/queue.ts @@ -1,14 +1,3 @@ -import type { DispatchJobEvent } from "@ctrlplane/validators/events"; -import { Queue } from "bullmq"; -import IORedis from "ioredis"; +import { Channel, getQueue } from "@ctrlplane/events"; -import { Channel } from "@ctrlplane/validators/events"; - -import { env } from "./config.js"; - -const connection = new IORedis(env.REDIS_URL, { maxRetriesPerRequest: null }); - -export const dispatchJobsQueue = new Queue( - Channel.DispatchJob, - { connection }, -); +export const dispatchJobsQueue = getQueue(Channel.DispatchJob); diff --git a/packages/rule-engine/src/evaluate.ts b/packages/rule-engine/src/evaluate.ts index cd21a62ba..82a37311e 100644 --- a/packages/rule-engine/src/evaluate.ts +++ b/packages/rule-engine/src/evaluate.ts @@ -1,4 +1,8 @@ -import type { DeploymentResourceContext, GetReleasesFunc } from "./types"; +import type { + DeploymentResourceContext, + DeploymentResourceSelectionResult, + GetReleasesFunc, +} from "./types"; import type { Policy } from "./types.js"; import { Releases } from "./releases.js"; import { RuleEngine } from "./rule-engine.js"; @@ -33,7 +37,7 @@ export const evaluate = async ( policy: Policy | Policy[] | null, context: DeploymentResourceContext, getReleases: GetReleasesFunc, -) => { +): Promise => { const policies = policy == null ? [] : Array.isArray(policy) ? policy : [policy]; @@ -41,7 +45,8 @@ export const evaluate = async ( if (mergedPolicy == null) return { allowed: false, - release: undefined, + chosenRelease: null, + rejectionReasons: new Map(), }; const rules = [...denyWindows(mergedPolicy)]; diff --git a/packages/rule-engine/src/rule-engine.ts b/packages/rule-engine/src/rule-engine.ts index bab004748..3a168614e 100644 --- a/packages/rule-engine/src/rule-engine.ts +++ b/packages/rule-engine/src/rule-engine.ts @@ -108,6 +108,7 @@ export class RuleEngine { if (result.allowedReleases.isEmpty()) { return { allowed: false, + chosenRelease: null, rejectionReasons: result.rejectionReasons ?? rejectionReasons, }; } @@ -128,6 +129,7 @@ export class RuleEngine { return chosen == null ? { allowed: false, + chosenRelease: null, rejectionReasons, } : { diff --git a/packages/rule-engine/src/types.ts b/packages/rule-engine/src/types.ts index b6e99fe32..b4f9e8d17 100644 --- a/packages/rule-engine/src/types.ts +++ b/packages/rule-engine/src/types.ts @@ -48,7 +48,7 @@ export type DeploymentResourceRuleResult = { export type DeploymentResourceSelectionResult = { allowed: boolean; - chosenRelease?: Release; + chosenRelease: Release | null; rejectionReasons: Map; }; diff --git a/packages/validators/src/events/index.ts b/packages/validators/src/events/index.ts index c7cbc429a..746d17902 100644 --- a/packages/validators/src/events/index.ts +++ b/packages/validators/src/events/index.ts @@ -1,53 +1 @@ -import { z } from "zod"; - export * from "./hooks/index.js"; - -export enum Channel { - JobSync = "job-sync", - DispatchJob = "dispatch-job", - ResourceScan = "resource-scan", - ReleaseEvaluate = "release-evaluate", - ReleaseNewVersion = "release-new-version", - ReleaseNewRepository = "release-new-repository", - ReleaseVariableChange = "release-variable-change", -} - -export const resourceScanEvent = z.object({ resourceProviderId: z.string() }); -export type ResourceScanEvent = z.infer; - -export const dispatchJobEvent = z.object({ - jobId: z.string(), -}); -export type DispatchJobEvent = z.infer; - -export const jobSyncEvent = z.object({ jobId: z.string() }); -export type JobSyncEvent = z.infer; - -export const releaseEvaluateEvent = z.object({ - deploymentId: z.string(), - environmentId: z.string(), - resourceId: z.string(), -}); -export type ReleaseEvaluateEvent = z.infer; - -export const releaseNewVersionEvent = z.object({ versionId: z.string() }); -export type ReleaseNewVersionEvent = z.infer; - -export const releaseResourceVariableChangeEvent = z.object({ - resourceVariableId: z.string().uuid(), -}); -export const releaseDeploymentVariableChangeEvent = z.object({ - deploymentVariableId: z.string().uuid(), -}); -export const releaseSystemVariableChangeEvent = z.object({ - systemVariableSetId: z.string().uuid(), -}); - -export const releaseVariableChangeEvent = z.union([ - releaseResourceVariableChangeEvent, - releaseDeploymentVariableChangeEvent, - releaseSystemVariableChangeEvent, -]); -export type ReleaseVariableChangeEvent = z.infer< - typeof releaseVariableChangeEvent ->; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 03a641294..7e3dc15b9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -448,6 +448,9 @@ importers: '@ctrlplane/db': specifier: workspace:* version: link:../../packages/db + '@ctrlplane/events': + specifier: workspace:* + version: link:../../packages/events '@ctrlplane/job-dispatch': specifier: workspace:* version: link:../../packages/job-dispatch @@ -832,6 +835,9 @@ importers: '@ctrlplane/db': specifier: workspace:* version: link:../db + '@ctrlplane/events': + specifier: workspace:* + version: link:../events '@ctrlplane/job-dispatch': specifier: workspace:* version: link:../job-dispatch @@ -1185,6 +1191,9 @@ importers: '@ctrlplane/db': specifier: workspace:* version: link:../db + '@ctrlplane/events': + specifier: workspace:* + version: link:../events '@ctrlplane/logger': specifier: workspace:* version: link:../logger