diff --git a/cli/package-lock.json b/cli/package-lock.json index af339110d0457..23f2e50f28bb0 100644 --- a/cli/package-lock.json +++ b/cli/package-lock.json @@ -9,9 +9,11 @@ "version": "2.2.37", "license": "GNU Affero General Public License version 3", "dependencies": { + "chokidar": "^4.0.3", "fast-glob": "^3.3.2", "fastq": "^1.17.1", - "lodash-es": "^4.17.21" + "lodash-es": "^4.17.21", + "micromatch": "^4.0.8" }, "bin": { "immich": "dist/index.js" @@ -23,6 +25,7 @@ "@types/byte-size": "^8.1.0", "@types/cli-progress": "^3.11.0", "@types/lodash-es": "^4.17.12", + "@types/micromatch": "^4.0.9", "@types/mock-fs": "^4.13.1", "@types/node": "^22.10.2", "@typescript-eslint/eslint-plugin": "^8.15.0", @@ -1343,6 +1346,13 @@ "win32" ] }, + "node_modules/@types/braces": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/@types/braces/-/braces-3.0.4.tgz", + "integrity": "sha512-0WR3b8eaISjEW7RpZnclONaLFDf7buaowRHdqLp4vLj54AsSAYWfh3DRbfiYJY9XDxMgx1B4sE1Afw2PGpuHOA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/byte-size": { "version": "8.1.2", "resolved": "https://registry.npmjs.org/@types/byte-size/-/byte-size-8.1.2.tgz", @@ -1387,6 +1397,16 @@ "@types/lodash": "*" } }, + "node_modules/@types/micromatch": { + "version": "4.0.9", + "resolved": "https://registry.npmjs.org/@types/micromatch/-/micromatch-4.0.9.tgz", + "integrity": "sha512-7V+8ncr22h4UoYRLnLXSpTxjQrNUXtWHGeMPRJt1nULXI57G9bIcpyrHlmrQ7QK24EyyuXvYcSSWAM8GA9nqCg==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/braces": "*" + } + }, "node_modules/@types/mock-fs": { "version": "4.13.4", "resolved": "https://registry.npmjs.org/@types/mock-fs/-/mock-fs-4.13.4.tgz", @@ -1865,11 +1885,12 @@ } }, "node_modules/braces": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.2.tgz", - "integrity": "sha512-b8um+L1RzM3WDSzvhm6gIz1yfTbBt6YTlcEKAvsmqCZZFw46z626lVj9j1yEPW33H5H+lBQpZMP1k8l+78Ha0A==", + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/braces/-/braces-3.0.3.tgz", + "integrity": "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==", + "license": "MIT", "dependencies": { - "fill-range": "^7.0.1" + "fill-range": "^7.1.1" }, "engines": { "node": ">=8" @@ -2013,6 +2034,21 @@ "node": ">= 16" } }, + "node_modules/chokidar": { + "version": "4.0.3", + "resolved": "https://registry.npmjs.org/chokidar/-/chokidar-4.0.3.tgz", + "integrity": "sha512-Qgzu8kfBvo+cA4962jnP1KkS6Dop5NS6g7R5LFYJr4b8Ub94PPQXUksCw9PvXoeXPRRddRNC5C1JQUR2SMGtnA==", + "license": "MIT", + "dependencies": { + "readdirp": "^4.0.1" + }, + "engines": { + "node": ">= 14.16.0" + }, + "funding": { + "url": "https://paulmillr.com/funding/" + } + }, "node_modules/ci-info": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/ci-info/-/ci-info-4.0.0.tgz", @@ -2644,9 +2680,10 @@ } }, "node_modules/fill-range": { - "version": "7.0.1", - "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.0.1.tgz", - "integrity": "sha512-qOo9F+dMUmC2Lcb4BbVvnKJxTPjCm+RRpe4gDuGrzkL7mEVl/djYSu2OdQ2Pa302N4oqkSg9ir6jaLWJ2USVpQ==", + "version": "7.1.1", + "resolved": "https://registry.npmjs.org/fill-range/-/fill-range-7.1.1.tgz", + "integrity": "sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==", + "license": "MIT", "dependencies": { "to-regex-range": "^5.0.1" }, @@ -2928,6 +2965,7 @@ "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", "integrity": "sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==", + "license": "MIT", "engines": { "node": ">=0.12.0" } @@ -3172,11 +3210,12 @@ } }, "node_modules/micromatch": { - "version": "4.0.5", - "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.5.tgz", - "integrity": "sha512-DMy+ERcEW2q8Z2Po+WNXuw3c5YaUSFjAO5GsJqfEl7UjvtIuFKO6ZrKvcItdy98dwFI2N1tg3zNIdKaQT+aNdA==", + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/micromatch/-/micromatch-4.0.8.tgz", + "integrity": "sha512-PXwfBhYu0hBCPw8Dn0E+WDYb7af3dSLVWKi3HGv84IdF4TyFoC0ysxFd0Goxw7nSv4T/PzEJQxsYsEiFCKo2BA==", + "license": "MIT", "dependencies": { - "braces": "^3.0.2", + "braces": "^3.0.3", "picomatch": "^2.3.1" }, "engines": { @@ -3675,6 +3714,19 @@ "node": ">=8" } }, + "node_modules/readdirp": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/readdirp/-/readdirp-4.0.2.tgz", + "integrity": "sha512-yDMz9g+VaZkqBYS/ozoBJwaBhTbZo3UNYQHNRw1D3UFQB8oHB4uS/tAODO+ZLjGWmUbKnIlOWO+aaIiAxrUWHA==", + "license": "MIT", + "engines": { + "node": ">= 14.16.0" + }, + "funding": { + "type": "individual", + "url": "https://paulmillr.com/funding/" + } + }, "node_modules/regexp-tree": { "version": "0.1.27", "resolved": "https://registry.npmjs.org/regexp-tree/-/regexp-tree-0.1.27.tgz", @@ -4092,6 +4144,7 @@ "version": "5.0.1", "resolved": "https://registry.npmjs.org/to-regex-range/-/to-regex-range-5.0.1.tgz", "integrity": "sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==", + "license": "MIT", "dependencies": { "is-number": "^7.0.0" }, diff --git a/cli/package.json b/cli/package.json index bdfaa0f528a3e..8d67a603276d9 100644 --- a/cli/package.json +++ b/cli/package.json @@ -19,6 +19,7 @@ "@types/byte-size": "^8.1.0", "@types/cli-progress": "^3.11.0", "@types/lodash-es": "^4.17.12", + "@types/micromatch": "^4.0.9", "@types/mock-fs": "^4.13.1", "@types/node": "^22.10.2", "@typescript-eslint/eslint-plugin": "^8.15.0", @@ -62,11 +63,13 @@ "node": ">=20.0.0" }, "dependencies": { + "chokidar": "^4.0.3", "fast-glob": "^3.3.2", "fastq": "^1.17.1", - "lodash-es": "^4.17.21" + "lodash-es": "^4.17.21", + "micromatch": "^4.0.8" }, "volta": { "node": "22.12.0" } -} +} \ No newline at end of file diff --git a/cli/src/commands/asset.spec.ts b/cli/src/commands/asset.spec.ts index 4bac1d00abf97..21137a3296549 100644 --- a/cli/src/commands/asset.spec.ts +++ b/cli/src/commands/asset.spec.ts @@ -1,12 +1,13 @@ import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; -import { describe, expect, it, vi } from 'vitest'; +import { setTimeout as sleep } from 'node:timers/promises'; +import { describe, expect, it, MockedFunction, vi } from 'vitest'; -import { Action, checkBulkUpload, defaults, Reason } from '@immich/sdk'; +import { Action, checkBulkUpload, defaults, getSupportedMediaTypes, Reason } from '@immich/sdk'; import createFetchMock from 'vitest-fetch-mock'; -import { checkForDuplicates, getAlbumName, uploadFiles, UploadOptionsDto } from './asset'; +import { checkForDuplicates, getAlbumName, startWatch, uploadFiles, UploadOptionsDto } from 'src/commands/asset'; vi.mock('@immich/sdk'); @@ -199,3 +200,112 @@ describe('checkForDuplicates', () => { }); }); }); + +describe('startWatch', () => { + let testFolder: string; + let checkBulkUploadMocked: MockedFunction; + + beforeEach(async () => { + vi.restoreAllMocks(); + + vi.mocked(getSupportedMediaTypes).mockResolvedValue({ + image: ['.jpg'], + sidecar: ['.xmp'], + video: ['.mp4'], + }); + + testFolder = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'test-startWatch-')); + checkBulkUploadMocked = vi.mocked(checkBulkUpload); + checkBulkUploadMocked.mockResolvedValue({ + results: [], + }); + }); + + it('should start watching a directory and upload new files', async () => { + const testFilePath = path.join(testFolder, 'test.jpg'); + + await startWatch([testFolder], { concurrency: 1 }, { batchSize: 1, debounceTimeMs: 10 }); + await sleep(100); // to debounce the watcher from considering the test file as a existing file + await fs.promises.writeFile(testFilePath, 'testjpg'); + + await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000); + expect(checkBulkUpload).toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: [ + expect.objectContaining({ + id: testFilePath, + }), + ], + }, + }); + }); + + it('should filter out unsupported files', async () => { + const testFilePath = path.join(testFolder, 'test.jpg'); + const unsupportedFilePath = path.join(testFolder, 'test.txt'); + + await startWatch([testFolder], { concurrency: 1 }, { batchSize: 1, debounceTimeMs: 10 }); + await sleep(100); // to debounce the watcher from considering the test file as a existing file + await fs.promises.writeFile(testFilePath, 'testjpg'); + await fs.promises.writeFile(unsupportedFilePath, 'testtxt'); + + await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000); + expect(checkBulkUpload).toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: expect.arrayContaining([ + expect.objectContaining({ + id: testFilePath, + }), + ]), + }, + }); + + expect(checkBulkUpload).not.toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: expect.arrayContaining([ + expect.objectContaining({ + id: unsupportedFilePath, + }), + ]), + }, + }); + }); + + it('should filger out ignored patterns', async () => { + const testFilePath = path.join(testFolder, 'test.jpg'); + const ignoredPattern = 'ignored'; + const ignoredFolder = path.join(testFolder, ignoredPattern); + await fs.promises.mkdir(ignoredFolder, { recursive: true }); + const ignoredFilePath = path.join(ignoredFolder, 'ignored.jpg'); + + await startWatch([testFolder], { concurrency: 1, ignore: ignoredPattern }, { batchSize: 1, debounceTimeMs: 10 }); + await sleep(100); // to debounce the watcher from considering the test file as a existing file + await fs.promises.writeFile(testFilePath, 'testjpg'); + await fs.promises.writeFile(ignoredFilePath, 'ignoredjpg'); + + await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000); + expect(checkBulkUpload).toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: expect.arrayContaining([ + expect.objectContaining({ + id: testFilePath, + }), + ]), + }, + }); + + expect(checkBulkUpload).not.toHaveBeenCalledWith({ + assetBulkUploadCheckDto: { + assets: expect.arrayContaining([ + expect.objectContaining({ + id: ignoredFilePath, + }), + ]), + }, + }); + }); + + afterEach(async () => { + await fs.promises.rm(testFolder, { recursive: true, force: true }); + }); +}); diff --git a/cli/src/commands/asset.ts b/cli/src/commands/asset.ts index 4cf6742f24669..376b41e40608c 100644 --- a/cli/src/commands/asset.ts +++ b/cli/src/commands/asset.ts @@ -12,13 +12,18 @@ import { getSupportedMediaTypes, } from '@immich/sdk'; import byteSize from 'byte-size'; +import { Matcher, watch as watchFs } from 'chokidar'; import { MultiBar, Presets, SingleBar } from 'cli-progress'; import { chunk } from 'lodash-es'; +import micromatch from 'micromatch'; import { Stats, createReadStream } from 'node:fs'; import { stat, unlink } from 'node:fs/promises'; import path, { basename } from 'node:path'; import { Queue } from 'src/queue'; -import { BaseOptions, authenticate, crawl, sha1 } from 'src/utils'; +import { BaseOptions, Batcher, authenticate, crawl, sha1 } from 'src/utils'; + +const UPLOAD_WATCH_BATCH_SIZE = 100; +const UPLOAD_WATCH_DEBOUNCE_TIME_MS = 10_000; const s = (count: number) => (count === 1 ? '' : 's'); @@ -36,6 +41,8 @@ export interface UploadOptionsDto { albumName?: string; includeHidden?: boolean; concurrency: number; + progress?: boolean; + watch?: boolean; } class UploadFile extends File { @@ -55,19 +62,90 @@ class UploadFile extends File { } } +const uploadBatch = async (files: string[], options: UploadOptionsDto) => { + const { newFiles, duplicates } = await checkForDuplicates(files, options); + const newAssets = await uploadFiles(newFiles, options); + await updateAlbums([...newAssets, ...duplicates], options); + await deleteFiles(newFiles, options); +}; + +export const startWatch = async ( + paths: string[], + options: UploadOptionsDto, + { + batchSize = UPLOAD_WATCH_BATCH_SIZE, + debounceTimeMs = UPLOAD_WATCH_DEBOUNCE_TIME_MS, + }: { batchSize?: number; debounceTimeMs?: number } = {}, +) => { + const watcherIgnored: Matcher[] = []; + const { image, video } = await getSupportedMediaTypes(); + const extensions = new Set([...image, ...video]); + + if (options.ignore) { + watcherIgnored.push((path) => micromatch.contains(path, `**/${options.ignore}`)); + } + + const pathsBatcher = new Batcher({ + batchSize, + debounceTimeMs, + onBatch: async (paths: string[]) => { + const uniquePaths = [...new Set(paths)]; + await uploadBatch(uniquePaths, options); + }, + }); + + const onFile = async (path: string, stats?: Stats) => { + if (stats?.isDirectory()) { + return; + } + const ext = '.' + path.split('.').pop()?.toLowerCase(); + if (!extensions.has(ext ?? '')) { + return; + } + console.log(`Change detected: ${path}`); + pathsBatcher.add(path); + }; + const fsWatcher = watchFs(paths, { + ignoreInitial: true, + ignored: watcherIgnored, + alwaysStat: true, + awaitWriteFinish: true, + depth: options.recursive ? undefined : 1, + persistent: true, + }) + .on('add', onFile) + .on('change', onFile) + .on('error', (error) => console.error(`Watcher error: ${error}`)); + + process.on('SIGINT', async () => { + console.log('Exiting...'); + await fsWatcher.close(); + process.exit(); + }); +}; + export const upload = async (paths: string[], baseOptions: BaseOptions, options: UploadOptionsDto) => { await authenticate(baseOptions); const scanFiles = await scan(paths, options); + if (scanFiles.length === 0) { - console.log('No files found, exiting'); - return; + if (options.watch) { + console.log('No files found initially.'); + } else { + console.log('No files found, exiting'); + return; + } } - const { newFiles, duplicates } = await checkForDuplicates(scanFiles, options); - const newAssets = await uploadFiles(newFiles, options); - await updateAlbums([...newAssets, ...duplicates], options); - await deleteFiles(newFiles, options); + if (options.watch) { + console.log('Watching for changes...'); + await startWatch(paths, options); + // watcher does not handle the initial scan + // as the scan() is a more efficient quick start with batched results + } + + await uploadBatch(scanFiles, options); }; const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => { @@ -85,19 +163,25 @@ const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => { return files; }; -export const checkForDuplicates = async (files: string[], { concurrency, skipHash }: UploadOptionsDto) => { +export const checkForDuplicates = async (files: string[], { concurrency, skipHash, progress }: UploadOptionsDto) => { if (skipHash) { console.log('Skipping hash check, assuming all files are new'); return { newFiles: files, duplicates: [] }; } - const multiBar = new MultiBar( - { format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' }, - Presets.shades_classic, - ); + let multiBar: MultiBar | undefined; + + if (progress) { + multiBar = new MultiBar( + { format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' }, + Presets.shades_classic, + ); + } else { + console.log(`Received ${files.length} files, hashing...`); + } - const hashProgressBar = multiBar.create(files.length, 0, { message: 'Hashing files ' }); - const checkProgressBar = multiBar.create(files.length, 0, { message: 'Checking for duplicates' }); + const hashProgressBar = multiBar?.create(files.length, 0, { message: 'Hashing files ' }); + const checkProgressBar = multiBar?.create(files.length, 0, { message: 'Checking for duplicates' }); const newFiles: string[] = []; const duplicates: Asset[] = []; @@ -117,7 +201,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas } } - checkProgressBar.increment(assets.length); + checkProgressBar?.increment(assets.length); }, { concurrency, retry: 3 }, ); @@ -137,7 +221,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas void checkBulkUploadQueue.push(batch); } - hashProgressBar.increment(); + hashProgressBar?.increment(); return results; }, { concurrency, retry: 3 }, @@ -155,7 +239,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas await checkBulkUploadQueue.drained(); - multiBar.stop(); + multiBar?.stop(); console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`); @@ -171,7 +255,10 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas return { newFiles, duplicates }; }; -export const uploadFiles = async (files: string[], { dryRun, concurrency }: UploadOptionsDto): Promise => { +export const uploadFiles = async ( + files: string[], + { dryRun, concurrency, progress }: UploadOptionsDto, +): Promise => { if (files.length === 0) { console.log('All assets were already uploaded, nothing to do.'); return []; @@ -191,12 +278,20 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo return files.map((filepath) => ({ id: '', filepath })); } - const uploadProgress = new SingleBar( - { format: 'Uploading assets | {bar} | {percentage}% | ETA: {eta_formatted} | {value_formatted}/{total_formatted}' }, - Presets.shades_classic, - ); - uploadProgress.start(totalSize, 0); - uploadProgress.update({ value_formatted: 0, total_formatted: byteSize(totalSize) }); + let uploadProgress: SingleBar | undefined; + + if (progress) { + uploadProgress = new SingleBar( + { + format: 'Uploading assets | {bar} | {percentage}% | ETA: {eta_formatted} | {value_formatted}/{total_formatted}', + }, + Presets.shades_classic, + ); + } else { + console.log(`Uploading ${files.length} asset${s(files.length)} (${byteSize(totalSize)})`); + } + uploadProgress?.start(totalSize, 0); + uploadProgress?.update({ value_formatted: 0, total_formatted: byteSize(totalSize) }); let duplicateCount = 0; let duplicateSize = 0; @@ -222,7 +317,7 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo successSize += stats.size ?? 0; } - uploadProgress.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) }); + uploadProgress?.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) }); return response; }, @@ -235,7 +330,7 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo await queue.drained(); - uploadProgress.stop(); + uploadProgress?.stop(); console.log(`Successfully uploaded ${successCount} new asset${s(successCount)} (${byteSize(successSize)})`); if (duplicateCount > 0) { diff --git a/cli/src/index.ts b/cli/src/index.ts index 341a70bef024a..5da4b50722b3c 100644 --- a/cli/src/index.ts +++ b/cli/src/index.ts @@ -69,6 +69,13 @@ program .default(4), ) .addOption(new Option('--delete', 'Delete local assets after upload').env('IMMICH_DELETE_ASSETS')) + .addOption(new Option('--no-progress', 'Hide progress bars').env('IMMICH_PROGRESS_BAR').default(true)) + .addOption( + new Option('--watch', 'Watch for changes and upload automatically') + .env('IMMICH_WATCH_CHANGES') + .default(false) + .implies({ progress: false }), + ) .argument('[paths...]', 'One or more paths to assets to be uploaded') .action((paths, options) => upload(paths, program.opts(), options)); diff --git a/cli/src/utils.spec.ts b/cli/src/utils.spec.ts index 3e7e55fcb69e1..28bda18d3ffc6 100644 --- a/cli/src/utils.spec.ts +++ b/cli/src/utils.spec.ts @@ -1,6 +1,7 @@ import mockfs from 'mock-fs'; import { readFileSync } from 'node:fs'; -import { CrawlOptions, crawl } from 'src/utils'; +import { Batcher, CrawlOptions, crawl } from 'src/utils'; +import { Mock } from 'vitest'; interface Test { test: string; @@ -286,3 +287,38 @@ describe('crawl', () => { } }); }); + +describe('Batcher', () => { + let batcher: Batcher; + let onBatch: Mock; + beforeEach(() => { + onBatch = vi.fn(); + batcher = new Batcher({ batchSize: 2, onBatch }); + }); + + it('should trigger onBatch() when a batch limit is reached', async () => { + batcher.add('a'); + batcher.add('b'); + batcher.add('c'); + expect(onBatch).toHaveBeenCalledOnce(); + expect(onBatch).toHaveBeenCalledWith(['a', 'b']); + }); + + it('should trigger onBatch() when flush() is called', async () => { + batcher.add('a'); + batcher.flush(); + expect(onBatch).toHaveBeenCalledOnce(); + expect(onBatch).toHaveBeenCalledWith(['a']); + }); + + it('should trigger onBatch() when debounce time reached', async () => { + vi.useFakeTimers(); + batcher = new Batcher({ batchSize: 2, debounceTimeMs: 100, onBatch }); + batcher.add('a'); + expect(onBatch).not.toHaveBeenCalled(); + vi.advanceTimersByTime(200); + expect(onBatch).toHaveBeenCalledOnce(); + expect(onBatch).toHaveBeenCalledWith(['a']); + vi.useRealTimers(); + }); +}); diff --git a/cli/src/utils.ts b/cli/src/utils.ts index 7bbbb5615b640..d3c7ff27876b8 100644 --- a/cli/src/utils.ts +++ b/cli/src/utils.ts @@ -172,3 +172,64 @@ export const sha1 = (filepath: string) => { rs.on('end', () => resolve(hash.digest('hex'))); }); }; + +/** + * Batches items and calls onBatch to process them + * when the batch size is reached or the debounce time has passed. + */ +export class Batcher { + private readonly items: T[] = []; + private readonly batchSize: number; + private readonly debounceTimeMs?: number; + private readonly onBatch: (items: T[]) => void; + private debounceTimer?: NodeJS.Timeout; + + constructor({ + batchSize, + debounceTimeMs, + onBatch, + }: { + batchSize: number; + debounceTimeMs?: number; + onBatch: (items: T[]) => Promise; + }) { + this.batchSize = batchSize; + this.debounceTimeMs = debounceTimeMs; + this.onBatch = onBatch; + } + + private setDebounceTimer() { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + } + if (this.debounceTimeMs) { + this.debounceTimer = setTimeout(() => this.flush(), this.debounceTimeMs); + } + } + + private clearDebounceTimer() { + if (this.debounceTimer) { + clearTimeout(this.debounceTimer); + this.debounceTimer = undefined; + } + } + + add(item: T) { + this.items.push(item); + this.setDebounceTimer(); + if (this.items.length >= this.batchSize) { + this.flush(); + } + } + + flush() { + this.clearDebounceTimer(); + if (this.items.length === 0) { + return; + } + + this.onBatch([...this.items]); + + this.items.length = 0; + } +}