Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4783): handle orphaned operation descriptions #3463

Merged
merged 14 commits into from
Nov 16, 2022
32 changes: 30 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
MongoMissingDependencyError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoWriteConcernError
} from '../error';
Expand Down Expand Up @@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter');
/** @internal */
const kDelayedTimeoutId = Symbol('delayedTimeoutId');

const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description';

/** @internal */
export interface CommandOptions extends BSONSerializeOptions {
command?: boolean;
Expand Down Expand Up @@ -369,7 +372,29 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

// always emit the message, in case we are streaming
this.emit('message', message);
const operationDescription = this[kQueue].get(message.responseTo);
let operationDescription = this[kQueue].get(message.responseTo);

// Protect against multiplexing.
if (this[kQueue].size > 1) {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE));
return;
}

if (!operationDescription && this.isMonitoringConnection) {
jyemin marked this conversation as resolved.
Show resolved Hide resolved
// This is how we recover when the initial hello's requestId is not
// the responseTo when hello responses have been skipped:

// Get the first orphaned operation description.
const entry = this[kQueue].entries().next();
dariakp marked this conversation as resolved.
Show resolved Hide resolved
if (entry) {
const [requestId, orphaned]: [number, OperationDescription] = entry.value;
// If the orphaned operation description exists then set it.
operationDescription = orphaned;
// Remove the entry with the bad request id from the queue.
this[kQueue].delete(requestId);
}
}

if (!operationDescription) {
return;
durran marked this conversation as resolved.
Show resolved Hide resolved
}
Expand All @@ -381,7 +406,10 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
// making the `responseTo` change on each response
this[kQueue].delete(message.responseTo);
if ('moreToCome' in message && message.moreToCome) {
// requeue the callback for next synthetic request
// If the operation description check above does find an orphaned
// description and sets the operationDescription then this line will put one
// back in the queue with the correct requestId and will resolve not being able
// to find the next one via the responseTo of the next streaming hello.
this[kQueue].set(message.requestId, operationDescription);
} else if (operationDescription.socketTimeoutOverride) {
this[kStream].setTimeout(this.socketTimeoutMS);
Expand Down
13 changes: 13 additions & 0 deletions test/tools/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { EJSON } from 'bson';
import * as BSON from 'bson';
import { expect } from 'chai';
import { Readable } from 'stream';
import { setTimeout } from 'timers';
import { inspect, promisify } from 'util';

Expand Down Expand Up @@ -354,6 +355,18 @@ export class TestBuilder {
}
}

export function bufferToStream(buffer) {
durran marked this conversation as resolved.
Show resolved Hide resolved
const stream = new Readable();
if (Array.isArray(buffer)) {
buffer.forEach(b => stream.push(b));
} else {
stream.push(buffer);
}

stream.push(null);
return stream;
}

