Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4735): fix change stream consecutive resumabilty #3453

Merged
merged 8 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -648,21 +648,21 @@ export class ChangeStream<
hasNext(callback?: Callback): Promise<boolean> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
for (;;) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
dariakp marked this conversation as resolved.
Show resolved Hide resolved
try {
await this._processErrorIteratorMode(error);
const hasNext = await this.cursor.hasNext();
return hasNext;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -675,23 +675,22 @@ export class ChangeStream<
next(callback?: Callback<TChange>): Promise<TChange> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
for (;;) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.next();
const processedChange = this._processChange(change ?? null);
return processedChange;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand All @@ -706,21 +705,21 @@ export class ChangeStream<
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
this._setIsIterator();
return maybeCallback(async () => {
try {
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
for (;;) {
baileympearson marked this conversation as resolved.
Show resolved Hide resolved
try {
await this._processErrorIteratorMode(error);
const change = await this.cursor.tryNext();
return change ?? null;
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
await this._processErrorIteratorMode(error);
} catch (error) {
try {
await this.close();
} catch {
// We are not concerned with errors from close()
}
throw error;
}
throw error;
}
}
}, callback);
Expand Down
197 changes: 147 additions & 50 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { promisify } from 'util';
import {
AbstractCursor,
ChangeStream,
ChangeStreamDocument,
ChangeStreamOptions,
Collection,
CommandStartedEvent,
Expand Down Expand Up @@ -1037,56 +1038,6 @@ describe('Change Streams', function () {
});

describe('Change Stream Resume Error Tests', function () {
describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () {
let client: MongoClient;
let changeStream: ChangeStream;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();
});

afterEach(async function () {
await changeStream.close();
await client.close();
});

it('should support consecutive resumes', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } },
async test() {
const failCommand: FailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 2
},
data: {
failCommands: ['getMore'],
closeConnection: true
}
};

await client.db('admin').command(failCommand);

const collection = client.db('test_consecutive_resume').collection('collection');

changeStream = collection.watch([], { batchSize: 1 });

await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });
await collection.insertOne({ name: 'bumpy' });

await sleep(1000);

for (let i = 0; i < 3; ++i) {
const change = await changeStream.next();
expect(change).not.to.be.null;
}
}
});
});

it.skip('should continue piping changes after a resumable error', {
metadata: { requires: { topology: 'replicaset' } },
test: done => {
Expand Down Expand Up @@ -1767,7 +1718,44 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that these test additions will likely result in merge conflicts for #3454 and we should be extra careful in that PR, once it's rebased, that all these tests are pulled in

{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changeStream.next()
]);

const change = (value as PromiseFulfilledResult<ChangeStreamDocument>).value;

expect(change).to.have.property('operationType', 'insert');

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
Expand Down Expand Up @@ -1896,6 +1884,42 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changeStream.hasNext()
]);

const change = (value as PromiseFulfilledResult<boolean>).value;

expect(change).to.be.true;

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
Expand Down Expand Up @@ -2033,6 +2057,42 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} ${error}`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

try {
// tryNext is not blocking and on sharded clusters we don't have control of when
// the actual change event will be ready on the change stream pipeline. This introduces
// a race condition, where sometimes we receive the change event and sometimes
// we don't when we call tryNext, depending on the timing of the sharded cluster.

// Since we really only care about the resumability, it's enough for this test to throw
// if tryNext ever throws and assert on the number of aggregate events.
await changeStream.tryNext();
} catch (err) {
expect.fail(`expected tryNext to resume, received error instead: ${err}`);
}

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

for (const { error, code, message } of resumableErrorCodes) {
Expand Down Expand Up @@ -2171,6 +2231,43 @@ describe('ChangeStream resumability', function () {
expect(aggregateEvents).to.have.lengthOf(2);
}
);

it(
`supports consecutive resumes on error code ${code} (${error})`,
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

await client.db('admin').command({
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 5 },
data: {
failCommands: ['getMore'],
errorCode: code,
errmsg: message
}
} as FailPoint);

const changes = once(changeStream, 'change');
await once(changeStream.cursor, 'init');

// There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when
// resuming a change stream don't return the change event. So we defer the insert until a period of time
// after the change stream has started listening for a change. 2000ms is long enough for the change
// stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding.
const [, value] = await Promise.allSettled([
sleep(2000).then(() => collection.insertOne({ name: 'bailey' })),
changes
]);

const [change] = (value as PromiseFulfilledResult<ChangeStreamDocument[]>).value;
expect(change).to.have.property('operationType', 'insert');

expect(aggregateEvents).to.have.lengthOf(6);
}
);
}

it(
Expand Down
6 changes: 1 addition & 5 deletions test/integration/change-streams/change_streams.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('Change Streams Spec - Unified', function () {
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test =>
test.description === 'Test consecutive resume'
? 'TODO(NODE-4670): fix consecutive resume change stream test'
: false
);
runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')));
});
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => {
return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0';
}

if (description === 'Test consecutive resume') {
return 'TODO(NODE-4670): fix consecutive resume change stream test';
}

if (
process.env.AUTH === 'auth' &&
[
Expand Down