Skip to content

Commit

Permalink
Add graceful shutdown pattern to worker
Browse files Browse the repository at this point in the history
  • Loading branch information
maxmcd committed May 9, 2024
1 parent 8581fff commit efcc200
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 27 deletions.
41 changes: 25 additions & 16 deletions deno-bootstrap/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,30 @@ if (typeof handler.default !== "function") {
}

// Use an empty onListen callback to prevent Deno from logging
Deno.serve({ path: socketFile, onListen: () => {} }, (req: Request) => {
const url = new URL(req.url);
url.host = req.headers.get("X-Deno-Worker-Host") || url.host;
url.port = req.headers.get("X-Deno-Worker-Port") || url.port;
// Setting url.protocol did not replace the protocol correctly for a unix
// socket. Replacing the href value seems to work well.
url.href = url.href.replace(
/^http\+unix:/,
req.headers.get("X-Deno-Worker-Protocol") || url.protocol
);
// Deno Request headers are immutable so we must make a new Request in order to delete our headers
req = new Request(url.toString(), req);
req.headers.delete("X-Deno-Worker-Host");
req.headers.delete("X-Deno-Worker-Protocol");
req.headers.delete("X-Deno-Worker-Port");
const server = Deno.serve(
{ path: socketFile, onListen: () => {} },
(req: Request) => {
const url = new URL(req.url);
url.host = req.headers.get("X-Deno-Worker-Host") || url.host;
url.port = req.headers.get("X-Deno-Worker-Port") || url.port;
// Setting url.protocol did not replace the protocol correctly for a unix
// socket. Replacing the href value seems to work well.
url.href = url.href.replace(
/^http\+unix:/,
req.headers.get("X-Deno-Worker-Protocol") || url.protocol
);
// Deno Request headers are immutable so we must make a new Request in order to delete our headers
req = new Request(url.toString(), req);
req.headers.delete("X-Deno-Worker-Host");
req.headers.delete("X-Deno-Worker-Protocol");
req.headers.delete("X-Deno-Worker-Port");

return handler.default(req);
return handler.default(req);
}
);

Deno.addSignalListener("SIGINT", async () => {
// On interrupt we only shut down the server. We will wait for all other
// unresolved promises before exiting.
await server.shutdown();
});
31 changes: 31 additions & 0 deletions src/DenoHTTPWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,37 @@ describe("DenoHTTPWorker", { timeout: 1000 }, () => {
worker.terminate();
});

it("shutdown gracefully", async () => {
let worker = await newDenoHTTPWorker(
`
export default async function (req: Request): Promise<Response> {
new Promise((resolve) => setTimeout(() => {resolve(); console.log("hi")}, 200));
return Response.json({ ok: req.url })
}
`,
{ printOutput: true }
);

let logs = "";
worker.stderr.on("data", (data) => (logs += data));
worker.stdout.on("data", (data) => (logs += data));

await new Promise<void>(async (resolve) => {
worker.addEventListener("exit", (code, signal) => {
expect(code).toEqual(0);
expect(logs).toContain("hi");
resolve();
});
let json = await worker.client
.get("https://localhost/hello?isee=you", { headers: {} })
.json();
expect(json).toEqual({
ok: "https://localhost/hello?isee=you",
});
worker.shutdown();
});
});

describe("runFlags editing", () => {
it.each([
"--allow-read",
Expand Down
41 changes: 30 additions & 11 deletions src/DenoHTTPWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const DEFAULT_DENO_BOOTSTRAP_SCRIPT_PATH = __dirname.endsWith("src")
? resolve(__dirname, "../deno-bootstrap/index.ts")
: resolve(__dirname, "../../deno-bootstrap/index.ts");

interface OnExitListener {
(exitCode: number, signal: string): void;
}

export interface DenoWorkerOptions {
/**
* The path to the executable that should be use when spawning the subprocess.
Expand Down Expand Up @@ -254,12 +258,13 @@ export const newDenoHTTPWorker = async (
export type { DenoHTTPWorker };

class DenoHTTPWorker {
#httpSession: http2.ClientHttp2Session;
#got: Got;
#socketFile: string;
#httpSession: http2.ClientHttp2Session;
#onexitListeners: OnExitListener[];
#process: ChildProcess;
#stdout: Readable;
#socketFile: string;
#stderr: Readable;
#stdout: Readable;
#terminated: Boolean = false;

constructor(
Expand All @@ -270,12 +275,13 @@ class DenoHTTPWorker {
stdout: Readable,
stderr: Readable
) {
this.#httpSession = httpSession;
this.#socketFile = socketFile;
this.#got = got;
this.#httpSession = httpSession;
this.#onexitListeners = [];
this.#process = process;
this.#stdout = stdout;
this.#socketFile = socketFile;
this.#stderr = stderr;
this.#stdout = stdout;
}

get client(): Got {
Expand All @@ -287,14 +293,24 @@ class DenoHTTPWorker {
return;
}
this.#terminated = true;
this.onexit(code || this.#process.exitCode || 0, signal || "");
if (this.#process && this.#process.exitCode === null) {
// TODO: do we need to SIGINT first to make sure we allow the process to do
// any cleanup?
forceKill(this.#process.pid!);
}
fs.rm(this.#socketFile);
this.#httpSession.close();
fs.rm(this.#socketFile);
for (let onexit of this.#onexitListeners) {
onexit(code ?? 1, signal ?? "");
}
}

/**
* Gracefully shuts down the worker process and waits for any unresolved
* promises to exit.
*/
shutdown() {
this.#process.kill("SIGINT");
}

get stdout() {
Expand All @@ -306,10 +322,13 @@ class DenoHTTPWorker {
}

/**
* Represents an event handler for the "exit" event. That is, a function to be
* called when the Deno worker process is terminated.
* Adds the given listener for the "exit" event.
* @param type The type of the event. (Always "exit")
* @param listener The listener to add for the event.
*/
onexit: (code: number, signal: string) => void = () => {};
addEventListener(type: "exit", listener: OnExitListener): void {
this.#onexitListeners.push(listener as OnExitListener);
}
}

/**
Expand Down

0 comments on commit efcc200

Please sign in to comment.