Skip to content

Commit

Permalink
#96 Improve takeSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
NiedziolkaMichal committed Sep 17, 2024
1 parent ad3c867 commit 602b38c
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 36 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.5.0] - 2024-09-17

### Added

- `restartWorker` option to `takeSnapshot`
- a promise return value to `takeSnapshot` settled to a created file path

### Changed

- `takeSnapshot` will no longer create a heap snapshot when there is not enough memory

## [2.4.0] - 2021-12-03

### Added
Expand Down
19 changes: 17 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,27 @@ Run CPU Profiler and save result on main process directory

<a name="WorkerNodes+takeSnapshot"></a>

### workerNodes.takeSnapshot() ⇒ <code>void</code>
### workerNodes.takeSnapshot() ⇒ <code>Promise</code>
Take Heap Snapshot and save result on main process directory

The operation will fail when there is not enough memory to create a snapshot.

**Kind**: instance method of [<code>WorkerNodes</code>](#WorkerNodes)
<a name="WorkerNodes+getUsedWorkers"></a>

| Param | Type |
|---------| --- |
| options | <code>TakeSnapshotOptions</code> |

### options.restartWorker : <code>Boolean</code>

Orders a worker that was used to create a snapshot,
to be immediately restarted.
It's recommended to use this option,
because V8 might persist the snapshot in memory until exit.

**Default**: <code>false</code>

### workerNodes.getUsedWorkers() ⇒ <code>Array.&lt;Worker&gt;</code>
Return list with used workers in pool

Expand Down Expand Up @@ -296,4 +311,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
8 changes: 8 additions & 0 deletions e2e/fixtures/mock-heap-statistics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const v8 = require("v8");

module.exports = function mockHeapStatistics () {
v8.getHeapStatistics = () => ({
heap_size_limit: 100,
used_heap_size: 55,
})
};
49 changes: 37 additions & 12 deletions e2e/v8-profilers.base.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,41 @@ module.exports = function describe(workerType) {
await workerNodes.call('hello!');

// when
workerNodes.takeSnapshot();
const getHeapSnapshotFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.heapsnapshot') && name.includes(`-${process.pid}-`)) :
() => fs.readdirSync(process.cwd()).find(name => name.includes('.heapsnapshot') && !name.includes(`-${process.pid}-`));
await eventually(() => getHeapSnapshotFilename() !== undefined);
const filePath = await workerNodes.takeSnapshot();

const result = getHeapSnapshotFilename();
t.truthy(result);
t.true(result.length > 0)
fs.unlinkSync(result);
t.regex(filePath, /^HeapSnapshot-\d+-\d+\.heapsnapshot$/);
t.true(fs.existsSync(filePath))
fs.unlinkSync(filePath);
});

test.serial(`should restart worker after taking heap snapshot when restartWorker option was set`, async (t) => {
// given
const workerNodes = new WorkerNodes(fixture('echo-function-async'), { lazyStart: true, workerType });
await workerNodes.ready();
await workerNodes.call('hello!');

const workersBefore = workerNodes.getUsedWorkers();

// when
const filePath = await workerNodes.takeSnapshot({ restartWorker: true });
fs.unlinkSync(filePath);

// Waiting for a worker to restart
await new Promise((resolve) => setTimeout(resolve, 500));

const workersAfter = workerNodes.getUsedWorkers();
t.true(workersBefore.length === workersAfter.length);
t.true(workersBefore[0] !== workersAfter[0]);
});

test.serial(`should let takeSnapshot throw an error when there is not enough heap`, async (t) => {
const workerNodes = new WorkerNodes(fixture('mock-heap-statistics'), { lazyStart: true, workerType });
await workerNodes.ready();
await workerNodes.call();

await t.throwsAsync(workerNodes.takeSnapshot(), {
message: 'Not enough memory to perform heap snapshot'
})
});

