Skip to content

Commit

Permalink
feat(transmit): send subscription and unsubscription to the bus
Browse files Browse the repository at this point in the history
Closes #4
  • Loading branch information
RomainLanz committed Mar 17, 2024
1 parent ec9ea9a commit 950bfc7
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
45 changes: 41 additions & 4 deletions src/transmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import string from '@poppinss/utils/string'
import { Stream } from './stream.js'
import { StreamChannelRepository } from './stream_channel_repository.js'
import { SecureChannelStore } from './secure_channel_store.js'
import { TransportMessageType } from './transport_message_type.js'
import type { HttpContext } from '@adonisjs/core/http'
import type { Transport } from '@rlanz/bus/types/main'
import type { Broadcastable, TransmitConfig } from './types/main.js'
Expand All @@ -26,6 +27,23 @@ interface TransmitLifecycleHooks {
unsubscribe: { uid: string; channel: string; ctx: HttpContext }
}

type TransmitMessage =
| {
type: typeof TransportMessageType.Broadcast
channel: string
payload: Broadcastable
}
| {
type: typeof TransportMessageType.Subscribe
channel: string
payload: { uid: string }
}
| {
type: typeof TransportMessageType.Unsubscribe
channel: string
payload: { uid: string }
}

export class Transmit {
/**
* The storage bag instance to store all the streams.
Expand Down Expand Up @@ -71,13 +89,19 @@ export class Transmit {
this.#bus = transport ? new Bus(transport, { retryQueue: { enabled: true } }) : null
this.#emittery = new Emittery()

void this.#bus?.subscribe<{ channel: string; payload: Broadcastable }>(
void this.#bus?.subscribe<TransmitMessage>(
// TODO: Create a computed config type
this.#config.transport!.channel!,
(message) => {
const { channel, payload } = message

void this.#broadcastLocally(channel, payload)
const { type, channel, payload } = message

if (type === TransportMessageType.Broadcast) {
void this.#broadcastLocally(channel, payload)
} else if (type === TransportMessageType.Subscribe) {
void this.#storage.addChannelToStream(message.payload.uid, message.channel)
} else if (type === TransportMessageType.Unsubscribe) {
void this.#storage.removeChannelFromStream(message.payload.uid, message.channel)
}
}
)

Expand Down Expand Up @@ -157,12 +181,24 @@ export class Transmit {

void this.#emittery.emit('subscribe', { uid, channel, ctx })

void this.#bus?.publish(this.#config.transport!.channel!, {
type: TransportMessageType.Subscribe,
channel,
payload: { uid },
})

return this.#storage.addChannelToStream(uid, channel)
}

$unsubscribeFromChannel(uid: string, channel: string, ctx: HttpContext): boolean {
void this.#emittery.emit('unsubscribe', { uid, channel, ctx })

void this.#bus?.publish(this.#config.transport!.channel!, {
type: TransportMessageType.Unsubscribe,
channel,
payload: { uid },
})

return this.#storage.removeChannelFromStream(uid, channel)
}

Expand Down Expand Up @@ -200,6 +236,7 @@ export class Transmit {
}

void this.#bus?.publish(this.#config.transport!.channel!, {
type: TransportMessageType.Broadcast,
channel,
payload,
})
Expand Down
5 changes: 5 additions & 0 deletions src/transport_message_type.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const TransportMessageType = {
Broadcast: 1,
Subscribe: 2,
Unsubscribe: 3,
} as const

0 comments on commit 950bfc7

Please sign in to comment.