Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PE-4908 Add support for range requests #64

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"node-cache": "^5.1.2",
"prom-client": "^14.0.1",
"ramda": "^0.28.0",
"range-parser": "^1.2.1",
"redis": "^4.6.10",
"retry-axios": "^3.0.0",
"rfc4648": "^1.5.2",
Expand Down
80 changes: 70 additions & 10 deletions src/routes/data/handlers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { expect } from 'chai';
import express from 'express';
import { Readable } from 'node:stream';
import sinon from 'sinon';
import { default as request } from 'supertest';

import log from '../../log.js';
import {
BlockListValidator,
ContiguousDataIndex,
ContiguousDataSource,
ManifestPathResolver,
Expand All @@ -33,7 +34,7 @@ describe('Data routes', () => {
let app: express.Express;
let dataIndex: ContiguousDataIndex;
let dataSource: ContiguousDataSource;
let blockListValidator: BlockListValidator;
let blockListValidator: any;
let manifestPathResolver: ManifestPathResolver;

beforeEach(() => {
Expand All @@ -43,7 +44,12 @@ describe('Data routes', () => {
getDataParent: sinon.stub(),
saveDataContentAttributes: sinon.stub(),
};
dataSource = { getData: sinon.stub() };
dataSource = {
getData: sinon.stub().returns({
stream: Readable.from(Buffer.from('testing...')),
size: 10,
}),
};
blockListValidator = {
isIdBlocked: sinon.stub(),
isHashBlocked: sinon.stub(),
Expand All @@ -54,7 +60,28 @@ describe('Data routes', () => {
};
});

it('should handle blocked ID', async () => {
it('should return 200 status code and data for unblocked data request', async () => {
app.get(
'/:id',
createDataHandler({
log,
dataIndex,
dataSource,
blockListValidator,
manifestPathResolver,
}),
);
blockListValidator.isIdBlocked.resolves(false);
blockListValidator.isHashBlocked.resolves(false);
return request(app)
.get('/not-a-real-id')
.expect(200)
.then((res: any) => {
expect(res.body.toString()).to.equal('testing...');
});
});

it('should return 206 status code and partial data for a range request', async () => {
const blockListValidator = {
isIdBlocked: sinon.stub(),
isHashBlocked: sinon.stub(),
Expand All @@ -69,13 +96,46 @@ describe('Data routes', () => {
manifestPathResolver,
}),
);
blockListValidator.isIdBlocked.resolves(true);
request(app)
.get('/id')
.expect(404)
.end((err: any, _res: any) => {
if (err) throw err;
return request(app)
.get('/not-a-real-id')
.set('Range', 'bytes=2-3')
.expect(206)
.then((res: any) => {
expect(res.body.toString()).to.equal('st');
});
});

// Multiple ranges are not yet supported
it('should return 416 status code for a range request with multiple ranges', async () => {
app.get(
'/:id',
createDataHandler({
log,
dataIndex,
dataSource,
blockListValidator,
manifestPathResolver,
}),
);
return request(app)
.get('/not-a-real-id')
.set('Range', 'bytes=1-2,4-5')
.expect(416);
});

it('should return 404 given a blocked ID', async () => {
app.get(
'/:id',
createDataHandler({
log,
dataIndex,
dataSource,
blockListValidator,
manifestPathResolver,
}),
);
blockListValidator.isIdBlocked.resolves(true);
return request(app).get('/not-a-real-id-id').expect(404);
});
});
});
150 changes: 133 additions & 17 deletions src/routes/data/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
import { Request, Response } from 'express';
import { default as asyncHandler } from 'express-async-handler';
import { Transform } from 'node:stream';
import url from 'node:url';
import rangeParser from 'range-parser';
import { Logger } from 'winston';

import { MANIFEST_CONTENT_TYPE } from '../../lib/encoding.js';
Expand Down Expand Up @@ -50,18 +52,103 @@
// TODO add header indicating whether data is verified
// TODO cached header for zero length data (maybe...)

// Allow range requests
res.header('Accept-Ranges', 'bytes');

// Aggressively cache data before max fork depth
if (dataAttributes?.stable) {
res.header('Cache-Control', `public, max-age=${STABLE_MAX_AGE}, immutable`);
} else {
res.header('Cache-Control', `public, max-age=${UNSTABLE_MAX_AGE}`);
}

// Use the content type from the L1 or data item index if available
res.contentType(
dataAttributes?.contentType ??
data.sourceContentType ??
DEFAULT_CONTENT_TYPE,
);
res.header('Content-Length', data.size.toString());
};

const handleRangeRequest = (
log: Logger,
rangeHeader: string,
res: Response,
data: ContiguousData,
dataAttributes: ContiguousDataAttributes | undefined,
) => {
const ranges = rangeParser(data.size, rangeHeader, {
combine: true,
});

// Malformed range header
if (ranges === -2) {
log.warn(`Malformed 'range' header`);
res.status(400).type('text').send(`Malformed 'range' header`);
return;
}

Check warning on line 89 in src/routes/data/handlers.ts

View check run for this annotation

Codecov / codecov/patch

src/routes/data/handlers.ts#L86-L89

Added lines #L86 - L89 were not covered by tests

// Unsatisfiable range
if (ranges === -1 || ranges.type !== 'bytes') {
log.warn('Range not satisfiable');
res
.status(416)
.set('Content-Range', `bytes */${data.size}`)
.type('text')
.send('Range not satisfiable');
return;
}

Check warning on line 100 in src/routes/data/handlers.ts

View check run for this annotation

Codecov / codecov/patch

src/routes/data/handlers.ts#L93-L100

Added lines #L93 - L100 were not covered by tests

const isSingleRange = ranges.length === 1;
if (isSingleRange) {
const totalSize = data.size;
const start = ranges[0].start;
const end = ranges[0].end;

res.status(206); // Partial Content
res.setHeader('Content-Range', `bytes ${start}-${end}/${totalSize}`);
res.setHeader('Accept-Ranges', 'bytes');

res.contentType(
dataAttributes?.contentType ??
data.sourceContentType ??
DEFAULT_CONTENT_TYPE,
);

// Create a custom Transform stream to filter the range
let position = 0;
const rangeStream = new Transform({
transform(chunk, _, callback) {
// Calculate the byte range for this chunk relative to the global start
// position
const chunkStart = position;
const chunkEnd = chunkStart + chunk.length - 1;

// Determine the intersection between the global range and the chunk's
// range
const intersectionStart = Math.max(start, chunkStart);
const intersectionEnd = Math.min(end, chunkEnd);

if (intersectionStart <= intersectionEnd) {
// There is an intersection, so slice and push the relevant part of
// the chunk
const slicedData = chunk.slice(
intersectionStart - chunkStart,
intersectionEnd - chunkStart + 1,
);
this.push(slicedData);
}

position += chunk.length;
callback();
},
});

data.stream.pipe(rangeStream).pipe(res);
} else {
log.warn('Multiple ranges are not yet supported');
res.status(416).type('text').send('Multiple ranges are not yet supported');
}
};

const setRawDataHeaders = (res: Response) => {
Expand Down Expand Up @@ -150,9 +237,18 @@
let data: ContiguousData | undefined;
try {
data = await dataSource.getData(id, dataAttributes);
setDataHeaders({ res, dataAttributes, data });
setRawDataHeaders(res);
data.stream.pipe(res);
// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader !== undefined) {
setRawDataHeaders(res);
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({ res, dataAttributes, data });
setRawDataHeaders(res);
res.header('Content-Length', data.size.toString());
data.stream.pipe(res);
}

Check warning on line 251 in src/routes/data/handlers.ts

View check run for this annotation

Codecov / codecov/patch

src/routes/data/handlers.ts#L240-L251

Added lines #L240 - L251 were not covered by tests
} catch (error: any) {
log.warn('Unable to retrieve contiguous data:', {
dataId: id,
Expand Down Expand Up @@ -225,12 +321,25 @@

// Set headers and stream data
try {
setDataHeaders({
res,
dataAttributes,
data,
});
data.stream.pipe(res);
// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader !== undefined) {
setDataHeaders({
res,
dataAttributes,
data,
});
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({
res,
dataAttributes,
data,
});
res.header('Content-Length', data.size.toString());
data.stream.pipe(res);
}

Check warning on line 342 in src/routes/data/handlers.ts

View check run for this annotation

Codecov / codecov/patch

src/routes/data/handlers.ts#L324-L342

Added lines #L324 - L342 were not covered by tests
} catch (error: any) {
log.error('Error retrieving data attributes:', {
dataId: resolvedId,
Expand Down Expand Up @@ -387,13 +496,20 @@
return;
}

// Set headers and stream data
setDataHeaders({
res,
dataAttributes,
data,
});
data.stream.pipe(res);
// Check if the request includes a Range header
const rangeHeader = req.headers.range;
if (rangeHeader !== undefined && data !== undefined) {
handleRangeRequest(log, rangeHeader, res, data, dataAttributes);
} else {
// Set headers and stream data
setDataHeaders({
res,
dataAttributes,
data,
});
res.header('Content-Length', data.size.toString());
data.stream.pipe(res);
}
} catch (error: any) {
log.error('Error retrieving data:', {
dataId: id,
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5472,6 +5472,11 @@ randombytes@^2.1.0:
dependencies:
safe-buffer "^5.1.0"

range-parser@^1.2.1:
version "1.2.1"
resolved "https://registry.yarnpkg.com/range-parser/-/range-parser-1.2.1.tgz#3cf37023d199e1c24d1a55b84800c2f3e6468031"
integrity sha512-Hrgsx+orqoygnmhFbKaHE6c296J+HTAQXoxEF6gNupROmmGJRoyzfG3ccAveqCBrwr/2yxQ5BVd/GTl5agOwSg==

range-parser@~1.2.1:
version "1.2.1"
resolved "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz"
Expand Down