Skip to content

Commit

Permalink
Merge pull request #323 from briefercloud/wait-idle
Browse files Browse the repository at this point in the history
only run python on idle kernels
  • Loading branch information
vieiralucas authored Jan 31, 2025
2 parents 4f6cd61 + 5a30bb3 commit e53c6e3
Showing 1 changed file with 89 additions and 25 deletions.
114 changes: 89 additions & 25 deletions apps/api/src/python/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Output, PythonErrorOutput } from '@briefer/types'
import * as services from '@jupyterlab/services'
import PQueue from 'p-queue'

import { logger } from '../logger.js'
import { getJupyterManager } from '../jupyter/index.js'
import prisma, { decrypt } from '@briefer/database'
import { config } from '../config/index.js'
import { acquireLock } from '../lock.js'

export class PythonExecutionError extends Error {
constructor(
Expand Down Expand Up @@ -56,43 +56,34 @@ const getManager = async (workspaceId: string) => {
return { kernelManager, sessionManager }
}

const executionQueues = new Map<string, PQueue>()
export async function executeCode(
workspaceId: string,
sessionId: string,
code: string,
onOutputs: (outputs: Output[]) => void,
opts: { storeHistory: boolean }
) {
const queueKey = `${workspaceId}-${sessionId}`
let queue = executionQueues.get(queueKey)
if (!queue) {
queue = new PQueue({ concurrency: 1 })
executionQueues.set(queueKey, queue)
}

let aborted = false
let executing = false
logger().debug(
{ workspaceId, sessionId, queueSize: queue.size },
'Adding code to execution queue'
)
const promise = queue.add(async () => {
if (aborted) {
return
}
const promise = acquireLock(
`executeCode:${workspaceId}:${sessionId}`,
async () => {
if (aborted) {
return
}

executing = true
await innerExecuteCode(workspaceId, sessionId, code, onOutputs, opts)
})
executing = true
await innerExecuteCode(workspaceId, sessionId, code, onOutputs, opts)
}
)

return {
async abort() {
aborted = true

if (executing) {
const { kernel } = await getSession(workspaceId, sessionId)
await kernel.interrupt()
await waitForKernelToBecomeIdle(workspaceId, sessionId, kernel)
return
}
},
Expand All @@ -116,18 +107,25 @@ async function innerExecuteCode(
logger().trace({ workspaceId, sessionId }, 'Jupyter is up.')

const { kernel } = await getSession(workspaceId, sessionId)

await waitForKernelToBecomeIdle(workspaceId, sessionId, kernel)

const future = kernel.requestExecute({
code,
allow_stdin: true,
store_history: storeHistory,
})

let kernelRestarted = false
kernel.statusChanged.connect((_, status) => {
const onKernelRestarted = (
_: services.Kernel.IKernelConnection,
status: services.Kernel.Status
) => {
if (status === 'restarting' || status === 'autorestarting') {
kernelRestarted = true
}
})
}
kernel.statusChanged.connect(onKernelRestarted)

future.onIOPub = (message) => {
switch (message.header.msg_type) {
Expand Down Expand Up @@ -334,8 +332,12 @@ async function innerExecuteCode(
}
})

await Promise.race([future.done, idlePromise])
done = true
try {
await Promise.race([future.done, idlePromise])
done = true
} finally {
kernel.statusChanged.disconnect(onKernelRestarted)
}
} catch (err) {
if (kernelRestarted) {
onOutputs([
Expand Down Expand Up @@ -573,3 +575,65 @@ export async function disposeAll(workspaceId: string) {
)
sessions.clear()
}

async function waitForKernelToBecomeIdle(
workspaceId: string,
sessionId: string,
kernel: services.Kernel.IKernelConnection
) {
const startTime = Date.now()

let kernelStatus = kernel.status
const onStatusChanged = (
_: services.Kernel.IKernelConnection,
status: services.Kernel.Status
) => {
kernelStatus = status
}
kernel.statusChanged.connect(onStatusChanged)

while (kernelStatus !== 'idle') {
// stuck trying to get an idle kernel to run code for more than a minute
if (Date.now() - startTime > 60000) {
logger().error(
{
workspaceId,
sessionId,
kernelStatus: kernel.status,
},
'Spent more than 1 minute attempting to make the kernel be idle. Crashing.'
)
throw new Error('Failed to get an idle kernel')
}

// stuck trying to interrupt a non idle kernel for more than 10 seconds
// we'll restart the kernel
if (Date.now() - startTime > 10000) {
logger().warn(
{
workspaceId,
sessionId,
kernelStatus: kernel.status,
},
'Spent more than 10 seconds trying to interrupt a non idle kernel. Restarting kernel instead.'
)
await kernel.restart()
await new Promise((resolve) => setTimeout(resolve, 500))
continue
}

// since we make sure that only a single code execution is running at a time
// if we found a non idle kernel, we first interrupt it
logger().warn(
{
workspaceId,
sessionId,
kernelStatus: kernel.status,
},
'Found non idle kernel before attempting to execute code. Interrupting first.'
)
await kernel.interrupt()
await new Promise((resolve) => setTimeout(resolve, 500))
}
kernel.statusChanged.disconnect(onStatusChanged)
}

0 comments on commit e53c6e3

Please sign in to comment.