Skip to content

Commit

Permalink
Merge pull request #1287 from murgatroid99/grpc-js_server_uds
Browse files Browse the repository at this point in the history
grpc-js: Use resolver to bind server ports
  • Loading branch information
murgatroid99 authored Mar 12, 2020
2 parents 0956c24 + 96d4d6a commit 7ebf530
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 43 deletions.
189 changes: 149 additions & 40 deletions packages/grpc-js/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { AddressInfo, ListenOptions } from 'net';
import { URL } from 'url';

import { ServiceError } from './call';
import { Status } from './constants';
import { Status, LogVerbosity } from './constants';
import { Deserialize, Serialize, ServiceDefinition } from './make-client';
import { Metadata } from './metadata';
import {
Expand All @@ -46,6 +46,14 @@ import {
} from './server-call';
import { ServerCredentials } from './server-credentials';
import { ChannelOptions } from './channel-options';
import { createResolver, ResolverListener } from './resolver';
import { log } from './logging';
import { SubchannelAddress, TcpSubchannelAddress, isTcpSubchannelAddress } from './subchannel';

interface BindResult {
port: number;
count: number;
}

function noop(): void {}

Expand Down Expand Up @@ -104,10 +112,8 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) {
// tslint:enable:no-any

export class Server {
private http2Server:
| http2.Http2Server
| http2.Http2SecureServer
| null = null;
private http2ServerList: (http2.Http2Server | http2.Http2SecureServer)[] = [];

private handlers: Map<string, UntypedHandler> = new Map<
string,
UntypedHandler
Expand Down Expand Up @@ -217,49 +223,152 @@ export class Server {
throw new TypeError('callback must be a function');
}

const url = new URL(`http://${port}`);
const options: ListenOptions = { host: url.hostname, port: +url.port };
const serverOptions: http2.ServerOptions = {};
if ('grpc.max_concurrent_streams' in this.options) {
serverOptions.settings = {
maxConcurrentStreams: this.options['grpc.max_concurrent_streams'],
};
}

if (creds._isSecure()) {
const secureServerOptions = Object.assign(
serverOptions,
creds._getSettings()!
);
this.http2Server = http2.createSecureServer(secureServerOptions);
} else {
this.http2Server = http2.createServer(serverOptions);
const setupServer = (): http2.Http2Server | http2.Http2SecureServer => {
let http2Server: http2.Http2Server | http2.Http2SecureServer;
if (creds._isSecure()) {
const secureServerOptions = Object.assign(
serverOptions,
creds._getSettings()!
);
http2Server = http2.createSecureServer(secureServerOptions);
} else {
http2Server = http2.createServer(serverOptions);
}

http2Server.setTimeout(0, noop);
this._setupHandlers(http2Server);
return http2Server;
}

this.http2Server.setTimeout(0, noop);
this._setupHandlers();
const bindSpecificPort = (addressList: SubchannelAddress[], portNum: number, previousCount: number): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve({port: portNum, count: previousCount});
}
return Promise.all(addressList.map(address => {
let addr: SubchannelAddress;
if (isTcpSubchannelAddress(address)) {
addr = {
host: (address as TcpSubchannelAddress).host,
port: portNum
};
} else {
addr = address
}

const http2Server = setupServer();
return new Promise<number|Error>((resolve, reject) => {
function onError(err: Error): void {
resolve(err);
}

function onError(err: Error): void {
callback(err, -1);
http2Server.once('error', onError);

http2Server.listen(addr, () => {
this.http2ServerList.push(http2Server);
const boundAddress = http2Server.address()!;
if (typeof boundAddress === 'string') {
resolve(portNum);
} else {
resolve(boundAddress.port);
}
http2Server.removeListener('error', onError);
});
})
})).then(results => {
let count = 0;
for (const result of results) {
if (typeof result === 'number') {
count += 1;
if (result !== portNum) {
throw new Error('Invalid state: multiple port numbers added from single address');
}
}
}
return {
port: portNum,
count: count + previousCount
};
});
}

this.http2Server.once('error', onError);
const bindWildcardPort = (addressList: SubchannelAddress[]): Promise<BindResult> => {
if (addressList.length === 0) {
return Promise.resolve<BindResult>({port: 0, count: 0});
}
const address = addressList[0];
const http2Server = setupServer();
return new Promise<BindResult>((resolve, reject) => {
function onError(err: Error): void {
resolve(bindWildcardPort(addressList.slice(1)));
}

http2Server.once('error', onError);

this.http2Server.listen(options, () => {
const server = this.http2Server as
| http2.Http2Server
| http2.Http2SecureServer;
const port = (server.address() as AddressInfo).port;
http2Server.listen(address, () => {
this.http2ServerList.push(http2Server);
resolve(bindSpecificPort(addressList.slice(1), (http2Server.address() as AddressInfo).port, 1));
http2Server.removeListener('error', onError);
});
});
}

server.removeListener('error', onError);
callback(null, port);
});
const resolverListener: ResolverListener = {
onSuccessfulResolution: (addressList, serviceConfig, serviceConfigError) => {
if (addressList.length === 0) {
callback(new Error(`No addresses resolved for port ${port}`), 0);
return;
}
let bindResultPromise: Promise<BindResult>;
if (isTcpSubchannelAddress(addressList[0])) {
if (addressList[0].port === 0) {
bindResultPromise = bindWildcardPort(addressList);
} else {
bindResultPromise = bindSpecificPort(addressList, addressList[0].port, 0);
}
} else{
// Use an arbitrary non-zero port for non-TCP addresses
bindResultPromise = bindSpecificPort(addressList, 1, 0);
}
bindResultPromise.then(bindResult => {
if (bindResult.count === 0) {
const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
} else {
if (bindResult.count < addressList.length) {
log(LogVerbosity.INFO, `WARNING Only ${bindResult.count} addresses added out of total ${addressList.length} resolved`);
}
callback(null, bindResult.port);
}
}, (error) => {
const errorString = `No address added out of total ${addressList.length} resolved`;
log(LogVerbosity.ERROR, errorString);
callback(new Error(errorString), 0);
});
},
onError: (error) => {
callback(new Error(error.details), 0);
}
};

const resolver = createResolver(port, resolverListener);
resolver.updateResolution();
}

forceShutdown(): void {
// Close the server if it is still running.
if (this.http2Server && this.http2Server.listening) {
this.http2Server.close();

for (const http2Server of this.http2ServerList) {
if (http2Server.listening) {
http2Server.close();
}
}

this.started = false;
Expand Down Expand Up @@ -296,7 +405,7 @@ export class Server {
}

start(): void {
if (this.http2Server === null || this.http2Server.listening !== true) {
if (this.http2ServerList.length === 0 || this.http2ServerList.every(http2Server => http2Server.listening !== true)) {
throw new Error('server must be bound in order to start');
}

Expand All @@ -321,18 +430,18 @@ export class Server {
// Close the server if necessary.
this.started = false;

if (this.http2Server && this.http2Server.listening) {
pendingChecks++;
this.http2Server.close(maybeCallback);
for (const http2Server of this.http2ServerList) {
if (http2Server.listening) {
pendingChecks++;
http2Server.close(maybeCallback);
}
}

// If any sessions are active, close them gracefully.
pendingChecks += this.sessions.size;
this.sessions.forEach(session => {
session.close(maybeCallback);
});

// If the server is closed and there are no active sessions, just call back.
if (pendingChecks === 0) {
callback();
}
Expand All @@ -342,12 +451,12 @@ export class Server {
throw new Error('Not yet implemented');
}

private _setupHandlers(): void {
if (this.http2Server === null) {
private _setupHandlers(http2Server: http2.Http2Server | http2.Http2SecureServer): void {
if (http2Server === null) {
return;
}

this.http2Server.on(
http2Server.on(
'stream',
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
Expand Down Expand Up @@ -416,7 +525,7 @@ export class Server {
}
);

this.http2Server.on('session', session => {
http2Server.on('session', session => {
if (!this.started) {
session.destroy();
return;
Expand Down
6 changes: 3 additions & 3 deletions packages/grpc-native-core/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ Server.prototype.addProtoService = util.deprecate(function(service,
* "address:port"
* @param {grpc.ServerCredentials} creds Server credential object to be used for
* SSL. Pass an insecure credentials object for an insecure port.
* @return {number} The bound port number. Negative if binding the port failed.
* @return {number} The bound port number. Zero if binding the port failed.
*/
Server.prototype.bind = function(port, creds) {
if (this.started) {
Expand All @@ -984,7 +984,7 @@ Server.prototype.bind = function(port, creds) {
* @callback grpc.Server~bindCallback
* @param {Error=} error If non-null, indicates that binding the port failed.
* @param {number} port The bound port number. If binding the port fails, this
* will be negative to match the output of bind.
* will be zero to match the output of bind.
*/

/**
Expand All @@ -1000,7 +1000,7 @@ Server.prototype.bindAsync = function(port, creds, callback) {
* incorrect use of the function, which should not be surfaced asynchronously
*/
const result = this.bind(port, creds)
if (result < 0) {
if (result === 0) {
setImmediate(callback, new Error('Failed to bind port'), result);
} else {
setImmediate(callback, null, result);
Expand Down

0 comments on commit 7ebf530

Please sign in to comment.