Skip to content

Commit

Permalink
Merge pull request #1346 from murgatroid99/grpc-js_max_message_size
Browse files Browse the repository at this point in the history
grpc-js: Add max message size enforcement
  • Loading branch information
murgatroid99 authored Apr 13, 2020
2 parents 4ec023c + 70b2a95 commit 227a35e
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 9 deletions.
2 changes: 2 additions & 0 deletions PACKAGE-COMPARISON.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ In addition, all channel arguments defined in [this header file](https://github.
- `grpc.initial_reconnect_backoff_ms`
- `grpc.max_reconnect_backoff_ms`
- `grpc.use_local_subchannel_pool`
- `grpc.max_send_message_length`
- `grpc.max_receive_message_length`
- `channelOverride`
- `channelFactoryOverride`
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "0.7.9",
"version": "0.8.0",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
4 changes: 4 additions & 0 deletions packages/grpc-js/src/channel-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export interface ChannelOptions {
'grpc.initial_reconnect_backoff_ms'?: number;
'grpc.max_reconnect_backoff_ms'?: number;
'grpc.use_local_subchannel_pool'?: number;
'grpc.max_send_message_length'?: number;
'grpc.max_receive_message_length'?: number;
[key: string]: string | number | undefined;
}

Expand All @@ -49,6 +51,8 @@ export const recognizedOptions = {
'grpc.initial_reconnect_backoff_ms': true,
'grpc.max_reconnect_backoff_ms': true,
'grpc.use_local_subchannel_pool': true,
'grpc.max_send_message_length': true,
'grpc.max_receive_message_length': true,
};

export function channelOptionsEqual(
Expand Down
2 changes: 2 additions & 0 deletions packages/grpc-js/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import { getDefaultAuthority } from './resolver';
import { ServiceConfig, validateServiceConfig } from './service-config';
import { trace, log } from './logging';
import { SubchannelAddress } from './subchannel';
import { MaxMessageSizeFilterFactory } from './max-message-size-filter';

export enum ConnectivityState {
CONNECTING,
Expand Down Expand Up @@ -213,6 +214,7 @@ export class ChannelImplementation implements Channel {
this.filterStackFactory = new FilterStackFactory([
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MaxMessageSizeFilterFactory(this.options),
new CompressionFilterFactory(this),
]);
// TODO(murgatroid99): Add more centralized handling of channel options
Expand Down
6 changes: 6 additions & 0 deletions packages/grpc-js/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ export enum LogVerbosity {
INFO,
ERROR,
}

// -1 means unlimited
export const DEFAULT_MAX_SEND_MESSAGE_LENGTH = -1;

// 4 MB default
export const DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
78 changes: 78 additions & 0 deletions packages/grpc-js/src/max-message-size-filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { BaseFilter, Filter, FilterFactory } from "./filter";
import { Call, WriteObject } from "./call-stream";
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from "./constants";
import { ChannelOptions } from "./channel-options";

export class MaxMessageSizeFilter extends BaseFilter implements Filter {
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;
constructor(
private readonly options: ChannelOptions,
private readonly callStream: Call
) {
super();
if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length']!;
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
}

async sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
/* A configured size of -1 means that there is no limit, so skip the check
* entirely */
if (this.maxSendMessageSize === -1) {
return message;
} else {
const concreteMessage = await message;
if (concreteMessage.message.length > this.maxSendMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Sent message larger than max (${concreteMessage.message.length} vs. ${this.maxSendMessageSize})`);
return Promise.reject<WriteObject>('Message too large');
} else {
return concreteMessage;
}
}
}

async receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
/* A configured size of -1 means that there is no limit, so skip the check
* entirely */
if (this.maxReceiveMessageSize === -1) {
return message;
} else {
const concreteMessage = await message;
if (concreteMessage.length > this.maxReceiveMessageSize) {
this.callStream.cancelWithStatus(Status.RESOURCE_EXHAUSTED, `Received message larger than max (${concreteMessage.length} vs. ${this.maxReceiveMessageSize})`);
return Promise.reject<Buffer>('Message too large');
} else {
return concreteMessage;
}
}
}
}

export class MaxMessageSizeFilterFactory implements FilterFactory<MaxMessageSizeFilter> {
constructor(private readonly options: ChannelOptions) {}

createFilter(callStream: Call): MaxMessageSizeFilter {
return new MaxMessageSizeFilter(this.options, callStream);
}
}
37 changes: 35 additions & 2 deletions packages/grpc-js/src/server-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream';

import { StatusObject } from './call-stream';
import { Status } from './constants';
import { Status, DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH } from './constants';
import { Deserialize, Serialize } from './make-client';
import { Metadata } from './metadata';
import { StreamDecoder } from './stream-decoder';
import { ObjectReadable, ObjectWritable } from './object-stream';
import { ChannelOptions } from './channel-options';

interface DeadlineUnitIndexSignature {
[name: string]: number;
Expand Down Expand Up @@ -338,10 +339,13 @@ export class Http2ServerCallStream<
private isPushPending = false;
private bufferedMessages: Array<Buffer | null> = [];
private messagesToPush: Array<RequestType | null> = [];
private maxSendMessageSize: number = DEFAULT_MAX_SEND_MESSAGE_LENGTH;
private maxReceiveMessageSize: number = DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH;

constructor(
private stream: http2.ServerHttp2Stream,
private handler: Handler<RequestType, ResponseType>
private handler: Handler<RequestType, ResponseType>,
private options: ChannelOptions
) {
super();

Expand All @@ -361,6 +365,13 @@ export class Http2ServerCallStream<
this.stream.on('drain', () => {
this.emit('drain');
});

if ('grpc.max_send_message_length' in options) {
this.maxSendMessageSize = options['grpc.max_send_message_length']!;
}
if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
}
}

private checkCancelled(): boolean {
Expand Down Expand Up @@ -435,6 +446,13 @@ export class Http2ServerCallStream<
stream.once('end', async () => {
try {
const requestBytes = Buffer.concat(chunks, totalLength);
if (this.maxReceiveMessageSize !== -1 && requestBytes.length > this.maxReceiveMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`
});
resolve();
}

resolve(await this.deserializeMessage(requestBytes));
} catch (err) {
Expand Down Expand Up @@ -555,6 +573,14 @@ export class Http2ServerCallStream<
return;
}

if (this.maxSendMessageSize !== -1 && chunk.length > this.maxSendMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Sent message larger than max (${chunk.length} vs. ${this.maxSendMessageSize})`
});
return;
}

this.sendMetadata();
return this.stream.write(chunk);
}
Expand All @@ -581,6 +607,13 @@ export class Http2ServerCallStream<
const messages = decoder.write(data);

for (const message of messages) {
if (this.maxReceiveMessageSize !== -1 && message.length > this.maxReceiveMessageSize) {
this.sendError({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${message.length} vs. ${this.maxReceiveMessageSize})`
});
return;
}
this.pushOrBufferMessage(readable, message);
}
});
Expand Down
4 changes: 2 additions & 2 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ export class Server {
throw getUnimplementedStatusResponse(path);
}

const call = new Http2ServerCallStream(stream, handler);
const call = new Http2ServerCallStream(stream, handler, this.options);
const metadata: Metadata = call.receiveMetadata(headers) as Metadata;
switch (handler.type) {
case 'unary':
Expand Down Expand Up @@ -555,7 +555,7 @@ export class Server {
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
const call = new Http2ServerCallStream(stream, null!);
const call = new Http2ServerCallStream(stream, null!, this.options);

if (err.code === undefined) {
err.code = Status.INTERNAL;
Expand Down
Loading

0 comments on commit 227a35e

Please sign in to comment.