Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ghost/core/core/server/services/jobs/job-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,26 @@ const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const WorkerModelEventBridge = require('./worker-model-event-bridge');
const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
logging.error(error);
sentry.captureException(error);
};
const events = require('../../lib/common/events');
const workerModelEventBridge = new WorkerModelEventBridge({models, events, logging, sentry});

const workerMessageHandler = ({name, message}) => {
if (workerModelEventBridge.isModelEventMessage(message)) {
workerModelEventBridge.handle({...message}, {jobName: name});

// @tryghost/job-manager treats any object message with an `event` property as
// a raw domain event. Model events are handled above and should not go
// through that path.
delete message.event;
return;
}

if (typeof message === 'string') {
logging.info(`Worker for job ${name} sent a message: ${message}`);
}
Expand Down
135 changes: 135 additions & 0 deletions ghost/core/core/server/services/jobs/worker-model-event-bridge.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
const moment = require('moment');

const MODEL_EVENT_TYPE = 'model-event';
const SUPPORTED_MODEL_EVENTS = {
'member.edited': {
model: 'Member'
}
};

class WorkerModelEventBridge {
constructor({models, events, logging, sentry}) {
this.models = models;
this.events = events;
this.logging = logging;
this.sentry = sentry;
}

isModelEventMessage(message) {
return isObject(message) && message.type === MODEL_EVENT_TYPE;
}

async handle(message, meta = {}) {
const validationError = this.validate(message);

if (validationError) {
this.logging.warn(`Ignoring invalid worker model event from job ${meta.jobName || 'unknown'}: ${validationError}`);
return false;
}

try {
return await this.emitModelEvent(message);
} catch (err) {
this.logging.error(err);
this.sentry.captureException(err);
return false;
}
}

validate(message) {
if (!this.isModelEventMessage(message)) {
return 'Unexpected message type';
}

const supportedEvent = SUPPORTED_MODEL_EVENTS[message.event];

if (!supportedEvent || supportedEvent.model !== message.model) {
return `Unsupported model event ${message.event || 'unknown'} for model ${message.model || 'unknown'}`;
}

if (!message.id || typeof message.id !== 'string') {
return 'Missing model id';
}

if (!isObject(message.previous)) {
return 'Missing previous attributes';
}

if (!isObject(message.changed) || !Object.keys(message.changed).length) {
return 'Missing changed attributes';
}

if (message.options && !isObject(message.options)) {
return 'Invalid options';
}
Comment on lines +62 to +64

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Reject options: null during validation to avoid false handler-error reporting

Line 62 uses a truthy guard, so options: null bypasses validation and later throws at Line 103 when reading options.context, which gets reported as an internal handler error instead of an invalid payload.

Proposed fix
-        if (message.options && !isObject(message.options)) {
+        if (message.options !== undefined && !isObject(message.options)) {
             return 'Invalid options';
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (message.options && !isObject(message.options)) {
return 'Invalid options';
}
if (message.options !== undefined && !isObject(message.options)) {
return 'Invalid options';
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ghost/core/core/server/services/jobs/worker-model-event-bridge.js` around
lines 62 - 64, The current validation uses a truthy guard so message.options ===
null slips through and later causes an internal handler error when accessing
options.context; update the validation around message.options (using the
existing isObject helper) to explicitly reject non-objects and null (e.g.,
change the check to ensure isObject(message.options) is true or explicitly test
message.options == null) so that the function returns 'Invalid options' for
null/invalid payloads before reaching the code that reads options.context.

}

async emitModelEvent(message) {
const Model = this.models[message.model];
let model;

try {
model = await Model.findOne({
id: message.id
}, {
require: true,
context: {internal: true}
});
} catch (err) {
if (isNotFoundError(err)) {
this.logging.warn(`Could not emit worker model event ${message.event}: ${message.model} ${message.id} was not found`);
return false;
}

throw err;
}

model._previousAttributes = normalizeDates({
...model.attributes,
...message.previous
});
model._changed = normalizeDates(message.changed);

const options = normalizeOptions(message.options);
this.events.emit(message.event, model, options);
return true;
}
}

function normalizeOptions(options = {}) {
return {
...options,
context: {
...options.context,
internal: true
}
};
}

function normalizeDates(attributes) {
const normalized = {...attributes};

for (const [key, value] of Object.entries(normalized)) {
if (key.endsWith('_at') && value && !(value instanceof Date)) {
normalized[key] = moment.utc(value).toDate();
}
}

return normalized;
}

function isObject(value) {
return !!value && typeof value === 'object' && !Array.isArray(value);
}

function isNotFoundError(err) {
return err && (
err.errorType === 'NotFoundError' ||
err.name === 'NotFoundError' ||
err.message === 'NotFound' ||
err.message === 'EmptyResponse'
);
}

module.exports = WorkerModelEventBridge;
module.exports.MODEL_EVENT_TYPE = MODEL_EVENT_TYPE;
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,23 @@ if (parentPort) {
.andWhere('status', 'comped');

const updateMemberIds = membersToUpdate.map(d => d.id);
const now = new Date();

// Update all comped members to free
updatedMembers = await db.knex('members')
.whereIn('id', updateMemberIds)
.update({
status: 'free',
updated_at: db.knex.raw('CURRENT_TIMESTAMP')
updated_at: now
});

const statusEvents = membersToUpdate.map((member) => {
const now = db.knex.raw('CURRENT_TIMESTAMP');

return {
id: ObjectId().toHexString(),
member_id: member.id,
from_status: member.status,
to_status: 'free',
created_at: now
created_at: db.knex.raw('CURRENT_TIMESTAMP')
};
});

Expand All @@ -88,6 +87,28 @@ if (parentPort) {
for (const chunk of chunks) {
await db.knex('members_status_events').insert(chunk);
}

if (parentPort) {
for (const member of membersToUpdate) {
parentPort.postMessage({
type: 'model-event',
event: 'member.edited',
model: 'Member',
id: member.id,
previous: {
status: member.status,
updated_at: member.updated_at
},
changed: {
status: 'free',
updated_at: now
},
options: {
context: {internal: true}
}
});
}
}
}

let cleanupEndDate = new Date();
Expand Down
101 changes: 101 additions & 0 deletions ghost/core/test/unit/server/services/jobs/job-service.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
const assert = require('node:assert/strict');
const Module = require('module');
const sinon = require('sinon');

describe('JobService', function () {
const jobServicePath = '../../../../../core/server/services/jobs/job-service';
let originalLoad;
let workerMessageHandler;
let handleModelEvent;

beforeEach(function () {
originalLoad = Module._load;
handleModelEvent = sinon.stub().resolves(true);

Module._load = function (request, parent, isMain) {
if (request === '@tryghost/job-manager') {
return class JobManager {
constructor(options) {
workerMessageHandler = options.workerMessageHandler;
}
};
}

if (request === './worker-model-event-bridge') {
return class WorkerModelEventBridge {
isModelEventMessage(message) {
return message && message.type === 'model-event';
}

handle(message, meta) {
return handleModelEvent(message, meta);
}
};
}

if (request === '@tryghost/logging') {
return {
info: sinon.stub(),
warn: sinon.stub(),
error: sinon.stub()
};
}

if (request === '../../models') {
return {Job: {}};
}

if (request === '../../../shared/sentry') {
return {captureException: sinon.stub()};
}

if (request === '@tryghost/domain-events') {
return {};
}

if (request === '../../../shared/config') {
return {};
}

if (request === '../../lib/common/events') {
return {emit: sinon.stub()};
}

return originalLoad.call(this, request, parent, isMain);
};

delete require.cache[require.resolve(jobServicePath)];
require(jobServicePath);
});

afterEach(function () {
Module._load = originalLoad;
delete require.cache[require.resolve(jobServicePath)];
sinon.restore();
});

it('routes model-event worker messages without leaving them as raw domain events', function () {
const message = {
type: 'model-event',
event: 'member.edited',
model: 'Member',
id: 'member-id',
previous: {status: 'comped'},
changed: {status: 'free'}
};

workerMessageHandler({name: 'clean-expired-comped', message});

sinon.assert.calledOnceWithExactly(handleModelEvent, {
type: 'model-event',
event: 'member.edited',
model: 'Member',
id: 'member-id',
previous: {status: 'comped'},
changed: {status: 'free'}
}, {
jobName: 'clean-expired-comped'
});
assert.equal(message.event, undefined);
});
});
Loading
Loading