From 95b98a60da318b1c9fa4bbb51631dd75c6a71b8c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Thu, 10 Nov 2022 20:43:53 +0100 Subject: [PATCH 01/14] fix(NODE-4783): handle orphaned operation descriptions --- src/cmap/connection.ts | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 9a617874b3..756dbff7d2 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -369,7 +369,23 @@ export class Connection extends TypedEventEmitter { // always emit the message, in case we are streaming this.emit('message', message); - const operationDescription = this[kQueue].get(message.responseTo); + let operationDescription = this[kQueue].get(message.responseTo); + + if (!operationDescription && this.isMonitoringConnection) { + // NODE-4783: How do we recover from this when the initial hello's requestId is not + // the responseTo when hello responses have been skipped? + // + // Get the first orphaned operation description. + const entry = this[kQueue].entries().next(); + if (entry) { + const [requestId, orphaned]: [number, OperationDescription] = entry.value; + // If the orphaned operation description exists then set it. + operationDescription = orphaned; + // Remove the entry with the bad request id from the queue. + this[kQueue].delete(requestId); + } + } + if (!operationDescription) { return; } @@ -381,7 +397,10 @@ export class Connection extends TypedEventEmitter { // making the `responseTo` change on each response this[kQueue].delete(message.responseTo); if ('moreToCome' in message && message.moreToCome) { - // requeue the callback for next synthetic request + // NODE-4783: If the operation description check above does find an orphaned + // description and sets the operationDescription then this line will put one + // back in the queue with the correct requestId and will resolve not being able + // to find the next one via the responseTo of the next streaming hello. this[kQueue].set(message.requestId, operationDescription); } else if (operationDescription.socketTimeoutOverride) { this[kStream].setTimeout(this.socketTimeoutMS); From fd9c7d38a0df82f4d9d8c02b133bb6e942683bb8 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 16:22:10 +0100 Subject: [PATCH 02/14] test(NODE-4783): add onMessage unit tests --- test/unit/cmap/connection.test.ts | 124 +++++++++++++++++++++++++----- 1 file changed, 104 insertions(+), 20 deletions(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index b1b25db28d..e5997a265b 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -4,13 +4,14 @@ import { Socket } from 'net'; import * as sinon from 'sinon'; import { setTimeout } from 'timers'; +import { BinMsg } from '../../../src/cmap/commands'; import { connect } from '../../../src/cmap/connect'; import { Connection, hasSessionSupport } from '../../../src/cmap/connection'; import { MessageStream } from '../../../src/cmap/message_stream'; import { MongoNetworkTimeoutError } from '../../../src/error'; import { isHello, ns } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; -import { getSymbolFrom } from '../../tools/utils'; +import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils'; import { createTimerSandbox } from '../timer_sandbox'; const connectionOptionsDefaults = { @@ -22,6 +23,25 @@ const connectionOptionsDefaults = { loadBalanced: false }; +/** The absolute minimum socket API needed by Connection as of writing this test */ +class FakeSocket extends EventEmitter { + address() { + // is never called + } + pipe() { + // does not need to do anything + } + destroy() { + // is called, has no side effects + } + get remoteAddress() { + return 'iLoveJavaScript'; + } + get remotePort() { + return 123; + } +} + describe('new Connection()', function () { let server; after(() => mock.cleanup()); @@ -137,6 +157,89 @@ describe('new Connection()', function () { }); }); + describe('#onMessage', function () { + context('when the connection is a monitoring connection', function () { + let queue: Map; + let driverSocket: FakeSocket; + let connection: Connection; + + beforeEach(function () { + driverSocket = sinon.spy(new FakeSocket()); + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + }); + + context('when requestId/responseTo do not match', function () { + let callbackSpy; + const document = { ok: 1 }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + // Emit a message that won't match the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: msg.readInt32LE(4), + responseTo: msg.readInt32LE(8), + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls the operation description callback with the document', function () { + expect(callbackSpy).to.be.calledWith(undefined, document); + }); + }); + + context('when requestId/reponseTo match', function () { + let callbackSpy; + const document = { ok: 1 }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + // Emit a message that matches the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 2, + responseTo: 1, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls the operation description callback with the document', function () { + expect(callbackSpy).to.be.calledWith(undefined, document); + }); + }); + }); + }); + describe('onTimeout()', () => { let connection: sinon.SinonSpiedInstance; let clock: sinon.SinonFakeTimers; @@ -146,25 +249,6 @@ describe('new Connection()', function () { let kDelayedTimeoutId: symbol; let NodeJSTimeoutClass: any; - /** The absolute minimum socket API needed by Connection as of writing this test */ - class FakeSocket extends EventEmitter { - address() { - // is never called - } - pipe() { - // does not need to do anything - } - destroy() { - // is called, has no side effects - } - get remoteAddress() { - return 'iLoveJavaScript'; - } - get remotePort() { - return 123; - } - } - beforeEach(() => { timerSandbox = createTimerSandbox(); clock = sinon.useFakeTimers(); From a63e9ec13f3b877afbfa918eeb4a625741c970b6 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 20:42:34 +0100 Subject: [PATCH 03/14] Update test/unit/cmap/connection.test.ts Co-authored-by: Bailey Pearson --- test/unit/cmap/connection.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index e5997a265b..b8ccf2dcf3 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -201,7 +201,7 @@ describe('new Connection()', function () { }); it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledWith(undefined, document); + expect(callbackSpy).to.be.calledExactlyOnceWith(undefined, document); }); }); From f96d22db6b5cb3b21dfb279104256db15369d491 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 20:42:44 +0100 Subject: [PATCH 04/14] Update test/unit/cmap/connection.test.ts Co-authored-by: Bailey Pearson --- test/unit/cmap/connection.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index b8ccf2dcf3..7870c58660 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -234,7 +234,7 @@ describe('new Connection()', function () { }); it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledWith(undefined, document); + expect(callbackSpy).to.be.calledExactlyOnceWith(undefined, document); }); }); }); From c4f7476319dae1657b7f316425c43665c70cb84b Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 21:28:42 +0100 Subject: [PATCH 05/14] test(NODE-4783): add half integration test --- test/tools/utils.ts | 12 +++++ test/unit/cmap/connection.test.ts | 64 +++++++++++++++++++++++---- test/unit/cmap/message_stream.test.js | 14 +----- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 0ecaaea85a..383115fa45 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -354,6 +354,18 @@ export class TestBuilder { } } +export function bufferToStream(buffer) { + const stream = new Readable(); + if (Array.isArray(buffer)) { + buffer.forEach(b => stream.push(b)); + } else { + stream.push(buffer); + } + + stream.push(null); + return stream; +} + export function generateOpMsgBuffer(document: Document): Buffer { const header = Buffer.alloc(4 * 4 + 4); diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 7870c58660..357ad31d56 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,7 +1,8 @@ import { expect } from 'chai'; -import { EventEmitter } from 'events'; +import { EventEmitter, on } from 'events'; import { Socket } from 'net'; import * as sinon from 'sinon'; +import { Readable } from 'stream'; import { setTimeout } from 'timers'; import { BinMsg } from '../../../src/cmap/commands'; @@ -165,11 +166,44 @@ describe('new Connection()', function () { beforeEach(function () { driverSocket = sinon.spy(new FakeSocket()); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); - connection.isMonitoringConnection = true; - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; + }); + + context('when multiple hellos exist on the stream', function () { + let callbackSpy; + const inputStream = new Readable(); + const document = { ok: 1 }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + const firstHello = generateOpMsgBuffer(document); + const secondHello = generateOpMsgBuffer(document); + const thirdHello = generateOpMsgBuffer(document); + const buffer = Buffer.concat([firstHello, secondHello, thirdHello]); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + + // Push the buffer of 3 hellos to the input stream + inputStream.push(buffer); + inputStream.push(null); + }); + + it('calls the operation description callback with the document', async function () { + await on(inputStream, 'message'); + expect(callbackSpy).to.be.calledOnceWith(undefined, document); + }); }); context('when requestId/responseTo do not match', function () { @@ -178,6 +212,13 @@ describe('new Connection()', function () { beforeEach(function () { callbackSpy = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + // Create the operation description. const operationDescription: OperationDescription = { requestId: 1, @@ -201,7 +242,7 @@ describe('new Connection()', function () { }); it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledExactlyOnceWith(undefined, document); + expect(callbackSpy).to.be.calledOnceWith(undefined, document); }); }); @@ -211,6 +252,13 @@ describe('new Connection()', function () { beforeEach(function () { callbackSpy = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + // Create the operation description. const operationDescription: OperationDescription = { requestId: 1, @@ -234,7 +282,7 @@ describe('new Connection()', function () { }); it('calls the operation description callback with the document', function () { - expect(callbackSpy).to.be.calledExactlyOnceWith(undefined, document); + expect(callbackSpy).to.be.calledOnceWith(undefined, document); }); }); }); diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 3158a9144a..69ffc08496 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -6,19 +6,7 @@ const { MessageStream } = require('../../../src/cmap/message_stream'); const { Msg } = require('../../../src/cmap/commands'); const expect = require('chai').expect; const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); -const { generateOpMsgBuffer } = require('../../tools/utils'); - -function bufferToStream(buffer) { - const stream = new Readable(); - if (Array.isArray(buffer)) { - buffer.forEach(b => stream.push(b)); - } else { - stream.push(buffer); - } - - stream.push(null); - return stream; -} +const { bufferToStream, generateOpMsgBuffer } = require('../../tools/utils'); describe('MessageStream', function () { context('when the stream is for a monitoring connection', function () { From a10e077d9aa8f565278b6c3cd4fdbd712bc0b506 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 21:36:52 +0100 Subject: [PATCH 06/14] chore: remove ts ignore --- test/unit/cmap/connection.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 357ad31d56..442a23c6fc 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -180,7 +180,6 @@ describe('new Connection()', function () { const thirdHello = generateOpMsgBuffer(document); const buffer = Buffer.concat([firstHello, secondHello, thirdHello]); - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults)); connection.isMonitoringConnection = true; const queueSymbol = getSymbolFrom(connection, 'queue'); From 4881e4de32f15156880327d436fd1ef5b7381a3e Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 21:59:28 +0100 Subject: [PATCH 07/14] fix: import readable --- test/tools/utils.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 383115fa45..35b17b5e1c 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,6 +1,7 @@ import { EJSON } from 'bson'; import * as BSON from 'bson'; import { expect } from 'chai'; +import { Readable } from 'stream'; import { setTimeout } from 'timers'; import { inspect, promisify } from 'util'; From cf2650e550e6fdec9e3591586472f6403ec72777 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 23:27:33 +0100 Subject: [PATCH 08/14] feat: error on invalid queue size --- src/cmap/connection.ts | 26 ++++++++---- test/unit/cmap/connection.test.ts | 70 +++++++++++++++++++++++++++---- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 756dbff7d2..3b2383f85c 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -17,6 +17,7 @@ import { MongoMissingDependencyError, MongoNetworkError, MongoNetworkTimeoutError, + MongoRuntimeError, MongoServerError, MongoWriteConcernError } from '../error'; @@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter'); /** @internal */ const kDelayedTimeoutId = Symbol('delayedTimeoutId'); +const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description'; + /** @internal */ export interface CommandOptions extends BSONSerializeOptions { command?: boolean; @@ -374,15 +377,20 @@ export class Connection extends TypedEventEmitter { if (!operationDescription && this.isMonitoringConnection) { // NODE-4783: How do we recover from this when the initial hello's requestId is not // the responseTo when hello responses have been skipped? - // - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); + + // First check if the map is of invalid size + if (this[kQueue].size > 1) { + this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); + } else { + // Get the first orphaned operation description. + const entry = this[kQueue].entries().next(); + if (entry) { + const [requestId, orphaned]: [number, OperationDescription] = entry.value; + // If the orphaned operation description exists then set it. + operationDescription = orphaned; + // Remove the entry with the bad request id from the queue. + this[kQueue].delete(requestId); + } } } diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 442a23c6fc..a9b95bcea2 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { EventEmitter, on } from 'events'; +import { EventEmitter, once } from 'events'; import { Socket } from 'net'; import * as sinon from 'sinon'; import { Readable } from 'stream'; @@ -9,7 +9,7 @@ import { BinMsg } from '../../../src/cmap/commands'; import { connect } from '../../../src/cmap/connect'; import { Connection, hasSessionSupport } from '../../../src/cmap/connection'; import { MessageStream } from '../../../src/cmap/message_stream'; -import { MongoNetworkTimeoutError } from '../../../src/error'; +import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error'; import { isHello, ns } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils'; @@ -172,12 +172,13 @@ describe('new Connection()', function () { let callbackSpy; const inputStream = new Readable(); const document = { ok: 1 }; + const last = { isWritablePrimary: true }; beforeEach(function () { callbackSpy = sinon.spy(); const firstHello = generateOpMsgBuffer(document); const secondHello = generateOpMsgBuffer(document); - const thirdHello = generateOpMsgBuffer(document); + const thirdHello = generateOpMsgBuffer(last); const buffer = Buffer.concat([firstHello, secondHello, thirdHello]); connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults)); @@ -199,9 +200,10 @@ describe('new Connection()', function () { inputStream.push(null); }); - it('calls the operation description callback with the document', async function () { - await on(inputStream, 'message'); - expect(callbackSpy).to.be.calledOnceWith(undefined, document); + it('calls the callback with the last hello document', async function () { + const messages = await once(connection, 'message'); + expect(messages[0].responseTo).to.equal(0); + expect(callbackSpy).to.be.calledOnceWith(undefined, last); }); }); @@ -230,8 +232,8 @@ describe('new Connection()', function () { const msg = generateOpMsgBuffer(document); const msgHeader: MessageHeader = { length: msg.readInt32LE(0), - requestId: msg.readInt32LE(4), - responseTo: msg.readInt32LE(8), + requestId: 1, + responseTo: 0, // This will not match. opCode: msg.readInt32LE(12) }; const msgBody = msg.subarray(16); @@ -284,6 +286,58 @@ describe('new Connection()', function () { expect(callbackSpy).to.be.calledOnceWith(undefined, document); }); }); + + context('when more than one operation description is in the queue', function () { + let spyOne; + let spyTwo; + const document = { ok: 1 }; + + beforeEach(function () { + spyOne = sinon.spy(); + spyTwo = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation descriptions. + const descriptionOne: OperationDescription = { + requestId: 1, + cb: spyOne + }; + const descriptionTwo: OperationDescription = { + requestId: 2, + cb: spyTwo + }; + + // Stick an operation description in the queue. + queue.set(2, descriptionOne); + queue.set(3, descriptionTwo); + // Emit a message that matches the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 2, + responseTo: 1, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls all operation description callbacks with an error', function () { + expect(spyOne).to.be.calledOnce; + expect(spyTwo).to.be.calledOnce; + const errorOne = spyOne.firstCall.args[0]; + const errorTwo = spyTwo.firstCall.args[0]; + expect(errorOne).to.be.instanceof(MongoRuntimeError); + expect(errorTwo).to.be.instanceof(MongoRuntimeError); + }); + }); }); }); From f96e664b05eabc3fdb985366e97dbf35888c7340 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 23:44:30 +0100 Subject: [PATCH 09/14] chore: update comment --- src/cmap/connection.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 3b2383f85c..c7cf5fdae7 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -375,8 +375,8 @@ export class Connection extends TypedEventEmitter { let operationDescription = this[kQueue].get(message.responseTo); if (!operationDescription && this.isMonitoringConnection) { - // NODE-4783: How do we recover from this when the initial hello's requestId is not - // the responseTo when hello responses have been skipped? + // This is how we recover when the initial hello's requestId is not + // the responseTo when hello responses have been skipped: // First check if the map is of invalid size if (this[kQueue].size > 1) { From 64205c3a1b592269b2e1c0e4f4d993da09fdb0e6 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Tue, 15 Nov 2022 23:45:36 +0100 Subject: [PATCH 10/14] chore: update comment --- src/cmap/connection.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index c7cf5fdae7..7aa913fbe4 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -405,7 +405,7 @@ export class Connection extends TypedEventEmitter { // making the `responseTo` change on each response this[kQueue].delete(message.responseTo); if ('moreToCome' in message && message.moreToCome) { - // NODE-4783: If the operation description check above does find an orphaned + // If the operation description check above does find an orphaned // description and sets the operationDescription then this line will put one // back in the queue with the correct requestId and will resolve not being able // to find the next one via the responseTo of the next streaming hello. From c670bda3ef518a31dc56455898092df4128cf337 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 16 Nov 2022 00:03:57 +0100 Subject: [PATCH 11/14] chore: move logic out of monitoring check --- src/cmap/connection.ts | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 7aa913fbe4..2ae0d5553b 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -374,23 +374,24 @@ export class Connection extends TypedEventEmitter { this.emit('message', message); let operationDescription = this[kQueue].get(message.responseTo); + // Protect against multiplexing. + if (this[kQueue].size > 1) { + this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); + return; + } + if (!operationDescription && this.isMonitoringConnection) { // This is how we recover when the initial hello's requestId is not // the responseTo when hello responses have been skipped: - // First check if the map is of invalid size - if (this[kQueue].size > 1) { - this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); - } else { - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); - } + // Get the first orphaned operation description. + const entry = this[kQueue].entries().next(); + if (entry) { + const [requestId, orphaned]: [number, OperationDescription] = entry.value; + // If the orphaned operation description exists then set it. + operationDescription = orphaned; + // Remove the entry with the bad request id from the queue. + this[kQueue].delete(requestId); } } From 7f977d03b780a4f6f87169941aa2fcf63b909a0c Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 16 Nov 2022 00:13:50 +0100 Subject: [PATCH 12/14] test: add non monitoring test --- test/unit/cmap/connection.test.ts | 61 +++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index a9b95bcea2..79b3ee7e30 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -159,6 +159,67 @@ describe('new Connection()', function () { }); describe('#onMessage', function () { + context('when the connection is not a monitoring connection', function () { + let queue: Map; + let driverSocket: FakeSocket; + let connection: Connection; + + beforeEach(function () { + driverSocket = sinon.spy(new FakeSocket()); + }); + + context('when more than one operation description is in the queue', function () { + let spyOne; + let spyTwo; + const document = { ok: 1 }; + + beforeEach(function () { + spyOne = sinon.spy(); + spyTwo = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = new Connection(driverSocket, connectionOptionsDefaults); + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation descriptions. + const descriptionOne: OperationDescription = { + requestId: 1, + cb: spyOne + }; + const descriptionTwo: OperationDescription = { + requestId: 2, + cb: spyTwo + }; + + // Stick an operation description in the queue. + queue.set(2, descriptionOne); + queue.set(3, descriptionTwo); + // Emit a message that matches the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 2, + responseTo: 1, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls all operation description callbacks with an error', function () { + expect(spyOne).to.be.calledOnce; + expect(spyTwo).to.be.calledOnce; + const errorOne = spyOne.firstCall.args[0]; + const errorTwo = spyTwo.firstCall.args[0]; + expect(errorOne).to.be.instanceof(MongoRuntimeError); + expect(errorTwo).to.be.instanceof(MongoRuntimeError); + }); + }); + }); + context('when the connection is a monitoring connection', function () { let queue: Map; let driverSocket: FakeSocket; From 5887b1f3695311bb3dca25ef9003de90cb80632d Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 16 Nov 2022 00:29:32 +0100 Subject: [PATCH 13/14] Revert "test: add non monitoring test" This reverts commit 7f977d03b780a4f6f87169941aa2fcf63b909a0c. --- test/unit/cmap/connection.test.ts | 61 ------------------------------- 1 file changed, 61 deletions(-) diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index 79b3ee7e30..a9b95bcea2 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -159,67 +159,6 @@ describe('new Connection()', function () { }); describe('#onMessage', function () { - context('when the connection is not a monitoring connection', function () { - let queue: Map; - let driverSocket: FakeSocket; - let connection: Connection; - - beforeEach(function () { - driverSocket = sinon.spy(new FakeSocket()); - }); - - context('when more than one operation description is in the queue', function () { - let spyOne; - let spyTwo; - const document = { ok: 1 }; - - beforeEach(function () { - spyOne = sinon.spy(); - spyTwo = sinon.spy(); - - // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay - connection = new Connection(driverSocket, connectionOptionsDefaults); - const queueSymbol = getSymbolFrom(connection, 'queue'); - queue = connection[queueSymbol]; - - // Create the operation descriptions. - const descriptionOne: OperationDescription = { - requestId: 1, - cb: spyOne - }; - const descriptionTwo: OperationDescription = { - requestId: 2, - cb: spyTwo - }; - - // Stick an operation description in the queue. - queue.set(2, descriptionOne); - queue.set(3, descriptionTwo); - // Emit a message that matches the existing operation description. - const msg = generateOpMsgBuffer(document); - const msgHeader: MessageHeader = { - length: msg.readInt32LE(0), - requestId: 2, - responseTo: 1, - opCode: msg.readInt32LE(12) - }; - const msgBody = msg.subarray(16); - - const message = new BinMsg(msg, msgHeader, msgBody); - connection.onMessage(message); - }); - - it('calls all operation description callbacks with an error', function () { - expect(spyOne).to.be.calledOnce; - expect(spyTwo).to.be.calledOnce; - const errorOne = spyOne.firstCall.args[0]; - const errorTwo = spyTwo.firstCall.args[0]; - expect(errorOne).to.be.instanceof(MongoRuntimeError); - expect(errorTwo).to.be.instanceof(MongoRuntimeError); - }); - }); - }); - context('when the connection is a monitoring connection', function () { let queue: Map; let driverSocket: FakeSocket; From a0306ce385e5924c94156f654fd3c0e76019ffd4 Mon Sep 17 00:00:00 2001 From: Durran Jordan Date: Wed, 16 Nov 2022 00:30:01 +0100 Subject: [PATCH 14/14] Revert "chore: move logic out of monitoring check" This reverts commit c670bda3ef518a31dc56455898092df4128cf337. --- src/cmap/connection.ts | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 2ae0d5553b..7aa913fbe4 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -374,24 +374,23 @@ export class Connection extends TypedEventEmitter { this.emit('message', message); let operationDescription = this[kQueue].get(message.responseTo); - // Protect against multiplexing. - if (this[kQueue].size > 1) { - this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); - return; - } - if (!operationDescription && this.isMonitoringConnection) { // This is how we recover when the initial hello's requestId is not // the responseTo when hello responses have been skipped: - // Get the first orphaned operation description. - const entry = this[kQueue].entries().next(); - if (entry) { - const [requestId, orphaned]: [number, OperationDescription] = entry.value; - // If the orphaned operation description exists then set it. - operationDescription = orphaned; - // Remove the entry with the bad request id from the queue. - this[kQueue].delete(requestId); + // First check if the map is of invalid size + if (this[kQueue].size > 1) { + this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); + } else { + // Get the first orphaned operation description. + const entry = this[kQueue].entries().next(); + if (entry) { + const [requestId, orphaned]: [number, OperationDescription] = entry.value; + // If the orphaned operation description exists then set it. + operationDescription = orphaned; + // Remove the entry with the bad request id from the queue. + this[kQueue].delete(requestId); + } } }