Skip to content

Zacharyb/fix job dispatch status #443

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions apps/event-worker/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,13 @@
import { logger } from "@ctrlplane/logger";

import { register } from "./instrumentation.js";
import { createDispatchExecutionJobWorker } from "./job-dispatch/index.js";
import { redis } from "./redis.js";
import { createReleaseNewVersionWorker } from "./releases/new-version/index.js";
import { createReleaseVariableChangeWorker } from "./releases/variable-change/index.js";
import { createResourceScanWorker } from "./resource-scan/index.js";
import { workers } from "./workers/index.js";

console.log("Registering instrumentation...");
await register();

const allWorkers = [
createResourceScanWorker(),
createDispatchExecutionJobWorker(),
createReleaseNewVersionWorker(),
createReleaseVariableChangeWorker(),
...Object.values(workers),
];
const allWorkers = Object.values(workers);

const shutdown = () => {
logger.warn("Exiting...");
Expand Down
57 changes: 0 additions & 57 deletions apps/event-worker/src/job-dispatch/index.ts

This file was deleted.

69 changes: 0 additions & 69 deletions apps/event-worker/src/job-sync/index.ts

This file was deleted.

72 changes: 72 additions & 0 deletions apps/event-worker/src/releases/create-job-from-release.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import type { Tx } from "@ctrlplane/db";
import _ from "lodash";

import { eq, takeFirstOrNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { JobStatus } from "@ctrlplane/validators/jobs";

const copyReleaseVariables = async (
tx: Tx,
jobId: string,
releaseId: string,
): Promise<void> => {
const releaseVars = await tx.query.releaseVariable.findMany({
where: eq(schema.releaseVariable.releaseId, releaseId),
});

if (releaseVars.length === 0) return;

await tx.insert(schema.jobVariable).values(
releaseVars.map((v) => ({
jobId,
..._.pick(v, ["key", "value", "sensitive"]),
})),
);
};

const getJobAgentConfig = async (tx: Tx, versionId: string) => {
const { job_agent: jobAgent } =
(await tx
.select()
.from(schema.deploymentVersion)
.innerJoin(
schema.deployment,
eq(schema.deploymentVersion.deploymentId, schema.deployment.id),
)
.innerJoin(
schema.jobAgent,
eq(schema.deployment.jobAgentId, schema.jobAgent.id),
)
.where(eq(schema.deploymentVersion.id, versionId))
.then(takeFirstOrNull)) ?? {};

if (jobAgent == null)
throw new Error(`Job agent not found for version ${versionId}`);
return {
jobAgentId: jobAgent.id,
jobAgentConfig: _.merge({}, jobAgent.config),
};
};

export const createJobFromRelease = async (
release: typeof schema.release.$inferSelect,
) => {
return db.transaction(async (tx) => {
// Create the job
const job = await tx
.insert(schema.job)
.values({
status: JobStatus.Pending,
...(await getJobAgentConfig(tx, release.versionId)),
})
.returning()
.then(takeFirstOrNull);

if (job == null) return null;

await copyReleaseVariables(tx, job.id, release.id);

return job;
});
};
39 changes: 31 additions & 8 deletions apps/event-worker/src/releases/create-release.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import type { ReleaseRepository } from "@ctrlplane/rule-engine";
import { Queue } from "bullmq";

import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
import { ReleaseManager } from "@ctrlplane/release-manager";
import { Channel } from "@ctrlplane/validators/events";

import { redis } from "../redis.js";
import { ReleaseRepositoryMutex } from "./mutex.js";

const evaluate = new Queue(Channel.ReleaseEvaluate, {
connection: redis,
});
const policyEvaluateQueue = getQueue(Channel.PolicyEvaluate);

export const createAndEvaluateRelease = async (
const createReleaseWithLock = async (
repo: ReleaseRepository,
versionId?: string,
) => {
Expand All @@ -31,9 +29,34 @@ export const createAndEvaluateRelease = async (
// it causes issues.
await releaseManager.setDesiredRelease(release.id);

await evaluate.add(release.id, repo);
await policyEvaluateQueue.add(release.id, repo);
} finally {
// Always release the mutex lock
await mutex.unlock();
}
};

export const createReleases = async (
releaseTargets: (typeof schema.releaseTarget.$inferInsert & {
versionId?: string;
})[],
) => {
// First upsert all release targets
await db
.insert(schema.releaseTarget)
.values(releaseTargets)
.onConflictDoNothing();

// Create releases and evaluate for each target
await Promise.all(
releaseTargets.map(async (target) => {
const repo = {
deploymentId: target.deploymentId,
environmentId: target.environmentId,
resourceId: target.resourceId,
};

await createReleaseWithLock(repo, target.versionId);
}),
);
};
49 changes: 0 additions & 49 deletions apps/event-worker/src/releases/deployment-resources.ts

This file was deleted.

50 changes: 0 additions & 50 deletions apps/event-worker/src/releases/evaluate/index.ts

This file was deleted.

Empty file.
Loading
Loading