Skip to content

Commit

Permalink
fix: delay cleaning user presence (#10855)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <[email protected]>
  • Loading branch information
mattkrick authored Feb 12, 2025
1 parent 369073d commit b208f89
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export interface AuthToken {
sub: string
tms: string[]
rol?: 'su' | 'impersonate'
rol?: 'su' | 'impersonate' | null
bet?: 1
iat: number
iss: string
Expand Down
5 changes: 2 additions & 3 deletions packages/client/types/reactHTML4.d.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import 'react'

declare module 'react' {
export interface TdHTMLAttributes<T> {
interface TdHTMLAttributes<T> {
height?: string | number
width?: string | number
bgcolor?: string
}
export interface TableHTMLAttributes<T> {
interface TableHTMLAttributes<T> {
align?: 'center' | 'left' | 'right'
bgcolor?: string
height?: string | number
Expand Down
2 changes: 1 addition & 1 deletion packages/embedder/custom.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {DataLoaderInstance} from '../server/dataloader/RootDataLoader'
import type {DataLoaderInstance} from '../server/dataloader/RootDataLoader'
import type {DB} from '../server/postgres/types/pg'
import {JobQueueError} from './JobQueueError'

Expand Down
6 changes: 5 additions & 1 deletion packages/gql-executor/gqlExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import tracer from 'dd-trace'
import {ServerChannel} from 'parabol-client/types/constEnums'
import GQLExecutorChannelId from '../client/shared/gqlIds/GQLExecutorChannelId'
import SocketServerChannelId from '../client/shared/gqlIds/SocketServerChannelId'
import sleep from '../client/utils/sleep'
import executeGraphQL from '../server/graphql/executeGraphQL'
import '../server/initSentry'
import '../server/monkeyPatchFetch'
import {GQLRequest} from '../server/types/custom'
import type {GQLRequest} from '../server/types/GQLRequest'
import {Logger} from '../server/utils/Logger'
import RedisInstance from '../server/utils/RedisInstance'
import RedisStream from './RedisStream'
Expand Down Expand Up @@ -46,6 +47,9 @@ const run = async () => {
ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP,
executorChannel
)
// The executor has published SourceStream messages to webserver that include its executorServerId
// It expects the webserver call it back to make use of its cache. These should resovle within a couple seconds
await sleep(2000)

setInterval(() => {
if (Date.now() - start >= MAX_SHUTDOWN_TIME) {
Expand Down
16 changes: 0 additions & 16 deletions packages/server/graphql/ResponseStream.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import {ExecutionResult} from 'graphql'
import SubscriptionIterator from '../utils/SubscriptionIterator'
import {getUserId} from '../utils/authorization'
import getGraphQLExecutor from '../utils/getGraphQLExecutor'
import sendToSentry from '../utils/sendToSentry'
import {SubscribeRequest} from './subscribeGraphQL'

const {SERVER_ID} = process.env

export default class ResponseStream implements AsyncIterableIterator<ExecutionResult> {
private sourceStream: SubscriptionIterator
private req: SubscribeRequest
Expand Down Expand Up @@ -44,19 +41,6 @@ export default class ResponseStream implements AsyncIterableIterator<ExecutionRe
}
return {done: false, value: result}
} catch (e) {
const error =
e instanceof Error ? e : new Error(`GQL executor failed to publish. docId: ${docId}`)
sendToSentry(error, {
userId: getUserId(authToken),
tags: {
authToken: JSON.stringify(authToken),
docId: docId || '',
query: query || '',
variables: JSON.stringify(variables),
socketServerId: SERVER_ID!,
executorServerId
}
})
return this.next()
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/server/graphql/executeGraphQL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import tracer from 'dd-trace'
import {graphql} from 'graphql'
import {FormattedExecutionResult} from 'graphql/execution/execute'
import type {GQLRequest} from '../types/custom'
import type {GQLRequest} from '../types/GQLRequest'
import sendToSentry from '../utils/sendToSentry'
import CompiledQueryCache from './CompiledQueryCache'
import getDataLoader from './getDataLoader'
Expand Down
32 changes: 6 additions & 26 deletions packages/server/graphql/handleGraphQLTrebuchetRequest.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import {OutgoingMessage} from '@mattkrick/graphql-trebuchet-client'
import tracer from 'dd-trace'
import ConnectionContext from '../socketHelpers/ConnectionContext'
import {getUserId} from '../utils/authorization'
import getGraphQLExecutor from '../utils/getGraphQLExecutor'
import relayUnsubscribe from '../utils/relayUnsubscribe'
import sanitizeGraphQLErrors from '../utils/sanitizeGraphQLErrors'
import sendToSentry from '../utils/sendToSentry'
import subscribeGraphQL from './subscribeGraphQL'
export type GraphQLMessageType = 'data' | 'complete' | 'error'

const {SERVER_ID} = process.env

const handleGraphQLTrebuchetRequest = async (
data: OutgoingMessage,
connectionContext: ConnectionContext
Expand Down Expand Up @@ -38,10 +34,10 @@ const handleGraphQLTrebuchetRequest = async (
subscribeGraphQL({docId, query, opId: opId!, variables, connectionContext})
return
}
try {
return tracer.trace('handleGraphQLTrebuchetRequest', async (span) => {
const carrier = {}
tracer.inject(span!, 'http_headers', carrier)
return tracer.trace('handleGraphQLTrebuchetRequest', async (span) => {
const carrier = {}
tracer.inject(span!, 'http_headers', carrier)
try {
const result = await getGraphQLExecutor().publish({
docId,
query,
Expand All @@ -55,30 +51,14 @@ const handleGraphQLTrebuchetRequest = async (
// TODO if multiple results, send GQL_DATA for all but the last
const messageType = result.data ? 'complete' : 'error'
return {type: messageType, id: opId, payload: safeResult} as const
})
} catch (e) {
if (e instanceof Error && e.message === 'TIMEOUT') {
sendToSentry(new Error('GQL executor took too long to respond'), {
userId: getUserId(authToken),
tags: {
authToken: JSON.stringify(authToken),
docId: docId || '',
query: query || '',
variables: JSON.stringify(variables),
socketServerId: SERVER_ID!
}
})
} catch (e) {
return {
type: 'error' as const,
id: opId || '',
payload: {errors: [{message: 'The request took too long'}]}
}
}
const viewerId = getUserId(authToken)
const error =
e instanceof Error ? e : new Error(`GQL executor failed to publish. docId: ${docId}`)
sendToSentry(error, {userId: viewerId})
}
})
} else if (data.type === 'stop' && opId) {
relayUnsubscribe(subs, opId)
}
Expand Down
9 changes: 7 additions & 2 deletions packages/server/listenHandler.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ms from 'ms'
import {us_listen_socket} from 'uWebSockets.js'
import getGraphQLExecutor from './utils/getGraphQLExecutor'
import {Logger} from './utils/Logger'
Expand All @@ -10,8 +11,12 @@ const listenHandler = (listenSocket: us_listen_socket) => {
if (listenSocket) {
Logger.log(`\n🔥🔥🔥 Server ID: ${SERVER_ID}. Ready for Sockets: Port ${PORT} 🔥🔥🔥`)
getGraphQLExecutor().subscribe()
// Cleaning on startup because shutdowns may be abrupt
serverHealthChecker.cleanUserPresence().catch(sendToSentry)
setTimeout(() => {
// if shutdowns are clean, this isn't necessary
// that's why we wait 3 minutes to let all the old servers shut down gracefully
serverHealthChecker.cleanUserPresence().catch(sendToSentry)
//
}, ms('3m'))
} else {
Logger.log(`❌❌❌ Port ${PORT} is in use! ❌❌❌`)
}
Expand Down
18 changes: 18 additions & 0 deletions packages/server/types/GQLRequest.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type {AuthToken} from '../../client/types/AuthToken'

export interface GQLRequest {
authToken: AuthToken
ip?: string
socketId?: string
variables?: {[key: string]: any}
docId?: string
query?: string
rootValue?: {[key: string]: any}
dataLoaderId?: string
// true if the query is on the private schema
isPrivate?: boolean
// true if the query is ad-hoc (e.g. GraphiQL, CLI)
isAdHoc?: boolean
// Datadog opentracing span of the calling server
carrier?: any
}
20 changes: 1 addition & 19 deletions packages/server/types/custom.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {GraphQLSchema} from 'graphql'
import type {GraphQLSchema} from 'graphql'
import type nestGitHubEndpoint from 'nest-graphql-endpoint/lib/nestGitHubEndpoint'
import '../../client/types/reactHTML4'
import type AuthToken from '../database/types/AuthToken'
import type ScheduledJobMeetingStageTimeLimit from '../database/types/ScheduledJobMetingStageTimeLimit'
import type ScheduledTeamLimitsJob from '../database/types/ScheduledTeamLimitsJob'
export interface OAuth2Success {
Expand All @@ -23,23 +22,6 @@ export interface OAuth2Error {
error_description?: string
error_uri?: string
}
export interface GQLRequest {
authToken: AuthToken
ip?: string
socketId?: string
variables?: {[key: string]: any}
docId?: string
query?: string
rootValue?: {[key: string]: any}
dataLoaderId?: string
// true if the query is on the private schema
isPrivate?: boolean
// true if the query is ad-hoc (e.g. GraphiQL, CLI)
isAdHoc?: boolean
// Datadog opentracing span of the calling server
carrier?: any
}

export type ScheduledJobUnion = ScheduledJobMeetingStageTimeLimit | ScheduledTeamLimitsJob

export type RootSchema = GraphQLSchema & {
Expand Down
19 changes: 17 additions & 2 deletions packages/server/utils/PubSubPromise.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import type {ExecutionResult} from 'graphql'
import ms from 'ms'
import GQLExecutorChannelId from '../../client/shared/gqlIds/GQLExecutorChannelId'
import type {GQLRequest} from '../types/GQLRequest'
import {Logger} from './Logger'
import RedisInstance from './RedisInstance'
import numToBase64 from './numToBase64'
import sendToSentry from './sendToSentry'
Expand All @@ -21,7 +24,7 @@ interface BaseRequest {
longRunning?: boolean
}

export default class PubSubPromise<Request extends BaseRequest, Response> {
export default class PubSubPromise {
jobs = {} as {[jobId: string]: Job}
publisher = new RedisInstance('pubsubPromise_pub')
subscriber = new RedisInstance('pubsubPromise_sub')
Expand Down Expand Up @@ -49,14 +52,26 @@ export default class PubSubPromise<Request extends BaseRequest, Response> {
this.subscriber.subscribe(this.subChannel)
}

publish = <NarrowResponse = Response>(request: Request) => {
publish = <NarrowResponse = ExecutionResult>(request: GQLRequest & BaseRequest) => {
return new Promise<NarrowResponse>((resolve, reject) => {
const nextJob = numToBase64(this.jobCounter++)
const jobId = `${SERVER_ID}:${nextJob}`
const {isAdHoc, longRunning} = request
const timeout = isAdHoc ? ADHOC_TIMEOUT : longRunning ? LONG_TIMEOUT : STANDARD_TIMEOUT
const timeoutId = setTimeout(() => {
delete this.jobs[jobId]
const {authToken, docId, query, variables} = request
Logger.error('GQL executor took too long to respond', {
tags: {
userId: authToken?.sub ?? '',
authToken: JSON.stringify(authToken),
docId: docId || '',
query: query?.slice(0, 50) ?? '',
variables: JSON.stringify(variables),
socketServerId: SERVER_ID!,
executorServerId
}
})
reject(new Error('TIMEOUT'))
}, timeout)
const previousJob = this.jobs[jobId]
Expand Down
4 changes: 1 addition & 3 deletions packages/server/utils/getGraphQLExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import {ExecutionResult} from 'graphql'
import {ServerChannel} from 'parabol-client/types/constEnums'
import SocketServerChannelId from '../../client/shared/gqlIds/SocketServerChannelId'
import type {GQLRequest} from '../types/custom'
import PubSubPromise from './PubSubPromise'

let pubsub: PubSubPromise<GQLRequest & {executorServerId?: string}, ExecutionResult>
let pubsub: PubSubPromise
const getGraphQLExecutor = () => {
if (!pubsub) {
pubsub = new PubSubPromise(
Expand Down
19 changes: 0 additions & 19 deletions packages/server/utils/publishInternalGQL.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import AuthToken from '../database/types/AuthToken'
import {getUserId} from './authorization'
import getGraphQLExecutor from './getGraphQLExecutor'
import sendToSentry from './sendToSentry'

interface Options {
socketId: string
Expand All @@ -11,8 +9,6 @@ interface Options {
authToken: AuthToken
}

const {SERVER_ID} = process.env

const publishInternalGQL = async <NarrowResponse>(options: Options) => {
const {socketId, query, ip, authToken, variables} = options
try {
Expand All @@ -25,21 +21,6 @@ const publishInternalGQL = async <NarrowResponse>(options: Options) => {
isPrivate: true
})
} catch (e) {
const viewerId = getUserId(authToken)
const error = e instanceof Error ? e : new Error('GQL executor failed to publish')
if (error.message === 'TIMEOUT') {
sendToSentry(new Error('GQL executor took too long to respond'), {
userId: getUserId(authToken),
tags: {
authToken: JSON.stringify(authToken),
query: query || '',
variables: JSON.stringify(variables),
socketServerId: SERVER_ID!
}
})
} else {
sendToSentry(error, {userId: viewerId})
}
return undefined
}
}
Expand Down
3 changes: 0 additions & 3 deletions packages/server/utils/publishWebhookGQL.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {Variables} from 'relay-runtime'
import ServerAuthToken from '../database/types/ServerAuthToken'
import getGraphQLExecutor from './getGraphQLExecutor'
import sendToSentry from './sendToSentry'

interface PublishOptions {
longRunning?: boolean
Expand All @@ -21,8 +20,6 @@ const publishWebhookGQL = async <NarrowResponse>(
...options
})
} catch (e) {
const error = e instanceof Error ? e : new Error('GQL executor failed to publish')
sendToSentry(error, {tags: {query: query.slice(0, 50)}})
return undefined
}
}
Expand Down

0 comments on commit b208f89

Please sign in to comment.