Skip to content

Commit

Permalink
retry to reconnect once to pubsub on not connected error
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Jan 9, 2025
1 parent ac35209 commit a003d63
Showing 1 changed file with 54 additions and 12 deletions.
66 changes: 54 additions & 12 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Prisma, PrismaClient } from '@prisma/client'
import { ITXClientDenyList } from '@prisma/client/runtime/library.js'
import PQueue from 'p-queue'
import pg from 'pg'
import { z } from 'zod'

export type {
Document,
Expand Down Expand Up @@ -133,7 +134,7 @@ async function getPubSubClient(dbOptions: InitOptions): Promise<pg.Client> {

pubSubClient.on('error', (err) => {
console.error('Got an error from the PG pubSubClient', err)
reconnectPubSub()
reconnectPubSub({ retryForever: true })
})

return pubSubClient
Expand Down Expand Up @@ -210,7 +211,7 @@ export async function getPGInstance(): Promise<{
}

let reconnectingToPubSub = false
async function reconnectPubSub() {
async function reconnectPubSub({ retryForever }: { retryForever: boolean }) {
if (!dbOptions) {
throw new Error(
'Unable to reconnect to PG PubSub because dbOptions is not set'
Expand Down Expand Up @@ -243,7 +244,15 @@ async function reconnectPubSub() {
break
} catch (err) {
console.error('[reconnecting] Error reconnecting to PG:', err)
if (!retryForever) {
console.error(
'[reconnecting] Not retrying because retryForever is false'
)
break
}

console.error('[reconnecting] Retrying in 1 second')

await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
Expand All @@ -270,19 +279,34 @@ export async function subscribe(

// This ensures only one `LISTEN` setup happens at a time for this channel
await queue.add(async () => {
const { pubSubClient } = await getPGInstance()
let retryOnNotConnected = true
while (true) {
const { pubSubClient } = await getPGInstance()

const subs = subscribers[channel]
if (subs) {
subs.add(onNotification)
return
}

const subs = subscribers[channel]
if (subs) {
subs.add(onNotification)
} else {
subscribers[channel] = new Set([onNotification])

try {
await pubSubClient.query(`LISTEN ${JSON.stringify(channel)}`)
} catch (e) {
subscribers[channel].delete(onNotification)
throw e
break
} catch (err) {
if (
retryOnNotConnected &&
z.object({ message: z.literal('Not connected') }).safeParse(err)
.success
) {
// try reconnecting and then retry once
await reconnectPubSub({ retryForever: false })
retryOnNotConnected = false
} else {
subscribers[channel].delete(onNotification)
throw err
}
}
}
})
Expand All @@ -309,8 +333,26 @@ export async function subscribe(
}

export async function publish(channel: string, message: string): Promise<void> {
const { pubSubClient } = await getPGInstance()
await pubSubClient.query('SELECT pg_notify($1, $2)', [channel, message])
let retryOnNotConnected = true

while (true) {
const { pubSubClient } = await getPGInstance()
try {
await pubSubClient.query('SELECT pg_notify($1, $2)', [channel, message])
break
} catch (err) {
if (
retryOnNotConnected &&
z.object({ message: z.literal('Not connected') }).safeParse(err).success
) {
// try reconnecting and then retry once
await reconnectPubSub({ retryForever: false })
retryOnNotConnected = false
} else {
throw err
}
}
}
}

export default prisma

0 comments on commit a003d63

Please sign in to comment.