Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-729] Improve retry behavior #230

Merged
merged 12 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
useArrowNativeTypes: true,
socketTimeout: 15 * 60 * 1000, // 15 minutes

retryMaxAttempts: 30,
retriesTimeout: 900 * 1000,
retryDelayMin: 1 * 1000,
retryDelayMax: 60 * 1000,
retryMaxAttempts: 5,
retriesTimeout: 15 * 60 * 1000, // 15 minutes
retryDelayMin: 1 * 1000, // 1 second
retryDelayMax: 60 * 1000, // 60 seconds (1 minute)

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,
Expand Down
6 changes: 3 additions & 3 deletions lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import thrift from 'thrift';
import https from 'https';
import http from 'http';
import { HeadersInit, Response } from 'node-fetch';
import { HeadersInit } from 'node-fetch';
import { ProxyAgent } from 'proxy-agent';

import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionProvider, { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
import IClientContext from '../../contracts/IClientContext';

Expand Down Expand Up @@ -120,7 +120,7 @@ export default class HttpConnection implements IConnectionProvider {
return this.connection;
}

public async getRetryPolicy(): Promise<IRetryPolicy<Response>> {
public async getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>> {
return new HttpRetryPolicy(this.context);
}
}
99 changes: 59 additions & 40 deletions lib/connection/connections/HttpRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import { Response } from 'node-fetch';
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';
import RetryError, { RetryErrorCode } from '../../errors/RetryError';

function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => resolve(), milliseconds);
});
}

export default class HttpRetryPolicy implements IRetryPolicy<Response> {
export default class HttpRetryPolicy implements IRetryPolicy<HttpTransactionDetails> {
private context: IClientContext;

private readonly startTime: number; // in milliseconds
Expand All @@ -27,53 +22,77 @@ export default class HttpRetryPolicy implements IRetryPolicy<Response> {
this.attempt = 0;
}

public async shouldRetry(response: Response): Promise<ShouldRetryResult> {
if (!response.ok) {
switch (response.status) {
// On these status codes it's safe to retry the request. However,
// both error codes mean that server is overwhelmed or even down.
// Therefore, we need to add some delay between attempts so
// server can recover and more likely handle next request
case 429: // Too Many Requests
case 503: // Service Unavailable
this.attempt += 1;

const clientConfig = this.context.getConfig();
public async shouldRetry(details: HttpTransactionDetails): Promise<ShouldRetryResult> {
if (this.isRetryable(details)) {
const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)
const retryDelay = getRetryDelay(this.attempt, clientConfig);

const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, response);
}
// Don't retry if overall retry timeout exceeded
const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, details);
}

const timeoutExceeded = Date.now() - this.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new RetryError(RetryErrorCode.TimeoutExceeded, response);
}
this.attempt += 1;

return { shouldRetry: true, retryAfter: retryDelay };
// Don't retry if max attempts count reached
const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new RetryError(RetryErrorCode.AttemptsExceeded, details);
}

// TODO: Here we should handle other error types (see PECO-730)
// Try to use retry delay from `Retry-After` header if available and valid, otherwise fall back to backoff
const retryAfter =
this.getRetryAfterHeader(details, clientConfig) ?? this.getBackoffDelay(this.attempt, clientConfig);

// no default
}
return { shouldRetry: true, retryAfter };
}

return { shouldRetry: false };
}