test(`should generate heap profiler result file`, async (t) => {
Expand All @@ -34,8 +59,8 @@ module.exports = function describe(workerType) {

await workerNodes.call('hello!');

const getCpuProfileFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && name.includes(`-${process.pid}-`)) :
const getCpuProfileFilename = workerType === "thread" ?
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && name.includes(`-${process.pid}-`)) :
() => fs.readdirSync(process.cwd()).find(name => name.includes('.cpuprofile') && !name.includes(`-${process.pid}-`));

await eventually(() => getCpuProfileFilename() !== undefined);
Expand All @@ -46,4 +71,4 @@ module.exports = function describe(workerType) {
t.true(result.length > 0)
fs.unlinkSync(result);
});
}
}
6 changes: 5 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ interface Options {
workerType?: "thread" | "process";
}

interface TakeSnapshotOptions {
restartWorker?: boolean;
}

interface WorkerNodesInstance {
call: CallProperty;
ready: () => Promise<WorkerNodesInstance>;
terminate: () => Promise<WorkerNodesInstance>;
profiler: (duration?: number) => void;
takeSnapshot: () => void;
takeSnapshot: (options?: TakeSnapshotOptions) => void;
getUsedWorkers: () => Array<Worker>;
}

Expand Down
14 changes: 8 additions & 6 deletions lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,19 @@ class WorkerNodes extends EventEmitter {

/**
* Take Heap Snapshot and save result on main process directory
*
* @returns {void}
* @param {TakeSnapshotOptions} options
* @returns {Promise<string>}
*/
takeSnapshot() {
takeSnapshot(options = {}) {
const worker = this.pickWorker();

if (worker) {
worker.takeSnapshot();
return worker.takeSnapshot(options);
} else {
// There might not be availble worker, let it start.
setTimeout(() => this.takeSnapshot(), 500);
return new Promise((resolve) => {
// There might not be available worker, let it start.
setTimeout(() => resolve(this.takeSnapshot()), 500);
})
}
}

Expand Down
25 changes: 15 additions & 10 deletions lib/util/get-heap-snapshot.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
const v8 = require('v8');
const fs = require('fs');

const getHeapSnapshot = (callback) => {
const stream = v8.getHeapSnapshot();
const file = fs.createWriteStream(`HeapSnapshot-${process.pid}-${Date.now()}.heapsnapshot`);

stream.on('data', (chunk) => file.write(chunk));
const hasEnoughMemory = () => {
const heapStats = v8.getHeapStatistics();
return heapStats.heap_size_limit >= heapStats.used_heap_size * 2;
}

stream.on('end', () => {
if (callback) { callback('heap snapshot done'); }
});
const getHeapSnapshot = (callback) => {
if (hasEnoughMemory()) {
const filePath = v8.writeHeapSnapshot(`HeapSnapshot-${process.pid}-${Date.now()}.heapsnapshot`);
callback(filePath);
} else {
callback(undefined, {
type: 'Error',
message: 'Not enough memory to perform heap snapshot'
});
}
}

module.exports = getHeapSnapshot;
module.exports = getHeapSnapshot;
10 changes: 10 additions & 0 deletions lib/util/promise-with-resolvers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const promiseWithResolvers = () => {
let resolve, reject
const promise = new Promise((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}

module.exports = promiseWithResolvers;
18 changes: 14 additions & 4 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const EventEmitter = require('events');
const WorkerProcess = require('./worker/process');
const Sequence = require('./util/sequence');
const messages = require('./worker/message');
const promiseWithResolvers = require("./util/promise-with-resolvers");

const ProcessRequest = messages.Request;
const ProcessResponse = messages.Response;
Expand Down Expand Up @@ -133,17 +134,26 @@ class Worker extends EventEmitter {
}

/**
*
* @param {TakeSnapshotOptions} options
* @returns {Promise<string>}
*/
takeSnapshot() {
takeSnapshot(options) {
const { promise, resolve, reject } = promiseWithResolvers();

const cmd = 'takeSnapshot';
this.calls.set(cmd, {
timer: null,
reject: () => {},
resolve: () => {},
reject,
resolve
});

this.process.handle({ cmd });

if (options.restartWorker) {
promise.finally(() => this.stop());
}

return promise;
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/worker/child-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,10 @@ function handleHeapSnapshot(requestData) {
const request = new Request(requestData);
const response = Response.from(request);

getHeapSnapshot((result) => {
getHeapSnapshot((result, error) => {
response.callId = 'takeSnapshot';
response.setResult(result);
response.error = error;
sendMessageToParent(response);
});
}
Expand Down

0 comments on commit 602b38c

Please sign in to comment.