Skip to content

fix(NODE-6955): add missing wallTime property to applicable change stream documents #4541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
51 changes: 38 additions & 13 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ export interface ChangeStreamDocumentCommon {
splitEvent?: ChangeStreamSplitEvent;
}

/** @public */
export interface ChangeStreamDocumentWallTime {
/**
* The server date and time of the database operation.
* wallTime differs from clusterTime in that clusterTime is a timestamp taken from the oplog entry associated with the database operation event.
* @sinceServerVersion 6.0.0
*/
wallTime?: Date;
}

/** @public */
export interface ChangeStreamDocumentCollectionUUID {
/**
Expand Down Expand Up @@ -239,7 +249,8 @@ export interface ChangeStreamDocumentOperationDescription {
export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'insert';
/** This key will contain the document being inserted */
Expand All @@ -255,7 +266,8 @@ export interface ChangeStreamInsertDocument<TSchema extends Document = Document>
export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'update';
/**
Expand Down Expand Up @@ -285,7 +297,8 @@ export interface ChangeStreamUpdateDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamReplaceDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema> {
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'replace';
/** The fullDocument of a replace event represents the document after the insert of the replacement document */
Expand All @@ -309,7 +322,8 @@ export interface ChangeStreamReplaceDocument<TSchema extends Document = Document
export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentKey<TSchema>,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'delete';
/** Namespace the delete event occurred on */
Expand All @@ -330,7 +344,8 @@ export interface ChangeStreamDeleteDocument<TSchema extends Document = Document>
*/
export interface ChangeStreamDropDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'drop';
/** Namespace the drop event occurred on */
Expand All @@ -343,7 +358,8 @@ export interface ChangeStreamDropDocument
*/
export interface ChangeStreamRenameDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'rename';
/** The new name for the `ns.coll` collection */
Expand All @@ -356,7 +372,9 @@ export interface ChangeStreamRenameDocument
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event
*/
export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamDropDatabaseDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropDatabase';
/** The database dropped */
Expand All @@ -367,7 +385,9 @@ export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCo
* @public
* @see https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event
*/
export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentCommon {
export interface ChangeStreamInvalidateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'invalidate';
}
Expand All @@ -380,7 +400,8 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm
export interface ChangeStreamCreateIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'createIndexes';
}
Expand All @@ -393,7 +414,8 @@ export interface ChangeStreamCreateIndexDocument
export interface ChangeStreamDropIndexDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'dropIndexes';
}
Expand All @@ -405,7 +427,8 @@ export interface ChangeStreamDropIndexDocument
*/
export interface ChangeStreamCollModDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'modify';
}
Expand All @@ -416,7 +439,8 @@ export interface ChangeStreamCollModDocument
*/
export interface ChangeStreamCreateDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID {
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'create';

Expand All @@ -435,7 +459,8 @@ export interface ChangeStreamCreateDocument
export interface ChangeStreamShardCollectionDocument
extends ChangeStreamDocumentCommon,
ChangeStreamDocumentCollectionUUID,
ChangeStreamDocumentOperationDescription {
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime {
/** Describes the type of operation represented in this change notification */
operationType: 'shardCollection';
}
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ export type {
ChangeStreamDocumentCommon,
ChangeStreamDocumentKey,
ChangeStreamDocumentOperationDescription,
ChangeStreamDocumentWallTime,
ChangeStreamDropDatabaseDocument,
ChangeStreamDropDocument,
ChangeStreamDropIndexDocument,
Expand Down
20 changes: 20 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ describe('Change Streams', function () {
}
});

it('contains a wallTime date property on the change', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=6.0.0' } },
async test() {
const collection = db.collection('wallTimeTest');
const changeStream = collection.watch(pipeline);

const willBeChanges = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await collection.insertOne({ d: 4 });

const change = (await willBeChanges.next()).value[0];

await changeStream.close();

expect(change).to.have.property('wallTime');
expect(change.wallTime).to.be.instanceOf(Date);
}
});

it('should create a ChangeStream on a collection and emit change events', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
Expand Down
14 changes: 14 additions & 0 deletions test/types/change_stream.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ declare const crudChange: CrudChangeDoc;
expectType<CrudChangeDoc extends ChangeStreamDocumentKey<Schema> ? true : false>(true);
expectType<number>(crudChange.documentKey._id); // _id will get typed
expectType<any>(crudChange.documentKey.blah); // shard keys could be anything
expectType<Date | undefined>(crudChange.wallTime);

// ChangeStreamFullNameSpace
expectType<ChangeStreamNameSpace>(crudChange.ns);
Expand All @@ -87,12 +88,14 @@ switch (change.operationType) {
expectType<number>(change.documentKey._id);
expectType<any>(change.documentKey.blah);
expectType<Schema>(change.fullDocument);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'update': {
expectType<ChangeStreamUpdateDocument<Schema>>(change);
expectType<'update'>(change.operationType);
expectType<Schema | undefined>(change.fullDocument); // Update only attaches fullDocument if configured
expectType<Date | undefined>(change.wallTime);
expectType<UpdateDescription<Schema>>(change.updateDescription);
expectType<Partial<Schema> | undefined>(change.updateDescription.updatedFields);
expectType<string[] | undefined>(change.updateDescription.removedFields);
Expand All @@ -104,61 +107,72 @@ switch (change.operationType) {
case 'replace': {
expectType<ChangeStreamReplaceDocument<Schema>>(change);
expectType<'replace'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<Schema>(change.fullDocument);
break;
}
case 'delete': {
expectType<ChangeStreamDeleteDocument<Schema>>(change);
expectType<Date | undefined>(change.wallTime);
expectType<'delete'>(change.operationType);
break;
}
case 'drop': {
expectType<ChangeStreamDropDocument>(change);
expectType<'drop'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
break;
}
case 'rename': {
expectType<ChangeStreamRenameDocument>(change);
expectType<'rename'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectType<{ db: string; coll: string }>(change.ns);
expectType<{ db: string; coll: string }>(change.to);
break;
}
case 'dropDatabase': {
expectType<ChangeStreamDropDatabaseDocument>(change);
expectType<'dropDatabase'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
expectError(change.ns.coll);
break;
}
case 'invalidate': {
expectType<ChangeStreamInvalidateDocument>(change);
expectType<'invalidate'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'create': {
expectType<ChangeStreamCreateDocument>(change);
expectType<'create'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'modify': {
expectType<ChangeStreamCollModDocument>(change);
expectType<'modify'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'createIndexes': {
expectType<ChangeStreamCreateIndexDocument>(change);
expectType<'createIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'dropIndexes': {
expectType<ChangeStreamDropIndexDocument>(change);
expectType<'dropIndexes'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'shardCollection': {
expectType<ChangeStreamShardCollectionDocument>(change);
expectType<'shardCollection'>(change.operationType);
expectType<Date | undefined>(change.wallTime);
break;
}
case 'reshardCollection': {
Expand Down