From ddb8d90850b58b238a668ce7c71f22aba3c80dc9 Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Thu, 7 Mar 2019 12:31:58 -0500 Subject: [PATCH] fix(aggregate): do not send batchSize for aggregation with $out Sending a batchSize of 0 will break the cursor, and in general we should not be sending batchSize for aggregation pipelines that contain a $out stage. Fixes NODE-1850 --- lib/operations/aggregate.js | 10 ++--- test/functional/aggregation_tests.js | 67 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/lib/operations/aggregate.js b/lib/operations/aggregate.js index 28405bd788..a4252a3a8b 100644 --- a/lib/operations/aggregate.js +++ b/lib/operations/aggregate.js @@ -25,13 +25,13 @@ function aggregate(db, coll, pipeline, options, callback) { const isDbAggregate = typeof coll === 'string'; const target = isDbAggregate ? db : coll; const topology = target.s.topology; - let ignoreReadConcern = false; + let hasOutStage = false; if (typeof options.out === 'string') { pipeline = pipeline.concat({ $out: options.out }); - ignoreReadConcern = true; + hasOutStage = true; } else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) { - ignoreReadConcern = true; + hasOutStage = true; } let command; @@ -52,7 +52,7 @@ function aggregate(db, coll, pipeline, options, callback) { const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern; - if (!ignoreReadConcern) { + if (!hasOutStage) { decorateWithReadConcern(command, target, options); } @@ -96,7 +96,7 @@ function aggregate(db, coll, pipeline, options, callback) { } options.cursor = options.cursor || {}; - if (options.batchSize) options.cursor.batchSize = options.batchSize; + if (options.batchSize && !hasOutStage) options.cursor.batchSize = options.batchSize; command.cursor = options.cursor; // promiseLibrary diff --git a/test/functional/aggregation_tests.js b/test/functional/aggregation_tests.js index 3fb693d124..a1581cfff3 100644 --- a/test/functional/aggregation_tests.js +++ b/test/functional/aggregation_tests.js @@ -1354,4 +1354,71 @@ describe('Aggregation', function() { // DOC_END } }); + + it('should not send a batchSize for aggregations with an out stage', { + metadata: { requires: { topology: ['single', 'replicaset'] } }, + test: function(done) { + const databaseName = this.configuration.db; + const client = this.configuration.newClient(this.configuration.writeConcernMax(), { + poolSize: 1, + monitorCommands: true + }); + + let err; + let coll1; + let coll2; + const events = []; + + client.on('commandStarted', e => { + if (e.commandName === 'aggregate') { + events.push(e); + } + }); + + client + .connect() + .then(() => { + coll1 = client.db(databaseName).collection('coll1'); + coll2 = client.db(databaseName).collection('coll2'); + + return Promise.all([coll1.remove({}), coll2.remove({})]); + }) + .then(() => { + const docs = Array.from({ length: 10 }).map(() => ({ a: 1 })); + + return coll1.insertMany(docs); + }) + .then(() => { + return Promise.all( + [ + coll1.aggregate([{ $out: 'coll2' }]), + coll1.aggregate([{ $out: 'coll2' }], { batchSize: 0 }), + coll1.aggregate([{ $out: 'coll2' }], { batchSize: 1 }), + coll1.aggregate([{ $out: 'coll2' }], { batchSize: 30 }), + coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }]), + coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 0 }), + coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 1 }), + coll1.aggregate([{ $match: { a: 1 } }, { $out: 'coll2' }], { batchSize: 30 }) + ].map(cursor => cursor.toArray()) + ); + }) + .then(() => { + expect(events) + .to.be.an('array') + .with.a.lengthOf(8); + events.forEach(event => { + expect(event).to.have.property('commandName', 'aggregate'); + expect(event) + .to.have.property('command') + .that.has.property('cursor') + .that.does.not.have.property('batchSize'); + }); + }) + .catch(_err => { + err = _err; + }) + .then(() => client.close()) + .then(() => done(err)); + } + }); });