public async invokeWithRetry(operation: RetryableOperation<Response>): Promise<Response> {
public async invokeWithRetry(operation: RetryableOperation<HttpTransactionDetails>): Promise<HttpTransactionDetails> {
for (;;) {
const response = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(response); // eslint-disable-line no-await-in-loop
const details = await operation(); // eslint-disable-line no-await-in-loop
const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop
if (!status.shouldRetry) {
return response;
return details;
}
await delay(status.retryAfter); // eslint-disable-line no-await-in-loop
}
}

protected isRetryable({ response }: HttpTransactionDetails): boolean {
const statusCode = response.status;

const result =
// Retry on all codes below 100
statusCode < 100 ||
// ...and on `429 Too Many Requests`
statusCode === 429 ||
// ...and on all `5xx` codes except for `501 Not Implemented`
(statusCode >= 500 && statusCode !== 501);

return result;
}

protected getRetryAfterHeader({ response }: HttpTransactionDetails, config: ClientConfig): number | undefined {
// `Retry-After` header may contain a date after which to retry, or delay seconds. We support only delay seconds.
// Value from `Retry-After` header is used when:
// 1. it's available and is non-empty
// 2. it could be parsed as a number, and is greater than zero
// 3. additionally, we clamp it to not be smaller than minimal retry delay
const header = response.headers.get('Retry-After') || '';
if (header !== '') {
const value = Number(header);
if (Number.isFinite(value) && value > 0) {
return Math.max(config.retryDelayMin, value);
}
}
return undefined;
}

protected getBackoffDelay(attempt: number, config: ClientConfig): number {
const value = 2 ** attempt * config.retryDelayMin;
return Math.min(value, config.retryDelayMax);
}
}
13 changes: 13 additions & 0 deletions lib/connection/connections/NullRetryPolicy.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import IRetryPolicy, { ShouldRetryResult, RetryableOperation } from '../contracts/IRetryPolicy';

export default class NullRetryPolicy<R> implements IRetryPolicy<R> {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public async shouldRetry(details: R): Promise<ShouldRetryResult> {
return { shouldRetry: false };
}

public async invokeWithRetry(operation: RetryableOperation<R>): Promise<R> {
// Just invoke the operation, don't attempt to retry it
return operation();
}
}
66 changes: 58 additions & 8 deletions lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

import { EventEmitter } from 'events';
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
import fetch, { RequestInit, HeadersInit, Request, Response, FetchError } from 'node-fetch';
// @ts-expect-error TS7016: Could not find a declaration file for module
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';
import IRetryPolicy from '../contracts/IRetryPolicy';
import { HttpTransactionDetails } from '../contracts/IConnectionProvider';
import NullRetryPolicy from './NullRetryPolicy';

