Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5981): read preference not applied to commands properly #4010

Merged
merged 31 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b5d0f02
fix(NODE-5817): read preference not applied to commands properly
alenakhineika Feb 28, 2024
1e4c377
test: remove a test case
alenakhineika Feb 28, 2024
541b5d5
docs: update comments
alenakhineika Feb 28, 2024
3304317
feat: pass directConnection from connection pool
alenakhineika Feb 29, 2024
8b875ef
test: remove only
alenakhineika Feb 29, 2024
2d033a3
fix: import type
alenakhineika Mar 1, 2024
46f49ee
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
alenakhineika Mar 1, 2024
325cb58
fix: add options to unit tests
alenakhineika Mar 1, 2024
d65cd3a
test: sync run-command spec tests
alenakhineika Mar 1, 2024
f591d2e
fix: do not check direct connection
alenakhineika Mar 1, 2024
9883c45
docs: update comment
alenakhineika Mar 1, 2024
2de4e4d
fix: check for primary for all topologies
alenakhineika Mar 1, 2024
2c20053
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
alenakhineika Mar 5, 2024
4193f7d
refactor: try to revert query
alenakhineika Mar 5, 2024
af78187
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
alenakhineika Mar 5, 2024
40a4847
test: exclude arbiter
alenakhineika Mar 5, 2024
418513e
Merge branch 'NODE-5981-fix-applying-read-preference-to-command' of g…
alenakhineika Mar 5, 2024
00978cb
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
alenakhineika Mar 5, 2024
1806f30
test: debug
alenakhineika Mar 5, 2024
a4685d9
Merge branch 'NODE-5981-fix-applying-read-preference-to-command' of g…
alenakhineika Mar 5, 2024
bb79b9b
test: skip when is not ReplicaSetWithPrimary
alenakhineika Mar 5, 2024
250451e
test: debug
alenakhineika Mar 5, 2024
5e57787
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
alenakhineika Mar 5, 2024
cd9ecdf
test: compare with primary as a string
alenakhineika Mar 5, 2024
c9f5e10
Merge branch 'NODE-5981-fix-applying-read-preference-to-command' of g…
alenakhineika Mar 5, 2024
4c82bb8
test: adress pr comments
alenakhineika Mar 7, 2024
f782630
Merge remote-tracking branch 'origin/main' into NODE-5981-fix-applyin…
alenakhineika Mar 7, 2024
4c41b4e
fix: type
alenakhineika Mar 7, 2024
61635b0
refactor(no-story): don't store directConnection on the connection po…
baileympearson Mar 11, 2024
9b32b77
Merge branch 'main' into NODE-5981-fix-applying-read-preference-to-co…
baileympearson Mar 11, 2024
4bee076
undo whitespace changes
nbbeeken Mar 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { BSONSerializeOptions, Document, Long } from '../bson';
import * as BSON from '../bson';
import { MongoInvalidArgumentError, MongoRuntimeError } from '../error';
import { ReadPreference } from '../read_preference';
import { type ReadPreference } from '../read_preference';
import type { ClientSession } from '../sessions';
import type { CommandOptions } from './connection';
import {
Expand Down Expand Up @@ -51,7 +51,6 @@ export interface OpQueryOptions extends CommandOptions {
requestId?: number;
moreToCome?: boolean;
exhaustAllowed?: boolean;
readPreference?: ReadPreference;
}

/**************************************************************
Expand All @@ -77,7 +76,6 @@ export class OpQueryRequest {
awaitData: boolean;
exhaust: boolean;
partial: boolean;
documentsReturnedIn?: string;

constructor(public databaseName: string, public query: Document, options: OpQueryOptions) {
// Basic options needed to be passed in
Expand Down Expand Up @@ -503,10 +501,6 @@ export class OpMsgRequest {
// Basic options
this.command.$db = databaseName;

if (options.readPreference && options.readPreference.mode !== ReadPreference.PRIMARY) {
this.command.$readPreference = options.readPreference.toJSON();
}

// Ensure empty options
this.options = options ?? {};

Expand Down
46 changes: 33 additions & 13 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import type { ServerApi, SupportedNodeConnectionOptions } from '../mongo_client'
import { type MongoClientAuthProviders } from '../mongo_client_auth_providers';
import { MongoLoggableComponent, type MongoLogger, SeverityLevel } from '../mongo_logger';
import { type CancellationToken, TypedEventEmitter } from '../mongo_types';
import type { ReadPreferenceLike } from '../read_preference';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { ServerType } from '../sdam/common';
import { applySession, type ClientSession, updateSessionFromResponse } from '../sessions';
import {
BufferPool,
Expand Down Expand Up @@ -83,6 +84,8 @@ export interface CommandOptions extends BSONSerializeOptions {
willRetryWrite?: boolean;

writeConcern?: WriteConcern;

directConnection?: boolean;
}

/** @public */
Expand Down Expand Up @@ -371,16 +374,34 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
cmd.$clusterTime = clusterTime;
}

