From b671fc9db0ee6deecda8400e65018d65700ca698 Mon Sep 17 00:00:00 2001 From: Don Jayamanne Date: Fri, 6 Oct 2023 11:21:29 +1100 Subject: [PATCH 1/2] Remove chaining execution --- .../common/chainingExecuteRequester.ts | 55 ++++++++++--------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/src/kernels/common/chainingExecuteRequester.ts b/src/kernels/common/chainingExecuteRequester.ts index 5b7a2a653fa..c093357b5b1 100644 --- a/src/kernels/common/chainingExecuteRequester.ts +++ b/src/kernels/common/chainingExecuteRequester.ts @@ -3,15 +3,15 @@ import type { JSONObject } from '@lumino/coreutils'; import type { Kernel, KernelMessage } from '@jupyterlab/services'; -import { DelayedFutureExecute } from './delayedFutureExecute'; +// import { DelayedFutureExecute } from './delayedFutureExecute'; // Class that makes sure when doing a requestExecute on an IKernelConnection, that only one request happens // at a time. export class ChainingExecuteRequester { - private previousExecute: - | Kernel.IShellFuture - | undefined; - private previousKernel: Kernel.IKernelConnection | undefined; + // private previousExecute: + // | Kernel.IShellFuture + // | undefined; + // private previousKernel: Kernel.IKernelConnection | undefined; public requestExecute( kernel: Kernel.IKernelConnection, content: KernelMessage.IExecuteRequestMsg['content'], @@ -25,27 +25,28 @@ export class ChainingExecuteRequester { if (!content.store_history) { return kernel.requestExecute(content, disposeOnDone, metadata); } - // Wrap execute in a delay so we don't queue up more than one of these at a time. - // Make sure for same kernel though. Otherwise the previous execute may never return. - const nextExecute = - this.previousExecute && this.previousKernel?.id === kernel.id - ? new DelayedFutureExecute(kernel, this.previousExecute, content, disposeOnDone, metadata) - : kernel.requestExecute(content, disposeOnDone, metadata); - this.previousExecute = nextExecute; - this.previousKernel = kernel; - nextExecute.done - .then(() => { - if (this.previousExecute == nextExecute) { - this.previousExecute = undefined; - this.previousKernel = undefined; - } - }) - .catch(() => { - if (this.previousExecute == nextExecute) { - this.previousExecute = undefined; - this.previousKernel = undefined; - } - }); - return nextExecute; + return kernel.requestExecute(content, disposeOnDone, metadata); + // // Wrap execute in a delay so we don't queue up more than one of these at a time. + // // Make sure for same kernel though. Otherwise the previous execute may never return. + // const nextExecute = + // this.previousExecute && this.previousKernel?.id === kernel.id + // ? new DelayedFutureExecute(kernel, this.previousExecute, content, disposeOnDone, metadata) + // : kernel.requestExecute(content, disposeOnDone, metadata); + // this.previousExecute = nextExecute; + // this.previousKernel = kernel; + // nextExecute.done + // .then(() => { + // if (this.previousExecute == nextExecute) { + // this.previousExecute = undefined; + // this.previousKernel = undefined; + // } + // }) + // .catch(() => { + // if (this.previousExecute == nextExecute) { + // this.previousExecute = undefined; + // this.previousKernel = undefined; + // } + // }); + // return nextExecute; } } From f82341446d30736f1102db84c8fe611500ed932c Mon Sep 17 00:00:00 2001 From: Don Jayamanne Date: Fri, 27 Oct 2023 14:00:17 +1100 Subject: [PATCH 2/2] Remove ChainingExecuteRequester class from kernel connections --- .../common/chainingExecuteRequester.ts | 52 ----- src/kernels/common/delayedFutureExecute.ts | 179 ------------------ .../jupyter/baseKernelConnectionWrapper.ts | 4 +- .../raw/session/rawKernelConnection.node.ts | 4 +- 4 files changed, 2 insertions(+), 237 deletions(-) delete mode 100644 src/kernels/common/chainingExecuteRequester.ts delete mode 100644 src/kernels/common/delayedFutureExecute.ts diff --git a/src/kernels/common/chainingExecuteRequester.ts b/src/kernels/common/chainingExecuteRequester.ts deleted file mode 100644 index c093357b5b1..00000000000 --- a/src/kernels/common/chainingExecuteRequester.ts +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -import type { JSONObject } from '@lumino/coreutils'; -import type { Kernel, KernelMessage } from '@jupyterlab/services'; -// import { DelayedFutureExecute } from './delayedFutureExecute'; - -// Class that makes sure when doing a requestExecute on an IKernelConnection, that only one request happens -// at a time. -export class ChainingExecuteRequester { - // private previousExecute: - // | Kernel.IShellFuture - // | undefined; - // private previousKernel: Kernel.IKernelConnection | undefined; - public requestExecute( - kernel: Kernel.IKernelConnection, - content: KernelMessage.IExecuteRequestMsg['content'], - disposeOnDone?: boolean, - metadata?: JSONObject - ): Kernel.IShellFuture { - // There is no need to queue the requests we send out (i.e. hidden requests, that are not directly sent by the user). - // I.e. where possible we shouldn't have to queue requests unnecessarily. - // Ensures we don't run into situations where we're waiting for a previous request to complete, which could result in a dead lock. - // See here for such an example https://github.com/microsoft/vscode-jupyter/issues/10510 - if (!content.store_history) { - return kernel.requestExecute(content, disposeOnDone, metadata); - } - return kernel.requestExecute(content, disposeOnDone, metadata); - // // Wrap execute in a delay so we don't queue up more than one of these at a time. - // // Make sure for same kernel though. Otherwise the previous execute may never return. - // const nextExecute = - // this.previousExecute && this.previousKernel?.id === kernel.id - // ? new DelayedFutureExecute(kernel, this.previousExecute, content, disposeOnDone, metadata) - // : kernel.requestExecute(content, disposeOnDone, metadata); - // this.previousExecute = nextExecute; - // this.previousKernel = kernel; - // nextExecute.done - // .then(() => { - // if (this.previousExecute == nextExecute) { - // this.previousExecute = undefined; - // this.previousKernel = undefined; - // } - // }) - // .catch(() => { - // if (this.previousExecute == nextExecute) { - // this.previousExecute = undefined; - // this.previousKernel = undefined; - // } - // }); - // return nextExecute; - } -} diff --git a/src/kernels/common/delayedFutureExecute.ts b/src/kernels/common/delayedFutureExecute.ts deleted file mode 100644 index ddae2fe1ffb..00000000000 --- a/src/kernels/common/delayedFutureExecute.ts +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -import type { JSONObject } from '@lumino/coreutils'; -import type { Kernel, KernelMessage } from '@jupyterlab/services'; -import { traceInfoIfCI } from '../../platform/logging'; -import { createDeferred } from '../../platform/common/utils/async'; -import { CancellationError } from 'vscode'; -import { noop } from '../../platform/common/utils/misc'; - -// Wraps a future so that a requestExecute on a session will wait for the previous future to finish before actually executing -export class DelayedFutureExecute - implements Kernel.IShellFuture -{ - private doneDeferred = createDeferred(); - private requestFuture: - | Kernel.IShellFuture - | undefined; - private pendingOnReply: ((msg: KernelMessage.IExecuteReplyMsg) => void | PromiseLike) | undefined; - private pendingOnIOPub: - | ((msg: KernelMessage.IIOPubMessage) => void | PromiseLike) - | undefined; - private pendingOnStdin: - | ((msg: KernelMessage.IStdinMessage) => void | PromiseLike) - | undefined; - private pendingHooks: (( - msg: KernelMessage.IIOPubMessage - ) => boolean | PromiseLike)[] = []; - private pendingInputReplies: ( - | KernelMessage.IReplyErrorContent - | KernelMessage.IReplyAbortContent - | KernelMessage.IInputReply - )[] = []; - private disposed = false; - private statusChangedHandler: (_session: Kernel.IKernelConnection, status: KernelMessage.Status) => void; - constructor( - private kernelConnection: Kernel.IKernelConnection, - previousLink: Kernel.IShellFuture, - private content: KernelMessage.IExecuteRequestMsg['content'], - private disposeOnDone?: boolean, - private metadata?: JSONObject - ) { - // Ensure we don't have any unhandled promises. - this.doneDeferred.promise.catch(noop); - // Setup our request based on the previous link finishing - previousLink.done.then(() => this.requestExecute()).catch((e) => this.doneDeferred.reject(e)); - - // If the kernel dies, finish our future - this.statusChangedHandler = (_session: Kernel.IKernelConnection, status: KernelMessage.Status) => { - if (status === 'unknown' || status === 'restarting' || status === 'dead' || status === 'autorestarting') { - this.doneDeferred.reject(new CancellationError()); - } - }; - kernelConnection.statusChanged?.connect(this.statusChangedHandler); - - // Run the handler now to check - this.statusChangedHandler(kernelConnection, kernelConnection.status); - } - public get msg(): KernelMessage.IExecuteRequestMsg { - if (this.requestFuture) { - return this.requestFuture.msg; - } - return { - content: this.content, - channel: 'shell', - header: { - date: Date.now.toString(), - msg_id: '1', - session: this.kernelConnection.id, - msg_type: 'execute_request', - username: '', - version: '1' - }, - parent_header: {}, - metadata: {} - }; - } - public get done(): Promise { - return this.doneDeferred.promise; - } - public set onReply(value: (msg: KernelMessage.IExecuteReplyMsg) => void | PromiseLike) { - if (this.requestFuture) { - this.requestFuture.onReply = value; - } else { - this.pendingOnReply = value; - } - } - public set onIOPub( - value: (msg: KernelMessage.IIOPubMessage) => void | PromiseLike - ) { - if (this.requestFuture) { - this.requestFuture.onIOPub = value; - } else { - this.pendingOnIOPub = value; - } - } - public set onStdin( - value: (msg: KernelMessage.IStdinMessage) => void | PromiseLike - ) { - if (this.requestFuture) { - this.requestFuture.onStdin = value; - } else { - this.pendingOnStdin = value; - } - } - public registerMessageHook( - hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike - ): void { - if (this.requestFuture) { - this.requestFuture.registerMessageHook(hook); - } else { - this.pendingHooks.push(hook); - } - } - public removeMessageHook( - hook: (msg: KernelMessage.IIOPubMessage) => boolean | PromiseLike - ): void { - this.pendingHooks = this.pendingHooks.filter((h) => h != hook); - if (this.requestFuture) { - this.requestFuture.removeMessageHook(hook); - } - } - public sendInputReply( - content: KernelMessage.IReplyErrorContent | KernelMessage.IReplyAbortContent | KernelMessage.IInputReply - ): void { - if (this.requestFuture) { - this.requestFuture.sendInputReply(content); - } else { - this.pendingInputReplies.push(content); - } - } - public get isDisposed(): boolean { - return this.disposed; - } - dispose(): void { - this.disposed = true; - this.kernelConnection.statusChanged.disconnect(this.statusChangedHandler); - if (this.requestFuture) { - this.requestFuture.dispose(); - this.requestFuture = undefined; - this.clear(); - } - } - - private clear(): void { - this.pendingInputReplies = []; - this.pendingHooks = []; - this.pendingOnReply = undefined; - this.pendingOnIOPub = undefined; - this.pendingOnStdin = undefined; - } - - private requestExecute() { - if (this.requestFuture) { - throw new Error(`ChainedFuture already executed. Can't execute more than once.`); - } - traceInfoIfCI(`DelayedFuture is starting request now for ${this.content}.`); - this.requestFuture = this.kernelConnection.requestExecute(this.content, this.disposeOnDone, this.metadata); - if (this.requestFuture) { - if (this.pendingOnReply) { - this.requestFuture.onReply = this.pendingOnReply; - } - if (this.pendingOnIOPub) { - this.requestFuture.onIOPub = this.pendingOnIOPub; - } - if (this.pendingOnStdin) { - this.requestFuture.onStdin = this.pendingOnStdin; - } - if (this.pendingHooks.length) { - this.pendingHooks.forEach((h) => this.requestFuture?.registerMessageHook(h)); - } - if (this.pendingInputReplies) { - this.pendingInputReplies.forEach((r) => this.requestFuture?.sendInputReply(r)); - } - this.requestFuture.done.then((r) => this.doneDeferred.resolve(r)).catch((e) => this.doneDeferred.reject(e)); - this.clear(); - } - } -} diff --git a/src/kernels/jupyter/baseKernelConnectionWrapper.ts b/src/kernels/jupyter/baseKernelConnectionWrapper.ts index 69233538a46..15de864cbf6 100644 --- a/src/kernels/jupyter/baseKernelConnectionWrapper.ts +++ b/src/kernels/jupyter/baseKernelConnectionWrapper.ts @@ -35,13 +35,11 @@ import type { JSONObject } from '@lumino/coreutils'; import { Signal } from '@lumino/signaling'; import { Disposable } from 'vscode'; import { IDisposable } from '../../platform/common/types'; -import { ChainingExecuteRequester } from '../common/chainingExecuteRequester'; /** * Wrapper around a Kernel.IKernelConnection. */ export abstract class BaseKernelConnectionWrapper implements Kernel.IKernelConnection { - private chainingExecute = new ChainingExecuteRequester(); public readonly statusChanged = new Signal(this); public readonly connectionStatusChanged = new Signal(this); public readonly iopubMessage = new Signal>(this); @@ -159,7 +157,7 @@ export abstract class BaseKernelConnectionWrapper implements Kernel.IKernelConne disposeOnDone?: boolean, metadata?: JSONObject ): Kernel.IShellFuture { - return this.chainingExecute.requestExecute(this.getKernelConnection(), content, disposeOnDone, metadata); + return this.getKernelConnection()!.requestExecute(content, disposeOnDone, metadata); } requestDebug( content: { diff --git a/src/kernels/raw/session/rawKernelConnection.node.ts b/src/kernels/raw/session/rawKernelConnection.node.ts index 46db862d149..98c2b236aac 100644 --- a/src/kernels/raw/session/rawKernelConnection.node.ts +++ b/src/kernels/raw/session/rawKernelConnection.node.ts @@ -20,7 +20,6 @@ import { IKernelSocket, LocalKernelConnectionMetadata } from '../../types'; import { suppressShutdownErrors } from '../../common/baseJupyterSession'; import { Signal } from '@lumino/signaling'; import type { IIOPubMessage, IMessage, IOPubMessageType, MessageType } from '@jupyterlab/services/lib/kernel/messages'; -import { ChainingExecuteRequester } from '../../common/chainingExecuteRequester'; import { CancellationError, CancellationToken, CancellationTokenSource, Uri } from 'vscode'; import { KernelProgressReporter } from '../../../platform/progress/kernelProgressReporter'; import { DataScience } from '../../../platform/common/utils/localize'; @@ -45,7 +44,6 @@ to a raw IPython kernel running on the local machine. RawKernel is in charge of input request, translating them, sending them to an IPython kernel over ZMQ, then passing back the messages */ export class RawKernelConnection implements Kernel.IKernelConnection { - private chainingExecute = new ChainingExecuteRequester(); public readonly statusChanged = new Signal(this); public readonly connectionStatusChanged = new Signal(this); public readonly iopubMessage = new Signal>(this); @@ -378,7 +376,7 @@ export class RawKernelConnection implements Kernel.IKernelConnection { disposeOnDone?: boolean, metadata?: import('@lumino/coreutils').JSONObject ): Kernel.IShellFuture { - return this.chainingExecute.requestExecute(this.realKernel!, content, disposeOnDone, metadata); + return this.realKernel!.requestExecute(content, disposeOnDone, metadata); } public requestDebug( // eslint-disable-next-line no-caller,no-eval