Skip to content

Commit

Permalink
feat: add fastify cpu profiler server (#28)
Browse files Browse the repository at this point in the history
* feat: add fastify cpu profiler server

* chore: readme

* ci: docker compose

* fix: pipeline

* fix: remove neon profiling
  • Loading branch information
rafaelcr authored Aug 16, 2024
1 parent 232f9bb commit b224e06
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ Please see each tool's source directory for additional documentation
* Node.js signal handlers that provide a way to shut down long-running application components
gracefully on unhandled exceptions or interrupt signals.

### CPU Profiler

* Fastify server that controls a profiler capable of generating:
* `.cpuprofile` files for CPU usage analysis
* `.heapsnapshot` files for memory usage analysis

### Logger

* Standardized logger configuration using [pino](https://github.com/pinojs/pino)
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
"test": "jest --runInBand",
"lint:eslint": "eslint . --ext .js,.jsx,.ts,.tsx -f unix",
"lint:prettier": "prettier --check src/**/*.ts",
"testenv:run": "docker-compose -f docker/docker-compose.dev.postgres.yml up",
"testenv:stop": "docker-compose -f docker/docker-compose.dev.postgres.yml down -v -t 0",
"testenv:logs": "docker-compose -f docker/docker-compose.dev.postgres.yml logs -t -f"
"testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up",
"testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0",
"testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f"
},
"bin": {
"api-toolkit-git-info": "./bin/api-toolkit-git-info.js"
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export * from './fastify';
export * from './helpers';
export * from './logger';
export * from './postgres';
export * from './profiler';
export * from './server-version';
export * from './shutdown-handler';
67 changes: 67 additions & 0 deletions src/profiler/__tests__/profiler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { FastifyInstance } from 'fastify';
import { buildProfilerServer } from '../server';
import { timeout } from '../../helpers';

describe('CPU profiler', () => {
let fastify: FastifyInstance;

beforeAll(async () => {
fastify = await buildProfilerServer();
});

test('CPU profiler snapshot bad duration', async () => {
const query1 = await fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=-100`,
});
expect(query1.statusCode).toBe(400);
});

test('generate CPU profiler snapshot', async () => {
const duration = 0.25; // 250 milliseconds
const query1 = await fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=${duration}`,
});
expect(query1.statusCode).toBe(200);
expect(query1.headers['content-type']).toBe('application/json; charset=utf-8');
let cpuProfileBody: any;
// Ensure entire profile result was streamed/returned
expect(() => {
cpuProfileBody = query1.json();
}).not.toThrow();
// Cursory check for the expected JSON format of a `.cpuprofile` file
expect(cpuProfileBody).toEqual(
expect.objectContaining({
nodes: expect.any(Array),
samples: expect.any(Array),
timeDeltas: expect.any(Array),
startTime: expect.any(Number),
endTime: expect.any(Number),
})
);
});

test('cancel CPU profiler snapshot', async () => {
const duration = 150; // 150 seconds
// init a cpu profile request, hold on to the promise for reading the request response
const promise = fastify.inject({
method: 'GET',
url: `/profile/cpu?duration=${duration}`,
});
await timeout(200);
// perform a request to cancel the previous profile session
const endQuery = await fastify.inject({
method: 'GET',
url: `/profile/cancel`,
});
expect(endQuery.statusCode).toBe(200);
// ensure the initial request failed
const result = await promise;
expect(result.statusCode).toBe(500);
});

afterAll(async () => {
await fastify.close();
});
});
2 changes: 2 additions & 0 deletions src/profiler/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './inspector-util';
export * from './server';
275 changes: 275 additions & 0 deletions src/profiler/inspector-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
import * as inspector from 'inspector';
import * as stream from 'stream';
import { stopwatch, Stopwatch } from '../helpers';
import { logger } from '../logger';

export type CpuProfileResult = inspector.Profiler.Profile;

export interface ProfilerInstance<TStopResult = void> {
start: () => Promise<void>;
stop: () => Promise<TStopResult>;
dispose: () => Promise<void>;
session: inspector.Session;
sessionType: 'cpu' | 'memory';
stopwatch: Stopwatch;
}

function isInspectorNotConnectedError(error: unknown): boolean {
const ERR_INSPECTOR_NOT_CONNECTED = 'ERR_INSPECTOR_NOT_CONNECTED';
const isNodeError = (r: unknown): r is NodeJS.ErrnoException => r instanceof Error && 'code' in r;
return isNodeError(error) && error.code === ERR_INSPECTOR_NOT_CONNECTED;
}

/**
* Connects and enables a new `inspector` session, then starts an internal v8 CPU profiling process.
* @returns A function to stop the profiling, and return the CPU profile result object.
* The result object can be used to create a `.cpuprofile` file using JSON.stringify.
* Use VSCode or Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.cpuprofile` file.
* @param samplingInterval - Optionally set sampling interval in microseconds, default is 1000 microseconds.
*/
export function initCpuProfiling(samplingInterval?: number): ProfilerInstance<CpuProfileResult> {
const sessionStopwatch = stopwatch();
const session = new inspector.Session();
session.connect();
logger.info(`[CpuProfiler] Connect session took ${sessionStopwatch.getElapsedAndRestart()}ms`);
const start = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.enable', error => {
if (error) {
logger.error(error, '[CpuProfiler] Error enabling profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error enabling profiling');
reject(error);
}
});
logger.info(`[CpuProfiler] Enable session took ${sw.getElapsedAndRestart()}ms`);

