Skip to content

Commit

Permalink
fix: cache pod exec websockets (podman-desktop#10165)
Browse files Browse the repository at this point in the history
Fixes podman-desktop#9467

Signed-off-by: Jeff MAURY <[email protected]>
  • Loading branch information
jeffmaury authored Dec 12, 2024
1 parent aa312fe commit e2cfada
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 54 deletions.
34 changes: 34 additions & 0 deletions packages/main/src/plugin/kubernetes/kubernetes-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1473,6 +1473,40 @@ test('Test should exec into container ', async () => {
execResp.onResize(1, 1);
});

test('Test should exec into container only once', async () => {
const client = createTestClient('default');
makeApiClientMock.mockReturnValue({
getCode: () => Promise.resolve({ body: { gitVersion: 'v1.20.0' } }),
});

const onStdOutFn = vi.fn();

const onStdErrFn = vi.fn();

const onCloseFn = vi.fn();

execMock.mockImplementation(
(
_namespace: string,
_podName: string,
_containerName: string,
_command: string | string[],
_stdout: Writable | null,
_stderr: Writable | null,
_stdin: Readable | null,
_tty: boolean,
_?: (status: V1Status) => void,
) => {
return { on: vi.fn() };
},
);

await client.execIntoContainer('test-pod', 'test-container', onStdOutFn, onStdErrFn, onCloseFn);
await client.execIntoContainer('test-pod', 'test-container', onStdOutFn, onStdErrFn, onCloseFn);
expect(execMock).toHaveBeenCalledOnce();
expect(telemetry.track).toHaveBeenCalledOnce();
});

test('Test should throw an exception during exec command if resize parameters are wrong', async () => {
const client = createTestClient('default');
makeApiClientMock.mockReturnValue({
Expand Down
136 changes: 83 additions & 53 deletions packages/main/src/plugin/kubernetes/kubernetes-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import {
import { PromiseMiddlewareWrapper } from '@kubernetes/client-node/dist/gen/middleware.js';
import type * as containerDesktopAPI from '@podman-desktop/api';
import * as jsYaml from 'js-yaml';
import type { WebSocket } from 'ws';
import { parseAllDocuments } from 'yaml';

import type { KubernetesPortForwardService } from '/@/plugin/kubernetes/kubernetes-port-forward-service.js';
Expand All @@ -84,7 +85,12 @@ import { Uri } from '../types/uri.js';
import { ContextsManager } from './contexts-manager.js';
import { ContextsManagerExperimental } from './contexts-manager-experimental.js';
import { ContextsStatesDispatcher } from './contexts-states-dispatcher.js';
import { BufferedStreamWriter, ResizableTerminalWriter, StringLineReader } from './kubernetes-exec-transmitter.js';
import {
BufferedStreamWriter,
ExecStreamWriter,
ResizableTerminalWriter,
StringLineReader,
} from './kubernetes-exec-transmitter.js';

interface ContextsManagerInterface {
// indicate to the manager that the kubeconfig has changed
Expand Down Expand Up @@ -203,6 +209,11 @@ export class KubernetesClient {

#portForwardService?: KubernetesPortForwardService;

#execs: Map<
string,
{ stdout: ExecStreamWriter; stderr: ExecStreamWriter; stdin: StringLineReader; conn: WebSocket }
> = new Map();

constructor(
private readonly apiSender: ApiSenderType,
private readonly configurationRegistry: ConfigurationRegistry,
Expand Down Expand Up @@ -536,6 +547,8 @@ export class KubernetesClient {
this.setupKubeWatcher();
this.apiResources.clear();
this.#portForwardService?.dispose();
this.#execs.forEach(entry => entry.conn.close());
this.#execs.clear();
this.#portForwardService = KubernetesClient.portForwardServiceProvider.getService(this, this.apiSender);
await this.fetchAPIGroups();
this.apiSender.send('pod-event');
Expand Down Expand Up @@ -1443,63 +1456,80 @@ export class KubernetesClient {
onStdErr: (data: Buffer) => void,
onClose: () => void,
): Promise<{ onStdIn: (data: string) => void; onResize: (columns: number, rows: number) => void }> {
let telemetryOptions = {};
try {
const ns = this.getCurrentNamespace();
const connected = await this.checkConnection();
if (!ns) {
throw new Error('no active namespace');
}
if (!connected) {
throw new Error('not active connection');
}

const stdout = new ResizableTerminalWriter(new BufferedStreamWriter(onStdOut));
const stderr = new ResizableTerminalWriter(new BufferedStreamWriter(onStdErr));
const stdin = new StringLineReader();

const exec = new Exec(this.kubeConfig);
const conn = await exec.exec(
ns,
podName,
containerName,
['/bin/sh', '-c', 'if command -v bash >/dev/null 2>&1; then bash; else sh; fi'],
stdout,
stderr,
stdin,
true,
(_: V1Status) => {
// need to think, maybe it would be better to pass exit code to the client, but on the other hand
// if connection is idle for 15 minutes, websocket connection closes automatically and this handler
// does not call. also need to separate SIGTERM signal (143) and normally exit signals to be able to
// proper reconnect client terminal. at this moment we ignore status and rely on websocket close event
},
);

//need to handle websocket idling, which causes the connection close which is not passed to the execution status
//approx time for idling before closing socket is 15 minutes. code and reason are always undefined here.
conn.on('close', () => {
let stdin: StringLineReader;
let stdout: ExecStreamWriter;
let stderr: ExecStreamWriter;
const entry = this.#execs.get(`${podName}-${containerName}`);
if (entry) {
stdin = entry.stdin;
stdout = entry.stdout;
stdout.delegate = new ResizableTerminalWriter(new BufferedStreamWriter(onStdOut));
stderr = entry.stderr;
stderr.delegate = new ResizableTerminalWriter(new BufferedStreamWriter(onStdErr));
entry.conn.on('close', () => {
onClose();
});
} else {
let telemetryOptions = {};
try {
const ns = this.getCurrentNamespace();
const connected = await this.checkConnection();
if (!ns) {
throw new Error('no active namespace');
}
if (!connected) {
throw new Error('not active connection');
}

return {
onStdIn: (data: string): void => {
stdin.readLine(data);
},
onResize: (columns: number, rows: number): void => {
if (columns <= 0 || rows <= 0 || isNaN(columns) || isNaN(rows) || columns === Infinity || rows === Infinity) {
throw new Error('resizing must be done using positive cols and rows');
}
stdout = new ExecStreamWriter(new ResizableTerminalWriter(new BufferedStreamWriter(onStdOut)));
stderr = new ExecStreamWriter(new ResizableTerminalWriter(new BufferedStreamWriter(onStdErr)));
stdin = new StringLineReader();

const exec = new Exec(this.kubeConfig);
const conn = await exec.exec(
ns,
podName,
containerName,
['/bin/sh', '-c', 'if command -v bash >/dev/null 2>&1; then bash; else sh; fi'],
stdout,
stderr,
stdin,
true,
(_: V1Status) => {
// need to think, maybe it would be better to pass exit code to the client, but on the other hand
// if connection is idle for 15 minutes, websocket connection closes automatically and this handler
// does not call. also need to separate SIGTERM signal (143) and normally exit signals to be able to
// proper reconnect client terminal. at this moment we ignore status and rely on websocket close event
},
);

(stdout as ResizableTerminalWriter).resize({ width: columns, height: rows });
},
};
} catch (error) {
telemetryOptions = { error: error };
throw this.wrapK8sClientError(error);
} finally {
this.telemetry.track('kubernetesExecIntoContainer', telemetryOptions);
//need to handle websocket idling, which causes the connection close which is not passed to the execution status
//approx time for idling before closing socket is 15 minutes. code and reason are always undefined here.
conn.on('close', () => {
onClose();
this.#execs.delete(`${podName}-${containerName}`);
});
this.#execs.set(`${podName}-${containerName}`, { stdin, stdout, stderr, conn });
} catch (error) {
telemetryOptions = { error: error };
throw this.wrapK8sClientError(error);
} finally {
this.telemetry.track('kubernetesExecIntoContainer', telemetryOptions);
}
}

return {
onStdIn: (data: string): void => {
stdin.readLine(data);
},
onResize: (columns: number, rows: number): void => {
if (columns <= 0 || rows <= 0 || isNaN(columns) || isNaN(rows) || columns === Infinity || rows === Infinity) {
throw new Error('resizing must be done using positive cols and rows');
}

((stdout as ExecStreamWriter).delegate as ResizableTerminalWriter).resize({ width: columns, height: rows });
},
};
}

async restartPod(name: string): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ export class ExecStreamWriter extends Writable {
this.transmitter = transmitter;
}

get delegate(): Writable {
return this.transmitter;
}

set delegate(delegate: Writable) {
this.transmitter = delegate;
}

override _write(chunk: unknown, encoding: BufferEncoding, callback: (error?: Error | null) => void): void {
this.transmitter._write(chunk, encoding, callback);
}
Expand Down
1 change: 0 additions & 1 deletion packages/renderer/src/lib/pod/KubernetesTerminal.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ onMount(async () => {
// and then add a \r\n to the end of the terminal to ensure the cursor is on a new line.
if (savedState?.terminal) {
shellTerminal.write(savedState.terminal);
shellTerminal.write('\r\n');
shellTerminal.focus();
}
});
Expand Down

0 comments on commit e2cfada

Please sign in to comment.