From 8489c2f47ea3a619c3b430edffb00f3cabeb2e1e Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Tue, 21 Nov 2023 14:02:22 -0500 Subject: [PATCH] fix!: reconnect, missing and duped events, remove max reconnect (#660) Signed-off-by: Todd Baert --- .github/workflows/ci.yml | 12 +- libs/providers/flagd/README.md | 25 ++-- libs/providers/flagd/flagd-testbed | 2 +- libs/providers/flagd/package.json | 2 +- libs/providers/flagd/src/e2e/jest.config.ts | 1 + .../src/e2e/setup-in-process-provider.ts | 11 +- .../flagd/src/e2e/setup-rpc-provider.ts | 12 +- .../e2e/step-definitions/evaluation.spec.ts | 6 +- .../flagd-json-evaluator.spec.ts | 6 +- .../flagd-reconnect.unstable.spec.ts | 45 +++++++ .../src/e2e/step-definitions/flagd.spec.ts | 6 +- libs/providers/flagd/src/e2e/tear-down.ts | 8 ++ .../flagd/src/lib/configuration.spec.ts | 5 - libs/providers/flagd/src/lib/configuration.ts | 12 -- libs/providers/flagd/src/lib/constants.ts | 2 +- .../flagd/src/lib/flagd-provider.spec.ts | 69 +++-------- .../providers/flagd/src/lib/flagd-provider.ts | 10 +- .../src/lib/service/grpc/grpc-service.ts | 117 +++++++----------- .../src/lib/service/in-process/data-fetch.ts | 2 +- .../in-process/grpc/grpc-fetch.spec.ts | 28 +++-- .../lib/service/in-process/grpc/grpc-fetch.ts | 85 +++++-------- 21 files changed, 207 insertions(+), 259 deletions(-) create mode 100644 libs/providers/flagd/src/e2e/step-definitions/flagd-reconnect.unstable.spec.ts create mode 100644 libs/providers/flagd/src/e2e/tear-down.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7431edca6..eefc5bd65 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,13 +38,21 @@ jobs: services: flagd: - image: ghcr.io/open-feature/flagd-testbed:v0.4.4 + image: ghcr.io/open-feature/flagd-testbed:v0.4.6 ports: - 8013:8013 + flagd-unstable: + image: ghcr.io/open-feature/flagd-testbed-unstable:v0.4.6 + ports: + - 8014:8013 sync: - image: ghcr.io/open-feature/sync-testbed:v0.4.4 + image: ghcr.io/open-feature/sync-testbed:v0.4.6 ports: - 9090:9090 + sync-unstable: + image: ghcr.io/open-feature/sync-testbed-unstable:v0.4.6 + ports: + - 9091:9090 steps: - uses: actions/checkout@v4 diff --git a/libs/providers/flagd/README.md b/libs/providers/flagd/README.md index 928db9ca5..48c49201c 100644 --- a/libs/providers/flagd/README.md +++ b/libs/providers/flagd/README.md @@ -28,17 +28,16 @@ Options can be defined in the constructor or as environment variables. Construct ### Available Configuration Options -| Option name | Environment variable name | Type | Default | Supported values | -|-----------------------|--------------------------------|---------|-----------|------------------| -| host | FLAGD_HOST | string | localhost | | -| port | FLAGD_PORT | number | 8013 | | -| tls | FLAGD_TLS | boolean | false | | -| socketPath | FLAGD_SOCKET_PATH | string | - | | -| resolverType | FLAGD_SOURCE_RESOLVER | string | rpc | rpc, in-process | -| selector | FLAGD_SOURCE_SELECTOR | string | - | | -| cache | FLAGD_CACHE | string | lru | lru,disabled | -| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | | -| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | | +| Option name | Environment variable name | Type | Default | Supported values | +| -------------------------------------- | ------------------------------ | ------- | --------- | ---------------- | +| host | FLAGD_HOST | string | localhost | | +| port | FLAGD_PORT | number | 8013 | | +| tls | FLAGD_TLS | boolean | false | | +| socketPath | FLAGD_SOCKET_PATH | string | - | | +| resolverType | FLAGD_SOURCE_RESOLVER | string | rpc | rpc, in-process | +| selector | FLAGD_SOURCE_SELECTOR | string | - | | +| cache | FLAGD_CACHE | string | lru | lru,disabled | +| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | | Below are examples of usage patterns. @@ -80,7 +79,7 @@ In the above example, the provider expects a flag sync service implementation to The flagd provider emits `PROVIDER_READY`, `PROVIDER_ERROR` and `PROVIDER_CONFIGURATION_CHANGED` events. | SDK event | Originating action in flagd | -|----------------------------------|---------------------------------------------------------------------------------| +| -------------------------------- | ------------------------------------------------------------------------------- | | `PROVIDER_READY` | The streaming connection with flagd has been established. | | `PROVIDER_ERROR` | The streaming connection with flagd has been broken. | | `PROVIDER_CONFIGURATION_CHANGED` | A flag configuration (default value, targeting rule, etc) in flagd has changed. | @@ -90,7 +89,7 @@ For general information on events, see the [official documentation](https://open ### Flag Metadata | Field | Type | Value | -|---------|--------|---------------------------------------------------| +| ------- | ------ | ------------------------------------------------- | | `scope` | string | "selector" set for the associated source in flagd | ## Building diff --git a/libs/providers/flagd/flagd-testbed b/libs/providers/flagd/flagd-testbed index 5b428c951..3d7d0a09c 160000 --- a/libs/providers/flagd/flagd-testbed +++ b/libs/providers/flagd/flagd-testbed @@ -1 +1 @@ -Subproject commit 5b428c951bbcfb7aa041f195ac38400cbc6ccd75 +Subproject commit 3d7d0a09c9a6ab0907f37113c799789761e722a9 diff --git a/libs/providers/flagd/package.json b/libs/providers/flagd/package.json index 8909e296e..3e0ba9986 100644 --- a/libs/providers/flagd/package.json +++ b/libs/providers/flagd/package.json @@ -9,7 +9,7 @@ "@openfeature/flagd-core": ">=0.1.1" }, "peerDependencies": { - "@grpc/grpc-js": "^1.6.0", + "@grpc/grpc-js": "~1.8.0", "@openfeature/server-sdk": ">=1.6.0" } } diff --git a/libs/providers/flagd/src/e2e/jest.config.ts b/libs/providers/flagd/src/e2e/jest.config.ts index 560e2e6a9..45d8924ba 100644 --- a/libs/providers/flagd/src/e2e/jest.config.ts +++ b/libs/providers/flagd/src/e2e/jest.config.ts @@ -7,4 +7,5 @@ export default { moduleNameMapper: { '@openfeature/flagd-core': ['/../../../../shared/flagd-core/src'], }, + globalTeardown: './tear-down.ts', }; diff --git a/libs/providers/flagd/src/e2e/setup-in-process-provider.ts b/libs/providers/flagd/src/e2e/setup-in-process-provider.ts index 0edeeb882..2c2ffa166 100644 --- a/libs/providers/flagd/src/e2e/setup-in-process-provider.ts +++ b/libs/providers/flagd/src/e2e/setup-in-process-provider.ts @@ -7,10 +7,13 @@ const FLAGD_NAME = 'flagd Provider'; // register the flagd provider before the tests. console.log('Setting flagd provider...'); OpenFeature.setProvider( + 'e2e', new FlagdProvider({ cache: 'disabled', resolverType: 'in-process', host: 'localhost', port: 9090 }), ); -assert( - OpenFeature.providerMetadata.name === FLAGD_NAME, - new Error(`Expected ${FLAGD_NAME} provider to be configured, instead got: ${OpenFeature.providerMetadata.name}`), -); +OpenFeature.setProvider('unstable', new FlagdProvider({ resolverType: 'in-process', host: 'localhost', port: 9091 })); +// TODO: update with correct assertions once we have ability to get providerMetadata for any provider +// assert( +// OpenFeature.providerMetadata.name === FLAGD_NAME, +// new Error(`Expected ${FLAGD_NAME} provider to be configured, instead got: ${OpenFeature.providerMetadata.name}`), +// ); console.log('flagd provider configured!'); diff --git a/libs/providers/flagd/src/e2e/setup-rpc-provider.ts b/libs/providers/flagd/src/e2e/setup-rpc-provider.ts index cce640f46..6974b2cb2 100644 --- a/libs/providers/flagd/src/e2e/setup-rpc-provider.ts +++ b/libs/providers/flagd/src/e2e/setup-rpc-provider.ts @@ -6,9 +6,11 @@ const FLAGD_NAME = 'flagd Provider'; // register the flagd provider before the tests. console.log('Setting flagd provider...'); -OpenFeature.setProvider(new FlagdProvider({ cache: 'disabled' })); -assert( - OpenFeature.providerMetadata.name === FLAGD_NAME, - new Error(`Expected ${FLAGD_NAME} provider to be configured, instead got: ${OpenFeature.providerMetadata.name}`), -); +OpenFeature.setProvider('e2e', new FlagdProvider({ cache: 'disabled' })); +OpenFeature.setProvider('unstable', new FlagdProvider({ cache: 'disabled', port: 8014 })); +// TODO: update with correct assertions once we have ability to get providerMetadata for any provider +// assert( +// OpenFeature.providerMetadata.name === FLAGD_NAME, +// new Error(`Expected ${FLAGD_NAME} provider to be configured, instead got: ${OpenFeature.providerMetadata.name}`), +// ); console.log('flagd provider configured!'); diff --git a/libs/providers/flagd/src/e2e/step-definitions/evaluation.spec.ts b/libs/providers/flagd/src/e2e/step-definitions/evaluation.spec.ts index 8f36ddbe2..ebcabbce2 100644 --- a/libs/providers/flagd/src/e2e/step-definitions/evaluation.spec.ts +++ b/libs/providers/flagd/src/e2e/step-definitions/evaluation.spec.ts @@ -14,7 +14,7 @@ import { defineFeature, loadFeature } from 'jest-cucumber'; const feature = loadFeature('features/evaluation.feature'); // get a client (flagd provider registered in setup) -const client = OpenFeature.getClient(); +const client = OpenFeature.getClient('e2e'); const givenAnOpenfeatureClientIsRegistered = ( given: (stepMatcher: string, stepDefinitionCallback: () => void) => void, @@ -29,10 +29,6 @@ defineFeature(feature, (test) => { }); }); - afterAll(async () => { - await OpenFeature.close(); - }); - test('Resolves boolean value', ({ given, when, then }) => { let value: boolean; let flagKey: string; diff --git a/libs/providers/flagd/src/e2e/step-definitions/flagd-json-evaluator.spec.ts b/libs/providers/flagd/src/e2e/step-definitions/flagd-json-evaluator.spec.ts index 4a56e521e..6bc133dc6 100644 --- a/libs/providers/flagd/src/e2e/step-definitions/flagd-json-evaluator.spec.ts +++ b/libs/providers/flagd/src/e2e/step-definitions/flagd-json-evaluator.spec.ts @@ -6,7 +6,7 @@ import { StepsDefinitionCallbackFunction } from 'jest-cucumber/dist/src/feature- const feature = loadFeature('features/flagd-json-evaluator.feature'); // get a client (flagd provider registered in setup) -const client = OpenFeature.getClient(); +const client = OpenFeature.getClient('e2e'); const aFlagProviderIsSet = (given: (stepMatcher: string, stepDefinitionCallback: () => void) => void) => { given('a flagd provider is set', () => undefined); @@ -39,10 +39,6 @@ defineFeature(feature, (test) => { }); }); - afterAll(async () => { - await OpenFeature.close(); - }); - test('Evaluator reuse', evaluateStringFlagWithContext); test('Fractional operator', ({ given, when, and, then }) => { diff --git a/libs/providers/flagd/src/e2e/step-definitions/flagd-reconnect.unstable.spec.ts b/libs/providers/flagd/src/e2e/step-definitions/flagd-reconnect.unstable.spec.ts new file mode 100644 index 000000000..d3349594d --- /dev/null +++ b/libs/providers/flagd/src/e2e/step-definitions/flagd-reconnect.unstable.spec.ts @@ -0,0 +1,45 @@ +import { OpenFeature, ProviderEvents } from '@openfeature/server-sdk'; +import { defineFeature, loadFeature } from 'jest-cucumber'; + +jest.setTimeout(30000); + +// load the feature file. +const feature = loadFeature('features/flagd-reconnect.feature'); + +// get a client (flagd provider registered in setup) +const client = OpenFeature.getClient('unstable'); + +defineFeature(feature, (test) => { + let readyRunCount = 0; + let errorRunCount = 0; + + beforeAll((done) => { + client.addHandler(ProviderEvents.Ready, async () => { + readyRunCount++; + done(); + }); + }); + + test('Provider reconnection', ({ given, when, then, and }) => { + given('a flagd provider is set', () => { + // handled in beforeAll + }); + when('a PROVIDER_READY handler and a PROVIDER_ERROR handler are added', () => { + client.addHandler(ProviderEvents.Error, () => { + errorRunCount++; + }); + }); + then('the PROVIDER_READY handler must run when the provider connects', async () => { + // should already be at 1 from `beforeAll` + expect(readyRunCount).toEqual(1); + }); + and("the PROVIDER_ERROR handler must run when the provider's connection is lost", async () => { + await new Promise((resolve) => setTimeout(resolve, 10000)); + expect(errorRunCount).toBeGreaterThan(0); + }); + and('when the connection is reestablished the PROVIDER_READY handler must run again', async () => { + await new Promise((resolve) => setTimeout(resolve, 10000)); + expect(readyRunCount).toBeGreaterThan(1); + }); + }); +}); diff --git a/libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts b/libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts index c33c54dd3..424d5bfb7 100644 --- a/libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts +++ b/libs/providers/flagd/src/e2e/step-definitions/flagd.spec.ts @@ -5,7 +5,7 @@ import { defineFeature, loadFeature } from 'jest-cucumber'; const feature = loadFeature('features/flagd.feature'); // get a client (flagd provider registered in setup) -const client = OpenFeature.getClient(); +const client = OpenFeature.getClient('e2e'); const aFlagProviderIsSet = (given: (stepMatcher: string, stepDefinitionCallback: () => void) => void) => { given('a flagd provider is set', () => undefined); @@ -18,10 +18,6 @@ defineFeature(feature, (test) => { }); }); - afterAll(async () => { - await OpenFeature.close(); - }); - test('Provider ready event', ({ given, when, then }) => { let ran = false; diff --git a/libs/providers/flagd/src/e2e/tear-down.ts b/libs/providers/flagd/src/e2e/tear-down.ts new file mode 100644 index 000000000..19ace441c --- /dev/null +++ b/libs/providers/flagd/src/e2e/tear-down.ts @@ -0,0 +1,8 @@ +import { OpenFeature } from '@openfeature/server-sdk'; + +const tearDownTests = async () => { + console.log('Shutting down OpenFeature...'); + await OpenFeature.close(); +}; + +export default tearDownTests; diff --git a/libs/providers/flagd/src/lib/configuration.spec.ts b/libs/providers/flagd/src/lib/configuration.spec.ts index e66e49ac0..f6696179c 100644 --- a/libs/providers/flagd/src/lib/configuration.spec.ts +++ b/libs/providers/flagd/src/lib/configuration.spec.ts @@ -15,7 +15,6 @@ describe('Configuration', () => { port: 8013, tls: false, maxCacheSize: DEFAULT_MAX_CACHE_SIZE, - maxEventStreamRetries: DEFAULT_MAX_EVENT_STREAM_RETRIES, cache: 'lru', resolverType: 'rpc', selector: '', @@ -28,7 +27,6 @@ describe('Configuration', () => { const tls = true; const socketPath = '/tmp/flagd.socks'; const maxCacheSize = 333; - const maxEventStreamRetries = 10; const cache = 'disabled'; const resolverType = 'in-process'; const selector = 'app=weather'; @@ -39,7 +37,6 @@ describe('Configuration', () => { process.env['FLAGD_SOCKET_PATH'] = socketPath; process.env['FLAGD_CACHE'] = cache; process.env['FLAGD_MAX_CACHE_SIZE'] = `${maxCacheSize}`; - process.env['FLAGD_MAX_EVENT_STREAM_RETRIES'] = `${maxEventStreamRetries}`; process.env['FLAGD_SOURCE_SELECTOR'] = `${selector}`; process.env['FLAGD_RESOLVER'] = `${resolverType}`; @@ -49,7 +46,6 @@ describe('Configuration', () => { tls, socketPath, maxCacheSize, - maxEventStreamRetries, cache, resolverType, selector, @@ -62,7 +58,6 @@ describe('Configuration', () => { port: 3000, tls: true, maxCacheSize: 1000, - maxEventStreamRetries: 5, cache: 'lru', resolverType: 'rpc', selector: '', diff --git a/libs/providers/flagd/src/lib/configuration.ts b/libs/providers/flagd/src/lib/configuration.ts index 73e0acf07..4fe37a7bf 100644 --- a/libs/providers/flagd/src/lib/configuration.ts +++ b/libs/providers/flagd/src/lib/configuration.ts @@ -62,13 +62,6 @@ export interface Config { * @default 1000 */ maxCacheSize?: number; - - /** - * Amount of times to attempt to reconnect to the event stream. - * - * @default 5 - */ - maxEventStreamRetries?: number; } export type FlagdProviderOptions = Partial; @@ -81,7 +74,6 @@ const DEFAULT_CONFIG: Config = { selector: '', cache: 'lru', maxCacheSize: DEFAULT_MAX_CACHE_SIZE, - maxEventStreamRetries: DEFAULT_MAX_EVENT_STREAM_RETRIES, }; enum ENV_VAR { @@ -91,7 +83,6 @@ enum ENV_VAR { FLAGD_SOCKET_PATH = 'FLAGD_SOCKET_PATH', FLAGD_CACHE = 'FLAGD_CACHE', FLAGD_MAX_CACHE_SIZE = 'FLAGD_MAX_CACHE_SIZE', - FLAGD_MAX_EVENT_STREAM_RETRIES = 'FLAGD_MAX_EVENT_STREAM_RETRIES', FLAGD_SOURCE_SELECTOR = 'FLAGD_SOURCE_SELECTOR', FLAGD_RESOLVER = 'FLAGD_RESOLVER', } @@ -115,9 +106,6 @@ const getEnvVarConfig = (): Partial => ({ ...(process.env[ENV_VAR.FLAGD_MAX_CACHE_SIZE] && { maxCacheSize: Number(process.env[ENV_VAR.FLAGD_MAX_CACHE_SIZE]), }), - ...(process.env[ENV_VAR.FLAGD_MAX_EVENT_STREAM_RETRIES] && { - maxEventStreamRetries: Number(process.env[ENV_VAR.FLAGD_MAX_EVENT_STREAM_RETRIES]), - }), ...(process.env[ENV_VAR.FLAGD_SOURCE_SELECTOR] && { selector: process.env[ENV_VAR.FLAGD_SOURCE_SELECTOR], }), diff --git a/libs/providers/flagd/src/lib/constants.ts b/libs/providers/flagd/src/lib/constants.ts index 9259e159d..6858a0352 100644 --- a/libs/providers/flagd/src/lib/constants.ts +++ b/libs/providers/flagd/src/lib/constants.ts @@ -1,5 +1,5 @@ export const BASE_EVENT_STREAM_RETRY_BACKOFF_MS = 1000; -export const DEFAULT_MAX_EVENT_STREAM_RETRIES = 5; +export const DEFAULT_MAX_EVENT_STREAM_RETRIES = Infinity; export const EVENT_CONFIGURATION_CHANGE = 'configuration_change'; export const EVENT_PROVIDER_READY = 'provider_ready'; export const DEFAULT_MAX_CACHE_SIZE = 1000; diff --git a/libs/providers/flagd/src/lib/flagd-provider.spec.ts b/libs/providers/flagd/src/lib/flagd-provider.spec.ts index ca3a630b3..0c1dc3122 100644 --- a/libs/providers/flagd/src/lib/flagd-provider.spec.ts +++ b/libs/providers/flagd/src/lib/flagd-provider.spec.ts @@ -28,6 +28,7 @@ import { import { EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from './constants'; import { FlagdProvider } from './flagd-provider'; import { FlagChangeMessage, GRPCService } from './service/grpc/grpc-service'; +import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state'; const REASON = StandardResolutionReasons.STATIC; const ERROR_REASON = StandardResolutionReasons.ERROR; @@ -261,11 +262,19 @@ describe(FlagdProvider.name, () => { cancel: jest.fn(), }; + const mockChannel = { + getConnectivityState: jest.fn(() => ConnectivityState.READY), + watchConnectivityState: jest.fn(), + }; + // mock ServiceClient to inject const streamingServiceClientMock = { eventStream: jest.fn(() => { return streamMock; }), + getChannel: jest.fn(() => { + return mockChannel; + }), close: jest.fn(), resolveBoolean: jest.fn( ( @@ -485,73 +494,23 @@ describe(FlagdProvider.name, () => { }); describe('connection/re-connection', () => { - it('should attempt to inital connection multiple times', (done) => { + it('should watch channel for reconnect after error', () => { const provider = new FlagdProvider( undefined, undefined, new GRPCService({ host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock), ); - provider.initialize(); - - // fake some errors - registeredOnErrorCallback(); - registeredOnErrorCallback(); - registeredOnErrorCallback(); - - // status should be ERROR - expect(provider.status).toEqual(ProviderStatus.ERROR); - - // connect finally - registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} }); - - new Promise((resolve) => setTimeout(resolve, 4000)).then(() => { - try { - // within 3 seconds, we should have seen at least 3 connect attempts, and provider should be READY. - expect( - (streamingServiceClientMock.eventStream as jest.MockedFn).mock.calls.length, - ).toBeGreaterThanOrEqual(3); - expect(provider.status).toEqual(ProviderStatus.READY); - done(); - } catch (err) { - done(err); - } + provider.initialize().catch(() => { + // ignore }); - }); - - it('should attempt re-connection multiple times', (done) => { - const provider = new FlagdProvider( - undefined, - undefined, - new GRPCService({ host: '', port: 123, tls: false, cache: 'lru' }, streamingServiceClientMock), - ); - provider.initialize(); - - // connect without issue initially - registeredOnMessageCallback({ type: EVENT_PROVIDER_READY, data: {} }); - - // status should be READY - expect(provider.status).toEqual(ProviderStatus.READY); // fake some errors registeredOnErrorCallback(); - registeredOnErrorCallback(); - registeredOnErrorCallback(); // status should be ERROR expect(provider.status).toEqual(ProviderStatus.ERROR); - - new Promise((resolve) => setTimeout(resolve, 4000)).then(() => { - try { - // within 4 seconds, we should have seen at least 3 connect attempts and status should be READY. - expect( - (streamingServiceClientMock.eventStream as jest.MockedFn).mock.calls.length, - ).toBeGreaterThanOrEqual(3); - expect(provider.status).toEqual(ProviderStatus.READY); - done(); - } catch (err) { - done(err); - } - }); + expect(streamingServiceClientMock.getChannel().getConnectivityState).toHaveBeenCalledWith(true); + expect(streamingServiceClientMock.getChannel().watchConnectivityState).toHaveBeenCalled(); }); }); }); diff --git a/libs/providers/flagd/src/lib/flagd-provider.ts b/libs/providers/flagd/src/lib/flagd-provider.ts index 311e2d851..66cc99bab 100644 --- a/libs/providers/flagd/src/lib/flagd-provider.ts +++ b/libs/providers/flagd/src/lib/flagd-provider.ts @@ -55,7 +55,7 @@ export class FlagdProvider implements Provider { initialize(): Promise { return this._service - .connect(this.setReady.bind(this), this.emitChanged.bind(this), this.setError.bind(this)) + .connect(this.handleReconnect.bind(this), this.handleChanged.bind(this), this.handleError.bind(this)) .then(() => { this.logger?.debug(`${this.metadata.name}: ready`); this._status = ProviderStatus.READY; @@ -122,15 +122,17 @@ export class FlagdProvider implements Provider { throw err; }; - private setReady(): void { + private handleReconnect(): void { this._status = ProviderStatus.READY; + this._events.emit(ProviderEvents.Ready); } - private setError(): void { + private handleError(): void { this._status = ProviderStatus.ERROR; + this._events.emit(ProviderEvents.Error); } - private emitChanged(flagsChanged: string[]): void { + private handleChanged(flagsChanged: string[]): void { this._events.emit(ProviderEvents.ConfigurationChanged, { flagsChanged }); } } diff --git a/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts b/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts index 5da2d6c86..791f33f1c 100644 --- a/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts +++ b/libs/providers/flagd/src/lib/service/grpc/grpc-service.ts @@ -1,4 +1,5 @@ import { ClientReadableStream, ClientUnaryCall, ServiceError, credentials, status } from '@grpc/grpc-js'; +import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state'; import { EvaluationContext, FlagNotFoundError, @@ -28,13 +29,7 @@ import { ServiceClient, } from '../../../proto/ts/schema/v1/schema'; import { Config } from '../../configuration'; -import { - BASE_EVENT_STREAM_RETRY_BACKOFF_MS, - DEFAULT_MAX_CACHE_SIZE, - DEFAULT_MAX_EVENT_STREAM_RETRIES, - EVENT_CONFIGURATION_CHANGE, - EVENT_PROVIDER_READY, -} from '../../constants'; +import { DEFAULT_MAX_CACHE_SIZE, EVENT_CONFIGURATION_CHANGE, EVENT_PROVIDER_READY } from '../../constants'; import { FlagdProvider } from '../../flagd-provider'; import { Service } from '../service'; @@ -73,14 +68,10 @@ export class GRPCService implements Service { private _client: ServiceClient; private _cache: LRUCache> | undefined; private _cacheEnabled = false; - private _streamAlive = false; - private _streamConnectAttempt = 0; - private _stream: ClientReadableStream | undefined = undefined; - private _streamConnectBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; - private _maxEventStreamRetries; + private _eventStream: ClientReadableStream | undefined = undefined; private get _cacheActive() { // the cache is "active" (able to be used) if the config enabled it, AND the gRPC stream is live - return this._cacheEnabled && this._streamAlive; + return this._cacheEnabled && this._client.getChannel().getConnectivityState(false) === ConnectivityState.READY; } constructor( @@ -89,7 +80,6 @@ export class GRPCService implements Service { private logger?: Logger, ) { const { host, port, tls, socketPath } = config; - this._maxEventStreamRetries = config.maxEventStreamRetries ?? DEFAULT_MAX_EVENT_STREAM_RETRIES; this._client = client ? client : new ServiceClient( @@ -104,16 +94,18 @@ export class GRPCService implements Service { } connect( - connectCallback: () => void, + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, ): Promise { - return this.connectStream(connectCallback, changedCallback, disconnectCallback); + return new Promise((resolve, reject) => + this.listen(reconnectCallback, changedCallback, disconnectCallback, resolve, reject), + ); } async disconnect(): Promise { // cancel the stream and close the connection - this._stream?.cancel(); + this._eventStream?.cancel(); this._client.close(); } @@ -153,42 +145,33 @@ export class GRPCService implements Service { return this.resolve(this._client.resolveObject, flagKey, context, logger); } - private connectStream( - connectCallback: () => void, + private listen( + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, - ): Promise { - return new Promise((resolve, reject) => { - this.logger?.debug(`${FlagdProvider.name}: connecting stream, attempt ${this._streamConnectAttempt}...`); - const stream = this._client.eventStream({}, {}); - stream.on('error', (err: ServiceError | undefined) => { - if (err?.code === status.CANCELLED) { - this.logger?.debug(`${FlagdProvider.name}: stream cancelled, will not be re-established`); + resolveConnect?: () => void, + rejectConnect?: (reason: Error) => void, + ) { + this.logger?.debug(`${FlagdProvider.name}: connecting stream...`); + const stream = this._client.eventStream({}, {}); + stream.on('error', (err: Error) => { + rejectConnect?.(err); + this.handleError(reconnectCallback, changedCallback, disconnectCallback); + }); + stream.on('data', (message) => { + if (message.type === EVENT_PROVIDER_READY) { + this.logger?.debug(`${FlagdProvider.name}: streaming connection established with flagd`); + // if resolveConnect is undefined, this is a reconnection; we only want to fire the reconnect callback in that case + if (resolveConnect) { + resolveConnect(); } else { - this.handleError(reject, connectCallback, changedCallback, disconnectCallback); - } - }); - stream.on('close', () => { - this.handleClose(); - }); - stream.on('data', (message) => { - if (message.type === EVENT_PROVIDER_READY) { - this.handleProviderReady(resolve, connectCallback); - } else if (message.type === EVENT_CONFIGURATION_CHANGE) { - this.handleFlagsChanged(message, changedCallback); + reconnectCallback(); } - }); - this._stream = stream; + } else if (message.type === EVENT_CONFIGURATION_CHANGE) { + this.handleFlagsChanged(message, changedCallback); + } }); - } - - private handleProviderReady(resolve: () => void, connectCallback: () => void) { - connectCallback(); - this.logger?.info(`${FlagdProvider.name}: streaming connection established with flagd`); - this._streamAlive = true; - this._streamConnectAttempt = 0; - this._streamConnectBackoff = BASE_EVENT_STREAM_RETRY_BACKOFF_MS; - resolve(); + this._eventStream = stream; } private handleFlagsChanged(message: EventStreamResponse, changedCallback: (flagsChanged: string[]) => void) { @@ -209,38 +192,26 @@ export class GRPCService implements Service { } } + private reconnect( + reconnectCallback: () => void, + changedCallback: (flagsChanged: string[]) => void, + disconnectCallback: () => void, + ) { + const channel = this._client.getChannel(); + channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => { + this.listen(reconnectCallback, changedCallback, disconnectCallback); + }); + } + private handleError( - reject: (reason?: Error) => void, - connectCallback: () => void, + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, ) { disconnectCallback(); this.logger?.error(`${FlagdProvider.name}: streaming connection error, will attempt reconnect...`); this._cache?.clear(); - this._streamAlive = false; - - // if we haven't reached max attempt, reconnect after backoff - if (this._streamConnectAttempt <= this._maxEventStreamRetries) { - this._streamConnectAttempt++; - setTimeout(() => { - this._streamConnectBackoff = this._streamConnectBackoff * 2; - this.connectStream(connectCallback, changedCallback, disconnectCallback).catch(() => { - // empty catch to avoid unhandled promise rejection - }); - }, this._streamConnectBackoff); - } else { - // after max attempts, give up - const errorMessage = `${FlagdProvider.name}: max stream connect attempts (${this._maxEventStreamRetries} reached)`; - this.logger?.error(errorMessage); - reject(new Error(errorMessage)); - } - } - - private handleClose() { - this.logger?.info(`${FlagdProvider.name}: streaming connection closed`); - this._cache?.clear(); - this._streamAlive = false; + this.reconnect(reconnectCallback, changedCallback, disconnectCallback); } private async resolve( diff --git a/libs/providers/flagd/src/lib/service/in-process/data-fetch.ts b/libs/providers/flagd/src/lib/service/in-process/data-fetch.ts index 726402809..b8962fcc1 100644 --- a/libs/providers/flagd/src/lib/service/in-process/data-fetch.ts +++ b/libs/providers/flagd/src/lib/service/in-process/data-fetch.ts @@ -4,7 +4,7 @@ export interface DataFetch { connect( dataFillCallback: (flags: string) => void, - connectCallback: () => void, + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, ): Promise; diff --git a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts index dd3dff9ff..0676bb9db 100644 --- a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts +++ b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts @@ -1,6 +1,7 @@ -import { GrpcFetch, initBackOffMs } from './grpc-fetch'; +import { GrpcFetch } from './grpc-fetch'; import { Config } from '../../../configuration'; import { FlagSyncServiceClient, SyncFlagsResponse, SyncState } from '../../../../proto/ts/sync/v1/sync_service'; +import { ConnectivityState } from '@grpc/grpc-js/build/src/connectivity-state'; describe('grpc fetch', () => { const cfg: Config = { host: 'localhost', port: 8000, tls: false, socketPath: '' }; @@ -40,14 +41,23 @@ describe('grpc fetch', () => { // then expect(dataFillCallback).toBeCalledTimes(1); - expect(connectCallback).toBeCalledTimes(1); expect(disconnectCallback).toBeCalledTimes(0); expect(callBackResponse).toBe(flagResponse); }); - it('should handle error and attempt to reconnect', (done) => { + it('should handle error and watch channel for reconnect', () => { + const mockChannel = { + getConnectivityState: jest.fn(() => ConnectivityState.READY), + watchConnectivityState: jest.fn(() => { + // empty + }), + }; + // given const serviceMock: FlagSyncServiceClient = { + getChannel: jest.fn(() => { + return mockChannel; + }), syncFlags: jest.fn(() => { return { on: jest.fn((event: string, callback: (err: Error) => void) => { @@ -64,15 +74,13 @@ describe('grpc fetch', () => { // when const fetch = new GrpcFetch(cfg, serviceMock); - fetch.connect(jest.fn(), jest.fn(), jest.fn(), disconnectCallback); + fetch.connect(jest.fn(), jest.fn(), jest.fn(), disconnectCallback).catch(() => { + // do nothing + }); // then expect(disconnectCallback).toBeCalledTimes(1); - - // wait for initial backoff - setTimeout(() => { - expect(disconnectCallback).toBeCalledTimes(2); - done(); - }, initBackOffMs); + expect(serviceMock.getChannel().getConnectivityState).toHaveBeenCalledWith(true); + expect(serviceMock.getChannel().watchConnectivityState).toHaveBeenCalled(); }); }); diff --git a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts index f52ac9f43..aa6e5a90d 100644 --- a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts +++ b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts @@ -1,19 +1,14 @@ -import { DataFetch } from '../data-fetch'; -import { Config } from '../../../configuration'; -import { FlagSyncServiceClient, SyncFlagsRequest, SyncFlagsResponse } from '../../../../proto/ts/sync/v1/sync_service'; -import { ClientReadableStream, ServiceError, credentials, status } from '@grpc/grpc-js'; +import { ClientReadableStream, ServiceError, credentials } from '@grpc/grpc-js'; import { Logger } from '@openfeature/core'; - -export const initBackOffMs = 2 * 1000; -const maxStartupDeadlineMs = 500; -const maxBackOffMs = 120 * 1000; +import { GeneralError } from '@openfeature/server-sdk'; +import { FlagSyncServiceClient, SyncFlagsRequest, SyncFlagsResponse } from '../../../../proto/ts/sync/v1/sync_service'; +import { Config } from '../../../configuration'; +import { DataFetch } from '../data-fetch'; /** * Implements the gRPC sync contract to fetch flag data. */ export class GrpcFetch implements DataFetch { - private _connecting = false; - private _nextBackoff = initBackOffMs; private _syncClient: FlagSyncServiceClient; private _syncStream: ClientReadableStream | undefined; private readonly _request: SyncFlagsRequest; @@ -35,85 +30,61 @@ export class GrpcFetch implements DataFetch { connect( dataFillCallback: (flags: string) => void, - connectCallback: () => void, + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, ): Promise { // note that we never reject the promise as sync is a long-running operation - return new Promise((resolve) => - this.listen(resolve, dataFillCallback, connectCallback, changedCallback, disconnectCallback), + return new Promise((resolve, reject) => + this.listen(dataFillCallback, reconnectCallback, changedCallback, disconnectCallback, resolve, reject), ); } disconnect() { this._logger?.debug('Disconnecting gRPC sync connection'); - this._syncStream?.cancel(); + this._syncStream?.destroy(); this._syncClient.close(); } private listen( - resolveConnect: () => void, dataFillCallback: (flags: string) => void, - connectCallback: () => void, + reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, + resolveConnect?: () => void, + rejectConnect?: (reason: Error) => void, ) { this._syncStream = this._syncClient.syncFlags(this._request); - this._connecting = false; this._syncStream.on('data', (data: SyncFlagsResponse) => { this._logger?.debug('Received sync payload'); dataFillCallback(data.flagConfiguration); - connectCallback(); changedCallback([]); // flags changed list not supported - resolveConnect(); - this._nextBackoff = initBackOffMs; + // if resolveConnect is undefined, this is a reconnection; we only want to fire the reconnect callback in that case + if (resolveConnect) { + resolveConnect(); + } else { + reconnectCallback(); + } }); this._syncStream.on('error', (err: ServiceError | undefined) => { - if (err?.code === status.CANCELLED) { - this._logger?.debug('Stream cancelled, will not be re-established'); - } else { - this._logger?.error('Connection error, attempting to reconnect', err); - disconnectCallback(); - this.reconnectWithBackoff( - resolveConnect, - dataFillCallback, - connectCallback, - changedCallback, - disconnectCallback, - ); - } + this._logger?.error('Connection error, attempting to reconnect', err); + disconnectCallback(); + rejectConnect?.(new GeneralError('Failed to connect stream')); + this.reconnect(dataFillCallback, reconnectCallback, changedCallback, disconnectCallback); }); } - private reconnectWithBackoff( - resolver: () => void, + private reconnect( dataFillCallback: (flags: string) => void, reconnectCallback: () => void, changedCallback: (flagsChanged: string[]) => void, disconnectCallback: () => void, - ): void { - // avoid reattempts if already connecting - // see - https://github.com/grpc/grpc-node/issues/2377 - if (this._connecting) { - return; - } - - this._logger?.debug(`Attempting to reconnection after ${this._nextBackoff}ms`); - this._connecting = true; - - if (this._nextBackoff > maxStartupDeadlineMs) { - resolver(); - } - - setTimeout(() => { - this._nextBackoff = this._nextBackoff * 2; - if (this._nextBackoff > maxBackOffMs) { - this._nextBackoff = maxBackOffMs; - } - - this.listen(resolver, dataFillCallback, reconnectCallback, changedCallback, disconnectCallback); - }, this._nextBackoff); + ) { + const channel = this._syncClient.getChannel(); + channel.watchConnectivityState(channel.getConnectivityState(true), Infinity, () => { + this.listen(dataFillCallback, reconnectCallback, changedCallback, disconnectCallback); + }); } }