export class THTTPException extends Thrift.TApplicationException {
public readonly statusCode: unknown;
Expand All @@ -32,7 +34,7 @@ interface ThriftHttpConnectionOptions {
url: string;
transport?: TTransportType;
protocol?: TProtocolConstructor;
getRetryPolicy(): Promise<IRetryPolicy<Response>>;
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
}

// This type describes a shape of internals of Thrift client object.
Expand All @@ -47,29 +49,56 @@ type ThriftClient = {
[key: string]: (input: TProtocol, mtype: Thrift.MessageType, seqId: number) => void;
};

const retryableThriftMethods = new Set([
'GetOperationStatus',
'CancelOperation',
'CloseOperation',
'GetResultSetMetadata',
'CloseSession',
'GetInfo',
'GetTypeInfo',
'GetCatalogs',
'GetSchemas',
'GetTables',
'GetTableTypes',
'GetColumns',
'GetFunctions',
'GetPrimaryKeys',
'GetCrossReference',
]);

export default class ThriftHttpConnection extends EventEmitter {
private readonly url: string;

private config: RequestInit;

private options: ThriftHttpConnectionOptions;

// This field is used by Thrift internally, so name and type are important
private readonly transport: TTransportType;

// This field is used by Thrift internally, so name and type are important
private readonly protocol: TProtocolConstructor;

private readonly getRetryPolicy: () => Promise<IRetryPolicy<Response>>;

// thrift.createClient sets this field internally
public client?: ThriftClient;

constructor(options: ThriftHttpConnectionOptions, config: RequestInit = {}) {
super();
this.url = options.url;
this.config = config;
this.options = options;
this.transport = options.transport ?? TBufferedTransport;
this.protocol = options.protocol ?? TBinaryProtocol;
this.getRetryPolicy = options.getRetryPolicy;
}

protected async getRetryPolicy(thriftMethodName?: string): Promise<IRetryPolicy<HttpTransactionDetails>> {
// Allow retry behavior only for Thrift operations that are for sure safe to retry
if (thriftMethodName && retryableThriftMethods.has(thriftMethodName)) {
return this.options.getRetryPolicy();
}
// Don't retry everything that is not explicitly allowed to retry
return new NullRetryPolicy();
}

public setHeaders(headers: HeadersInit) {
Expand All @@ -92,12 +121,16 @@ export default class ThriftHttpConnection extends EventEmitter {
body: data,
};

this.getRetryPolicy()
this.getThriftMethodName(data)
.then((thriftMethod) => this.getRetryPolicy(thriftMethod))
.then((retryPolicy) => {
const makeRequest = () => fetch(this.url, requestConfig);
const makeRequest = () => {
const request = new Request(this.url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
};
return retryPolicy.invokeWithRetry(makeRequest);
})
.then((response) => {
.then(({ response }) => {
if (response.status !== 200) {
throw new THTTPException(response);
}
Expand Down Expand Up @@ -131,6 +164,23 @@ export default class ThriftHttpConnection extends EventEmitter {
});
}

private getThriftMethodName(thriftMessage: Buffer): Promise<string | undefined> {
return new Promise((resolve) => {
try {
const receiver = this.transport.receiver((transportWithData) => {
const Protocol = this.protocol;
const proto = new Protocol(transportWithData);
const header = proto.readMessageBegin();
resolve(header.fname);
}, 0 /* `seqId` could be any because it's ignored */);

receiver(thriftMessage);
} catch {
resolve(undefined);
}
});
}

private handleThriftResponse(transportWithData: TTransport) {
if (!this.client) {
throw new Thrift.TApplicationException(Thrift.TApplicationExceptionType.INTERNAL_ERROR, 'Client not available');
Expand Down
9 changes: 7 additions & 2 deletions lib/connection/contracts/IConnectionProvider.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import http from 'http';
import { HeadersInit, Response } from 'node-fetch';
import { HeadersInit, Request, Response } from 'node-fetch';
import IRetryPolicy from './IRetryPolicy';

export interface HttpTransactionDetails {
request: Request;
response: Response;
}

export default interface IConnectionProvider {
getThriftConnection(): Promise<any>;

getAgent(): Promise<http.Agent>;

setHeaders(headers: HeadersInit): void;

getRetryPolicy(): Promise<IRetryPolicy<Response>>;
getRetryPolicy(): Promise<IRetryPolicy<HttpTransactionDetails>>;
}
2 changes: 1 addition & 1 deletion lib/connection/contracts/IRetryPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export type ShouldRetryResult =
export type RetryableOperation<R> = () => Promise<R>;

export default interface IRetryPolicy<R> {
shouldRetry(response: R): Promise<ShouldRetryResult>;
shouldRetry(details: R): Promise<ShouldRetryResult>;

invokeWithRetry(operation: RetryableOperation<R>): Promise<R>;
}
10 changes: 9 additions & 1 deletion lib/hive/Commands/BaseCommand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ export default abstract class BaseCommand {
return await this.invokeCommand<Response>(request, command);
} catch (error) {
if (error instanceof RetryError) {
const statusCode = error.payload instanceof Response ? error.payload.status : undefined;
let statusCode: number | undefined;
if (
error.payload &&
typeof error.payload === 'object' &&
'response' in error.payload &&
error.payload.response instanceof Response
) {
statusCode = error.payload.response.status;
}

switch (error.errorCode) {
case RetryErrorCode.AttemptsExceeded:
Expand Down
8 changes: 6 additions & 2 deletions lib/result/CloudFetchResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import LZ4 from 'lz4';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch';
import { TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
Expand Down Expand Up @@ -73,6 +73,10 @@ export default class CloudFetchResultHandler implements IResultsProvider<Array<B
const retryPolicy = await connectionProvider.getRetryPolicy();

const requestConfig: RequestInit = { agent, ...init };
return retryPolicy.invokeWithRetry(() => fetch(url, requestConfig));
const result = await retryPolicy.invokeWithRetry(() => {
const request = new Request(url, requestConfig);
return fetch(request).then((response) => ({ request, response }));
});
return result.response;
}
}
Loading
Loading