Skip to content

Commit

Permalink
feat: separate frontend backend counting (#9167)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwasniew authored Jan 29, 2025
1 parent 7ca8cc2 commit cbe0ac4
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { IUniqueConnectionStore } from '../../types';
import type {
BucketId,
TimedUniqueConnections,
UniqueConnections,
} from './unique-connection-store-type';
Expand All @@ -16,7 +17,7 @@ export class FakeUniqueConnectionStore implements IUniqueConnectionStore {
}

async get(
id: 'current' | 'previous',
id: BucketId,
): Promise<(UniqueConnections & { updatedAt: Date }) | null> {
return this.uniqueConnectionsRecord[id] || null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export interface IUniqueConnectionReadModel {
getStats(): Promise<{ previous: number; current: number }>;
getStats(): Promise<{
previous: number;
current: number;
previousBackend: number;
currentBackend: number;
previousFrontend: number;
currentFrontend: number;
}>;
}
50 changes: 48 additions & 2 deletions src/lib/features/unique-connection/unique-connection-read-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,20 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel {
}

async getStats() {
const [previous, current] = await Promise.all([
const [
previous,
current,
previousFrontend,
currentFrontend,
previousBackend,
currentBackend,
] = await Promise.all([
this.uniqueConnectionStore.get('previous'),
this.uniqueConnectionStore.get('current'),
this.uniqueConnectionStore.get('previousFrontend'),
this.uniqueConnectionStore.get('currentFrontend'),
this.uniqueConnectionStore.get('previousBackend'),
this.uniqueConnectionStore.get('currentBackend'),
]);
const previousHll = HyperLogLog(REGISTERS_EXPONENT);
if (previous) {
Expand All @@ -26,6 +37,41 @@ export class UniqueConnectionReadModel implements IUniqueConnectionReadModel {
if (current) {
currentHll.merge({ n: REGISTERS_EXPONENT, buckets: current.hll });
}
return { previous: previousHll.count(), current: currentHll.count() };
const previousFrontendHll = HyperLogLog(REGISTERS_EXPONENT);
if (previousFrontend) {
previousFrontendHll.merge({
n: REGISTERS_EXPONENT,
buckets: previousFrontend.hll,
});
}
const currentFrontendHll = HyperLogLog(REGISTERS_EXPONENT);
if (currentFrontend) {
currentFrontendHll.merge({
n: REGISTERS_EXPONENT,
buckets: currentFrontend.hll,
});
}
const previousBackendHll = HyperLogLog(REGISTERS_EXPONENT);
if (previousBackend) {
previousBackendHll.merge({
n: REGISTERS_EXPONENT,
buckets: previousBackend.hll,
});
}
const currentBackendHll = HyperLogLog(REGISTERS_EXPONENT);
if (currentBackend) {
currentBackendHll.merge({
n: REGISTERS_EXPONENT,
buckets: currentBackend.hll,
});
}
return {
previous: previousHll.count(),
current: currentHll.count(),
previousFrontend: previousFrontendHll.count(),
currentFrontend: currentFrontendHll.count(),
previousBackend: previousBackendHll.count(),
currentBackend: currentBackendHll.count(),
};
}
}
231 changes: 200 additions & 31 deletions src/lib/features/unique-connection/unique-connection-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,38 @@ test('sync first current bucket', async () => {
);
uniqueConnectionService.listen();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});

await uniqueConnectionService.sync();

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 2 });
expect(stats).toEqual({
previous: 0,
current: 2,
previousBackend: 0,
currentBackend: 2,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('sync first previous bucket', async () => {
Expand All @@ -51,17 +73,33 @@ test('sync first previous bucket', async () => {
uniqueConnectionStore,
);

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection1');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection2');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection1',
type: 'backend',
});
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection2',
type: 'backend',
});

await uniqueConnectionService.sync();

eventBus.emit(SDK_CONNECTION_ID_RECEIVED, 'connection3');
eventBus.emit(SDK_CONNECTION_ID_RECEIVED, {
connectionId: 'connection3',
type: 'backend',
});

await uniqueConnectionService.sync(addHours(new Date(), 1));

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
expect(stats).toEqual({
previous: 3,
current: 0,
previousBackend: 3,
currentBackend: 0,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('sync to existing current bucket from the same service', async () => {
Expand All @@ -77,16 +115,35 @@ test('sync to existing current bucket from the same service', async () => {
uniqueConnectionStore,
);

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'backend',
});

await uniqueConnectionService.sync();

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection3');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
expect(stats).toEqual({
previous: 0,
current: 3,
previousBackend: 0,
currentBackend: 3,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('sync to existing current bucket from another service', async () => {
Expand All @@ -109,16 +166,35 @@ test('sync to existing current bucket from another service', async () => {
uniqueConnectionStore,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
uniqueConnectionService1.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService1.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService1.sync();

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
uniqueConnectionService2.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService2.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService2.sync();

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 0, current: 3 });
expect(stats).toEqual({
previous: 0,
current: 3,
previousBackend: 0,
currentBackend: 3,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('sync to existing previous bucket from another service', async () => {
Expand All @@ -141,16 +217,35 @@ test('sync to existing previous bucket from another service', async () => {
config,
);

uniqueConnectionService1.count('connection1');
uniqueConnectionService1.count('connection2');
uniqueConnectionService1.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService1.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService1.sync(addHours(new Date(), 1));

uniqueConnectionService2.count('connection1');
uniqueConnectionService2.count('connection3');
uniqueConnectionService2.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService2.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService2.sync(addHours(new Date(), 1));

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 0 });
expect(stats).toEqual({
previous: 3,
current: 0,
previousBackend: 3,
currentBackend: 0,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('populate previous and current', async () => {
Expand All @@ -165,20 +260,94 @@ test('populate previous and current', async () => {
uniqueConnectionStore,
);

uniqueConnectionService.count('connection1');
uniqueConnectionService.count('connection2');
uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'backend',
});
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();

uniqueConnectionService.count('connection3');
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

uniqueConnectionService.count('connection3');
uniqueConnectionService.count('connection4');
uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection4',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({ previous: 3, current: 2 });
expect(stats).toEqual({
previous: 3,
current: 2,
previousBackend: 3,
currentBackend: 2,
previousFrontend: 0,
currentFrontend: 0,
});
});

test('populate all buckets', async () => {
const eventBus = new EventEmitter();
const config = { flagResolver: alwaysOnFlagResolver, getLogger, eventBus };
const uniqueConnectionStore = new FakeUniqueConnectionStore();
const uniqueConnectionService = new UniqueConnectionService(
{ uniqueConnectionStore },
config,
);
const uniqueConnectionReadModel = new UniqueConnectionReadModel(
uniqueConnectionStore,
);

uniqueConnectionService.count({
connectionId: 'connection1',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection2',
type: 'frontend',
});
await uniqueConnectionService.sync();
await uniqueConnectionService.sync();

uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

uniqueConnectionService.count({
connectionId: 'connection3',
type: 'backend',
});
uniqueConnectionService.count({
connectionId: 'connection4',
type: 'frontend',
});
await uniqueConnectionService.sync(addHours(new Date(), 1));
await uniqueConnectionService.sync(addHours(new Date(), 1)); // deliberate duplicate call

const stats = await uniqueConnectionReadModel.getStats();
expect(stats).toEqual({
previous: 3,
current: 2,
previousBackend: 2,
currentBackend: 1,
previousFrontend: 1,
currentFrontend: 1,
});
});
Loading

0 comments on commit cbe0ac4

Please sign in to comment.