Skip to content

Commit

Permalink
fix: Fixed multi-server work
Browse files Browse the repository at this point in the history
  • Loading branch information
y9san9 committed Feb 12, 2025
1 parent a93b7d8 commit b651154
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/modules/main/chat/logic/chat-logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ export function createChatLogic(
setDisplayNickname: (value) => { events.emit({ type: "nickname", value }); },
});
const cancel2 = listenWorkerEvents({
worker, chatId: queueId, nickname: nicknameStateHandle,
worker, url, queueId, nickname: nicknameStateHandle,
getMessages: () => messages, setMessages,
setUpdating,
getLocalNonce: () => localNonce, setLocalNonce: (value) => localNonce = value,
Expand Down
11 changes: 7 additions & 4 deletions src/modules/main/chat/logic/listen-worker-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { Cancellation } from "@/coroutines/cancellation.ts";

export type ListenWorkerEventsOptions = {
worker: WorkerStateHandle;
chatId: string;
url: string;
queueId: string;
nickname: NicknameStateHandle;
getMessages: () => Message[];
setMessages: (value: Message[]) => void;
Expand All @@ -18,7 +19,7 @@ export type ListenWorkerEventsOptions = {

export function listenWorkerEvents(
{
worker, chatId, nickname,
worker, url, queueId, nickname,
getMessages, setMessages,
setUpdating,
getLocalNonce, setLocalNonce,
Expand All @@ -29,7 +30,8 @@ export function listenWorkerEvents(
switch (event.type) {
case "new":
{
if (event.queueId != chatId) return;
if (event.url !== url) return;
if (event.queueId != queueId) return;

const messages: Message[] = [];

Expand Down Expand Up @@ -70,7 +72,8 @@ export function listenWorkerEvents(
break;
}
case "waiting":
if (event.queueId != chatId) break;
if (event.url !== url) break;
if (event.queueId != queueId) break;
setUpdating(!event.waiting);
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/modules/umbrella/logic/logic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type CreateSeedWorkerOptions = {
function createSeedClient() {
const client = seedClient({
engine: {
mainUrl: "https://meetacy.app/seed-kt",
mainUrl: "wss://meetacy.app/seed-kt",
},
});
client.setForeground(true);
Expand Down
2 changes: 2 additions & 0 deletions src/modules/umbrella/logic/worker-state-handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { SeedPersistence } from "@/modules/umbrella/persistence/seed-persistence

export type WorkStateHandleEvent = {
type: "new";
url: string;
queueId: string;
messages: SeedWorkerMessage[];
} | {
type: "connected";
connected: boolean;
} | {
type: "waiting";
url: string;
queueId: string;
waiting: boolean;
}
Expand Down
16 changes: 15 additions & 1 deletion src/modules/umbrella/persistence/seed-persistence.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import {
Expand All @@ -23,7 +24,7 @@ export interface SeedPersistence {
}

export async function createPersistence(): Promise<SeedPersistence> {
const db = await openDB("persistence", 11, {
const db = await openDB("persistence", 12, {
async upgrade(db, version, _, transaction) {
// Full schema creation
if (version == 0) {
Expand Down Expand Up @@ -104,6 +105,19 @@ export async function createPersistence(): Promise<SeedPersistence> {
}
}
}
if (version <= 11) {
const chatStore = transaction.objectStore("chat");

const cursor = await chatStore.openCursor();
if (cursor) {
for await (const { value: chat } of cursor) {
if (chat.serverUrl.startsWith("https")) {
(chat as Chat).serverUrl = chat.serverUrl.replace("https", "wss");
await chatStore.put(chat);
}
}
}
}
},
});

Expand Down
4 changes: 2 additions & 2 deletions src/sdk-v2/seed-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createObservable, Observable } from "@/coroutines/observable";
import { createSeedEngine, SeedEngine, SeedEngineDisconnected } from "./seed-engine";
import { createSeedEngine, LOG_LEVEL_INFO, SeedEngine, SeedEngineDisconnected } from "./seed-engine";
import typia from "typia";

export type SeedClientEvent = {
Expand Down Expand Up @@ -98,7 +98,7 @@ export function createSeedClient(
): SeedClient {
const events: Observable<SeedClientEvent> = createObservable();

const engine = createSeedEngine(engineOptions.mainUrl);
const engine = createSeedEngine(engineOptions.mainUrl, LOG_LEVEL_INFO);
const subscribeQueues: Map<string, Set<string>> = new Map();

function setSubscribeQueue(
Expand Down
68 changes: 53 additions & 15 deletions src/sdk-v2/seed-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { launch } from "@/coroutines/launch";
import { createObservable, Observable } from "@/coroutines/observable";
import typia from "typia";

export const LOG_LEVEL_NONE = 0;
export const LOG_LEVEL_INFO = 1;
export const LOG_LEVEL_DEBUG = 2;

export type SeedEngineEvent = {
type: "ready";
ready: boolean;
Expand Down Expand Up @@ -61,6 +65,10 @@ type SeedEngineSentMessage = {
request: unknown;
}

type ForwardResponse = {
status: boolean;
}

type SeedEnginePendingRequest = {
url: string;
resolve: (payload: unknown) => void;
Expand Down Expand Up @@ -101,7 +109,10 @@ export interface SeedEngine {
open(): void;
}

export function createSeedEngine(mainUrl: string): SeedEngine {
export function createSeedEngine(
mainUrl: string,
logLevel: number = LOG_LEVEL_INFO,
): SeedEngine {
const events: Observable<SeedEngineEvent> = createObservable();

let ws: WebSocket | undefined;
Expand Down Expand Up @@ -184,7 +195,9 @@ export function createSeedEngine(mainUrl: string): SeedEngine {
payload: unknown,
checkConnection: boolean = true,
) {
console.log(`>> execute\nServer: ${url}\nRequest:`, payload);
if (logLevel >= LOG_LEVEL_INFO) {
console.log(`>> execute\nServer: ${url}\nRequest:`, payload);
}
return new Promise((resolve, reject) => {
if (!ready) reject(new SeedEngineDisconnected());
if (checkConnection && !connectedUrls.has(url)) {
Expand All @@ -198,31 +211,52 @@ export function createSeedEngine(mainUrl: string): SeedEngine {
},
});

let message: unknown;

if (url === mainUrl) {
message = payload;
} else {
message = {
type: "forward",
url,
request: payload,
} satisfies SeedEngineSentMessage;
const string = JSON.stringify(payload);
if (logLevel >= LOG_LEVEL_DEBUG) {
console.log(">>> debug\n", string);
}
ws?.send(string);
return;
}

ws?.send(JSON.stringify(message));
const request = {
type: "forward",
url,
request: payload,
} satisfies SeedEngineSentMessage;

// checkConnection is false since it's just forward
// request and I don't actually need to call connect
// on mainUrl to make forward requests
executeOrThrow(mainUrl, request, false).then((response) => {
if (!typia.is<ForwardResponse>(response)) {
reject(new Error("Unexpected response from backend"));
return;
}
if (!response.status) {
reject(new SeedEngineDisconnected());
}
// We don't call resolve here, because it will be called
// when message with type 'forward' is received
}, reject);
});
}

function open() {
ws = new WebSocket(mainUrl);

ws.onopen = () => {
console.log("<< ws: onopen");
if (logLevel >= LOG_LEVEL_INFO) {
console.log("<< ws: onopen");
}
setReady(true);
};

ws.onmessage = (message) => {
if (logLevel >= LOG_LEVEL_DEBUG) {
console.log("<<< debug\n", message.data);
}
const data = JSON.parse(message.data as string) as SeedEngineReceivedMessage;
if (!typia.is<SeedEngineReceivedMessage>(data)) {
return;
Expand All @@ -237,7 +271,9 @@ export function createSeedEngine(mainUrl: string): SeedEngine {
forward: data,
};
}
console.log(`<< message\nServer: ${forward.url}\nPayload:`, forward.forward);
if (logLevel >= LOG_LEVEL_INFO) {
console.log(`<< message\nServer: ${forward.url}\nPayload:`, forward.forward);
}
if (forward.forward.type === "response") {
const index = requests.findIndex((request) => request.url === forward.url);
if (index === -1) {
Expand Down Expand Up @@ -265,7 +301,9 @@ export function createSeedEngine(mainUrl: string): SeedEngine {
};

ws.onclose = () => {
console.log("<< ws: onclose");
if (logLevel >= LOG_LEVEL_INFO) {
console.log("<< ws: onclose");
}
setReady(false);
};
}
Expand Down

0 comments on commit b651154

Please sign in to comment.