if (samplingInterval !== undefined) {
logger.info(`[CpuProfiler] Setting sampling interval to ${samplingInterval} microseconds`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.setSamplingInterval', { interval: samplingInterval }, error => {
if (error) {
logger.error(error, '[CpuProfiler] Error setting sampling interval');
reject(error);
} else {
logger.info(`[CpuProfiler] Set sampling interval`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error setting sampling interval');
reject(error);
}
});
logger.info(`[CpuProfiler] Set sampling interval took ${sw.getElapsedAndRestart()}ms`);
}

logger.info(`[CpuProfiler] Profiling starting...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.start', error => {
if (error) {
logger.error(error, '[CpuProfiler] Error starting profiling');
reject(error);
} else {
sessionStopwatch.restart();
logger.info(`[CpuProfiler] Profiling started`);
resolve();
}
});
} catch (error) {
logger.error(error, '[CpuProfiler] Error starting profiling');
reject(error);
}
});
logger.info(`[CpuProfiler] Start profiler took ${sw.getElapsedAndRestart()}ms`);
};

const stop = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Profiling stopping...`);
try {
return await new Promise<CpuProfileResult>((resolve, reject) => {
try {
session.post('Profiler.stop', (error, profileResult) => {
if (error) {
logger.error(error, '[CpuProfiler] Error stopping profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling stopped`);
resolve(profileResult.profile);
}
});
} catch (error) {
reject(error);
}
});
} finally {
logger.info(`[CpuProfiler] Stop profiler took ${sw.getElapsedAndRestart()}ms`);
}
};

const dispose = async () => {
const sw = stopwatch();
try {
logger.info(`[CpuProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logger.error(error, '[CpuProfiler] Error disabling profiling');
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
logger.info(
`[CpuProfiler] Disable and disconnect profiler took ${sw.getElapsedAndRestart()}ms`
);
}
};

return { start, stop, dispose, session, sessionType: 'cpu', stopwatch: sessionStopwatch };
}

/**
* Connects and enables a new `inspector` session, then creates an internal v8 Heap profiler snapshot.
* @param outputStream - An output stream that heap snapshot chunks are written to.
* The result stream can be used to create a `.heapsnapshot` file.
* Use Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.heapsnapshot` file.
*/
export function initHeapSnapshot(
outputStream: stream.Writable
): ProfilerInstance<{ totalSnapshotByteSize: number }> {
const sw = stopwatch();
const session = new inspector.Session();
session.connect();
let totalSnapshotByteSize = 0;
const start = async () => {
logger.info(`[HeapProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.enable', error => {
if (error) {
logger.error(error, '[HeapProfiler] Error enabling profiling');
reject(error);
} else {
sw.restart();
logger.info(`[HeapProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logger.error(error, '[HeapProfiler] Error enabling profiling');
reject(error);
}
});

session.on('HeapProfiler.addHeapSnapshotChunk', message => {
// Note: this doesn't handle stream back-pressure, but we don't have control over the
// `HeapProfiler.addHeapSnapshotChunk` callback in order to use something like piping.
// So in theory on a slow `outputStream` (usually an http connection response) this can cause OOM.
logger.info(
`[HeapProfiler] Writing heap snapshot chunk of size ${message.params.chunk.length}`
);
totalSnapshotByteSize += message.params.chunk.length;
outputStream.write(message.params.chunk, error => {
if (error) {
logger.error(
error,
`[HeapProfiler] Error writing heap profile chunk to output stream: ${error.message}`
);
}
});
});
};

const stop = async () => {
logger.info(`[HeapProfiler] Taking snapshot...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.takeHeapSnapshot', undefined, (error: Error | null) => {
if (error) {
logger.error(error, '[HeapProfiler] Error taking snapshot');
reject(error);
} else {
logger.info(
`[HeapProfiler] Taking snapshot completed, ${totalSnapshotByteSize} bytes...`
);
resolve();
}
});
} catch (error) {
logger.error(error, '[HeapProfiler] Error taking snapshot');
reject(error);
}
});
logger.info(`[HeapProfiler] Draining snapshot buffer to stream...`);
const writeFinishedPromise = new Promise<void>((resolve, reject) => {
outputStream.on('finish', () => resolve());
outputStream.on('error', error => reject(error));
});
outputStream.end();
await writeFinishedPromise;
logger.info(`[HeapProfiler] Finished draining snapshot buffer to stream`);
return { totalSnapshotByteSize };
};

const dispose = async () => {
try {
logger.info(`[HeapProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logger.error(error, '[HeapProfiler] Error disabling profiling');
reject(error);
} else {
logger.info(`[HeapProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
}
};

return { start, stop, dispose, session, sessionType: 'memory', stopwatch: sw };
}
Loading

0 comments on commit b224e06

Please sign in to comment.