diff --git a/README.md b/README.md index 270b22a..ab7b867 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,12 @@ Please see each tool's source directory for additional documentation * Node.js signal handlers that provide a way to shut down long-running application components gracefully on unhandled exceptions or interrupt signals. +### CPU Profiler + +* Fastify server that controls a profiler capable of generating: + * `.cpuprofile` files for CPU usage analysis + * `.heapsnapshot` files for memory usage analysis + ### Logger * Standardized logger configuration using [pino](https://github.com/pinojs/pino) diff --git a/package.json b/package.json index d4b2ff3..ae828f8 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,9 @@ "test": "jest --runInBand", "lint:eslint": "eslint . --ext .js,.jsx,.ts,.tsx -f unix", "lint:prettier": "prettier --check src/**/*.ts", - "testenv:run": "docker-compose -f docker/docker-compose.dev.postgres.yml up", - "testenv:stop": "docker-compose -f docker/docker-compose.dev.postgres.yml down -v -t 0", - "testenv:logs": "docker-compose -f docker/docker-compose.dev.postgres.yml logs -t -f" + "testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up", + "testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0", + "testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f" }, "bin": { "api-toolkit-git-info": "./bin/api-toolkit-git-info.js" diff --git a/src/index.ts b/src/index.ts index fa3c7d9..d510274 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,5 +2,6 @@ export * from './fastify'; export * from './helpers'; export * from './logger'; export * from './postgres'; +export * from './profiler'; export * from './server-version'; export * from './shutdown-handler'; diff --git a/src/profiler/__tests__/profiler.test.ts b/src/profiler/__tests__/profiler.test.ts new file mode 100644 index 0000000..e8c5d30 --- /dev/null +++ b/src/profiler/__tests__/profiler.test.ts @@ -0,0 +1,67 @@ +import { FastifyInstance } from 'fastify'; +import { buildProfilerServer } from '../server'; +import { timeout } from '../../helpers'; + +describe('CPU profiler', () => { + let fastify: FastifyInstance; + + beforeAll(async () => { + fastify = await buildProfilerServer(); + }); + + test('CPU profiler snapshot bad duration', async () => { + const query1 = await fastify.inject({ + method: 'GET', + url: `/profile/cpu?duration=-100`, + }); + expect(query1.statusCode).toBe(400); + }); + + test('generate CPU profiler snapshot', async () => { + const duration = 0.25; // 250 milliseconds + const query1 = await fastify.inject({ + method: 'GET', + url: `/profile/cpu?duration=${duration}`, + }); + expect(query1.statusCode).toBe(200); + expect(query1.headers['content-type']).toBe('application/json; charset=utf-8'); + let cpuProfileBody: any; + // Ensure entire profile result was streamed/returned + expect(() => { + cpuProfileBody = query1.json(); + }).not.toThrow(); + // Cursory check for the expected JSON format of a `.cpuprofile` file + expect(cpuProfileBody).toEqual( + expect.objectContaining({ + nodes: expect.any(Array), + samples: expect.any(Array), + timeDeltas: expect.any(Array), + startTime: expect.any(Number), + endTime: expect.any(Number), + }) + ); + }); + + test('cancel CPU profiler snapshot', async () => { + const duration = 150; // 150 seconds + // init a cpu profile request, hold on to the promise for reading the request response + const promise = fastify.inject({ + method: 'GET', + url: `/profile/cpu?duration=${duration}`, + }); + await timeout(200); + // perform a request to cancel the previous profile session + const endQuery = await fastify.inject({ + method: 'GET', + url: `/profile/cancel`, + }); + expect(endQuery.statusCode).toBe(200); + // ensure the initial request failed + const result = await promise; + expect(result.statusCode).toBe(500); + }); + + afterAll(async () => { + await fastify.close(); + }); +}); diff --git a/src/profiler/index.ts b/src/profiler/index.ts new file mode 100644 index 0000000..cf45f4d --- /dev/null +++ b/src/profiler/index.ts @@ -0,0 +1,2 @@ +export * from './inspector-util'; +export * from './server'; diff --git a/src/profiler/inspector-util.ts b/src/profiler/inspector-util.ts new file mode 100644 index 0000000..28b075b --- /dev/null +++ b/src/profiler/inspector-util.ts @@ -0,0 +1,275 @@ +import * as inspector from 'inspector'; +import * as stream from 'stream'; +import { stopwatch, Stopwatch } from '../helpers'; +import { logger } from '../logger'; + +export type CpuProfileResult = inspector.Profiler.Profile; + +export interface ProfilerInstance { + start: () => Promise; + stop: () => Promise; + dispose: () => Promise; + session: inspector.Session; + sessionType: 'cpu' | 'memory'; + stopwatch: Stopwatch; +} + +function isInspectorNotConnectedError(error: unknown): boolean { + const ERR_INSPECTOR_NOT_CONNECTED = 'ERR_INSPECTOR_NOT_CONNECTED'; + const isNodeError = (r: unknown): r is NodeJS.ErrnoException => r instanceof Error && 'code' in r; + return isNodeError(error) && error.code === ERR_INSPECTOR_NOT_CONNECTED; +} + +/** + * Connects and enables a new `inspector` session, then starts an internal v8 CPU profiling process. + * @returns A function to stop the profiling, and return the CPU profile result object. + * The result object can be used to create a `.cpuprofile` file using JSON.stringify. + * Use VSCode or Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.cpuprofile` file. + * @param samplingInterval - Optionally set sampling interval in microseconds, default is 1000 microseconds. + */ +export function initCpuProfiling(samplingInterval?: number): ProfilerInstance { + const sessionStopwatch = stopwatch(); + const session = new inspector.Session(); + session.connect(); + logger.info(`[CpuProfiler] Connect session took ${sessionStopwatch.getElapsedAndRestart()}ms`); + const start = async () => { + const sw = stopwatch(); + logger.info(`[CpuProfiler] Enabling profiling...`); + await new Promise((resolve, reject) => { + try { + session.post('Profiler.enable', error => { + if (error) { + logger.error(error, '[CpuProfiler] Error enabling profiling'); + reject(error); + } else { + logger.info(`[CpuProfiler] Profiling enabled`); + resolve(); + } + }); + } catch (error) { + logger.error(error, '[CpuProfiler] Error enabling profiling'); + reject(error); + } + }); + logger.info(`[CpuProfiler] Enable session took ${sw.getElapsedAndRestart()}ms`); + + if (samplingInterval !== undefined) { + logger.info(`[CpuProfiler] Setting sampling interval to ${samplingInterval} microseconds`); + await new Promise((resolve, reject) => { + try { + session.post('Profiler.setSamplingInterval', { interval: samplingInterval }, error => { + if (error) { + logger.error(error, '[CpuProfiler] Error setting sampling interval'); + reject(error); + } else { + logger.info(`[CpuProfiler] Set sampling interval`); + resolve(); + } + }); + } catch (error) { + logger.error(error, '[CpuProfiler] Error setting sampling interval'); + reject(error); + } + }); + logger.info(`[CpuProfiler] Set sampling interval took ${sw.getElapsedAndRestart()}ms`); + } + + logger.info(`[CpuProfiler] Profiling starting...`); + await new Promise((resolve, reject) => { + try { + session.post('Profiler.start', error => { + if (error) { + logger.error(error, '[CpuProfiler] Error starting profiling'); + reject(error); + } else { + sessionStopwatch.restart(); + logger.info(`[CpuProfiler] Profiling started`); + resolve(); + } + }); + } catch (error) { + logger.error(error, '[CpuProfiler] Error starting profiling'); + reject(error); + } + }); + logger.info(`[CpuProfiler] Start profiler took ${sw.getElapsedAndRestart()}ms`); + }; + + const stop = async () => { + const sw = stopwatch(); + logger.info(`[CpuProfiler] Profiling stopping...`); + try { + return await new Promise((resolve, reject) => { + try { + session.post('Profiler.stop', (error, profileResult) => { + if (error) { + logger.error(error, '[CpuProfiler] Error stopping profiling'); + reject(error); + } else { + logger.info(`[CpuProfiler] Profiling stopped`); + resolve(profileResult.profile); + } + }); + } catch (error) { + reject(error); + } + }); + } finally { + logger.info(`[CpuProfiler] Stop profiler took ${sw.getElapsedAndRestart()}ms`); + } + }; + + const dispose = async () => { + const sw = stopwatch(); + try { + logger.info(`[CpuProfiler] Disabling profiling...`); + await new Promise((resolve, reject) => { + try { + session.post('Profiler.disable', error => { + if (error && isInspectorNotConnectedError(error)) { + logger.info(`[CpuProfiler] Profiler already disconnected`); + resolve(); + } else if (error) { + logger.error(error, '[CpuProfiler] Error disabling profiling'); + reject(error); + } else { + logger.info(`[CpuProfiler] Profiling disabled`); + resolve(); + } + }); + } catch (error) { + if (isInspectorNotConnectedError(error)) { + logger.info(`[CpuProfiler] Profiler already disconnected`); + resolve(); + } else { + reject(); + } + } + }); + } finally { + session.disconnect(); + logger.info( + `[CpuProfiler] Disable and disconnect profiler took ${sw.getElapsedAndRestart()}ms` + ); + } + }; + + return { start, stop, dispose, session, sessionType: 'cpu', stopwatch: sessionStopwatch }; +} + +/** + * Connects and enables a new `inspector` session, then creates an internal v8 Heap profiler snapshot. + * @param outputStream - An output stream that heap snapshot chunks are written to. + * The result stream can be used to create a `.heapsnapshot` file. + * Use Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.heapsnapshot` file. + */ +export function initHeapSnapshot( + outputStream: stream.Writable +): ProfilerInstance<{ totalSnapshotByteSize: number }> { + const sw = stopwatch(); + const session = new inspector.Session(); + session.connect(); + let totalSnapshotByteSize = 0; + const start = async () => { + logger.info(`[HeapProfiler] Enabling profiling...`); + await new Promise((resolve, reject) => { + try { + session.post('HeapProfiler.enable', error => { + if (error) { + logger.error(error, '[HeapProfiler] Error enabling profiling'); + reject(error); + } else { + sw.restart(); + logger.info(`[HeapProfiler] Profiling enabled`); + resolve(); + } + }); + } catch (error) { + logger.error(error, '[HeapProfiler] Error enabling profiling'); + reject(error); + } + }); + + session.on('HeapProfiler.addHeapSnapshotChunk', message => { + // Note: this doesn't handle stream back-pressure, but we don't have control over the + // `HeapProfiler.addHeapSnapshotChunk` callback in order to use something like piping. + // So in theory on a slow `outputStream` (usually an http connection response) this can cause OOM. + logger.info( + `[HeapProfiler] Writing heap snapshot chunk of size ${message.params.chunk.length}` + ); + totalSnapshotByteSize += message.params.chunk.length; + outputStream.write(message.params.chunk, error => { + if (error) { + logger.error( + error, + `[HeapProfiler] Error writing heap profile chunk to output stream: ${error.message}` + ); + } + }); + }); + }; + + const stop = async () => { + logger.info(`[HeapProfiler] Taking snapshot...`); + await new Promise((resolve, reject) => { + try { + session.post('HeapProfiler.takeHeapSnapshot', undefined, (error: Error | null) => { + if (error) { + logger.error(error, '[HeapProfiler] Error taking snapshot'); + reject(error); + } else { + logger.info( + `[HeapProfiler] Taking snapshot completed, ${totalSnapshotByteSize} bytes...` + ); + resolve(); + } + }); + } catch (error) { + logger.error(error, '[HeapProfiler] Error taking snapshot'); + reject(error); + } + }); + logger.info(`[HeapProfiler] Draining snapshot buffer to stream...`); + const writeFinishedPromise = new Promise((resolve, reject) => { + outputStream.on('finish', () => resolve()); + outputStream.on('error', error => reject(error)); + }); + outputStream.end(); + await writeFinishedPromise; + logger.info(`[HeapProfiler] Finished draining snapshot buffer to stream`); + return { totalSnapshotByteSize }; + }; + + const dispose = async () => { + try { + logger.info(`[HeapProfiler] Disabling profiling...`); + await new Promise((resolve, reject) => { + try { + session.post('HeapProfiler.disable', error => { + if (error && isInspectorNotConnectedError(error)) { + logger.info(`[HeapProfiler] Profiler already disconnected`); + resolve(); + } else if (error) { + logger.error(error, '[HeapProfiler] Error disabling profiling'); + reject(error); + } else { + logger.info(`[HeapProfiler] Profiling disabled`); + resolve(); + } + }); + } catch (error) { + if (isInspectorNotConnectedError(error)) { + logger.info(`[HeapProfiler] Profiler already disconnected`); + resolve(); + } else { + reject(); + } + } + }); + } finally { + session.disconnect(); + } + }; + + return { start, stop, dispose, session, sessionType: 'memory', stopwatch: sw }; +} diff --git a/src/profiler/server.ts b/src/profiler/server.ts new file mode 100644 index 0000000..aea8d04 --- /dev/null +++ b/src/profiler/server.ts @@ -0,0 +1,207 @@ +import * as inspector from 'inspector'; +import { once } from 'events'; +import { Server } from 'http'; +import * as os from 'os'; +import * as path from 'path'; +import * as fs from 'fs'; +import { timeout } from '../helpers'; +import { pipeline } from 'node:stream/promises'; +import { logger, PINO_LOGGER_CONFIG } from '../logger'; +import Fastify, { FastifyInstance, FastifyPluginCallback, FastifyReply } from 'fastify'; +import { Type, TypeBoxTypeProvider } from '@fastify/type-provider-typebox'; +import { initCpuProfiling, initHeapSnapshot, ProfilerInstance } from './inspector-util'; + +const DurationSchema = Type.Number({ minimum: 0 }); +const SamplingIntervalSchema = Type.Optional(Type.Number({ minimum: 0 })); + +const CpuProfiler: FastifyPluginCallback, Server, TypeBoxTypeProvider> = ( + fastify, + options, + done +) => { + let existingSession: { instance: ProfilerInstance; response: FastifyReply } | undefined; + + fastify.get( + '/profile/cpu', + { + schema: { + querystring: Type.Object({ + duration: DurationSchema, + sampling_interval: SamplingIntervalSchema, + }), + }, + }, + async (req, res) => { + if (existingSession) { + await res.status(409).send({ error: 'Profile session already in progress' }); + return; + } + const seconds = req.query.duration; + const samplingInterval = req.query.sampling_interval; + const cpuProfiler = initCpuProfiling(samplingInterval); + existingSession = { instance: cpuProfiler, response: res }; + try { + const filename = `cpu_${Math.round(Date.now() / 1000)}_${seconds}-seconds.cpuprofile`; + await cpuProfiler.start(); + const ac = new AbortController(); + const timeoutPromise = timeout(seconds * 1000, ac); + await Promise.race([timeoutPromise, once(res.raw, 'close')]); + if (res.raw.writableEnded || res.raw.destroyed) { + // session was cancelled + ac.abort(); + return; + } + const result = await cpuProfiler.stop(); + const resultString = JSON.stringify(result); + logger.info( + `[CpuProfiler] Completed, total profile report JSON string length: ${resultString.length}` + ); + await res + .headers({ + 'Cache-Control': 'no-store', + 'Transfer-Encoding': 'chunked', + 'Content-Disposition': `attachment; filename="${filename}"`, + 'Content-Type': 'application/json; charset=utf-8', + }) + .send(resultString); + } finally { + const session = existingSession; + existingSession = undefined; + await session?.instance.dispose().catch(); + } + } + ); + + fastify.get( + '/profile/cpu/start', + { + schema: { + querystring: Type.Object({ + sampling_interval: SamplingIntervalSchema, + }), + }, + }, + async (req, res) => { + if (existingSession) { + await res.status(409).send({ error: 'Profile session already in progress' }); + return; + } + const samplingInterval = req.query.sampling_interval; + const cpuProfiler = initCpuProfiling(samplingInterval); + existingSession = { instance: cpuProfiler, response: res }; + await cpuProfiler.start(); + const profilerRunningLogger = setInterval(() => { + if (existingSession) { + logger.error(`CPU profiler has been enabled for a long time`); + } else { + clearInterval(profilerRunningLogger); + } + }, 10_000).unref(); + await res.send('CPU profiler started'); + } + ); + + fastify.get('/profile/cpu/stop', async (req, res) => { + if (!existingSession) { + await res.status(409).send({ error: 'No profile session in progress' }); + return; + } + if (existingSession.instance.sessionType !== 'cpu') { + await res.status(409).send({ error: 'No CPU profile session in progress' }); + return; + } + try { + const elapsedSeconds = existingSession.instance.stopwatch.getElapsedSeconds(); + const timestampSeconds = Math.round(Date.now() / 1000); + const filename = `cpu_${timestampSeconds}_${elapsedSeconds}-seconds.cpuprofile`; + const result = await ( + existingSession.instance as ProfilerInstance + ).stop(); + const resultString = JSON.stringify(result); + logger.info( + `[CpuProfiler] Completed, total profile report JSON string length: ${resultString.length}` + ); + await res + .headers({ + 'Cache-Control': 'no-store', + 'Transfer-Encoding': 'chunked', + 'Content-Disposition': `attachment; filename="${filename}"`, + 'Content-Type': 'application/json; charset=utf-8', + }) + .send(resultString); + } finally { + const session = existingSession; + existingSession = undefined; + await session?.instance.dispose().catch(); + } + }); + + fastify.get('/profile/heap_snapshot', async (req, res) => { + if (existingSession) { + await res.status(409).send({ error: 'Profile session already in progress' }); + return; + } + const filename = `heap_${Math.round(Date.now() / 1000)}.heapsnapshot`; + const tmpFile = path.join(os.tmpdir(), filename); + const fileWriteStream = fs.createWriteStream(tmpFile); + const heapProfiler = initHeapSnapshot(fileWriteStream); + existingSession = { instance: heapProfiler, response: res }; + try { + // Taking a heap snapshot (with current implementation) is a one-shot process ran to get the + // applications current heap memory usage, rather than something done over time. So start and + // stop without waiting. + await heapProfiler.start(); + const result = await heapProfiler.stop(); + logger.info( + `[HeapProfiler] Completed, total snapshot byte size: ${result.totalSnapshotByteSize}` + ); + await res.headers({ + 'Cache-Control': 'no-store', + 'Transfer-Encoding': 'chunked', + 'Content-Disposition': `attachment; filename="${filename}"`, + 'Content-Type': 'application/json; charset=utf-8', + }); + await pipeline(fs.createReadStream(tmpFile), res.raw); + } finally { + const session = existingSession; + existingSession = undefined; + await session?.instance.dispose().catch(); + try { + fileWriteStream.destroy(); + } catch (_) {} + try { + logger.info(`[HeapProfiler] Cleaning up tmp file ${tmpFile}`); + fs.unlinkSync(tmpFile); + } catch (_) {} + } + }); + + fastify.get('/profile/cancel', async (req, res) => { + if (!existingSession) { + await res.status(409).send({ error: 'No existing profile session is exists to cancel' }); + return; + } + const session = existingSession; + await session.instance.stop().catch(); + await session.instance.dispose().catch(); + await session.response.status(500).send('cancelled'); + existingSession = undefined; + await Promise.resolve(); + await res.send({ ok: 'existing profile session stopped' }); + }); + + done(); +}; + +/** + * Creates a Fastify server that controls a CPU profiler. + * @returns Fastify instance + */ +export async function buildProfilerServer(): Promise { + const fastify = Fastify({ + trustProxy: true, + logger: PINO_LOGGER_CONFIG, + }).withTypeProvider(); + await fastify.register(CpuProfiler); + return fastify; +}