Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

offload zipping files to worker thread #233

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 18 additions & 37 deletions src/services/AlbumService.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { constant, Constant, Inject, Service } from "@tsed/di";
import { AlbumRepo } from "../db/repo/AlbumRepo.js";
import { BucketRepo } from "../db/repo/BucketRepo.js";
import { BadRequest, InternalServerError, NotFound } from "@tsed/exceptions";
import { BadRequest, NotFound } from "@tsed/exceptions";
import { AlbumModel } from "../model/db/Album.model.js";
import { Builder } from "builder-pattern";
import crypto from "node:crypto";
import { FileRepo } from "../db/repo/FileRepo.js";
import { FileService } from "./FileService.js";
import { filesDir, FileUtils } from "../utils/Utils.js";
import { FileUtils, WorkerUtils } from "../utils/Utils.js";
import { FileUploadModel } from "../model/db/FileUpload.model.js";
import { ThumbnailCacheReo } from "../db/repo/ThumbnailCacheReo.js";
import fs, { ReadStream } from "node:fs";
import archiver from "archiver";
import GlobalEnv from "../model/constants/GlobalEnv.js";
import { MimeService } from "./MimeService.js";
import { Logger } from "@tsed/logger";
Expand Down Expand Up @@ -198,21 +197,7 @@ export class AlbumService {
},
});

const workerPromise: Promise<void> = new Promise((resolve, reject): void => {
worker.on("message", (message: { success: boolean; error?: string }) => {
if (message.success) {
resolve();
} else {
reject(new Error(message.error));
}
});
worker.on("error", reject);
worker.on("exit", code => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
const workerPromise = WorkerUtils.newWorkerPromise(worker);
workerPromise
.then(() => this.logger.info(`Successfully generated thumbnails for album ${privateAlbumToken}`))
.catch(e => this.logger.error(e));
Expand Down Expand Up @@ -286,8 +271,21 @@ export class AlbumService {
throw new BadRequest("Zip file is too large");
}

const zipLocation = filesDir + `/${album.name}_${crypto.randomUUID()}.zip`;
return Promise.all([this.createZip(filesToZip, zipLocation), album.name, zipLocation]);
const workerData = filesToZip.map(file => {
return {
fullLocationOnDisk: file.fullLocationOnDisk,
parsedFileName: file.parsedFileName,
};
});
const worker = new Worker(new URL("../workers/zipFiles.js", import.meta.url), {
workerData: {
filesToZip: workerData,
},
});

const zipLocation = await WorkerUtils.newWorkerPromise<string>(worker);

return [fs.createReadStream(zipLocation), album.name, zipLocation];
}

public isAlbumTooBigToDownload(album: AlbumModel): boolean {
Expand All @@ -299,23 +297,6 @@ export class AlbumService {
return false;
}

private async createZip(files: FileUploadModel[], zipLocation: string): Promise<ReadStream> {
const output = fs.createWriteStream(zipLocation);
const archive = archiver("zip");

archive.on("error", err => {
throw new InternalServerError(err.message);
});

archive.pipe(output);
for (const file of files) {
archive.file(file.fullLocationOnDisk, { name: file.parsedFileName });
}

await archive.finalize();
return fs.createReadStream(zipLocation);
}

private checkPrivateToken(token: string, album: AlbumModel): void {
if (album.isPublicToken(token)) {
throw new BadRequest("Supplied token is not valid");
Expand Down
22 changes: 22 additions & 0 deletions src/utils/Utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import fs from "node:fs/promises";
import type { PlatformMulterFile } from "@tsed/common";
import { FileUploadModel } from "../model/db/FileUpload.model.js";
import { isFormatSupportedByFfmpeg } from "./ffmpgWrapper.js";
import { WorkerResponse } from "./typeings.js";
import { Worker } from "node:worker_threads";

export class ObjectUtils {
public static getNumber(source: string): number {
Expand Down Expand Up @@ -221,4 +223,24 @@ export class NetworkUtils {
}
}

export class WorkerUtils {
public static newWorkerPromise<T = void>(worker: Worker): Promise<T> {
return new Promise((resolve, reject): void => {
worker.on("message", (message: WorkerResponse<T>) => {
if (message.success) {
resolve(message.data);
} else {
reject(new Error(message.error));
}
});
worker.on("error", reject);
worker.on("exit", code => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
}

export const filesDir = `${path.dirname(fileURLToPath(import.meta.url))}/../../files`;
6 changes: 6 additions & 0 deletions src/utils/typeings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,9 @@ export type AdminDataTaleEntryModel = {
recordsFiltered: number;
data: IpBlockedAwareFileEntry[];
};

export type WorkerResponse<T> = {
success: boolean;
error?: string;
data: T;
};
39 changes: 39 additions & 0 deletions src/workers/zipFiles.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { parentPort, workerData } from "node:worker_threads";
import { filesDir } from "../utils/Utils.js";
import crypto from "node:crypto";
import fs from "node:fs";
import archiver from "archiver";

async function createZip(
files: { fullLocationOnDisk: string; parsedFileName: string }[],
output: fs.WriteStream,
): Promise<void> {
const archive = archiver("zip");
archive.on("error", err => {
parentPort?.postMessage({ success: false, error: err.message });
});

archive.pipe(output);
for (const file of files) {
archive.file(file.fullLocationOnDisk, { name: file.parsedFileName });
}

await archive.finalize();
}

let output: fs.WriteStream | undefined;

try {
const { albumName, filesToZip } = workerData;
const zipLocation = filesDir + `/${albumName}_${crypto.randomUUID()}.zip`;
output = fs.createWriteStream(zipLocation);
await createZip(filesToZip, output);
parentPort?.postMessage({ success: true, data: zipLocation });
} catch (err) {
parentPort?.postMessage({ success: false, error: err.message });
} finally {
if (output) {
output.close();
}
parentPort?.close();
}