From faddd38b2be30d8021a37df0628dc8440e010871 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 18 Oct 2022 14:51:45 -0400 Subject: [PATCH 1/8] test: unskip tests --- .../change-streams/change_stream.test.ts | 50 ------------------- .../change_streams.spec.test.ts | 6 +-- .../unified_test_format.spec.test.ts | 4 -- 3 files changed, 1 insertion(+), 59 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a22e4bcf11..3b31d49972 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1037,56 +1037,6 @@ describe('Change Streams', function () { }); describe('Change Stream Resume Error Tests', function () { - describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () { - let client: MongoClient; - let changeStream: ChangeStream; - - beforeEach(async function () { - client = this.configuration.newClient(); - await client.connect(); - }); - - afterEach(async function () { - await changeStream.close(); - await client.close(); - }); - - it('should support consecutive resumes', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } }, - async test() { - const failCommand: FailPoint = { - configureFailPoint: 'failCommand', - mode: { - times: 2 - }, - data: { - failCommands: ['getMore'], - closeConnection: true - } - }; - - await client.db('admin').command(failCommand); - - const collection = client.db('test_consecutive_resume').collection('collection'); - - changeStream = collection.watch([], { batchSize: 1 }); - - await initIteratorMode(changeStream); - - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - - await sleep(1000); - - for (let i = 0; i < 3; ++i) { - const change = await changeStream.next(); - expect(change).not.to.be.null; - } - } - }); - }); - it.skip('should continue piping changes after a resumable error', { metadata: { requires: { topology: 'replicaset' } }, test: done => { diff --git a/test/integration/change-streams/change_streams.spec.test.ts b/test/integration/change-streams/change_streams.spec.test.ts index 48a56b99df..451acc7a2d 100644 --- a/test/integration/change-streams/change_streams.spec.test.ts +++ b/test/integration/change-streams/change_streams.spec.test.ts @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; describe('Change Streams Spec - Unified', function () { - runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test => - test.description === 'Test consecutive resume' - ? 'TODO(NODE-4670): fix consecutive resume change stream test' - : false - ); + runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified'))); }); diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 9d1699cf54..0af0a0fe68 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => { return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0'; } - if (description === 'Test consecutive resume') { - return 'TODO(NODE-4670): fix consecutive resume change stream test'; - } - if ( process.env.AUTH === 'auth' && [ From f0b4589ed182b7ee429b330b350b2bbf7d034c09 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 20 Oct 2022 11:13:41 -0400 Subject: [PATCH 2/8] fix: loop in cs iterator helpers --- src/change_stream.ts | 55 ++++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 0772bf701e..2781484fa3 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,21 +648,21 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const hasNext = await this.cursor.hasNext(); - return hasNext; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const hasNext = await this.cursor.hasNext(); return hasNext; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -675,23 +675,22 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.next(); - const processedChange = this._processChange(change ?? null); - return processedChange; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); return processedChange; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -706,21 +705,21 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.tryNext(); - return change ?? null; - } catch (error) { + for (;;) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.tryNext(); return change ?? null; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); From ed39bd7b587b436fe9358b513725d443cbe6ceac Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 20 Oct 2022 14:25:35 -0400 Subject: [PATCH 3/8] test: add consecutive resume tests --- .../change-streams/change_stream.test.ts | 149 +++++++++++++++++- 1 file changed, 148 insertions(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 3b31d49972..fe9528efc8 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -10,6 +10,7 @@ import { promisify } from 'util'; import { AbstractCursor, ChangeStream, + ChangeStreamDocument, ChangeStreamOptions, Collection, CommandStartedEvent, @@ -1610,7 +1611,7 @@ describe('Change Streams', function () { }); }); -describe('ChangeStream resumability', function () { +describe.only('ChangeStream resumability', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; @@ -1717,7 +1718,44 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [_, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changeStream.next() + ]); + + const change = (value as PromiseFulfilledResult).value; + + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -1846,6 +1884,42 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [_, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changeStream.hasNext() + ]); + + const change = (value as PromiseFulfilledResult).value; + + expect(change).to.be.true; + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } for (const { error, code, message } of resumableErrorCodes) { @@ -1983,6 +2057,42 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + try { + // tryNext is not blocking and on sharded clusters we don't have control of when + // the actual change event will be ready on the change stream pipeline. This introduces + // a race condition, where sometimes we receive the change event and sometimes + // we don't when we call tryNext, depending on the timing of the sharded cluster. + + // Since we really only care about the resumability, it's enough for this test to throw + // if tryNext ever throws and assert on the number of aggregate events. + await changeStream.tryNext(); + } catch (err) { + expect.fail(`expected tryNext to resume, received error instead: ${err}`); + } + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } for (const { error, code, message } of resumableErrorCodes) { @@ -2121,6 +2231,43 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + const changes = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [_, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changes + ]); + + const [change] = (value as PromiseFulfilledResult).value; + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } it( From 27ca1fdbaca7df9fa889befd5b11c520380d5f2b Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 20 Oct 2022 14:38:43 -0400 Subject: [PATCH 4/8] remove .only --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index fe9528efc8..16b32b465e 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1611,7 +1611,7 @@ describe('Change Streams', function () { }); }); -describe.only('ChangeStream resumability', function () { +describe('ChangeStream resumability', function () { let client: MongoClient; let collection: Collection; let changeStream: ChangeStream; From 84c4c7862c7d5a7bc3825cdc54f866f0762e5025 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Thu, 20 Oct 2022 16:29:26 -0400 Subject: [PATCH 5/8] fix: lint --- test/integration/change-streams/change_stream.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 16b32b465e..a4f6999e44 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1742,7 +1742,7 @@ describe('ChangeStream resumability', function () { // resuming a change stream don't return the change event. So we defer the insert until a period of time // after the change stream has started listening for a change. 2000ms is long enough for the change // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. - const [_, value] = await Promise.allSettled([ + const [, value] = await Promise.allSettled([ sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), changeStream.next() ]); @@ -1908,7 +1908,7 @@ describe('ChangeStream resumability', function () { // resuming a change stream don't return the change event. So we defer the insert until a period of time // after the change stream has started listening for a change. 2000ms is long enough for the change // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. - const [_, value] = await Promise.allSettled([ + const [, value] = await Promise.allSettled([ sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), changeStream.hasNext() ]); @@ -2257,7 +2257,7 @@ describe('ChangeStream resumability', function () { // resuming a change stream don't return the change event. So we defer the insert until a period of time // after the change stream has started listening for a change. 2000ms is long enough for the change // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. - const [_, value] = await Promise.allSettled([ + const [, value] = await Promise.allSettled([ sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), changes ]); From 31ce275f990e226e215936381513174e128dda40 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 25 Oct 2022 13:25:50 -0400 Subject: [PATCH 6/8] use while(true) instead of for(;;) --- src/change_stream.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 2781484fa3..48e9a253ba 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,7 +648,7 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + while(true) { try { const hasNext = await this.cursor.hasNext(); return hasNext; @@ -675,7 +675,7 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + while(true) { try { const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); @@ -705,7 +705,7 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + while(true) { try { const change = await this.cursor.tryNext(); return change ?? null; From fab50dca342e7e73ca69b0e5b1c50d2ce3ecb235 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Wed, 26 Oct 2022 11:16:41 -0400 Subject: [PATCH 7/8] Revert "use while(true) instead of for(;;)" This reverts commit 31ce275f990e226e215936381513174e128dda40. --- src/change_stream.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 48e9a253ba..2781484fa3 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,7 +648,7 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - while(true) { + for (;;) { try { const hasNext = await this.cursor.hasNext(); return hasNext; @@ -675,7 +675,7 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - while(true) { + for (;;) { try { const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); @@ -705,7 +705,7 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - while(true) { + for (;;) { try { const change = await this.cursor.tryNext(); return change ?? null; From c9af754caaf69037962a2d448d6121453f98dbb5 Mon Sep 17 00:00:00 2001 From: Bailey Pearson Date: Tue, 25 Oct 2022 13:25:50 -0400 Subject: [PATCH 8/8] use while(true) instead of for(;;) w/ lint disable comments --- src/change_stream.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 2781484fa3..0304a527c3 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,7 +648,11 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { const hasNext = await this.cursor.hasNext(); return hasNext; @@ -675,7 +679,11 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); @@ -705,7 +713,11 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - for (;;) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { const change = await this.cursor.tryNext(); return change ?? null;