diff --git a/src/change_stream.ts b/src/change_stream.ts index 403c464edd..ed847519e8 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -205,6 +205,16 @@ export interface ChangeStreamDocumentCommon { splitEvent?: ChangeStreamSplitEvent; } +/** @public */ +export interface ChangeStreamDocumentWallTime { + /** + * The server date and time of the database operation. + * wallTime differs from clusterTime in that clusterTime is a timestamp taken from the oplog entry associated with the database operation event. + * @sinceServerVersion 6.0.0 + */ + wallTime?: Date; +} + /** @public */ export interface ChangeStreamDocumentCollectionUUID { /** @@ -239,7 +249,8 @@ export interface ChangeStreamDocumentOperationDescription { export interface ChangeStreamInsertDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'insert'; /** This key will contain the document being inserted */ @@ -255,7 +266,8 @@ export interface ChangeStreamInsertDocument export interface ChangeStreamUpdateDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'update'; /** @@ -285,7 +297,8 @@ export interface ChangeStreamUpdateDocument */ export interface ChangeStreamReplaceDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentKey { + ChangeStreamDocumentKey, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'replace'; /** The fullDocument of a replace event represents the document after the insert of the replacement document */ @@ -309,7 +322,8 @@ export interface ChangeStreamReplaceDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentKey, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'delete'; /** Namespace the delete event occurred on */ @@ -330,7 +344,8 @@ export interface ChangeStreamDeleteDocument */ export interface ChangeStreamDropDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'drop'; /** Namespace the drop event occurred on */ @@ -343,7 +358,8 @@ export interface ChangeStreamDropDocument */ export interface ChangeStreamRenameDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'rename'; /** The new name for the `ns.coll` collection */ @@ -356,7 +372,9 @@ export interface ChangeStreamRenameDocument * @public * @see https://www.mongodb.com/docs/manual/reference/change-events/#dropdatabase-event */ -export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCommon { +export interface ChangeStreamDropDatabaseDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'dropDatabase'; /** The database dropped */ @@ -367,7 +385,9 @@ export interface ChangeStreamDropDatabaseDocument extends ChangeStreamDocumentCo * @public * @see https://www.mongodb.com/docs/manual/reference/change-events/#invalidate-event */ -export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentCommon { +export interface ChangeStreamInvalidateDocument + extends ChangeStreamDocumentCommon, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'invalidate'; } @@ -380,7 +400,8 @@ export interface ChangeStreamInvalidateDocument extends ChangeStreamDocumentComm export interface ChangeStreamCreateIndexDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, - ChangeStreamDocumentOperationDescription { + ChangeStreamDocumentOperationDescription, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'createIndexes'; } @@ -393,7 +414,8 @@ export interface ChangeStreamCreateIndexDocument export interface ChangeStreamDropIndexDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, - ChangeStreamDocumentOperationDescription { + ChangeStreamDocumentOperationDescription, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'dropIndexes'; } @@ -405,7 +427,8 @@ export interface ChangeStreamDropIndexDocument */ export interface ChangeStreamCollModDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'modify'; } @@ -416,7 +439,8 @@ export interface ChangeStreamCollModDocument */ export interface ChangeStreamCreateDocument extends ChangeStreamDocumentCommon, - ChangeStreamDocumentCollectionUUID { + ChangeStreamDocumentCollectionUUID, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'create'; @@ -435,7 +459,8 @@ export interface ChangeStreamCreateDocument export interface ChangeStreamShardCollectionDocument extends ChangeStreamDocumentCommon, ChangeStreamDocumentCollectionUUID, - ChangeStreamDocumentOperationDescription { + ChangeStreamDocumentOperationDescription, + ChangeStreamDocumentWallTime { /** Describes the type of operation represented in this change notification */ operationType: 'shardCollection'; } diff --git a/src/index.ts b/src/index.ts index b886504295..0021dde7ad 100644 --- a/src/index.ts +++ b/src/index.ts @@ -208,6 +208,7 @@ export type { ChangeStreamDocumentCommon, ChangeStreamDocumentKey, ChangeStreamDocumentOperationDescription, + ChangeStreamDocumentWallTime, ChangeStreamDropDatabaseDocument, ChangeStreamDropDocument, ChangeStreamDropIndexDocument, diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 7a188b2e33..1e9ac09901 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -170,6 +170,26 @@ describe('Change Streams', function () { } }); + it('contains a wallTime date property on the change', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=6.0.0' } }, + async test() { + const collection = db.collection('wallTimeTest'); + const changeStream = collection.watch(pipeline); + + const willBeChanges = on(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ d: 4 }); + + const change = (await willBeChanges.next()).value[0]; + + await changeStream.close(); + + expect(change).to.have.property('wallTime'); + expect(change.wallTime).to.be.instanceOf(Date); + } + }); + it('should create a ChangeStream on a collection and emit change events', { metadata: { requires: { topology: 'replicaset' } }, async test() { diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts index 02815cefd6..ea689a61ea 100644 --- a/test/types/change_stream.test-d.ts +++ b/test/types/change_stream.test-d.ts @@ -75,6 +75,7 @@ declare const crudChange: CrudChangeDoc; expectType ? true : false>(true); expectType(crudChange.documentKey._id); // _id will get typed expectType(crudChange.documentKey.blah); // shard keys could be anything +expectType(crudChange.wallTime); // ChangeStreamFullNameSpace expectType(crudChange.ns); @@ -87,12 +88,14 @@ switch (change.operationType) { expectType(change.documentKey._id); expectType(change.documentKey.blah); expectType(change.fullDocument); + expectType(change.wallTime); break; } case 'update': { expectType>(change); expectType<'update'>(change.operationType); expectType(change.fullDocument); // Update only attaches fullDocument if configured + expectType(change.wallTime); expectType>(change.updateDescription); expectType | undefined>(change.updateDescription.updatedFields); expectType(change.updateDescription.removedFields); @@ -104,23 +107,27 @@ switch (change.operationType) { case 'replace': { expectType>(change); expectType<'replace'>(change.operationType); + expectType(change.wallTime); expectType(change.fullDocument); break; } case 'delete': { expectType>(change); + expectType(change.wallTime); expectType<'delete'>(change.operationType); break; } case 'drop': { expectType(change); expectType<'drop'>(change.operationType); + expectType(change.wallTime); expectType<{ db: string; coll: string }>(change.ns); break; } case 'rename': { expectType(change); expectType<'rename'>(change.operationType); + expectType(change.wallTime); expectType<{ db: string; coll: string }>(change.ns); expectType<{ db: string; coll: string }>(change.to); break; @@ -128,37 +135,44 @@ switch (change.operationType) { case 'dropDatabase': { expectType(change); expectType<'dropDatabase'>(change.operationType); + expectType(change.wallTime); expectError(change.ns.coll); break; } case 'invalidate': { expectType(change); expectType<'invalidate'>(change.operationType); + expectType(change.wallTime); break; } case 'create': { expectType(change); expectType<'create'>(change.operationType); + expectType(change.wallTime); break; } case 'modify': { expectType(change); expectType<'modify'>(change.operationType); + expectType(change.wallTime); break; } case 'createIndexes': { expectType(change); expectType<'createIndexes'>(change.operationType); + expectType(change.wallTime); break; } case 'dropIndexes': { expectType(change); expectType<'dropIndexes'>(change.operationType); + expectType(change.wallTime); break; } case 'shardCollection': { expectType(change); expectType<'shardCollection'>(change.operationType); + expectType(change.wallTime); break; } case 'reshardCollection': {