From b047424648c510a2543b23016c46ea0667fb9b2a Mon Sep 17 00:00:00 2001 From: Steve Larson <9larsons@gmail.com> Date: Tue, 9 Jun 2026 11:21:37 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fixed=20expired=20complimentary?= =?UTF-8?q?=20members=20not=20triggering=20webhooks?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit no ref Direct DB updates in worker jobs bypass model lifecycle events, so expiring a comped member updated access without emitting member.edited. This keeps workers model-free and adds a narrow worker-to-host bridge instead of initializing Ghost models inside the worker runtime. --- .../core/server/services/jobs/job-service.js | 12 ++ .../jobs/worker-model-event-bridge.js | 135 ++++++++++++++ .../members/jobs/clean-expired-comped.js | 29 ++- .../server/services/jobs/job-service.test.js | 101 +++++++++++ .../jobs/worker-model-event-bridge.test.js | 167 ++++++++++++++++++ .../members/clean-expired-comped.test.js | 38 +++- .../services/webhooks/serialize.test.js | 40 +++++ 7 files changed, 510 insertions(+), 12 deletions(-) create mode 100644 ghost/core/core/server/services/jobs/worker-model-event-bridge.js create mode 100644 ghost/core/test/unit/server/services/jobs/job-service.test.js create mode 100644 ghost/core/test/unit/server/services/jobs/worker-model-event-bridge.test.js diff --git a/ghost/core/core/server/services/jobs/job-service.js b/ghost/core/core/server/services/jobs/job-service.js index 376d1b23a8c..20265f1f11a 100644 --- a/ghost/core/core/server/services/jobs/job-service.js +++ b/ghost/core/core/server/services/jobs/job-service.js @@ -9,14 +9,26 @@ const models = require('../../models'); const sentry = require('../../../shared/sentry'); const domainEvents = require('@tryghost/domain-events'); const config = require('../../../shared/config'); +const WorkerModelEventBridge = require('./worker-model-event-bridge'); const errorHandler = (error, workerMeta) => { logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); logging.error(error); sentry.captureException(error); }; const events = require('../../lib/common/events'); +const workerModelEventBridge = new WorkerModelEventBridge({models, events, logging, sentry}); const workerMessageHandler = ({name, message}) => { + if (workerModelEventBridge.isModelEventMessage(message)) { + workerModelEventBridge.handle({...message}, {jobName: name}); + + // @tryghost/job-manager treats any object message with an `event` property as + // a raw domain event. Model events are handled above and should not go + // through that path. + delete message.event; + return; + } + if (typeof message === 'string') { logging.info(`Worker for job ${name} sent a message: ${message}`); } diff --git a/ghost/core/core/server/services/jobs/worker-model-event-bridge.js b/ghost/core/core/server/services/jobs/worker-model-event-bridge.js new file mode 100644 index 00000000000..85c67653247 --- /dev/null +++ b/ghost/core/core/server/services/jobs/worker-model-event-bridge.js @@ -0,0 +1,135 @@ +const moment = require('moment'); + +const MODEL_EVENT_TYPE = 'model-event'; +const SUPPORTED_MODEL_EVENTS = { + 'member.edited': { + model: 'Member' + } +}; + +class WorkerModelEventBridge { + constructor({models, events, logging, sentry}) { + this.models = models; + this.events = events; + this.logging = logging; + this.sentry = sentry; + } + + isModelEventMessage(message) { + return isObject(message) && message.type === MODEL_EVENT_TYPE; + } + + async handle(message, meta = {}) { + const validationError = this.validate(message); + + if (validationError) { + this.logging.warn(`Ignoring invalid worker model event from job ${meta.jobName || 'unknown'}: ${validationError}`); + return false; + } + + try { + return await this.emitModelEvent(message); + } catch (err) { + this.logging.error(err); + this.sentry.captureException(err); + return false; + } + } + + validate(message) { + if (!this.isModelEventMessage(message)) { + return 'Unexpected message type'; + } + + const supportedEvent = SUPPORTED_MODEL_EVENTS[message.event]; + + if (!supportedEvent || supportedEvent.model !== message.model) { + return `Unsupported model event ${message.event || 'unknown'} for model ${message.model || 'unknown'}`; + } + + if (!message.id || typeof message.id !== 'string') { + return 'Missing model id'; + } + + if (!isObject(message.previous)) { + return 'Missing previous attributes'; + } + + if (!isObject(message.changed) || !Object.keys(message.changed).length) { + return 'Missing changed attributes'; + } + + if (message.options && !isObject(message.options)) { + return 'Invalid options'; + } + } + + async emitModelEvent(message) { + const Model = this.models[message.model]; + let model; + + try { + model = await Model.findOne({ + id: message.id + }, { + require: true, + context: {internal: true} + }); + } catch (err) { + if (isNotFoundError(err)) { + this.logging.warn(`Could not emit worker model event ${message.event}: ${message.model} ${message.id} was not found`); + return false; + } + + throw err; + } + + model._previousAttributes = normalizeDates({ + ...model.attributes, + ...message.previous + }); + model._changed = normalizeDates(message.changed); + + const options = normalizeOptions(message.options); + this.events.emit(message.event, model, options); + return true; + } +} + +function normalizeOptions(options = {}) { + return { + ...options, + context: { + ...options.context, + internal: true + } + }; +} + +function normalizeDates(attributes) { + const normalized = {...attributes}; + + for (const [key, value] of Object.entries(normalized)) { + if (key.endsWith('_at') && value && !(value instanceof Date)) { + normalized[key] = moment.utc(value).toDate(); + } + } + + return normalized; +} + +function isObject(value) { + return !!value && typeof value === 'object' && !Array.isArray(value); +} + +function isNotFoundError(err) { + return err && ( + err.errorType === 'NotFoundError' || + err.name === 'NotFoundError' || + err.message === 'NotFound' || + err.message === 'EmptyResponse' + ); +} + +module.exports = WorkerModelEventBridge; +module.exports.MODEL_EVENT_TYPE = MODEL_EVENT_TYPE; diff --git a/ghost/core/core/server/services/members/jobs/clean-expired-comped.js b/ghost/core/core/server/services/members/jobs/clean-expired-comped.js index 8c79ddfae12..16d37a818fa 100644 --- a/ghost/core/core/server/services/members/jobs/clean-expired-comped.js +++ b/ghost/core/core/server/services/members/jobs/clean-expired-comped.js @@ -57,24 +57,23 @@ if (parentPort) { .andWhere('status', 'comped'); const updateMemberIds = membersToUpdate.map(d => d.id); + const now = new Date(); // Update all comped members to free updatedMembers = await db.knex('members') .whereIn('id', updateMemberIds) .update({ status: 'free', - updated_at: db.knex.raw('CURRENT_TIMESTAMP') + updated_at: now }); const statusEvents = membersToUpdate.map((member) => { - const now = db.knex.raw('CURRENT_TIMESTAMP'); - return { id: ObjectId().toHexString(), member_id: member.id, from_status: member.status, to_status: 'free', - created_at: now + created_at: db.knex.raw('CURRENT_TIMESTAMP') }; }); @@ -88,6 +87,28 @@ if (parentPort) { for (const chunk of chunks) { await db.knex('members_status_events').insert(chunk); } + + if (parentPort) { + for (const member of membersToUpdate) { + parentPort.postMessage({ + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: member.id, + previous: { + status: member.status, + updated_at: member.updated_at + }, + changed: { + status: 'free', + updated_at: now + }, + options: { + context: {internal: true} + } + }); + } + } } let cleanupEndDate = new Date(); diff --git a/ghost/core/test/unit/server/services/jobs/job-service.test.js b/ghost/core/test/unit/server/services/jobs/job-service.test.js new file mode 100644 index 00000000000..533a0a35c40 --- /dev/null +++ b/ghost/core/test/unit/server/services/jobs/job-service.test.js @@ -0,0 +1,101 @@ +const assert = require('node:assert/strict'); +const Module = require('module'); +const sinon = require('sinon'); + +describe('JobService', function () { + const jobServicePath = '../../../../../core/server/services/jobs/job-service'; + let originalLoad; + let workerMessageHandler; + let handleModelEvent; + + beforeEach(function () { + originalLoad = Module._load; + handleModelEvent = sinon.stub().resolves(true); + + Module._load = function (request, parent, isMain) { + if (request === '@tryghost/job-manager') { + return class JobManager { + constructor(options) { + workerMessageHandler = options.workerMessageHandler; + } + }; + } + + if (request === './worker-model-event-bridge') { + return class WorkerModelEventBridge { + isModelEventMessage(message) { + return message && message.type === 'model-event'; + } + + handle(message, meta) { + return handleModelEvent(message, meta); + } + }; + } + + if (request === '@tryghost/logging') { + return { + info: sinon.stub(), + warn: sinon.stub(), + error: sinon.stub() + }; + } + + if (request === '../../models') { + return {Job: {}}; + } + + if (request === '../../../shared/sentry') { + return {captureException: sinon.stub()}; + } + + if (request === '@tryghost/domain-events') { + return {}; + } + + if (request === '../../../shared/config') { + return {}; + } + + if (request === '../../lib/common/events') { + return {emit: sinon.stub()}; + } + + return originalLoad.call(this, request, parent, isMain); + }; + + delete require.cache[require.resolve(jobServicePath)]; + require(jobServicePath); + }); + + afterEach(function () { + Module._load = originalLoad; + delete require.cache[require.resolve(jobServicePath)]; + sinon.restore(); + }); + + it('routes model-event worker messages without leaving them as raw domain events', function () { + const message = { + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: {status: 'comped'}, + changed: {status: 'free'} + }; + + workerMessageHandler({name: 'clean-expired-comped', message}); + + sinon.assert.calledOnceWithExactly(handleModelEvent, { + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: {status: 'comped'}, + changed: {status: 'free'} + }, { + jobName: 'clean-expired-comped' + }); + assert.equal(message.event, undefined); + }); +}); diff --git a/ghost/core/test/unit/server/services/jobs/worker-model-event-bridge.test.js b/ghost/core/test/unit/server/services/jobs/worker-model-event-bridge.test.js new file mode 100644 index 00000000000..857cfbcb28c --- /dev/null +++ b/ghost/core/test/unit/server/services/jobs/worker-model-event-bridge.test.js @@ -0,0 +1,167 @@ +const assert = require('node:assert/strict'); +const sinon = require('sinon'); + +const WorkerModelEventBridge = require('../../../../../core/server/services/jobs/worker-model-event-bridge'); + +describe('WorkerModelEventBridge', function () { + let models; + let events; + let logging; + let sentry; + let bridge; + + beforeEach(function () { + models = { + Member: { + findOne: sinon.stub() + } + }; + events = { + emit: sinon.stub() + }; + logging = { + warn: sinon.stub(), + error: sinon.stub() + }; + sentry = { + captureException: sinon.stub() + }; + bridge = new WorkerModelEventBridge({models, events, logging, sentry}); + }); + + it('emits a reconstructed member.edited event', async function () { + const currentUpdatedAt = new Date('2026-05-29T00:00:00.000Z'); + const model = { + attributes: { + id: 'member-id', + email: 'member@example.com', + status: 'free', + updated_at: currentUpdatedAt + } + }; + + models.Member.findOne.resolves(model); + + const result = await bridge.handle({ + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: { + status: 'comped', + updated_at: '2026-04-28T15:55:45.000Z' + }, + changed: { + status: 'free', + updated_at: currentUpdatedAt + }, + options: { + context: {internal: true} + } + }, { + jobName: 'clean-expired-comped' + }); + + assert.equal(result, true); + sinon.assert.calledOnceWithExactly(models.Member.findOne, { + id: 'member-id' + }, { + require: true, + context: {internal: true} + }); + + assert.equal(model._previousAttributes.id, 'member-id'); + assert.equal(model._previousAttributes.email, 'member@example.com'); + assert.equal(model._previousAttributes.status, 'comped'); + assert.deepEqual(model._previousAttributes.updated_at, new Date('2026-04-28T15:55:45.000Z')); + assert.equal(model._changed.status, 'free'); + assert.deepEqual(model._changed.updated_at, currentUpdatedAt); + sinon.assert.calledOnceWithExactly(events.emit, 'member.edited', model, { + context: {internal: true} + }); + sinon.assert.notCalled(logging.warn); + sinon.assert.notCalled(logging.error); + sinon.assert.notCalled(sentry.captureException); + }); + + it('logs and ignores unsupported model events', async function () { + const result = await bridge.handle({ + type: 'model-event', + event: 'post.edited', + model: 'Post', + id: 'post-id', + previous: {title: 'Old title'}, + changed: {title: 'New title'} + }, { + jobName: 'some-job' + }); + + assert.equal(result, false); + sinon.assert.calledOnce(logging.warn); + sinon.assert.notCalled(models.Member.findOne); + sinon.assert.notCalled(events.emit); + sinon.assert.notCalled(logging.error); + sinon.assert.notCalled(sentry.captureException); + }); + + it('logs and ignores malformed model event messages', async function () { + const result = await bridge.handle({ + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: {status: 'comped'}, + changed: {} + }, { + jobName: 'clean-expired-comped' + }); + + assert.equal(result, false); + sinon.assert.calledOnce(logging.warn); + sinon.assert.notCalled(models.Member.findOne); + sinon.assert.notCalled(events.emit); + }); + + it('logs missing models without crashing', async function () { + const notFoundError = new Error('EmptyResponse'); + models.Member.findOne.rejects(notFoundError); + + const result = await bridge.handle({ + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'missing-member-id', + previous: {status: 'comped'}, + changed: {status: 'free'} + }, { + jobName: 'clean-expired-comped' + }); + + assert.equal(result, false); + sinon.assert.calledOnce(logging.warn); + sinon.assert.notCalled(events.emit); + sinon.assert.notCalled(logging.error); + sinon.assert.notCalled(sentry.captureException); + }); + + it('captures handler errors without crashing', async function () { + const error = new Error('database unavailable'); + models.Member.findOne.rejects(error); + + const result = await bridge.handle({ + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: {status: 'comped'}, + changed: {status: 'free'} + }, { + jobName: 'clean-expired-comped' + }); + + assert.equal(result, false); + sinon.assert.calledOnceWithExactly(logging.error, error); + sinon.assert.calledOnceWithExactly(sentry.captureException, error); + sinon.assert.notCalled(events.emit); + }); +}); diff --git a/ghost/core/test/unit/server/services/members/clean-expired-comped.test.js b/ghost/core/test/unit/server/services/members/clean-expired-comped.test.js index f6430a58cbe..9155e433c74 100644 --- a/ghost/core/test/unit/server/services/members/clean-expired-comped.test.js +++ b/ghost/core/test/unit/server/services/members/clean-expired-comped.test.js @@ -2,6 +2,7 @@ const assert = require('node:assert/strict'); const Module = require('node:module'); const rawTimestamp = {raw: 'CURRENT_TIMESTAMP'}; +const previousUpdatedAt = new Date('2026-04-28T15:55:45.000Z'); describe('Job: Clean expired comped members', function () { const jobPath = '../../../../../core/server/services/members/jobs/clean-expired-comped'; @@ -10,12 +11,15 @@ describe('Job: Clean expired comped members', function () { delete require.cache[require.resolve(jobPath)]; }); - it('bumps updated_at when expiring comped members', async function () { + it('bumps updated_at and posts model-event messages when expiring comped members', async function () { const updateCalls = []; + const messages = []; const done = new Promise((resolve) => { const parentPort = { once() {}, postMessage(message) { + messages.push(message); + if (message === 'done') { resolve(); } @@ -36,14 +40,31 @@ describe('Job: Clean expired comped members', function () { await done; - assert.deepEqual(updateCalls, [{ - tableName: 'members', - ids: ['member-id'], - data: { + assert.equal(updateCalls.length, 1); + assert.equal(updateCalls[0].tableName, 'members'); + assert.deepEqual(updateCalls[0].ids, ['member-id']); + assert.equal(updateCalls[0].data.status, 'free'); + assert.ok(updateCalls[0].data.updated_at instanceof Date); + + const modelEventMessage = messages.find(message => message && message.type === 'model-event'); + assert.deepEqual(modelEventMessage, { + type: 'model-event', + event: 'member.edited', + model: 'Member', + id: 'member-id', + previous: { + status: 'comped', + updated_at: previousUpdatedAt + }, + changed: { status: 'free', - updated_at: rawTimestamp + updated_at: updateCalls[0].data.updated_at + }, + options: { + context: {internal: true} } - }]); + }); + assert.ok(messages.indexOf(modelEventMessage) < messages.indexOf('done')); }); }); @@ -76,7 +97,8 @@ function createQuery(tableName, updateCalls) { andWhere() { return Promise.resolve([{ id: 'member-id', - status: 'comped' + status: 'comped', + updated_at: previousUpdatedAt }]); }, diff --git a/ghost/core/test/unit/server/services/webhooks/serialize.test.js b/ghost/core/test/unit/server/services/webhooks/serialize.test.js index d63383c207c..502d47c496a 100644 --- a/ghost/core/test/unit/server/services/webhooks/serialize.test.js +++ b/ghost/core/test/unit/server/services/webhooks/serialize.test.js @@ -1,4 +1,5 @@ const assert = require('node:assert/strict'); +const sinon = require('sinon'); const models = require('../../../../../core/server/models'); @@ -20,6 +21,7 @@ describe('WebhookService - Serialize', function () { afterEach(function () { tiersService.api = null; + sinon.restore(); }); it('rejects with no arguments', async function () { @@ -84,4 +86,42 @@ describe('WebhookService - Serialize', function () { assert.equal(result.post.current.title, 'A brand new title', 'The updated title should be present'); assert.equal(result.post.previous.title, 'Ghostly Kitchen Sink', 'The previous title should also be present'); }); + + it('can serialize reconstructed member.edited model event state', async function () { + const previousUpdatedAt = new Date('2026-04-28T15:55:45.000Z'); + const currentUpdatedAt = new Date('2026-05-29T00:00:00.000Z'); + const memberModel = new models.Member({ + id: 'member-id', + uuid: 'member-uuid', + email: 'member@example.com', + status: 'free', + created_at: previousUpdatedAt, + updated_at: currentUpdatedAt + }); + + sinon.stub(memberModel, 'load').resolves(memberModel); + memberModel._previousAttributes = { + ...memberModel.attributes, + status: 'comped', + updated_at: previousUpdatedAt + }; + memberModel._changed = { + status: 'free', + updated_at: currentUpdatedAt + }; + + const result = await serialize('member.edited', memberModel); + + sinon.assert.calledOnceWithExactly(memberModel.load, [ + 'labels', + 'products', + 'newsletters' + ]); + assert.equal(result.member.current.status, 'free'); + assert.equal(result.member.current.comped, false); + assert.deepEqual(result.member.current.updated_at, currentUpdatedAt); + assert.equal(result.member.previous.status, 'comped'); + assert.deepEqual(result.member.previous.updated_at, previousUpdatedAt); + assert.deepEqual(Object.keys(result.member.previous).sort(), ['status', 'updated_at']); + }); });