Skip to content

Commit

Permalink
feat: add onConnectionStateChange callback for flux subscription (#2782)
Browse files Browse the repository at this point in the history
* Add onConnectionStateChange callback for flux subscription

Fixes: #2769

* Add onConnectionStateChange callback for flux subscription

Fixes: #2769

* Fix tests

* Fix tests

* Fix signals test
Fixes: #2769

* Fix formatting
Fixes: #2769

---------

Co-authored-by: Anton Platonov <[email protected]>
  • Loading branch information
krissvaa and platosha authored Oct 16, 2024
1 parent 0daf4dc commit 9bafcb7
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 5 deletions.
9 changes: 8 additions & 1 deletion packages/ts/frontend/src/Connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import {
UnauthorizedResponseError,
type ValidationErrorData,
} from './EndpointErrors.js';
import { type ActionOnLostSubscription, FluxConnection } from './FluxConnection.js';
import {
type ActionOnLostSubscription,
FluxConnection,
type FluxSubscriptionStateChangeEvent,
} from './FluxConnection.js';
import type { VaadinWindow } from './types.js';

const $wnd = window as VaadinWindow;
Expand Down Expand Up @@ -43,6 +47,9 @@ export interface Subscription<T> {
/** Called when a new value is available. */
onNext(callback: (value: T) => void): Subscription<T>;

/** Called when the subscription state changes. */
onConnectionStateChange(callback: (event: FluxSubscriptionStateChangeEvent) => void): Subscription<T>;

/**
* Called when the connection is restored, but there's no longer a valid subscription. If the callback returns
* `ActionOnLostSubscription.RESUBSCRIBE`, the subscription will be re-established by connecting to the same
Expand Down
60 changes: 60 additions & 0 deletions packages/ts/frontend/src/FluxConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,29 @@ export enum ActionOnLostSubscription {
REMOVE = 'remove',
}

/**
* Possible states of a flux subscription.
*/
export enum FluxSubscriptionState {
/**
* The subscription is not connected and is trying to connect.
*/
CONNECTING = 'connecting',
/**
* The subscription is connected and receiving updates.
*/
CONNECTED = 'connected',
/**
* The subscription is closed and is not trying to reconnect.
*/
CLOSED = 'closed',
}

/**
* Event wrapper for flux subscription connection state change callback
*/
export type FluxSubscriptionStateChangeEvent = CustomEvent<{ state: FluxSubscriptionState }>;

type EndpointInfo = {
endpointName: string;
methodName: string;
Expand All @@ -58,6 +81,8 @@ export class FluxConnection extends EventTarget {
readonly #onCompleteCallbacks = new Map<string, () => void>();
readonly #onErrorCallbacks = new Map<string, () => void>();
readonly #onNextCallbacks = new Map<string, (value: any) => void>();
readonly #onStateChangeCallbacks = new Map<string, (event: FluxSubscriptionStateChangeEvent) => void>();
readonly #statusOfSubscriptions = new Map<string, FluxSubscriptionState>();
#pendingMessages: ServerMessage[] = [];
#socket?: Atmosphere.Request;

Expand All @@ -82,6 +107,7 @@ export class FluxConnection extends EventTarget {
const msg: ServerConnectMessage = { '@type': 'subscribe', endpointName, id, methodName, params };
this.#send(msg);
this.#endpointInfos.set(id, { endpointName, methodName, params });
this.#setSubscriptionConnState(id, FluxSubscriptionState.CONNECTING);
const hillaSubscription: Subscription<any> = {
cancel: () => {
if (!this.#endpointInfos.has(id)) {
Expand Down Expand Up @@ -121,6 +147,13 @@ export class FluxConnection extends EventTarget {
}
return hillaSubscription;
},
onConnectionStateChange: (callback: (event: FluxSubscriptionStateChangeEvent) => void): Subscription<any> => {
this.#onStateChangeCallbacks.set(id, callback);
callback(
new CustomEvent('subscription-state-change', { detail: { state: this.#statusOfSubscriptions.get(id)! } }),
);
return hillaSubscription;
},
};
return hillaSubscription;
}
Expand Down Expand Up @@ -193,18 +226,41 @@ export class FluxConnection extends EventTarget {
onReconnect: () => {
if (this.state !== State.RECONNECTING) {
this.state = State.RECONNECTING;
this.#endpointInfos.forEach((endpointInfo, id) => {
if (endpointInfo.reconnect?.() === ActionOnLostSubscription.RESUBSCRIBE) {
this.#setSubscriptionConnState(id, FluxSubscriptionState.CONNECTING);
} else {
this.#setSubscriptionConnState(id, FluxSubscriptionState.CLOSED);
}
});
}
},
onFailureToReconnect: () => {
if (this.state !== State.INACTIVE) {
this.state = State.INACTIVE;
this.dispatchEvent(new CustomEvent('state-changed', { detail: { active: false } }));
this.#endpointInfos.forEach((_, id) => this.#setSubscriptionConnState(id, FluxSubscriptionState.CLOSED));
}
},
...atmosphereOptions,
} satisfies Atmosphere.Request);
}

#setSubscriptionConnState(id: string, state: FluxSubscriptionState) {
const currentState = this.#statusOfSubscriptions.get(id);
if (!currentState) {
this.#statusOfSubscriptions.set(id, state);
this.#onStateChangeCallbacks.get(id)?.(
new CustomEvent('subscription-state-change', { detail: { state: this.#statusOfSubscriptions.get(id)! } }),
);
} else if (currentState !== state) {
this.#statusOfSubscriptions.set(id, state);
this.#onStateChangeCallbacks.get(id)?.(
new CustomEvent('subscription-state-change', { detail: { state: this.#statusOfSubscriptions.get(id)! } }),
);
}
}

#handleMessage(message: unknown) {
if (isClientMessage(message)) {
const { id } = message;
Expand All @@ -215,6 +271,7 @@ export class FluxConnection extends EventTarget {
if (callback) {
callback(message.item);
}
this.#setSubscriptionConnState(id, FluxSubscriptionState.CONNECTED);
} else if (message['@type'] === 'complete') {
this.#onCompleteCallbacks.get(id)?.();
this.#removeSubscription(id);
Expand All @@ -238,6 +295,9 @@ export class FluxConnection extends EventTarget {
}

#removeSubscription(id: string) {
this.#setSubscriptionConnState(id, FluxSubscriptionState.CLOSED);
this.#statusOfSubscriptions.delete(id);
this.#onStateChangeCallbacks.delete(id);
this.#onNextCallbacks.delete(id);
this.#onCompleteCallbacks.delete(id);
this.#onErrorCallbacks.delete(id);
Expand Down
61 changes: 57 additions & 4 deletions packages/ts/frontend/test/FluxConnection.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { expect } from '@esm-bundle/chai';
import type { ReactiveController, ReactiveControllerHost } from '@lit/reactive-element';
import sinon from 'sinon';
import { FluxSubscriptionState } from '../FluxConnection';
import { ActionOnLostSubscription, FluxConnection, State } from '../src/FluxConnection.js';
import type {
AbstractMessage,
Expand Down Expand Up @@ -80,7 +81,7 @@ describe('@vaadin/hilla-frontend', () => {

it('should immediately return a Subscription when subscribing', () => {
const sub = fluxConnection.subscribe('MyEndpoint', 'myMethod');
for (const name of ['onNext', 'onComplete', 'onError']) {
for (const name of ['onNext', 'onComplete', 'onError', 'onConnectionStateChange', 'onSubscriptionLost']) {
expect(sub).to.have.property(name).which.is.a('function');
}
});
Expand Down Expand Up @@ -373,7 +374,7 @@ describe('@vaadin/hilla-frontend', () => {
sub.onSubscriptionLost(resubscribe);
getSubscriptionEventSpies()?.onReconnect?.();
getSubscriptionEventSpies()?.onReopen?.();
expect(resubscribe).to.have.been.calledOnce;
expect(resubscribe).to.have.been.calledTwice;
expect(getSubscriptionEventSpies()?.push).to.have.been.calledWith(
JSON.stringify({
'@type': 'subscribe',
Expand All @@ -394,7 +395,7 @@ describe('@vaadin/hilla-frontend', () => {
getSubscriptionEventSpies()?.onReopen?.();
getSubscriptionEventSpies()?.onReconnect?.();
getSubscriptionEventSpies()?.onReopen?.();
expect(resubscribe).to.have.been.calledOnce;
expect(resubscribe).to.have.been.calledTwice;
});

it('should remove subscription information when callback has no return value', () => {
Expand All @@ -405,7 +406,59 @@ describe('@vaadin/hilla-frontend', () => {
getSubscriptionEventSpies()?.onReopen?.();
getSubscriptionEventSpies()?.onReconnect?.();
getSubscriptionEventSpies()?.onReopen?.();
expect(resubscribe).to.have.been.calledOnce;
expect(resubscribe).to.have.been.calledTwice;
});

describe('flux subscription state', () => {
it('calls change subscription connection state callback', () => {
const sub = fluxConnection.subscribe('MyEndpoint', 'myMethod');
let subState;
sub.onConnectionStateChange((event) => {
subState = event.detail.state;
});
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTING);
emitMessage({ '@type': 'update', id: '0' });
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTED);
getSubscriptionEventSpies()?.onClose?.();
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTED);
getSubscriptionEventSpies()?.onReconnect?.();
expect(subState).to.be.eq(FluxSubscriptionState.CLOSED);
getSubscriptionEventSpies()?.onReopen?.();
expect(subState).to.be.eq(FluxSubscriptionState.CLOSED);
});

it('calls change subscription connection state callback with re-subscription', () => {
const sub = fluxConnection.subscribe('MyEndpoint', 'myMethod');
let subState;
const resubscribe = sinon.stub();
resubscribe.returns(ActionOnLostSubscription.RESUBSCRIBE);
sub.onSubscriptionLost(resubscribe);
sub.onConnectionStateChange((event) => {
subState = event.detail.state;
});
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTING);
emitMessage({ '@type': 'update', id: '0' });
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTED);
getSubscriptionEventSpies()?.onClose?.();
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTED);
getSubscriptionEventSpies()?.onReconnect?.();
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTING);
getSubscriptionEventSpies()?.onReopen?.();
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTING);
emitMessage({ '@type': 'update', id: '0' });
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTED);
});

it('should close subscription connection state if failed to reconnect', () => {
const sub = fluxConnection.subscribe('MyEndpoint', 'myMethod');
let subState;
sub.onConnectionStateChange((event) => {
subState = event.detail.state;
});
expect(subState).to.be.eq(FluxSubscriptionState.CONNECTING);
getSubscriptionEventSpies()?.onFailureToReconnect?.();
expect(subState).to.be.eq(FluxSubscriptionState.CLOSED);
});
});
});
});
3 changes: 3 additions & 0 deletions packages/ts/react-signals/test/FullStackSignal.spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ describe('@vaadin/hilla-react-signals', () => {
onSubscriptionLost() {
return this;
},
onConnectionStateChange() {
return this;
},
});
// Mock the subscribe method
client.subscribe.returns(subscription);
Expand Down
3 changes: 3 additions & 0 deletions packages/ts/react-signals/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ export function createSubscriptionStub<T>(): sinon.SinonSpiedInstance<Subscripti
onSubscriptionLost() {
return this;
},
onConnectionStateChange() {
return this;
},
});
}

Expand Down

0 comments on commit 9bafcb7

Please sign in to comment.