diff --git a/lib/core/connection/pool.js b/lib/core/connection/pool.js index a3ded9ee1a..4c51ed70b7 100644 --- a/lib/core/connection/pool.js +++ b/lib/core/connection/pool.js @@ -19,6 +19,7 @@ const apm = require('./apm'); const Buffer = require('safe-buffer').Buffer; const connect = require('./connect'); const updateSessionFromResponse = require('../sessions').updateSessionFromResponse; +const eachAsync = require('../utils').eachAsync; var DISCONNECTED = 'disconnected'; var CONNECTING = 'connecting'; @@ -635,42 +636,35 @@ Pool.prototype.unref = function() { // Destroy the connections function destroy(self, connections, options, callback) { - let connectionCount = connections.length; - function connectionDestroyed() { - connectionCount--; - if (connectionCount > 0) { - return; - } - - // clear all pool state - self.inUseConnections = []; - self.availableConnections = []; - self.connectingConnections = 0; - self.executing = false; - self.queue = []; - self.reconnectConnection = null; - self.numberOfConsecutiveTimeouts = 0; - self.connectionIndex = 0; - self.retriesLeft = self.options.reconnectTries; - self.reconnectId = null; - - // Set state to destroyed - stateTransition(self, DESTROYED); - if (typeof callback === 'function') { - callback(null, null); - } - } + eachAsync( + connections, + (conn, cb) => { + CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName)); + conn.destroy(options, cb); + }, + err => { + if (err) { + if (typeof callback === 'function') callback(err, null); + return; + } - if (connectionCount === 0) { - connectionDestroyed(); - return; - } + // clear all pool state + self.inUseConnections = []; + self.availableConnections = []; + self.connectingConnections = 0; + self.executing = false; + self.queue = []; + self.reconnectConnection = null; + self.numberOfConsecutiveTimeouts = 0; + self.connectionIndex = 0; + self.retriesLeft = self.options.reconnectTries; + self.reconnectId = null; - // Destroy all connections - connections.forEach(conn => { - CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName)); - conn.destroy(options, connectionDestroyed); - }); + // Set state to destroyed + stateTransition(self, DESTROYED); + if (typeof callback === 'function') callback(null, null); + } + ); } /** @@ -755,43 +749,39 @@ Pool.prototype.destroy = function(force, callback) { */ Pool.prototype.reset = function(callback) { const connections = this.availableConnections.concat(this.inUseConnections); - let connectionCount = connections.length; - const connectionDestroyed = () => { - connectionCount--; - if (connectionCount > 0) { - return; - } + eachAsync( + connections, + (conn, cb) => { + CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName)); + conn.destroy({ force: true }, cb); + }, + err => { + if (err) { + if (typeof callback === 'function') { + callback(err, null); + return; + } + } - // clear all pool state - this.inUseConnections = []; - this.availableConnections = []; - this.connectingConnections = 0; - this.executing = false; - this.reconnectConnection = null; - this.numberOfConsecutiveTimeouts = 0; - this.connectionIndex = 0; - this.retriesLeft = this.options.reconnectTries; - this.reconnectId = null; - - // create an initial connection, and kick off execution again - _createConnection(this); - - if (typeof callback === 'function') { - callback(null, null); + // clear all pool state + this.inUseConnections = []; + this.availableConnections = []; + this.connectingConnections = 0; + this.executing = false; + this.reconnectConnection = null; + this.numberOfConsecutiveTimeouts = 0; + this.connectionIndex = 0; + this.retriesLeft = this.options.reconnectTries; + this.reconnectId = null; + + // create an initial connection, and kick off execution again + _createConnection(this); + + if (typeof callback === 'function') { + callback(null, null); + } } - }; - - // if we already have no connections, just reset state and callback - if (connectionCount === 0) { - connectionDestroyed(); - return; - } - - // destroy all connections - connections.forEach(conn => { - CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName)); - conn.destroy({ force: true }, connectionDestroyed); - }); + ); }; // Prepare the buffer that Pool.prototype.write() uses to send to the server diff --git a/lib/core/utils.js b/lib/core/utils.js index 73adaddadf..bc7c22f8dc 100644 --- a/lib/core/utils.js +++ b/lib/core/utils.js @@ -116,6 +116,37 @@ function isPromiseLike(maybePromise) { return maybePromise && typeof maybePromise.then === 'function'; } +/** + * Applies the function `eachFn` to each item in `arr`, in parallel. + * + * @param {array} arr an array of items to asynchronusly iterate over + * @param {function} eachFn A function to call on each item of the array. The callback signature is `(item, callback)`, where the callback indicates iteration is complete. + * @param {function} callback The callback called after every item has been iterated + */ +function eachAsync(arr, eachFn, callback) { + if (arr.length === 0) { + callback(null); + return; + } + + const length = arr.length; + let completed = 0; + function eachCallback(err) { + if (err) { + callback(err, null); + return; + } + + if (++completed === length) { + callback(null); + } + } + + for (let idx = 0; idx < length; ++idx) { + eachFn(arr[idx], eachCallback); + } +} + module.exports = { uuidV4, calculateDurationInMs, @@ -124,5 +155,6 @@ module.exports = { retrieveEJSON, retrieveKerberos, maxWireVersion, - isPromiseLike + isPromiseLike, + eachAsync };