export function generateOpMsgBuffer(document: Document): Buffer {
const header = Buffer.alloc(4 * 4 + 4);

Expand Down
229 changes: 207 additions & 22 deletions test/unit/cmap/connection.test.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import { expect } from 'chai';
import { EventEmitter } from 'events';
import { EventEmitter, once } from 'events';
import { Socket } from 'net';
import * as sinon from 'sinon';
import { Readable } from 'stream';
import { setTimeout } from 'timers';

import { BinMsg } from '../../../src/cmap/commands';
import { connect } from '../../../src/cmap/connect';
import { Connection, hasSessionSupport } from '../../../src/cmap/connection';
import { MessageStream } from '../../../src/cmap/message_stream';
import { MongoNetworkTimeoutError } from '../../../src/error';
import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error';
import { isHello, ns } from '../../../src/utils';
import * as mock from '../../tools/mongodb-mock/index';
import { getSymbolFrom } from '../../tools/utils';
import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils';
import { createTimerSandbox } from '../timer_sandbox';

const connectionOptionsDefaults = {
Expand All @@ -22,6 +24,25 @@ const connectionOptionsDefaults = {
loadBalanced: false
};

/** The absolute minimum socket API needed by Connection as of writing this test */
class FakeSocket extends EventEmitter {
address() {
// is never called
}
pipe() {
// does not need to do anything
}
destroy() {
// is called, has no side effects
}
get remoteAddress() {
return 'iLoveJavaScript';
}
get remotePort() {
return 123;
}
}

describe('new Connection()', function () {
let server;
after(() => mock.cleanup());
Expand Down Expand Up @@ -137,6 +158,189 @@ describe('new Connection()', function () {
});
});

describe('#onMessage', function () {
context('when the connection is a monitoring connection', function () {
let queue: Map<number, OperationDescription>;
let driverSocket: FakeSocket;
let connection: Connection;

beforeEach(function () {
driverSocket = sinon.spy(new FakeSocket());
});

context('when multiple hellos exist on the stream', function () {
durran marked this conversation as resolved.
Show resolved Hide resolved
let callbackSpy;
const inputStream = new Readable();
const document = { ok: 1 };
const last = { isWritablePrimary: true };

beforeEach(function () {
callbackSpy = sinon.spy();
const firstHello = generateOpMsgBuffer(document);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
const secondHello = generateOpMsgBuffer(document);
const thirdHello = generateOpMsgBuffer(last);
const buffer = Buffer.concat([firstHello, secondHello, thirdHello]);

connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults));
connection.isMonitoringConnection = true;
const queueSymbol = getSymbolFrom(connection, 'queue');
queue = connection[queueSymbol];

// Create the operation description.
const operationDescription: OperationDescription = {
requestId: 1,
cb: callbackSpy
};

// Stick an operation description in the queue.
queue.set(1, operationDescription);

// Push the buffer of 3 hellos to the input stream
inputStream.push(buffer);
inputStream.push(null);
});

it('calls the callback with the last hello document', async function () {
const messages = await once(connection, 'message');
expect(messages[0].responseTo).to.equal(0);
expect(callbackSpy).to.be.calledOnceWith(undefined, last);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
});
});

context('when requestId/responseTo do not match', function () {
let callbackSpy;
const document = { ok: 1 };

beforeEach(function () {
callbackSpy = sinon.spy();

// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
connection.isMonitoringConnection = true;
const queueSymbol = getSymbolFrom(connection, 'queue');
queue = connection[queueSymbol];

// Create the operation description.
const operationDescription: OperationDescription = {
requestId: 1,
cb: callbackSpy
};

// Stick an operation description in the queue.
queue.set(1, operationDescription);
// Emit a message that won't match the existing operation description.
const msg = generateOpMsgBuffer(document);
const msgHeader: MessageHeader = {
length: msg.readInt32LE(0),
requestId: 1,
responseTo: 0, // This will not match.
opCode: msg.readInt32LE(12)
};
const msgBody = msg.subarray(16);

const message = new BinMsg(msg, msgHeader, msgBody);
connection.onMessage(message);
});

it('calls the operation description callback with the document', function () {
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
});
});

context('when requestId/reponseTo match', function () {
let callbackSpy;
const document = { ok: 1 };

beforeEach(function () {
callbackSpy = sinon.spy();

// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
connection.isMonitoringConnection = true;
const queueSymbol = getSymbolFrom(connection, 'queue');
queue = connection[queueSymbol];

// Create the operation description.
const operationDescription: OperationDescription = {
requestId: 1,
cb: callbackSpy
};

// Stick an operation description in the queue.
queue.set(1, operationDescription);
// Emit a message that matches the existing operation description.
const msg = generateOpMsgBuffer(document);
const msgHeader: MessageHeader = {
length: msg.readInt32LE(0),
requestId: 2,
responseTo: 1,
durran marked this conversation as resolved.
Show resolved Hide resolved
opCode: msg.readInt32LE(12)
};
const msgBody = msg.subarray(16);

const message = new BinMsg(msg, msgHeader, msgBody);
connection.onMessage(message);
});

it('calls the operation description callback with the document', function () {
expect(callbackSpy).to.be.calledOnceWith(undefined, document);
});
});

context('when more than one operation description is in the queue', function () {
let spyOne;
let spyTwo;
const document = { ok: 1 };

beforeEach(function () {
spyOne = sinon.spy();
spyTwo = sinon.spy();

// @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay
connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults));
connection.isMonitoringConnection = true;
const queueSymbol = getSymbolFrom(connection, 'queue');
queue = connection[queueSymbol];

// Create the operation descriptions.
const descriptionOne: OperationDescription = {
requestId: 1,
cb: spyOne
};
const descriptionTwo: OperationDescription = {
requestId: 2,
cb: spyTwo
};

// Stick an operation description in the queue.
queue.set(2, descriptionOne);
queue.set(3, descriptionTwo);
// Emit a message that matches the existing operation description.
const msg = generateOpMsgBuffer(document);
const msgHeader: MessageHeader = {
length: msg.readInt32LE(0),
requestId: 2,
responseTo: 1,
opCode: msg.readInt32LE(12)
};
const msgBody = msg.subarray(16);

const message = new BinMsg(msg, msgHeader, msgBody);
connection.onMessage(message);
});

it('calls all operation description callbacks with an error', function () {
expect(spyOne).to.be.calledOnce;
expect(spyTwo).to.be.calledOnce;
const errorOne = spyOne.firstCall.args[0];
const errorTwo = spyTwo.firstCall.args[0];
expect(errorOne).to.be.instanceof(MongoRuntimeError);
expect(errorTwo).to.be.instanceof(MongoRuntimeError);
});
});
});
});

describe('onTimeout()', () => {
let connection: sinon.SinonSpiedInstance<Connection>;
let clock: sinon.SinonFakeTimers;
Expand All @@ -146,25 +350,6 @@ describe('new Connection()', function () {
let kDelayedTimeoutId: symbol;
let NodeJSTimeoutClass: any;

/** The absolute minimum socket API needed by Connection as of writing this test */
class FakeSocket extends EventEmitter {
address() {
// is never called
}
pipe() {
// does not need to do anything
}
destroy() {
// is called, has no side effects
}
get remoteAddress() {
return 'iLoveJavaScript';
}
get remotePort() {
return 123;
}
}

beforeEach(() => {
timerSandbox = createTimerSandbox();
clock = sinon.useFakeTimers();
Expand Down
14 changes: 1 addition & 13 deletions test/unit/cmap/message_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,7 @@ const { MessageStream } = require('../../../src/cmap/message_stream');
const { Msg } = require('../../../src/cmap/commands');
const expect = require('chai').expect;
const { LEGACY_HELLO_COMMAND } = require('../../../src/constants');
const { generateOpMsgBuffer } = require('../../tools/utils');

function bufferToStream(buffer) {
const stream = new Readable();
if (Array.isArray(buffer)) {
buffer.forEach(b => stream.push(b));
} else {
stream.push(buffer);
}

stream.push(null);
return stream;
}
const { bufferToStream, generateOpMsgBuffer } = require('../../tools/utils');

describe('MessageStream', function () {
context('when the stream is for a monitoring connection', function () {
Expand Down