Skip to content

Commit

Permalink
Try to determine file-type from web stream, utilizing TransformStream
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Aug 2, 2024
1 parent 257bc99 commit 1bb5685
Showing 1 changed file with 35 additions and 45 deletions.
80 changes: 35 additions & 45 deletions core.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,57 +108,47 @@ export class FileTypeParser {
}
}

async toDetectionStream(webStream, options = {}) {
async toDetectionStream(stream, options) {
const {sampleSize = reasonableDetectionSizeInBytes} = options;

// Initialize a reader from the web stream
const reader = webStream.getReader({mode: 'byob'});
const pass = new TransformStream();
const writer = pass.writable.getWriter();
let detectedFileType;

// Read the first chunk for file type detection
const {value: chunk, done} = await reader.read(new Uint8Array(sampleSize));
if (done || !chunk) {
detectedFileType = undefined;
} else {
try {
detectedFileType = await this.fromBuffer(chunk.slice(0, sampleSize));
} catch (error) {
if (error instanceof strtok3.EndOfStreamError) {
detectedFileType = undefined;
} else {
throw error;
}
}
}
// Create a new ReadableStream to manage locking issues
const transformStream = new TransformStream({
async start(controller) {
const reader = stream.getReader({mode: 'byob'});
try {
// Read the first chunk from the stream
const {value: chunk, done} = await reader.read(new Uint8Array(sampleSize));
if (!done && chunk) {
try {
// Attempt to detect the file type from the chunk
detectedFileType = await this.fromBuffer(chunk.slice(0, sampleSize));
} catch (error) {
if (!(error instanceof strtok3.EndOfStreamError)) {
throw error; // Re-throw non-EndOfStreamError
}

detectedFileType = undefined;
}
}

try {
// Write the initial chunk into the pass-through stream
writer.write(chunk);
} catch (error) {
reader.cancel(error); // Cancel the reader on error
throw new Error(`Stream handling failed: ${error.message}`);
}

// Forward remaining data from the reader to the writer
(async function pump() {
const {value, done} = await reader.read(new Uint8Array(512 * 1024)).catch(readError => {
writer.abort(readError); // Abort writing on error
});
if (done) {
return writer.close();
}
controller.enqueue(chunk); // Enqueue the initial chunk
} catch (error) {
controller.error(error); // Handle errors during start
} finally {
reader.releaseLock(); // Ensure the reader is released
}
},
transform(chunk, controller) {
// Pass through the chunks without modification
controller.enqueue(chunk);
},
});

await writer.write(value).catch(writeError => {
reader.cancel(writeError); // Cancel the reader on error
});
return pump(); // Recursion for continuous reading
})();
const finalStream = stream.pipeThrough(transformStream);
finalStream.fileType = detectedFileType;

// Attach the detected file type to the output stream
pass.readable.fileType = detectedFileType;
return pass.readable;
return finalStream;
}

check(header, options) {
Expand Down

0 comments on commit 1bb5685

Please sign in to comment.