Skip to content

Commit c8362eb

Browse files
test: resumability for change stream unified tests (#3282)
1 parent 9c1782e commit c8362eb

10 files changed

+2
-7603
lines changed
Lines changed: 1 addition & 317 deletions
Original file line numberDiff line numberDiff line change
@@ -1,324 +1,8 @@
1-
import { expect } from 'chai';
21
import * as path from 'path';
32

4-
import { Document, MongoClient } from '../../../src';
5-
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
63
import { loadSpecTests } from '../../spec';
74
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';
8-
import { delay, setupDatabase } from '../shared';
95

10-
// TODO(NODE-4126): Fix change stream resumabilty in iterator mode
11-
const skippedResumabilityTests = [
12-
'change stream resumes after HostUnreachable',
13-
'change stream resumes after HostNotFound',
14-
'change stream resumes after NetworkTimeout',
15-
'change stream resumes after ShutdownInProgress',
16-
'change stream resumes after PrimarySteppedDown',
17-
'change stream resumes after ExceededTimeLimit',
18-
'change stream resumes after SocketException',
19-
'change stream resumes after NotWritablePrimary',
20-
'change stream resumes after InterruptedAtShutdown',
21-
'change stream resumes after InterruptedDueToReplStateChange',
22-
'change stream resumes after NotPrimaryNoSecondaryOk',
23-
'change stream resumes after NotPrimaryOrSecondary',
24-
'change stream resumes after StaleShardVersion',
25-
'change stream resumes after StaleEpoch',
26-
'change stream resumes after RetryChangeStream',
27-
'change stream resumes after FailedToSatisfyReadPreference',
28-
'change stream resumes if error contains ResumableChangeStreamError',
29-
'change stream resumes after a network error',
30-
'change stream resumes after CursorNotFound',
31-
'Test consecutive resume'
32-
];
336
describe('Change Streams Spec - Unified', function () {
34-
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), skippedResumabilityTests);
35-
});
36-
37-
// TODO(NODE-3819): Unskip flaky MacOS tests.
38-
const maybeDescribe = process.platform === 'darwin' ? describe.skip : describe;
39-
maybeDescribe('Change Stream Spec - v1', function () {
40-
let globalClient;
41-
let ctx;
42-
let events;
43-
44-
const TESTS_TO_SKIP = new Set([]);
45-
46-
before(function () {
47-
const configuration = this.configuration;
48-
return setupDatabase(configuration).then(() => {
49-
globalClient = configuration.newClient();
50-
return globalClient.connect();
51-
});
52-
});
53-
54-
after(function () {
55-
const gc = globalClient;
56-
globalClient = undefined;
57-
return new Promise<void>(r => gc.close(() => r()));
58-
});
59-
60-
loadSpecTests(path.join('change-streams', 'legacy')).forEach(suite => {
61-
const ALL_DBS = [suite.database_name, suite.database2_name];
62-
63-
describe(suite.name, () => {
64-
beforeEach(function () {
65-
const gc = globalClient;
66-
const sDB = suite.database_name;
67-
const sColl = suite.collection_name;
68-
const configuration = this.configuration;
69-
return Promise.all(
70-
ALL_DBS.map(db => gc.db(db).dropDatabase({ writeConcern: { w: 'majority' } }))
71-
)
72-
.then(() => gc.db(sDB).createCollection(sColl))
73-
.then(() => {
74-
if (suite.database2_name && suite.collection2_name) {
75-
return gc.db(suite.database2_name).createCollection(suite.collection2_name);
76-
}
77-
})
78-
.then(() =>
79-
configuration
80-
.newClient({}, { monitorCommands: true, heartbeatFrequencyMS: 100 })
81-
.connect()
82-
)
83-
.then(client => {
84-
ctx = { gc, client };
85-
events = [];
86-
const _events = events;
87-
88-
ctx.database = ctx.client.db(sDB);
89-
ctx.collection = ctx.database.collection(sColl);
90-
ctx.client.on('commandStarted', e => {
91-
if (e.commandName !== LEGACY_HELLO_COMMAND) _events.push(e);
92-
});
93-
});
94-
});
95-
96-
afterEach(function () {
97-
const client = ctx.client;
98-
ctx = undefined;
99-
events = undefined;
100-
101-
client.removeAllListeners('commandStarted');
102-
103-
return client && client.close(true);
104-
});
105-
106-
suite.tests.forEach(test => {
107-
const shouldSkip = test.skip || TESTS_TO_SKIP.has(test.description);
108-
// There's no evidence of test.only being defined in the spec files
109-
// But let's avoid removing it now to just be sure we aren't changing anything
110-
// These tests will eventually be replaced by unified format versions.
111-
const itFn = shouldSkip ? it.skip : test.only ? Reflect.get(it, 'only') : it;
112-
const metadata = generateMetadata(test);
113-
const testFn = generateTestFn(test);
114-
115-
itFn(test.description, { metadata, test: testFn });
116-
});
117-
});
118-
});
119-
120-
// Fn Generator methods
121-
122-
function generateMetadata(test) {
123-
const topology = test.topology;
124-
const requires: MongoDBMetadataUI['requires'] = {};
125-
const versionLimits = [];
126-
if (test.minServerVersion) {
127-
versionLimits.push(`>=${test.minServerVersion}`);
128-
}
129-
if (test.maxServerVersion) {
130-
versionLimits.push(`<=${test.maxServerVersion}`);
131-
}
132-
if (versionLimits.length) {
133-
requires.mongodb = versionLimits.join(' ');
134-
}
135-
136-
if (topology) {
137-
requires.topology = topology;
138-
}
139-
140-
return { requires };
141-
}
142-
143-
function generateTestFn(test) {
144-
const configureFailPoint = makeFailPointCommand(test);
145-
const testFnRunOperations = makeTestFnRunOperations(test);
146-
const testSuccess = makeTestSuccess(test);
147-
const testFailure = makeTestFailure(test);
148-
const testAPM = makeTestAPM(test);
149-
150-
return function testFn() {
151-
return configureFailPoint(ctx)
152-
.then(() => testFnRunOperations(ctx))
153-
.then(testSuccess, testFailure)
154-
.then(() => testAPM(ctx, events));
155-
};
156-
}
157-
158-
function makeFailPointCommand(test) {
159-
if (!test.failPoint) {
160-
return () => Promise.resolve();
161-
}
162-
163-
return function (ctx) {
164-
return ctx.gc.db('admin').command(test.failPoint);
165-
};
166-
}
167-
168-
function makeTestSuccess(test) {
169-
const result = test.result;
170-
171-
return function testSuccess(value) {
172-
if (result.error) {
173-
throw new Error(`Expected test to return error ${result.error}`);
174-
}
175-
176-
if (result.success) {
177-
expect(value).to.have.a.lengthOf(result.success.length);
178-
expect(value).to.matchMongoSpec(result.success);
179-
}
180-
};
181-
}
182-
183-
function makeTestFailure(test) {
184-
const result = test.result;
185-
186-
return function testFailure(err) {
187-
if (!result.error) {
188-
throw err;
189-
}
190-
191-
expect(err).to.matchMongoSpec(result.error);
192-
};
193-
}
194-
195-
function makeTestAPM(test) {
196-
const expectedEvents = test.expectations || [];
197-
198-
return function testAPM(ctx, events) {
199-
expectedEvents
200-
.map(e => e.command_started_event)
201-
.map(normalizeAPMEvent)
202-
.forEach((expected, idx) => {
203-
if (!events[idx]) {
204-
throw new Error(
205-
`Expected there to be an APM event at index ${idx}, but there was none`
206-
);
207-
}
208-
// killCursors events should be skipped
209-
// (see https://github.com/mongodb/specifications/blob/master/source/change-streams/tests/README.rst#spec-test-runner)
210-
if (events[idx].commandName === 'killCursors') {
211-
return;
212-
}
213-
expect(events[idx]).to.matchMongoSpec(expected);
214-
});
215-
};
216-
}
217-
218-
function allSettled(promises) {
219-
let err;
220-
return Promise.all(promises.map(p => p.catch(x => (err = err || x)))).then(args => {
221-
if (err) {
222-
throw err;
223-
}
224-
return args;
225-
});
226-
}
227-
228-
function makeTestFnRunOperations(test) {
229-
const target = test.target;
230-
const operations = test.operations;
231-
const success = test.result.success || [];
232-
233-
return function testFnRunOperations(ctx) {
234-
const changeStreamPipeline = test.changeStreamPipeline;
235-
const changeStreamOptions = test.changeStreamOptions;
236-
ctx.changeStream = ctx[target].watch(changeStreamPipeline, changeStreamOptions);
237-
238-
const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length);
239-
const operationsPromise = runOperations(ctx.gc, operations);
240-
241-
return allSettled([changeStreamPromise, operationsPromise]).then(args => args[0]);
242-
};
243-
}
244-
245-
function readAndCloseChangeStream(changeStream, numChanges) {
246-
const close = makeChangeStreamCloseFn(changeStream);
247-
let changeStreamPromise = changeStream.next().then(r => [r]);
248-
249-
for (let i = 1; i < numChanges; i += 1) {
250-
changeStreamPromise = changeStreamPromise.then(results => {
251-
return changeStream.next().then(result => {
252-
results.push(result);
253-
return results;
254-
});
255-
});
256-
}
257-
258-
return changeStreamPromise.then(
259-
result => close(null, result),
260-
err => close(err)
261-
);
262-
}
263-
264-
function runOperations(client, operations) {
265-
return operations
266-
.map(op => makeOperation(client, op))
267-
.reduce((p, op) => p.then(op), delay(200));
268-
}
269-
270-
function makeChangeStreamCloseFn(changeStream): (error?: any, value?: any) => Promise<unknown> {
271-
return function close(error, value) {
272-
return new Promise((resolve, reject) => {
273-
changeStream.close(err => {
274-
if (error || err) {
275-
return reject(error || err);
276-
}
277-
return resolve(value);
278-
});
279-
});
280-
};
281-
}
282-
283-
function normalizeAPMEvent(raw) {
284-
const rawKeys = Object.keys(raw);
285-
rawKeys.sort();
286-
expect(rawKeys, 'test runner only supports these keys, is there a new one?').to.deep.equal([
287-
'command',
288-
'command_name',
289-
'database_name'
290-
]);
291-
return {
292-
command: raw.command,
293-
commandName: raw.command_name,
294-
databaseName: raw.database_name
295-
};
296-
}
297-
298-
function makeOperation(client: MongoClient, op: Document) {
299-
const collection = client.db(op.database).collection(op.collection);
300-
switch (op.name) {
301-
case 'insertOne':
302-
expect(op.arguments).to.have.property('document').that.is.an('object');
303-
return () => collection.insertOne(op.arguments.document);
304-
case 'updateOne':
305-
expect(op.arguments).to.have.property('filter').that.is.an('object');
306-
expect(op.arguments).to.have.property('update').that.is.an('object');
307-
return () => collection.updateOne(op.arguments.filter, op.arguments.update);
308-
case 'replaceOne':
309-
expect(op.arguments).to.have.property('filter').that.is.an('object');
310-
expect(op.arguments).to.have.property('replacement').that.is.an('object');
311-
return () => collection.replaceOne(op.arguments.filter, op.arguments.replacement);
312-
case 'deleteOne':
313-
expect(op.arguments).to.have.property('filter').that.is.an('object');
314-
return () => collection.deleteOne(op.arguments.filter);
315-
case 'rename':
316-
expect(op.arguments).to.have.property('to').that.is.a('string');
317-
return () => collection.rename(op.arguments.to);
318-
case 'drop':
319-
return () => collection.drop();
320-
default:
321-
throw new Error(`runner does not support ${op.name}`);
322-
}
323-
}
7+
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
3248
});

0 commit comments

Comments
 (0)