diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 107c2e3ef..4bdc0b78a 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -90,6 +90,16 @@ export type ServerSurfaceCall = { getPath(): string; } & EventEmitter; +export type ServerWritableBuffer = { + _write( + chunk: ResponseType | Buffer, + encoding: string, + callback: Function + ): void; + write(chunk: ResponseType | Buffer, cb?: Function): boolean; + write(chunk: ResponseType | Buffer, encoding?: any, cb?: Function): boolean; +}; + export type ServerUnaryCall = ServerSurfaceCall & { request: RequestType; }; @@ -97,14 +107,15 @@ export type ServerReadableStream = ServerSurfaceCall & ObjectReadable; export type ServerWritableStream = ServerSurfaceCall & - ObjectWritable & { + ObjectWritable & + ServerWritableBuffer & { request: RequestType; end: (metadata?: Metadata) => void; }; export type ServerDuplexStream = ServerSurfaceCall & ObjectReadable & - ObjectWritable & { end: (metadata?: Metadata) => void }; - + ObjectWritable & + ServerWritableBuffer & { end: (metadata?: Metadata) => void }; export class ServerUnaryCallImpl extends EventEmitter implements ServerUnaryCall @@ -222,7 +233,7 @@ export class ServerWritableStreamImpl } _write( - chunk: ResponseType, + chunk: ResponseType | Buffer, encoding: string, // eslint-disable-next-line @typescript-eslint/no-explicit-any callback: (...args: any[]) => void @@ -654,15 +665,39 @@ export class Http2ServerCallStream< } } - serializeMessage(value: ResponseType) { + serializeMessage(value: ResponseType | Buffer) { + // TODO(cjihrig): Call compression aware serializeMessage(). + + if (Buffer.isBuffer(value)) { + return this.serializeMessageHandleBuffer(value); + } + const messageBuffer = this.handler.serialize(value); + return this.addCompressionAndLength(messageBuffer); + } - // TODO(cjihrig): Call compression aware serializeMessage(). - const byteLength = messageBuffer.byteLength; + private serializeMessageHandleBuffer(value: Buffer): Buffer { + const byteLength = value.byteLength; + // checking if this is a protobuf message or a gRPC frame + if ( + byteLength >= 5 && + (value.readUInt8(0) === 0 || value.readUint8(0) === 1) && + value.readUInt32BE(1) === byteLength - 5 + ) { + return value; + } + + return this.addCompressionAndLength(value); + } + + private addCompressionAndLength(value: Buffer, compressed = false) { + const byteLength = value.byteLength; + const compressionByte = compressed ? 1 : 0; const output = Buffer.allocUnsafe(byteLength + 5); output.writeUInt8(0, 0); + output.writeUInt8(compressionByte, 0); output.writeUInt32BE(byteLength, 1); - messageBuffer.copy(output, 5); + value.copy(output, 5); return output; }