diff --git a/src/config.zig b/src/config.zig index 831953a..31ede76 100644 --- a/src/config.zig +++ b/src/config.zig @@ -29,6 +29,7 @@ pub const Config = struct { }; pub const Request = struct { + lazy_read_size: ?usize = null, max_body_size: ?usize = null, buffer_size: ?usize = null, max_header_count: ?usize = null, diff --git a/src/httpz.zig b/src/httpz.zig index 53c467d..f74f46a 100644 --- a/src/httpz.zig +++ b/src/httpz.zig @@ -739,7 +739,13 @@ test "tests:beforeAll" { const ga = global_test_allocator.allocator(); { - default_server = try Server(void).init(ga, .{ .port = 5992 }, {}); + default_server = try Server(void).init(ga, .{ + .port = 5992, + .request = .{ + .lazy_read_size = 4_096, + .max_body_size = 1_048_576, + } + }, {}); // only need to do this because we're using listenInNewThread instead // of blocking here. So the array to hold the middleware needs to outlive @@ -766,6 +772,7 @@ test "tests:beforeAll" { router.method("PING", "/test/method", TestDummyHandler.method, .{}); router.get("/test/query", TestDummyHandler.reqQuery, .{}); router.get("/test/stream", TestDummyHandler.eventStream, .{}); + router.get("/test/req_stream", TestDummyHandler.reqStream, .{}); router.get("/test/chunked", TestDummyHandler.chunked, .{}); router.get("/test/route_data", TestDummyHandler.routeData, .{ .data = &TestDummyHandler.RouteData{ .power = 12345 } }); router.all("/test/cors", TestDummyHandler.jsonRes, .{ .middlewares = cors }); @@ -1299,6 +1306,54 @@ test "httpz: custom handle" { try t.expectString("HTTP/1.1 200 \r\nContent-Length: 9\r\n\r\nhello teg", testReadAll(stream, &buf)); } +test "httpz: request body streaming" { + { + // no body + const stream = testStream(5992); + defer stream.close(); + try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 0\r\n\r\n"); + + var res = testReadParsed(stream); + defer res.deinit(); + try res.expectJson(.{ .length = 0 }); + } + + { + // small body + const stream = testStream(5992); + defer stream.close(); + try stream.writeAll("GET /test/req_stream HTTP/1.1\r\nContent-Length: 4\r\n\r\n123z"); + + var res = testReadParsed(stream); + defer res.deinit(); + try res.expectJson(.{ .length = 4 }); + } + + var r = t.getRandom(); + const random = r.random(); + + // a bit of fuzzing + for (0..10) |_| { + const stream = testStream(5992); + defer stream.close(); + var req: []const u8 = "GET /test/req_stream HTTP/1.1\r\nContent-Length: 20000\r\n\r\n" ++ ("a" ** 20_000); + while (req.len > 0) { + const len = random.uintAtMost(usize, req.len - 1) + 1; + const n = stream.write(req[0..len]) catch |err| switch (err) { + error.WouldBlock => 0, + else => return err, + }; + std.time.sleep(std.time.ns_per_ms * 2); + req = req[n..]; + } + + var res = testReadParsed(stream); + defer res.deinit(); + try res.expectJson(.{ .length = 20_000 }); + } + +} + test "websocket: invalid request" { const stream = testStream(5998); defer stream.close(); @@ -1494,6 +1549,18 @@ const TestDummyHandler = struct { try res.startEventStream(StreamContext{ .data = "hello" }, StreamContext.handle); } + fn reqStream(req: *Request, res: *Response) !void { + var stream = try req.streamBody(); + defer stream.deinit(); + + var l: usize = 0; + var buf: [1024]u8 = undefined; + while (try stream.read(&buf)) |data| { + l += data.len; + } + return res.json(.{.length = l}, .{}); + } + const StreamContext = struct { data: []const u8, diff --git a/src/request.zig b/src/request.zig index 802e861..ab685ff 100644 --- a/src/request.zig +++ b/src/request.zig @@ -22,6 +22,9 @@ pub const Request = struct { // The URL of the request url: Url, + // httpz's wrapper around a stream, the brave can access the underlying .stream + conn: *HTTPConn, + // the address of the client address: Address, @@ -45,6 +48,11 @@ pub const Request = struct { body_buffer: ?buffer.Buffer = null, body_len: usize = 0, + // True if we haven't read the [full] body yet. This can only happen when + // lazy_read_size is configured and the request is larger that this value. + // There can still be _part_ of the body in body_buffer. + lazy_body: bool, + // cannot use an optional on qs, because it's pre-allocated so always exists qs_read: bool = false, @@ -81,11 +89,13 @@ pub const Request = struct { pub fn init(arena: Allocator, conn: *HTTPConn) Request { const state = &conn.req_state; return .{ + .conn = conn, .arena = arena, .qs = &state.qs, .fd = &state.fd, .mfd = &state.mfd, .method = state.method.?, + .lazy_body = state.lazy_body, .method_string = state.method_string orelse "", .protocol = state.protocol.?, .url = Url.parse(state.url.?), @@ -165,6 +175,26 @@ pub const Request = struct { return self.parseMultiFormData(); } + pub fn streamBody(self: *Request) !Stream { + var buf: []const u8 = &.{}; + if (self.body_buffer) |bb| { + std.debug.assert(bb.type == .static); + buf = bb.data; + } + + const conn = self.conn; + if (self.lazy_body == true) { + try conn.blockingMode(); + } + + return .{ + .req = self, + .buffer = buf, + .remaining = self.body_len, + .socket = conn.stream.handle, + }; + } + // OK, this is a bit complicated. // We might need to allocate memory to parse the querystring. Specifically, if // there's a url-escaped component (a key or value), we need memory to store @@ -462,6 +492,42 @@ pub const Request = struct { .filename = filename, }; } + + pub const Stream = struct { + req: *Request, + remaining: usize, + buffer: []const u8, + socket: std.posix.socket_t, + + pub fn deinit(self: *Stream) void { + self.req.conn.nonblockingMode() catch {}; + } + + pub fn read(self: *Stream, into: []u8) !?[]u8 { + const b = self.buffer; + const remaining = self.remaining; + if (b.len != 0) { + const l = @min(b.len, into.len); + + const buf = into[0..l]; + @memcpy(buf, b[0..l]); + + self.buffer = b[l..]; + self.remaining = remaining - l; + + return buf; + } + + if (remaining == 0) { + return null; + } + + var buf = if (into.len > remaining) into[0..remaining] else into; + const n = try std.posix.read(self.socket, buf); + self.remaining = remaining - n; + return if (n == 0) null else buf[0..n]; + } + }; }; // All the upfront memory allocation that we can do. Each worker keeps a pool @@ -490,9 +556,12 @@ pub const State = struct { // to a route. params: Params, - // constant config, but it's the only field we need, + // constant config max_body_size: usize, + // constant config + lazy_read_size: ?usize, + // For reading the body, we might need more than `buf`. buffer_pool: *buffer.Pool, @@ -526,6 +595,10 @@ pub const State = struct { // know what it is from the content-length header body_len: usize, + // True if we aren't reading the body. Happens when lazy_read_size is enabled + // and we get a large body. It'll be up to the app to read it! + lazy_body: bool, + middlewares: std.StringHashMap(*anyopaque), const asUint = @import("url.zig").asUint; @@ -541,7 +614,9 @@ pub const State = struct { .method = null, .method_string = "", .protocol = null, + .lazy_body = false, .buffer_pool = buffer_pool, + .lazy_read_size = config.lazy_read_size, .max_body_size = config.max_body_size orelse 1_048_576, .middlewares = std.StringHashMap(*anyopaque).init(arena), .qs = try StringKeyValue.init(arena, config.max_query_count orelse 32), @@ -566,6 +641,7 @@ pub const State = struct { self.len = 0; self.url = null; self.method = null; + self.lazy_body = false; self.method_string = null; self.protocol = null; @@ -877,12 +953,20 @@ pub const State = struct { const buf = self.buf; // how much (if any) of the body we've already read - const read = len - pos; + if (self.lazy_read_size) |lazy_read| { + if (cl >= lazy_read) { + self.pos = len; + self.lazy_body = true; + self.body = .{ .type = .static, .data = buf[pos..len] }; + return true; + } + } + const read = len - pos; if (read == cl) { // we've read the entire body into buf, point to that. - self.body = .{ .type = .static, .data = buf[pos..len] }; self.pos = len; + self.body = .{ .type = .static, .data = buf[pos..len] }; return true; }