Skip to content

Commit

Permalink
let's make the scoping part of the consumers instead of the Events se…
Browse files Browse the repository at this point in the history
…rvice.
  • Loading branch information
patroza committed Nov 10, 2024
1 parent caa9b78 commit 72264e2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
"@azure/cosmos": "^4.1.1",
"@azure/service-bus": "^7.9.5",
"@azure/storage-blob": "^12.25.0",
"@effect-app/infra": "2.24.1",
"@effect-app/infra": "2.25.0",
"effect-app": "^2.13.1",
"@effect/platform": "^0.69.20",
"@effect/opentelemetry": "^0.39.13",
Expand Down
10 changes: 6 additions & 4 deletions api/src/lib/middleware/events.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { makeSSE } from "@effect-app/infra/api/middlewares"
import { Events } from "api/services.js"
import { Effect, Stream } from "effect-app"
import { Effect } from "effect-app"
import { ClientEvents } from "resources.js"

export const makeEvents = Events.pipe(
Effect.map((events) => makeSSE(Stream.unwrapScoped(events.stream), ClientEvents))
)
export const makeEvents = Events
.use((_) => _.stream)
.pipe(
Effect.map(makeSSE(ClientEvents))
)
2 changes: 1 addition & 1 deletion api/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Events } from "./services.js"

class RootAppRouter extends HttpRouter.Tag("RootAppRouter")<RootAppRouter>() {}
const AllRoutes = RootAppRouter
.use((router) =>
.useScoped((router) =>
Effect.gen(function*() {
const cfg = yield* BaseConfig
yield* router.get("/events", yield* MW.makeEvents)
Expand Down
5 changes: 1 addition & 4 deletions api/src/services/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import type { ClientEvents } from "resources.js"

export class Events extends Effect.Service<Events>()("Events", {
accessors: true,
scoped: Effect.gen(function*() {
effect: Effect.gen(function*() {
const q = yield* PubSub.unbounded<{ evt: ClientEvents; namespace: string }>()
// we would prefer the http server interrupting the stream processing as part of its shutdown
// but that's not happening, and this is the next best thing
yield* Effect.addFinalizer(() => q.shutdown)
const svc = {
publish: (...evts: NonEmptyReadonlyArray<ClientEvents>) =>
storeId.pipe(FiberRef.get, Effect.andThen((namespace) => q.offerAll(evts.map((evt) => ({ evt, namespace }))))),
Expand Down
12 changes: 6 additions & 6 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 72264e2

Please sign in to comment.