Skip to content

Commit

Permalink
fix(pool): flush workItems after next tick to avoid dupe selection
Browse files Browse the repository at this point in the history
In the event that a timeout occurs and a server instance is drained,
the work items were cleared before the server could signal it needed
to be cycled. By waiting until the next tick, we can ensure the
event makes it to the `Topology`, and the server is removed from
consideration for server selection.

NODE-2350
  • Loading branch information
mbroadst committed Dec 2, 2019
1 parent 8fe456b commit 3ec49e5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
14 changes: 14 additions & 0 deletions lib/core/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ class Connection extends EventEmitter {
this.socket.unref();
}

/**
* Flush all work Items on this connection
*
* @param {*} err The error to propagate to the flushed work items
*/
flush(err) {
while (this.workItems.length > 0) {
const workItem = this.workItems.shift();
if (workItem.cb) {
workItem.cb(err);
}
}
}

/**
* Destroy connection
* @method
Expand Down
18 changes: 11 additions & 7 deletions lib/core/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,20 +251,24 @@ function connectionFailureHandler(pool, event, err, conn) {
// Remove the connection
removeConnection(pool, conn);

// Flush all work Items on this connection
while (conn.workItems.length > 0) {
const workItem = conn.workItems.shift();
if (workItem.cb) workItem.cb(err);
}

if (pool.state !== DRAINING && pool.options.legacyCompatMode === false) {
if (
pool.state !== DRAINING &&
pool.state !== DESTROYED &&
pool.options.legacyCompatMode === false
) {
// since an error/close/timeout means pool invalidation in a
// pre-CMAP world, we will issue a custom `drain` event here to
// signal that the server should be recycled
stateTransition(pool, DRAINING);
pool.emit('drain', err);

// wait to flush work items so this server isn't selected again immediately
process.nextTick(() => conn.flush(err));
return;
}

// flush remaining work items
conn.flush(err);
}

// Did we catch a timeout, increment the numberOfConsecutiveTimeouts
Expand Down

0 comments on commit 3ec49e5

Please sign in to comment.