if (
isSharded(this) &&
!this.supportsOpMsg &&
readPreference &&
readPreference.mode !== 'primary'
) {
cmd = {
$query: cmd,
$readPreference: readPreference.toJSON()
};
// For standalone, drivers MUST NOT set $readPreference.
if (this.description.type !== ServerType.Standalone) {
if (
!isSharded(this) &&
!this.description.loadBalanced &&
this.supportsOpMsg &&
options.directConnection === true &&
readPreference?.mode === 'primary'
) {
// For mongos and load balancers with 'primary' mode, drivers MUST NOT set $readPreference.
// For all other types with a direct connection, if the read preference is 'primary'
// (driver sets 'primary' as default if no read preference is configured),
// the $readPreference MUST be set to 'primaryPreferred'
// to ensure that any server type can handle the request.
cmd.$readPreference = ReadPreference.primaryPreferred.toJSON();
} else if (isSharded(this) && !this.supportsOpMsg && readPreference?.mode !== 'primary') {
// When sending a read operation via OP_QUERY and the $readPreference modifier,
// the query MUST be provided using the $query modifier.
cmd = {
$query: cmd,
$readPreference: readPreference.toJSON()
};
} else if (readPreference?.mode !== 'primary') {
// For mode 'primary', drivers MUST NOT set $readPreference.
// For all other read preference modes (i.e. 'secondary', 'primaryPreferred', ...),
// drivers MUST set $readPreference
cmd.$readPreference = readPreference.toJSON();
}
}

const commandOptions = {
Expand All @@ -389,8 +410,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
checkKeys: false,
// This value is not overridable
secondaryOk: readPreference.secondaryOk(),
...options,
readPreference // ensure we pass in ReadPreference instance
...options
};

const message = this.supportsOpMsg
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/stream_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export interface StreamDescriptionOptions {
/** @public */
export class StreamDescription {
address: string;
type: string;
type: ServerType;
minWireVersion?: number;
maxWireVersion?: number;
maxBsonObjectSize: number;
Expand Down
8 changes: 2 additions & 6 deletions src/cmap/wire_protocol/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@ export interface ReadPreferenceOption {
}

export function getReadPreference(options?: ReadPreferenceOption): ReadPreference {
// Default to command version of the readPreference
// Default to command version of the readPreference.
let readPreference = options?.readPreference ?? ReadPreference.primary;
// If we have an option readPreference override the command one
if (options?.readPreference) {
readPreference = options.readPreference;
}

if (typeof readPreference === 'string') {
readPreference = ReadPreference.fromString(readPreference);
Expand All @@ -43,7 +39,7 @@ export function isSharded(topologyOrServer?: Topology | Server | Connection): bo
}

// NOTE: This is incredibly inefficient, and should be removed once command construction
// happens based on `Server` not `Topology`.
// happens based on `Server` not `Topology`.
if (topologyOrServer.description && topologyOrServer.description instanceof TopologyDescription) {
const servers: ServerDescription[] = Array.from(topologyOrServer.description.servers.values());
return servers.some((server: ServerDescription) => server.type === ServerType.Mongos);
Expand Down
1 change: 1 addition & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ export interface MongoOptions
readPreference: ReadPreference;
readConcern: ReadConcern;
loadBalanced: boolean;
directConnection: boolean;
serverApi: ServerApi;
compressors: CompressorName[];
writeConcern: WriteConcern;
Expand Down
5 changes: 4 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

// Clone the options
const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
const finalOptions = Object.assign({}, options, {
wireProtocolCommand: false,
directConnection: this.topology.s.options.directConnection
});

// There are cases where we need to flag the read preference not to get sent in
// the command, such as pre-5.0 servers attempting to perform an aggregate write
Expand Down
126 changes: 49 additions & 77 deletions test/integration/max-staleness/max_staleness.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('Max Staleness', function () {
// Primary server states
const serverIsPrimary = [Object.assign({}, defaultFields)];
server.setMessageHandler(request => {
var doc = request.document;
const doc = request.document;
if (isHello(doc)) {
request.reply(serverIsPrimary[0]);
return;
Expand Down Expand Up @@ -46,71 +46,53 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(
`mongodb://${test.server.uri()}/test?readPreference=secondary&maxStalenessSeconds=250`,
{ serverApi: null } // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
);

client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);

db.collection('test')
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
await client.connect();
const db = client.db(self.configuration.db);
await db.collection('test').find({}).toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});

it('should correctly set maxStalenessSeconds on Mongos query using db level readPreference', {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
test: async function () {
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;

// Get a db with a new readPreference
var db1 = client.db('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
});
await client.connect();

db1
.collection('test')
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
// Get a db with a new readPreference
const db1 = client.db('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
});
await db1.collection('test').find({}).toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});

Expand All @@ -120,35 +102,31 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);

// Get a db with a new readPreference
db.collection('test', {
await client.connect();
const db = client.db(self.configuration.db);

// Get a db with a new readPreference
await db
.collection('test', {
readPreference: new ReadPreference('secondary', null, { maxStalenessSeconds: 250 })
})
.find({})
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
.find({})
.toArray();
expect(test.checkCommand).to.containSubset({
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
}
);
Expand All @@ -157,35 +135,29 @@ describe('Max Staleness', function () {
metadata: {
requires: {
generators: true,
topology: 'single'
topology: 'replicaset'
}
},

test: function (done) {
var self = this;
test: async function () {
const self = this;
const configuration = this.configuration;
const client = configuration.newClient(`mongodb://${test.server.uri()}/test`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(function (err, client) {
expect(err).to.not.exist;
var db = client.db(self.configuration.db);
var readPreference = new ReadPreference('secondary', null, { maxStalenessSeconds: 250 });

// Get a db with a new readPreference
db.collection('test')
.find({})
.withReadPreference(readPreference)
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});

client.close(done);
});
await client.connect();
const db = client.db(self.configuration.db);
const readPreference = new ReadPreference('secondary', null, { maxStalenessSeconds: 250 });

// Get a db with a new readPreference
await db.collection('test').find({}).withReadPreference(readPreference).toArray();

expect(test.checkCommand).to.containSubset({
$query: { find: 'test', filter: {} },
$readPreference: { mode: 'secondary', maxStalenessSeconds: 250 }
});
await client.close();
}
});
});
7 changes: 1 addition & 6 deletions test/integration/run-command/run_command.spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,5 @@ import { loadSpecTests } from '../../spec';
import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner';

describe('RunCommand spec', () => {
runUnifiedSuite(loadSpecTests('run-command'), test => {
if (test.description === 'does not attach $readPreference to given command on standalone') {
return 'TODO(NODE-5263): Do not send $readPreference to standalone servers';
}
return false;
});
runUnifiedSuite(loadSpecTests('run-command'));
});
Loading