Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js: Merge 1.10.x branch into master #2792

Merged
merged 35 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ad598ec
Serverside keepalive error detection and cleanups
davidfiala May 28, 2024
334f0dc
remove comment
davidfiala May 28, 2024
d799a7a
unify server and client keepalive matching comments and discussion on…
davidfiala May 29, 2024
577b4b4
add keepalive server trace back in to match channelz vs non-channelz …
davidfiala May 29, 2024
7883164
return imports back to original order
davidfiala May 29, 2024
19cdc12
another missing trace message for parity
davidfiala May 29, 2024
bed5e85
resolve hoisting
davidfiala May 29, 2024
d325b5f
hoist in second location
davidfiala May 29, 2024
a77d94f
Based on grpc/grpc-node#2139 I wrapped http2session.ping in a try-cat…
davidfiala May 29, 2024
c2da436
remove keepaliveDisabled from server.ts. rename keepaliveTimer.
davidfiala May 29, 2024
7719e37
grpc-js: Fix client hang when receiving extra messages for a unary re…
murgatroid99 Jun 6, 2024
3c5ab22
per discussion, avoid tracking keepalive disabled state and instead d…
davidfiala Jun 6, 2024
e64d816
grpc-js: Avoid buffering significantly more than max_receive_message_…
murgatroid99 Jun 5, 2024
98cd87f
ensure that client keepalive timers are always cleared when they trig…
davidfiala Jun 7, 2024
7ecaa2d
grpc-js: Bump to 1.10.9
murgatroid99 Jun 7, 2024
674f4e3
Merge pull request from GHSA-7v5v-9h63-cj86
murgatroid99 Jun 10, 2024
52fe8e9
Merge pull request #2772 from murgatroid99/grpc-js_cardinality_error_…
murgatroid99 Jun 18, 2024
5c0226d
Merge pull request #2760 from davidfiala/@grpc/grpc-js@1.10.x
murgatroid99 Jun 18, 2024
e759029
HTTP CONNECT: handle early server packets
mjameswh May 10, 2024
5ae5514
fix: add decoding for url encoded user credentials
brendan-myers May 29, 2024
cbab4e5
grpc-js: Bump to 1.10.10
murgatroid99 Jun 18, 2024
42844cf
grpc-js: Re-add client-side max send message size checking
murgatroid99 Jun 20, 2024
97c4cda
Merge pull request #2779 from murgatroid99/grpc-js_max_send_message_s…
murgatroid99 Jun 24, 2024
3c55b5b
Merge pull request #2777 from murgatroid99/grpc-js_1.10_backports
murgatroid99 Jun 24, 2024
c934257
Merge pull request #2778 from murgatroid99/grpc-js_1.10.10
murgatroid99 Jun 24, 2024
c1815e0
grpc-js: Fix pick_first reconnecting without active calls
murgatroid99 Jul 3, 2024
e804ad6
grpc-js: Bump to 1.10.11
murgatroid99 Jul 3, 2024
a5fac6f
grpc-js: pick-first: Fix short circuit READY subchannel handling
murgatroid99 Jul 8, 2024
745a451
grpc-js: Increase state change deadline in server idle tests
murgatroid99 Jul 9, 2024
395de4b
grpc-js: Refresh server idle timer if not enough time has passed
murgatroid99 Jul 9, 2024
810e9e6
grpc-js: Ensure pending calls end after channel close
murgatroid99 Jul 9, 2024
fbbc78d
Merge pull request #2790 from murgatroid99/grpc-js_server_idle_test_d…
murgatroid99 Jul 10, 2024
023c1d0
Merge pull request #2791 from murgatroid99/grpc-js_channel_close_pick…
murgatroid99 Jul 10, 2024
f8338c5
Merge pull request #2784 from murgatroid99/grpc-js_pick_first_reconne…
murgatroid99 Jul 10, 2024
9ea4187
Merge remote-tracking branch 'upstream/@grpc/grpc-js@1.10.x' into grp…
murgatroid99 Jul 10, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.10.8",
"version": "1.10.11",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
9 changes: 5 additions & 4 deletions packages/grpc-js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ export class Client {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received');
}
responseMessage = message;
},
Expand All @@ -345,7 +345,7 @@ export class Client {
callProperties.callback!(
callErrorFromStatus(
{
code: Status.INTERNAL,
code: Status.UNIMPLEMENTED,
details: 'No message received',
metadata: status.metadata,
},
Expand Down Expand Up @@ -463,9 +463,10 @@ export class Client {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onReceiveMessage(message: any) {
if (responseMessage !== null) {
call.cancelWithStatus(Status.INTERNAL, 'Too many responses received');
call.cancelWithStatus(Status.UNIMPLEMENTED, 'Too many responses received');
}
responseMessage = message;
call.startRead();
},
onReceiveStatus(status: StatusObject) {
if (receivedStatus) {
Expand All @@ -478,7 +479,7 @@ export class Client {
callProperties.callback!(
callErrorFromStatus(
{
code: Status.INTERNAL,
code: Status.UNIMPLEMENTED,
details: 'No message received',
metadata: status.metadata,
},
Expand Down
75 changes: 59 additions & 16 deletions packages/grpc-js/src/compression-filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { WriteObject, WriteFlags } from './call-interface';
import { Channel } from './channel';
import { ChannelOptions } from './channel-options';
import { CompressionAlgorithms } from './compression-algorithms';
import { LogVerbosity } from './constants';
import { DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH, LogVerbosity, Status } from './constants';
import { BaseFilter, Filter, FilterFactory } from './filter';
import * as logging from './logging';
import { Metadata, MetadataValue } from './metadata';
Expand Down Expand Up @@ -98,6 +98,10 @@ class IdentityHandler extends CompressionHandler {
}

class DeflateHandler extends CompressionHandler {
constructor(private maxRecvMessageLength: number) {
super();
}

compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.deflate(message, (err, output) => {
Expand All @@ -112,18 +116,34 @@ class DeflateHandler extends CompressionHandler {

decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.inflate(message, (err, output) => {
if (err) {
reject(err);
} else {
resolve(output);
let totalLength = 0;
const messageParts: Buffer[] = [];
const decompresser = zlib.createInflate();
decompresser.on('data', (chunk: Buffer) => {
messageParts.push(chunk);
totalLength += chunk.byteLength;
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
decompresser.destroy();
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
});
}
});
decompresser.on('end', () => {
resolve(Buffer.concat(messageParts));
});
decompresser.write(message);
decompresser.end();
});
}
}

class GzipHandler extends CompressionHandler {
constructor(private maxRecvMessageLength: number) {
super();
}

compressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.gzip(message, (err, output) => {
Expand All @@ -138,13 +158,25 @@ class GzipHandler extends CompressionHandler {

decompressMessage(message: Buffer) {
return new Promise<Buffer>((resolve, reject) => {
zlib.unzip(message, (err, output) => {
if (err) {
reject(err);
} else {
resolve(output);
let totalLength = 0;
const messageParts: Buffer[] = [];
const decompresser = zlib.createGunzip();
decompresser.on('data', (chunk: Buffer) => {
messageParts.push(chunk);
totalLength += chunk.byteLength;
if (this.maxRecvMessageLength !== -1 && totalLength > this.maxRecvMessageLength) {
decompresser.destroy();
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message that decompresses to a size larger than ${this.maxRecvMessageLength}`
});
}
});
decompresser.on('end', () => {
resolve(Buffer.concat(messageParts));
});
decompresser.write(message);
decompresser.end();
});
}
}
Expand All @@ -169,14 +201,14 @@ class UnknownHandler extends CompressionHandler {
}
}

function getCompressionHandler(compressionName: string): CompressionHandler {
function getCompressionHandler(compressionName: string, maxReceiveMessageSize: number): CompressionHandler {
switch (compressionName) {
case 'identity':
return new IdentityHandler();
case 'deflate':
return new DeflateHandler();
return new DeflateHandler(maxReceiveMessageSize);
case 'gzip':
return new GzipHandler();
return new GzipHandler(maxReceiveMessageSize);
default:
return new UnknownHandler(compressionName);
}
Expand All @@ -186,6 +218,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
private sendCompression: CompressionHandler = new IdentityHandler();
private receiveCompression: CompressionHandler = new IdentityHandler();
private currentCompressionAlgorithm: CompressionAlgorithm = 'identity';
private maxReceiveMessageLength: number;
private maxSendMessageLength: number;

constructor(
channelOptions: ChannelOptions,
Expand All @@ -195,6 +229,8 @@ export class CompressionFilter extends BaseFilter implements Filter {

const compressionAlgorithmKey =
channelOptions['grpc.default_compression_algorithm'];
this.maxReceiveMessageLength = channelOptions['grpc.max_receive_message_length'] ?? DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
this.maxSendMessageLength = channelOptions['grpc.max_send_message_length'] ?? DEFAULT_MAX_SEND_MESSAGE_LENGTH;
if (compressionAlgorithmKey !== undefined) {
if (isCompressionAlgorithmKey(compressionAlgorithmKey)) {
const clientSelectedEncoding = CompressionAlgorithms[
Expand All @@ -215,7 +251,8 @@ export class CompressionFilter extends BaseFilter implements Filter {
) {
this.currentCompressionAlgorithm = clientSelectedEncoding;
this.sendCompression = getCompressionHandler(
this.currentCompressionAlgorithm
this.currentCompressionAlgorithm,
-1
);
}
} else {
Expand Down Expand Up @@ -247,7 +284,7 @@ export class CompressionFilter extends BaseFilter implements Filter {
if (receiveEncoding.length > 0) {
const encoding: MetadataValue = receiveEncoding[0];
if (typeof encoding === 'string') {
this.receiveCompression = getCompressionHandler(encoding);
this.receiveCompression = getCompressionHandler(encoding, this.maxReceiveMessageLength);
}
}
metadata.remove('grpc-encoding');
Expand Down Expand Up @@ -279,6 +316,12 @@ export class CompressionFilter extends BaseFilter implements Filter {
* and the output is a framed and possibly compressed message. For this
* reason, this filter should be at the bottom of the filter stack */
const resolvedMessage: WriteObject = await message;
if (this.maxSendMessageLength !== -1 && resolvedMessage.message.length > this.maxSendMessageLength) {
throw {
code: Status.RESOURCE_EXHAUSTED,
details: `Attempted to send message with a size larger than ${this.maxSendMessageLength}`
};
}
let compress: boolean;
if (this.sendCompression instanceof IdentityHandler) {
compress = false;
Expand Down
33 changes: 29 additions & 4 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, QueuePicker } from './picker';
import { UnavailablePicker, Picker, QueuePicker, PickArgs, PickResult, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
Expand All @@ -33,7 +33,6 @@ import {
} from './resolver';
import { trace } from './logging';
import { SubchannelAddress } from './subchannel-address';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';
import { mapProxyName } from './http_proxy';
import { GrpcUri, parseUri, uriToString } from './uri-parser';
import { ServerSurfaceCall } from './server-call';
Expand Down Expand Up @@ -144,6 +143,22 @@ class ChannelSubchannelWrapper
}
}

class ShutdownPicker implements Picker {
pick(pickArgs: PickArgs): PickResult {
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: 'Channel closed before call started',
metadata: new Metadata()
},
subchannel: null,
onCallStarted: null,
onCallEnded: null
}
}
}

export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
Expand Down Expand Up @@ -402,7 +417,6 @@ export class InternalChannel {
}
);
this.filterStackFactory = new FilterStackFactory([
new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this, this.options),
]);
this.trace(
Expand Down Expand Up @@ -538,7 +552,9 @@ export class InternalChannel {
}

getConfig(method: string, metadata: Metadata): GetConfigResult {
this.resolvingLoadBalancer.exitIdle();
if (this.connectivityState !== ConnectivityState.SHUTDOWN) {
this.resolvingLoadBalancer.exitIdle();
}
if (this.configSelector) {
return {
type: 'SUCCESS',
Expand Down Expand Up @@ -747,6 +763,15 @@ export class InternalChannel {
close() {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
this.currentPicker = new ShutdownPicker();
for (const call of this.configSelectionQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.configSelectionQueue = [];
for (const call of this.pickQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.pickQueue = [];
clearInterval(this.callRefTimer);
if (this.idleTimer) {
clearTimeout(this.idleTimer);
Expand Down
17 changes: 11 additions & 6 deletions packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
PickResultType,
UnavailablePicker,
} from './picker';
import { Endpoint, SubchannelAddress } from './subchannel-address';
import { Endpoint, SubchannelAddress, subchannelAddressToString } from './subchannel-address';
import * as logging from './logging';
import { LogVerbosity } from './constants';
import {
Expand Down Expand Up @@ -348,7 +348,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (newState !== ConnectivityState.READY) {
this.removeCurrentPick();
this.calculateAndReportNewState();
this.requestReresolution();
}
return;
}
Expand Down Expand Up @@ -483,6 +482,15 @@ export class PickFirstLoadBalancer implements LoadBalancer {
subchannel: this.channelControlHelper.createSubchannel(address, {}),
hasReportedTransientFailure: false,
}));
trace('connectToAddressList([' + addressList.map(address => subchannelAddressToString(address)) + '])');
for (const { subchannel } of newChildrenList) {
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef());
subchannel.addConnectivityStateListener(this.subchannelStateListener);
this.pickSubchannel(subchannel);
return;
}
}
/* Ref each subchannel before resetting the list, to ensure that
* subchannels shared between the list don't drop to 0 refs during the
* transition. */
Expand All @@ -494,10 +502,6 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.children = newChildrenList;
for (const { subchannel } of this.children) {
subchannel.addConnectivityStateListener(this.subchannelStateListener);
if (subchannel.getConnectivityState() === ConnectivityState.READY) {
this.pickSubchannel(subchannel);
return;
}
}
for (const child of this.children) {
if (
Expand Down Expand Up @@ -527,6 +531,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
const rawAddressList = ([] as SubchannelAddress[]).concat(
...endpointList.map(endpoint => endpoint.addresses)
);
trace('updateAddressList([' + rawAddressList.map(address => subchannelAddressToString(address)) + '])');
if (rawAddressList.length === 0) {
throw new Error('No addresses in endpoint list passed to pick_first');
}
Expand Down
Loading
Loading