Skip to content

Commit

Permalink
WIP range request refactoring
Browse files Browse the repository at this point in the history
Uses rangeParser now, but doesn't yet support multiple ranges.
  • Loading branch information
djwhitt committed Nov 21, 2023
1 parent 4d01133 commit 8c706f9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 54 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"node-cache": "^5.1.2",
"prom-client": "^14.0.1",
"ramda": "^0.28.0",
"range-parser": "^1.2.1",
"redis": "^4.6.10",
"retry-axios": "^3.0.0",
"rfc4648": "^1.5.2",
Expand Down
124 changes: 70 additions & 54 deletions src/routes/data/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
*/
import { Request, Response } from 'express';
import { default as asyncHandler } from 'express-async-handler';
import { Transform } from 'node:stream';
import url from 'node:url';
import rangeParser from 'range-parser';
import { Logger } from 'winston';
import { Transform } from 'stream';

import { MANIFEST_CONTENT_TYPE } from '../../lib/encoding.js';
import {
Expand Down Expand Up @@ -62,70 +63,86 @@ const setDataHeaders = ({
data.sourceContentType ??
DEFAULT_CONTENT_TYPE,
);
res.header('Content-Length', data.size.toString());
};

const handlePartialDataResponse = (log: Logger, rangeHeader: string, res: Response, data: ContiguousData, dataAttributes: ContiguousDataAttributes | undefined ) => {
const totalSize = data.size;
const parts = rangeHeader.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : totalSize - 1;
const chunkSize = end - start + 1;
const handleRangeRequest = (
log: Logger,
rangeHeader: string,
res: Response,
data: ContiguousData,
dataAttributes: ContiguousDataAttributes | undefined,
) => {
const ranges = rangeParser(data.size, rangeHeader, {
combine: true,
});

// Check if the range is valid
if (start >= 0 && end < totalSize && start <= end) {
res.status(206); // Partial Content
res.setHeader("Content-Range", `bytes ${start}-${end}/${totalSize}`);
res.setHeader("Accept-Ranges", "bytes");
// Malformed range header
if (ranges === -2) {
log.error(`Malformed 'range' header`);
res.status(400).type('text').send(`Malformed 'range' header`);
return;
}

if (dataAttributes?.stable) {
res.setHeader('Cache-Control', `public, max-age=${STABLE_MAX_AGE}, immutable`);
} else {
res.setHeader('Cache-Control', `public, max-age=${UNSTABLE_MAX_AGE}`);
}
// Unsatisfiable range
if (ranges === -1 || ranges.type !== 'bytes') {
log.error('Range not satisfiable');
res
.status(416)
.set('Content-Range', `bytes */${data.size}`)
.type('text')
.send('Range not satisfiable');
return;
}

const isSingleRange = ranges.length === 1;
if (isSingleRange) {
const totalSize = data.size;
const start = ranges[0].start;
const end = ranges[0].end;

res.status(206); // Partial Content
res.setHeader('Content-Range', `bytes ${start}-${end}/${totalSize}`);
res.setHeader('Accept-Ranges', 'bytes');

res.contentType(
dataAttributes?.contentType ?? data.sourceContentType ?? DEFAULT_CONTENT_TYPE
dataAttributes?.contentType ??
data.sourceContentType ??
DEFAULT_CONTENT_TYPE,
);

// Create a custom Transform stream to filter the range
let position = 0;
const rangeStream = new Transform({
transform(chunk, _, callback) {
// Calculate the byte range for this chunk relative to the global start position
const chunkStart = (this as any).position;
// Calculate the byte range for this chunk relative to the global start
// position
const chunkStart = position;
const chunkEnd = chunkStart + chunk.length - 1;

// Determine the intersection between the global range and the chunk's range
// Determine the intersection between the global range and the chunk's
// range
const intersectionStart = Math.max(start, chunkStart);
const intersectionEnd = Math.min(end, chunkEnd);

if (intersectionStart <= intersectionEnd) {
// There is an intersection, so slice and push the relevant part of the chunk
const slicedData = chunk.slice(intersectionStart - chunkStart, intersectionEnd - chunkStart + 1);
// There is an intersection, so slice and push the relevant part of
// the chunk
const slicedData = chunk.slice(
intersectionStart - chunkStart,
intersectionEnd - chunkStart + 1,
);
this.push(slicedData);
}

(this as any).position += chunk.length;
position += chunk.length;
callback();
},
});
(rangeStream as any).position = 0;

rangeStream.on("close", () => {
// Handle any cleanup if needed
});

data.stream.pipe(rangeStream).pipe(res);
} else {
// If the range is invalid, send a 416 response (Requested Range Not Satisfiable)
log.warn('Attributes', dataAttributes);
log.warn("Couldn't run range query", {
start: start,
end: end,
chunkSize: chunkSize,
totalSize: totalSize,
});
res.status(416).end();
// Multiple ranges are not yet supported
data.stream.pipe(res);
}
};

Expand Down Expand Up @@ -218,17 +235,15 @@ export const createRawDataHandler = ({
data = await dataSource.getData(id, dataAttributes);
// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader && data) {
handlePartialDataResponse(log, rangeHeader, res, data, dataAttributes);
if (rangeHeader !== undefined) {
setRawDataHeaders(res);
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({ res, dataAttributes, data });
setRawDataHeaders(res);
data.stream.pipe(res);
}


}
} catch (error: any) {
log.warn('Unable to retrieve contiguous data:', {
dataId: id,
Expand Down Expand Up @@ -303,19 +318,23 @@ const sendManifestResponse = async ({
try {
// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader && data) {
handlePartialDataResponse(log, rangeHeader, res, data, dataAttributes);
if (rangeHeader !== undefined) {
setDataHeaders({
res,
dataAttributes,
data,
});
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({
res,
dataAttributes,
data,
});

res.header('Content-Length', data.size.toString());
data.stream.pipe(res);
}

} catch (error: any) {
log.error('Error retrieving data attributes:', {
dataId: resolvedId,
Expand Down Expand Up @@ -429,7 +448,6 @@ export const createDataHandler = ({
// Attempt to retrieve data
try {
data = await dataSource.getData(id, dataAttributes);

} catch (error: any) {
log.warn('Unable to retrieve contiguous data:', {
dataId: id,
Expand Down Expand Up @@ -475,20 +493,18 @@ export const createDataHandler = ({

// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader && data) {
handlePartialDataResponse(log, rangeHeader, res, data, dataAttributes);
if (rangeHeader !== undefined && data !== undefined) {
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({
res,
dataAttributes,
data,
});

res.header('Content-Length', data.size.toString());
data.stream.pipe(res);
}


} catch (error: any) {
log.error('Error retrieving data:', {
dataId: id,
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5472,6 +5472,11 @@ randombytes@^2.1.0:
dependencies:
safe-buffer "^5.1.0"

range-parser@^1.2.1:
version "1.2.1"
resolved "https://registry.yarnpkg.com/range-parser/-/range-parser-1.2.1.tgz#3cf37023d199e1c24d1a55b84800c2f3e6468031"
integrity sha512-Hrgsx+orqoygnmhFbKaHE6c296J+HTAQXoxEF6gNupROmmGJRoyzfG3ccAveqCBrwr/2yxQ5BVd/GTl5agOwSg==

range-parser@~1.2.1:
version "1.2.1"
resolved "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz"
Expand Down

0 comments on commit 8c706f9

Please sign in to comment.