Skip to content

Commit

Permalink
Add detecting Web Stream to default (core) entry point
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Jul 29, 2024
1 parent 499b985 commit 0d11c90
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 11 deletions.
48 changes: 48 additions & 0 deletions core.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ export async function fileTypeFromTokenizer(tokenizer) {
return new FileTypeParser().fromTokenizer(tokenizer);
}

export async function stream(webStream) {
return new FileTypeParser().toDetectionStream(webStream);
}

export class FileTypeParser {
constructor(options) {
this.detectors = options?.customDetectors;
Expand Down Expand Up @@ -104,6 +108,50 @@ export class FileTypeParser {
}
}

async toDetectionStream(webStream, options = {}) {
const {sampleSize = reasonableDetectionSizeInBytes} = options;

// Initialize a reader from the web stream
const reader = webStream.getReader({mode: 'byob'});
const pass = new TransformStream();
const writer = pass.writable.getWriter();
let detectedFileType;

// Read the first chunk for file type detection
const {value: chunk, done} = await reader.read(new Uint8Array(sampleSize));
if (done || !chunk) {
detectedFileType = undefined;
} else {
try {
detectedFileType = await this.fromBuffer(chunk.slice(0, sampleSize));
} catch (error) {
if (error instanceof strtok3.EndOfStreamError) {
detectedFileType = undefined;
} else {
throw error;
}
}
}

// Write the initial chunk into the pass-through stream
writer.write(chunk);

// Forward remaining data from the reader to the writer
(async function pump() {
const {value, done} = await reader.read(new Uint8Array(512 * 1024));
if (done) {
return writer.close();
}

await writer.write(value);
return pump();
})();

// Attach the detected file type to the output stream
pass.readable.fileType = detectedFileType;
return pass.readable;
}

check(header, options) {
return _check(this.buffer, header, options);
}
Expand Down
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ if (stream2.fileType?.mime === 'image/jpeg') {
}
```
*/
export function fileTypeStream(readableStream: NodeReadableStream, options?: StreamOptions): Promise<ReadableStreamWithFileType>;
export function fileTypeStream(readableStream: NodeReadableStream | AnyWebReadableStream<Uint8Array>, options?: StreamOptions): Promise<ReadableStreamWithFileType>;

export * from './core.js';
12 changes: 8 additions & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Node.js specific entry point.
*/

import {ReadableStream as WebReadableStream} from 'node:stream/web';
import {pipeline, PassThrough} from 'node:stream';
import * as strtok3 from 'strtok3';
import {FileTypeParser, reasonableDetectionSizeInBytes} from './core.js';

Expand All @@ -26,7 +27,10 @@ export class NodeFileTypeParser extends FileTypeParser {
}

async toDetectionStream(readableStream, options = {}) {
const {default: stream} = await import('node:stream');
if (readableStream instanceof WebReadableStream) {
return super.toDetectionStream(readableStream, options);
}

const {sampleSize = reasonableDetectionSizeInBytes} = options;

return new Promise((resolve, reject) => {
Expand All @@ -36,8 +40,8 @@ export class NodeFileTypeParser extends FileTypeParser {
(async () => {
try {
// Set up output stream
const pass = new stream.PassThrough();
const outputStream = stream.pipeline ? stream.pipeline(readableStream, pass, () => {}) : readableStream.pipe(pass);
const pass = new PassThrough();
const outputStream = pipeline ? pipeline(readableStream, pass, () => {}) : readableStream.pipe(pass);

// Read the input stream and detect the filetype
const chunk = readableStream.read(sampleSize) ?? readableStream.read() ?? new Uint8Array(0);
Expand Down Expand Up @@ -70,7 +74,7 @@ export async function fileTypeFromStream(stream, fileTypeOptions) {
}

export async function fileTypeStream(readableStream, options = {}) {
return new NodeFileTypeParser().toDetectionStream(readableStream, options);
return (new NodeFileTypeParser(options)).toDetectionStream(readableStream, options);
}

export {fileTypeFromTokenizer, fileTypeFromBuffer, fileTypeFromBlob, FileTypeParser, supportedMimeTypes, supportedExtensions} from './core.js';
55 changes: 49 additions & 6 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import process from 'node:process';
import path from 'node:path';
import {fileURLToPath} from 'node:url';
import fs from 'node:fs';
import {readFile} from 'node:fs/promises';
import stream from 'node:stream';
import test from 'ava';
import {readableNoopStream} from 'noop-stream';
Expand Down Expand Up @@ -337,7 +338,7 @@ async function testFileNodeFromStream(t, ext, name) {
t.is(typeof fileType.mime, 'string', 'fileType.mime');
}

async function loadEntireFile(readable) {
async function loadEntireFileFromNodeReadable(readable) {
const chunks = [];
let totalLength = 0;

Expand All @@ -357,18 +358,58 @@ async function loadEntireFile(readable) {
return entireFile;
}

async function testStream(t, ext, name) {
async function testStreamWithNodeStream(t, ext, name) {
const fixtureName = `${(name ?? 'fixture')}.${ext}`;
const file = path.join(__dirname, 'fixture', fixtureName);

const readableStream = await fileTypeStream(fs.createReadStream(file));
const fileStream = fs.createReadStream(file);

const [bufferA, bufferB] = await Promise.all([loadEntireFile(readableStream), loadEntireFile(fileStream)]);
const [bufferA, bufferB] = await Promise.all([loadEntireFileFromNodeReadable(readableStream), loadEntireFileFromNodeReadable(fileStream)]);

t.true(areUint8ArraysEqual(bufferA, bufferB));
}

async function loadEntireFileFromWebStream(webStream) {
const reader = webStream.getReader();
const chunks = [];
let totalLength = 0;
let bytesRead = 0;

do {
const {done, value} = await reader.read();
if (done) {
break;
}

chunks.push(value);
bytesRead = value.byteLength;
totalLength += bytesRead;
} while (bytesRead > 0);

// Concatenate all chunks into a single Uint8Array
const entireFile = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
entireFile.set(chunk, offset);
offset += chunk.byteLength;
}

return entireFile;
}

async function testStreamWithWebStream(t, ext, name) {
const fixtureName = `${(name ?? 'fixture')}.${ext}`;
const file = path.join(__dirname, 'fixture', fixtureName);
// Read the file into a buffer
const fileBuffer = await readFile(file);
// Create a Blob from the buffer
const blob = new Blob([fileBuffer]);
const webStream = await fileTypeStream(blob.stream());
const webStreamResult = await loadEntireFileFromWebStream(webStream);
t.true(areUint8ArraysEqual(fileBuffer, webStreamResult));
}

test('Test suite must be able to detect Node.js major version', t => {
t.is(typeof nodeMajorVersion, 'number', 'Detected Node.js major version should be a number');
});
Expand All @@ -388,7 +429,9 @@ for (const type of types) {
}

_test(`${name}.${type} ${i++} .fileTypeFromStream() Node.js method - same fileType`, testFileNodeFromStream, type, name);
test(`${name}.${type} ${i++} .fileTypeStream() - identical streams`, testStream, type, name);
_test(`${name}.${type} ${i++} .fileTypeStream() - identical Node.js Readable streams`, testStreamWithNodeStream, type, name);

test(`${name}.${type} ${i++} .fileTypeStream() - identical Web Streams`, testStreamWithWebStream, type, name);
}
} else {
const fixtureName = `fixture.${type}`;
Expand All @@ -397,7 +440,7 @@ for (const type of types) {
_test(`${type} ${i++} .fileTypeFromFile()`, testFromFile, type);
_test(`${type} ${i++} .fileTypeFromBuffer()`, testFromBuffer, type);
_test(`${type} ${i++} .fileTypeFromStream() Node.js`, testFileNodeFromStream, type);
test(`${type} ${i++} .fileTypeStream() - identical streams`, testStream, type);
test(`${type} ${i++} .fileTypeStream() - identical streams`, testStreamWithNodeStream, type);
}

if (Object.prototype.hasOwnProperty.call(falsePositives, type)) {
Expand Down Expand Up @@ -427,7 +470,7 @@ test('.fileTypeStream() method - short stream', async t => {
t.is(newStream.fileType, undefined);

// Test usability of returned stream
const bufferB = await loadEntireFile(newStream);
const bufferB = await loadEntireFileFromNodeReadable(newStream);
t.deepEqual(bufferA, bufferB);
});

Expand Down

0 comments on commit 0d11c90

Please sign in to comment.