Skip to content

Commit

Permalink
Add ability to stream request body from application handler
Browse files Browse the repository at this point in the history
  • Loading branch information
karlseguin committed Nov 9, 2024
1 parent 63c317c commit 14ca8a0
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub const Config = struct {
};

pub const Request = struct {
lazy_read_size: ?usize = null,
max_body_size: ?usize = null,
buffer_size: ?usize = null,
max_header_count: ?usize = null,
Expand Down
69 changes: 68 additions & 1 deletion src/httpz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,13 @@ test "tests:beforeAll" {
const ga = global_test_allocator.allocator();

{
default_server = try Server(void).init(ga, .{ .port = 5992 }, {});
default_server = try Server(void).init(ga, .{
.port = 5992,
.request = .{
.lazy_read_size = 4_096,
.max_body_size = 1_048_576,
}
}, {});

// only need to do this because we're using listenInNewThread instead
// of blocking here. So the array to hold the middleware needs to outlive
Expand All @@ -766,6 +772,7 @@ test "tests:beforeAll" {
router.method("PING", "/test/method", TestDummyHandler.method, .{});
router.get("/test/query", TestDummyHandler.reqQuery, .{});
router.get("/test/stream", TestDummyHandler.eventStream, .{});
router.get("/test/req_stream", TestDummyHandler.reqStream, .{});
router.get("/test/chunked", TestDummyHandler.chunked, .{});
router.get("/test/route_data", TestDummyHandler.routeData, .{ .data = &TestDummyHandler.RouteData{ .power = 12345 } });
router.all("/test/cors", TestDummyHandler.jsonRes, .{ .middlewares = cors });
Expand Down Expand Up @@ -1299,6 +1306,54 @@ test "httpz: custom handle" {
try t.expectString("HTTP/1.1 200 \r\nContent-Length: 9\r\n\r\nhello teg", testReadAll(stream, &buf));
}

test "httpz: request body streaming" {
{
// no body
const stream = testStream(5992);
defer stream.close();
try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 0\r\n\r\n");

var res = testReadParsed(stream);
defer res.deinit();
try res.expectJson(.{ .length = 0 });
}

{
// small body
const stream = testStream(5992);
defer stream.close();
try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 4\r\n\r\n123z");

var res = testReadParsed(stream);
defer res.deinit();
try res.expectJson(.{ .length = 4 });
}

var r = t.getRandom();
const random = r.random();

// a bit of fuzzing
for (0..10) |_| {
const stream = testStream(5992);
defer stream.close();
var req: []const u8 = "GET /test/req_stream HTTP/1.1\r\nContent-Length: 20000\r\n\r\n" ++ ("a" ** 20_000);
while (req.len > 0) {
const len = random.uintAtMost(usize, req.len - 1) + 1;
const n = stream.write(req[0..len]) catch |err| switch (err) {
error.WouldBlock => 0,
else => return err,
};
std.time.sleep(std.time.ns_per_ms * 2);
req = req[n..];
}

var res = testReadParsed(stream);
defer res.deinit();
try res.expectJson(.{ .length = 20_000 });
}

}

test "websocket: invalid request" {
const stream = testStream(5998);
defer stream.close();
Expand Down Expand Up @@ -1494,6 +1549,18 @@ const TestDummyHandler = struct {
try res.startEventStream(StreamContext{ .data = "hello" }, StreamContext.handle);
}

fn reqStream(req: *Request, res: *Response) !void {
var stream = try req.streamBody();
defer stream.deinit();

var l: usize = 0;
var buf: [1024]u8 = undefined;
while (try stream.read(&buf)) |data| {
l += data.len;
}
return res.json(.{.length = l}, .{});
}

const StreamContext = struct {
data: []const u8,

Expand Down
90 changes: 87 additions & 3 deletions src/request.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub const Request = struct {
// The URL of the request
url: Url,

// httpz's wrapper around a stream, the brave can access the underlying .stream
conn: *HTTPConn,

// the address of the client
address: Address,

Expand All @@ -45,6 +48,11 @@ pub const Request = struct {
body_buffer: ?buffer.Buffer = null,
body_len: usize = 0,

// True if we haven't read the [full] body yet. This can only happen when
// lazy_read_size is configured and the request is larger that this value.
// There can still be _part_ of the body in body_buffer.
lazy_body: bool,

// cannot use an optional on qs, because it's pre-allocated so always exists
qs_read: bool = false,

Expand Down Expand Up @@ -81,11 +89,13 @@ pub const Request = struct {
pub fn init(arena: Allocator, conn: *HTTPConn) Request {
const state = &conn.req_state;
return .{
.conn = conn,
.arena = arena,
.qs = &state.qs,
.fd = &state.fd,
.mfd = &state.mfd,
.method = state.method.?,
.lazy_body = state.lazy_body,
.method_string = state.method_string orelse "",
.protocol = state.protocol.?,
.url = Url.parse(state.url.?),
Expand Down Expand Up @@ -165,6 +175,26 @@ pub const Request = struct {
return self.parseMultiFormData();
}

pub fn streamBody(self: *Request) !Stream {
var buf: []const u8 = &.{};
if (self.body_buffer) |bb| {
std.debug.assert(bb.type == .static);
buf = bb.data;
}

const conn = self.conn;
if (self.lazy_body == true) {
try conn.blockingMode();
}

return .{
.req = self,
.buffer = buf,
.remaining = self.body_len,
.socket = conn.stream.handle,
};
}

// OK, this is a bit complicated.
// We might need to allocate memory to parse the querystring. Specifically, if
// there's a url-escaped component (a key or value), we need memory to store
Expand Down Expand Up @@ -462,6 +492,42 @@ pub const Request = struct {
.filename = filename,
};
}

pub const Stream = struct {
req: *Request,
remaining: usize,
buffer: []const u8,
socket: std.posix.socket_t,

pub fn deinit(self: *Stream) void {
self.req.conn.nonblockingMode() catch {};
}

pub fn read(self: *Stream, into: []u8) !?[]u8 {
const b = self.buffer;
const remaining = self.remaining;
if (b.len != 0) {
const l = @min(b.len, into.len);

const buf = into[0..l];
@memcpy(buf, b[0..l]);

self.buffer = b[l..];
self.remaining = remaining - l;

return buf;
}

if (remaining == 0) {
return null;
}

var buf = if (into.len > remaining) into[0..remaining] else into;
const n = try std.posix.read(self.socket, buf);
self.remaining = remaining - n;
return if (n == 0) null else buf[0..n];
}
};
};

// All the upfront memory allocation that we can do. Each worker keeps a pool
Expand Down Expand Up @@ -490,9 +556,12 @@ pub const State = struct {
// to a route.
params: Params,

// constant config, but it's the only field we need,
// constant config
max_body_size: usize,

// constant config
lazy_read_size: ?usize,

// For reading the body, we might need more than `buf`.
buffer_pool: *buffer.Pool,

Expand Down Expand Up @@ -526,6 +595,10 @@ pub const State = struct {
// know what it is from the content-length header
body_len: usize,

// True if we aren't reading the body. Happens when lazy_read_size is enabled
// and we get a large body. It'll be up to the app to read it!
lazy_body: bool,

middlewares: std.StringHashMap(*anyopaque),

const asUint = @import("url.zig").asUint;
Expand All @@ -541,7 +614,9 @@ pub const State = struct {
.method = null,
.method_string = "",
.protocol = null,
.lazy_body = false,
.buffer_pool = buffer_pool,
.lazy_read_size = config.lazy_read_size,
.max_body_size = config.max_body_size orelse 1_048_576,
.middlewares = std.StringHashMap(*anyopaque).init(arena),
.qs = try StringKeyValue.init(arena, config.max_query_count orelse 32),
Expand All @@ -566,6 +641,7 @@ pub const State = struct {
self.len = 0;
self.url = null;
self.method = null;
self.lazy_body = false;
self.method_string = null;
self.protocol = null;

Expand Down Expand Up @@ -877,12 +953,20 @@ pub const State = struct {
const buf = self.buf;

// how much (if any) of the body we've already read
const read = len - pos;
if (self.lazy_read_size) |lazy_read| {
if (cl >= lazy_read) {
self.pos = len;
self.lazy_body = true;
self.body = .{ .type = .static, .data = buf[pos..len] };
return true;
}
}

const read = len - pos;
if (read == cl) {
// we've read the entire body into buf, point to that.
self.body = .{ .type = .static, .data = buf[pos..len] };
self.pos = len;
self.body = .{ .type = .static, .data = buf[pos..len] };
return true;
}

Expand Down

0 comments on commit 14ca8a0

Please sign in to comment.