Skip to content

Commit

Permalink
Update schema to support response parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Wundero committed Feb 19, 2025
1 parent 2dd31ba commit 309cf79
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 150 deletions.
4 changes: 2 additions & 2 deletions apps/deployment/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from "drizzle-orm/sqlite-core";
import { v7 } from "uuid";

import type { ChannelMessageSchema } from "@sinkr/validators";
import type { ChannelMessagesSendRequestSchema } from "@sinkr/validators";

export const apps = sqliteTable(
"app",
Expand Down Expand Up @@ -161,7 +161,7 @@ export const storedChannelMessages = sqliteTable(
.notNull()
.default(sql`(CURRENT_TIMESTAMP)`),
data: blob({ mode: "json" })
.$type<z.infer<typeof ChannelMessageSchema>>()
.$type<z.infer<typeof ChannelMessagesSendRequestSchema>["request"]>()
.notNull(),
},
(storedChannelMessage) => [
Expand Down
7 changes: 3 additions & 4 deletions apps/deployment/src/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { v7 } from "uuid";

import {
ClientRequestStoredMessagesSchema,
ServerEndpointSchema,
ServerRequestSchema,
} from "@sinkr/validators";

import { peers } from "./db/schema";
Expand Down Expand Up @@ -73,7 +73,7 @@ export const hooks = {
data: unknown;
id: string;
}>();
const parsed = ServerEndpointSchema.safeParse(body.data);
const parsed = ServerRequestSchema.safeParse(body.data);
if (!parsed.success) {
peer.send({
status: 400,
Expand All @@ -84,9 +84,8 @@ export const hooks = {
}
const res = await handleSource(body.id, parsed.data, peerInfo.appId);
peer.send({
status: res.status,
id: body.id,
data: res.data,
...res,
});
},
async close(peer) {
Expand Down
51 changes: 33 additions & 18 deletions apps/deployment/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import { AsyncLocalStorage } from "async_hooks";
import type { z } from "zod";
import { DurableObject } from "cloudflare:workers";

import { ServerEndpointSchema } from "@sinkr/validators";
import type {
RouteRequestSchema,
RouteResponseSchema,
ServerRoute,
} from "@sinkr/validators";
import { ServerRequestSchema } from "@sinkr/validators";

import { getCoordinatorInstance, getPeers, handleSource, ws } from "./server";
import { getDB, init } from "./utils";
Expand Down Expand Up @@ -45,18 +49,17 @@ export class ObjectCoordinator extends DurableObject<Env> {
return obj;
}

async distribute({
async distribute<TRoute extends ServerRoute>({
id,
appId,
data,
}: {
id: string;
appId: string;
data: z.infer<typeof ServerEndpointSchema>;
}) {
data: RouteRequestSchema<TRoute>;
}): Promise<({ id: string } & RouteResponseSchema<TRoute>["response"])[]> {
const cursor = this.sql.exec<{ id: string }>("SELECT id FROM handler;");
const promises: Promise<{ status: number; data?: unknown; id: string }>[] =
[];
const promises = [];
for (const { id: handlerId } of cursor) {
const handler = this.getBinding(handlerId);
promises.push(handler.process({ id, appId, data }));
Expand Down Expand Up @@ -143,9 +146,15 @@ export class ObjectCoordinator extends DurableObject<Env> {
id: string;
};
const { data: body, id } = bodyBuf;
const parsed = ServerEndpointSchema.safeParse(body);
if (!parsed.success) {
return new Response("Invalid request", { status: 400 });
const parsed = ServerRequestSchema.safeParse(body);
if (!parsed.success || "response" in parsed.data) {
return new Response(
JSON.stringify({
success: false,
error: "Invalid request",
}),
{ status: 400 },
);
}
const data = parsed.data;
const info = await handleSource(id, data, appId);
Expand All @@ -155,7 +164,7 @@ export class ObjectCoordinator extends DurableObject<Env> {
...info,
}),
{
status: info.status,
status: info.success ? 200 : 400,
},
);
}
Expand Down Expand Up @@ -204,16 +213,16 @@ export class SocketHandler extends DurableObject<Env> {
await this.coordinator.updateConnections(this.ctx.id.toString(), conns);
}

async process({
async process<TRoute extends ServerRoute>({
id,
data,
appId,
}: {
id: string;
appId: string;
data: z.infer<typeof ServerEndpointSchema>;
}) {
const info = await handleSource(id, data, appId);
data: RouteRequestSchema<TRoute>;
}): Promise<RouteResponseSchema<TRoute>["response"] & { id: string }> {
const info = await handleSource<TRoute>(id, data, appId);
return {
id,
...info,
Expand Down Expand Up @@ -260,14 +269,20 @@ export class SocketHandler extends DurableObject<Env> {
id: string;
};
const { data: body, id } = bodyBuf;
const parsed = ServerEndpointSchema.safeParse(body);
const parsed = ServerRequestSchema.safeParse(body);
if (!parsed.success) {
return new Response("Invalid request", { status: 400 });
return new Response(
JSON.stringify({
success: false,
error: "Invalid request",
}),
{ status: 400 },
);
}
const data = parsed.data;
const info = await this.process({ id, data, appId });
return new Response(JSON.stringify(info), {
status: info.status,
status: info.success ? 200 : 400,
});
}

Expand Down
Loading

0 comments on commit 309cf79

Please sign in to comment.