From 3b77c6246b8463dafc1d34f0615784777fa9db26 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Sat, 19 Oct 2024 22:02:09 +0800 Subject: [PATCH] Move away from oneshot (epoll) / dispatch (kqueue) Result in way less system calls, significant performance boost, but more complicated thread-safety code. --- src/httpz.zig | 9 +- src/request.zig | 2 +- src/response.zig | 1 - src/t.zig | 2 +- src/worker.zig | 377 +++++++++++++++++++++++++---------------------- test_runner.zig | 28 ++-- 6 files changed, 223 insertions(+), 196 deletions(-) diff --git a/src/httpz.zig b/src/httpz.zig index b77c6e3..1ecf77d 100644 --- a/src/httpz.zig +++ b/src/httpz.zig @@ -512,6 +512,8 @@ pub fn Server(comptime H: type) type { var req = Request.init(allocator, conn); var res = Response.init(allocator, conn); + defer std.debug.assert(res.written == true); + if (comptime std.meta.hasFn(Handler, "handle")) { if (comptime @typeInfo(@TypeOf(Handler.handle)).@"fn".return_type != void) { @compileError(@typeName(Handler) ++ ".handle must return 'void'"); @@ -668,13 +670,6 @@ pub fn upgradeWebsocket(comptime H: type, req: *Request, res: *Response, ctx: an return true; } -// fn websocketHandler(comptime H: type, server: *websocket.Server, stream: std.net.Stream, context: anytype) void { -// errdefer stream.close(); -// var conn = server.newConn(stream); -// var handler = H.init(&conn, context) catch return; -// server.handle(H, &handler, &conn); -// } - // std.heap.StackFallbackAllocator is very specific. It's really _stack_ as it // requires a comptime size. Also, it uses non-public calls from the FixedBufferAllocator. // There should be a more generic FallbackAllocator that just takes 2 allocators... diff --git a/src/request.zig b/src/request.zig index 83e6b91..802e861 100644 --- a/src/request.zig +++ b/src/request.zig @@ -1522,7 +1522,7 @@ test "request: fuzz" { const number_of_requests = random.uintAtMost(u8, 10) + 1; for (0..number_of_requests) |_| { - defer ctx.conn.keepalive(4096); + defer ctx.conn.requestDone(4096); const method = randomMethod(random); const url = t.randomString(random, aa, 20); diff --git a/src/response.zig b/src/response.zig index b0a5214..110778d 100644 --- a/src/response.zig +++ b/src/response.zig @@ -181,7 +181,6 @@ pub const Response = struct { .{ .len = header_buf.len, .base = header_buf.ptr }, .{ .len = body.len, .base = body.ptr }, }; - try writeAllIOVec(stream.handle, &vec); } diff --git a/src/t.zig b/src/t.zig index 60d7644..81d6500 100644 --- a/src/t.zig +++ b/src/t.zig @@ -98,7 +98,7 @@ pub const Context = struct { const conn = aa.create(Conn) catch unreachable; conn.* = .{ - .state = .request, + ._state = .request, .handover = .close, .stream = server, .address = std.net.Address.initIp4([_]u8{ 127, 0, 0, 200 }, 0), diff --git a/src/worker.zig b/src/worker.zig index 2ebd21c..6f91ee7 100644 --- a/src/worker.zig +++ b/src/worker.zig @@ -212,10 +212,10 @@ pub fn Blocking(comptime S: type, comptime WSH: type) type { var is_keepalive = false; while (true) { + defer conn.requestDone(self.retain_allocated_bytes_keepalive); switch (self.handleRequest(conn, is_keepalive, thread_buf) catch .close) { .keepalive => { is_keepalive = true; - conn.keepalive(self.retain_allocated_bytes_keepalive); }, .close, .unknown => { posix.close(socket); @@ -236,7 +236,6 @@ pub fn Blocking(comptime S: type, comptime WSH: type) type { self.http_conn_pool.release(conn); return; }, - .need_data => unreachable, // for nonblocking only } } } @@ -276,8 +275,9 @@ pub fn Blocking(comptime S: type, comptime WSH: type) type { return .disown; }, else => { - requestParseError(conn, err) catch {}; - return .close; + requestError(conn, err) catch {}; + posix.close(stream.handle); + return .disown; }, }; @@ -362,8 +362,8 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { websocket: *ws.Worker(WSH), - // how many bytes should we retain in our arena allocator between keepalive usage - retain_allocated_bytes_keepalive: usize, + // how many bytes should we retain in a connection's arena allocator + retain_allocated_bytes: usize, // The thread pool that'll actually handle any incoming data. This is what // will call server.handleRequest which will eventually call the application @@ -380,7 +380,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { // and which we are now waiting for another request. // This is only ever manipulated directly from the worker thread // so doesn't need thread safety. - keepalive_list: List(Conn(WSH)), + keepalive_list: ConcurrentList(Conn(WSH)), // Requests currently being processed. // The worker thread moves the connnection here, but the thread pool @@ -442,9 +442,6 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { websocket.* = try ws.Worker(WSH).init(allocator, &server._websocket_state); errdefer websocket.deinit(); - const retain_allocated_bytes = config.workers.retain_allocated_bytes orelse 4096; - const retain_allocated_bytes_keepalive = @max(retain_allocated_bytes, 8192); - var buffer_pool = try initializeBufferPool(allocator, config); errdefer buffer_pool.deinit(); @@ -484,7 +481,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { .max_conn = config.workers.max_conn orelse 8_192, .timeout_request = config.timeout.request orelse MAX_TIMEOUT, .timeout_keepalive = config.timeout.keepalive orelse MAX_TIMEOUT, - .retain_allocated_bytes_keepalive = retain_allocated_bytes_keepalive, + .retain_allocated_bytes = config.workers.retain_allocated_bytes orelse 8192, }; } @@ -497,9 +494,9 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { self.thread_pool.deinit(); self.shutdownList(&self.request_list); - self.shutdownList(&self.keepalive_list); - self.shutdownConcurrentList(&self.handover_list); self.shutdownConcurrentList(&self.active_list); + self.shutdownConcurrentList(&self.handover_list); + self.shutdownConcurrentList(&self.keepalive_list); self.buffer_pool.deinit(); self.conn_mem_pool.deinit(); @@ -516,6 +513,10 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { pub fn run(self: *Self, listener: posix.fd_t) void { var thread_pool = &self.thread_pool; + const active_list = &self.active_list; + const request_list = &self.request_list; + const keepalive_list = &self.keepalive_list; + const timeout_request = self.timeout_request; self.loop.start() catch |err| { log.err("Failed to start event loop: {}", .{err}); @@ -529,7 +530,10 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { var now = timestamp(); while (true) { - const timeout = self.prepareToWait(now); + const had_timeouts, const timeout = self.prepareToWait(now); + if (had_timeouts and self.full) { + self.enableListener(listener); + } var it = self.loop.wait(timeout) catch |err| { log.err("Failed to wait on events: {}", .{err}); std.time.sleep(std.time.ns_per_s); @@ -544,31 +548,70 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { log.err("Failed to accept connection: {}", .{err}); std.time.sleep(std.time.ns_per_ms * 5); }, - .signal => self.processSignal(now, &closed_conn), - .recv => |conn| { - switch (conn.protocol) { - .http => |http_conn| { - // Switch the connection to "active" before we pass is to our thread pool. - // This is super important as it ensures that the worker won't touch the connection while - // the thread-pool has it. This is a huge part of making Conn/HTTPConn largely thread-safe - // with any locking. - switch (http_conn.state) { - .active, .handover => unreachable, - .request => self.request_list.remove(conn), - .keepalive => self.keepalive_list.remove(conn), + .signal => self.processSignal(&closed_conn), + .recv => |conn| switch (conn.protocol) { + .http => |http_conn| { + const state = http_conn.getState(); + switch (state) { + .request, .keepalive => {}, + .active, .handover => { + // we need to finish whatever we're doing + // before we can process more data (i.e. if + // the connection is being upgrade to websocket, + // this will be a websocket message.) + continue; + }, + } + + // At this point, the connection is either in + // keepalive or request. Either way, we know no + // other thread is access the connection, so we + // can access _state directly. + + const stream = http_conn.stream; + const done = http_conn.req_state.parse(http_conn.req_arena.allocator(), stream) catch |err| { + requestError(http_conn, err) catch {}; + http_conn.requestDone(self.retain_allocated_bytes); + self.closeConn(conn); + continue; + }; + + if (done == false) { + // we need more data to fulfill the request + switch (state) { + .request => {}, + .keepalive => { + http_conn.timeout = now + timeout_request; + keepalive_list.remove(conn); + http_conn._state = .request; + request_list.insert(conn); + }, + .active, .handover => unreachable, // handled above } - http_conn.state = .active; - self.active_list.insert(conn); - }, - .websocket => {}, - } - thread_pool.spawn(.{ self, conn }); + continue; + } + + switch (state) { + .keepalive => keepalive_list.remove(conn), + .request => request_list.remove(conn), + .active, .handover => unreachable, // handled above + } + + http_conn._state = .active; + active_list.insert(conn); + thread_pool.spawn(.{ self, now, conn }); + + }, + .websocket => thread_pool.spawn(.{ self, now, conn }), }, .close => |conn| { closed_conn = true; switch (conn.protocol) { - .http => self.closeConn(conn), - .websocket => thread_pool.spawn(.{ self, conn }), // TODO: could probably optimize this + .http => |http_conn| { + http_conn.requestDone(self.retain_allocated_bytes); + self.closeConn(conn); + }, + .websocket => thread_pool.spawn(.{ self, now, conn }), // TODO: could probably optimize this } }, .shutdown => return, @@ -581,11 +624,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { } if (self.full and closed_conn) { - self.full = false; - self.loop.monitorAccept(listener) catch |err| { - log.err("Failed to enable monitor to listening socket: {}", .{err}); - return; - }; + self.enableListener(listener); } } } @@ -611,7 +650,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { var address_len: posix.socklen_t = @sizeOf(net.Address); const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.CLOEXEC) catch |err| { - // On BSD, REUSEPORT_LB means taht only 1 worker should get notified + // On BSD, REUSEPORT_LB means that only 1 worker should get notified // of a connetion. On Linux, however, we only have REUSEPORT, which will // notify all workers. However, we monitor the listener using EPOLLEXCLUSIVE. // This makes it so that "one or more" workers receive it. @@ -640,7 +679,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { const http_conn = try self.http_conn_pool.acquire(); http_conn.request_count = 1; - http_conn.state = .request; + http_conn._state = .request; http_conn.address = address; http_conn.stream = .{ .handle = socket }; http_conn.timeout = now + self.timeout_request; @@ -654,12 +693,12 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { self.request_list.insert(conn); errdefer self.closeConn(conn); - try self.loop.monitorRead(conn, false); + try self.loop.monitorRead(conn); len += 1; } } - fn processSignal(self: *Self, now: u32, closed_bool: *bool) void { + fn processSignal(self: *Self, closed_bool: *bool) void { const loop = &self.loop; var hl = &self.handover_list; @@ -683,31 +722,12 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { while (c) |conn| { c = conn.next; const http_conn = conn.protocol.http; + std.debug.assert(http_conn.getState() == .handover); switch (http_conn.handover) { - .keepalive => { - std.debug.assert(http_conn.state == .handover); - http_conn.state = .keepalive; - self.keepalive_list.insert(conn); - http_conn.timeout = now + self.timeout_keepalive; - loop.monitorRead(conn, true) catch { - metrics.internalError(); - self.closeConn(conn); - }; - }, .close, .unknown => { closed_bool.* = true; self.closeConn(conn); }, - .need_data => { - std.debug.assert(http_conn.state == .handover); - http_conn.state = .request; - self.request_list.insert(conn); - http_conn.timeout = now + self.timeout_request; - loop.monitorRead(conn, true) catch { - metrics.internalError(); - self.closeConn(conn); - }; - }, .disown => { closed_bool.* = true; if (loop.remove(conn)) { @@ -717,26 +737,26 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { } }, .websocket => |ptr| { - closed_bool.* = true; if (comptime WSH == httpz.DummyWebsocketHandler) { std.debug.print("Your httpz handler must have a `WebsocketHandler` declaration. This must be the same type passed to `httpz.upgradeWebsocket`. Closing the connection.\n", .{}); + closed_bool.* = true; self.closeConn(conn); continue; } - std.debug.assert(http_conn.state == .handover); - self.handover_list.remove(conn); self.http_conn_pool.release(http_conn); const hc: *ws.HandlerConn(WSH) = @ptrCast(@alignCast(ptr)); conn.protocol = .{ .websocket = hc }; - loop.monitorRead(conn, true) catch { + loop.switchToOneShot(conn) catch { metrics.internalError(); + closed_bool.* = true; self.closeConn(conn); continue; }; }, + .keepalive => unreachable, } } } @@ -746,55 +766,33 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { // Currently, for an HTTP connection, this is only called when we have a // full HTTP request ready to process. For an WebSocket packet, it's called // when we have data available - there may or may not be a full message ready - pub fn processData(self: *Self, conn: *Conn(WSH), thread_buf: []u8) void { + pub fn processData(self: *Self, now: u32, conn: *Conn(WSH), thread_buf: []u8) void { switch (conn.protocol) { - .http => |http_conn| self.processHTTPData(conn, thread_buf, http_conn), + .http => |http_conn| self.processHTTPData(now, conn, thread_buf, http_conn), .websocket => |hc| self.processWebsocketData(conn, thread_buf, hc), } } - pub fn processHTTPData(self: *Self, conn: *Conn(WSH), thread_buf: []u8, http_conn: *HTTPConn) void { - const stream = http_conn.stream; - defer { - std.debug.assert(http_conn.state == .active); - - // active_list is thread-safe - self.active_list.remove(conn); - - // Ordering is important! Once we put this in handover_list, - // it can be accessed by the worker thread. You might think that - // can't happen since we don't call signal() until after, but - // any other threadpool thread could call signal at any time. - http_conn.state = .handover; - - // handover_list is thread-safe - self.handover_list.insert(conn); - - // signal the worker thread to check/process handover_list - self.loop.signal() catch |err| log.err("failed to signal worker: {}", .{err}); - } - - const done = http_conn.req_state.parse(http_conn.req_arena.allocator(), stream) catch |err| { - requestParseError(http_conn, err) catch {}; - http_conn.handover = .close; - return; - }; - - if (done == false) { - http_conn.handover = .need_data; - return; - } - + pub fn processHTTPData(self: *Self, now: u32, conn: *Conn(WSH), thread_buf: []u8, http_conn: *HTTPConn) void { metrics.request(); http_conn.request_count += 1; self.server.handleRequest(http_conn, thread_buf); + http_conn.requestDone(self.retain_allocated_bytes); + + std.debug.assert(http_conn.getState() == .active); + self.active_list.remove(conn); - if (http_conn.handover == .keepalive) { - // Do what we can in the threadpool to setup this connection for - // more work. Anything we _don't_ do here has to be done in the worker - // thread, which impacts all connections. But, we can only do stuff - // that's thread-safe here (i.e. we can't manipulate the ConnManager) - http_conn.keepalive(self.retain_allocated_bytes_keepalive); + switch (http_conn.handover) { + .keepalive => { + http_conn.timeout = now + self.timeout_keepalive; + http_conn.setState(.keepalive); + self.keepalive_list.insert(conn); + }, + .close, .unknown, .websocket, .disown => { + http_conn.setState(.handover); + self.handover_list.insert(conn); + self.loop.signal() catch |err| log.err("failed to signal worker: {}", .{err}); + } } } @@ -807,7 +805,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { } else if (ws_conn.isClosed()) { self.websocket.cleanupConn(hc); } else { - self.loop.monitorRead(conn, true) catch |err| { + self.loop.rearmRead(conn) catch |err| { log.debug("({}) failed to add read event monitor: {}", .{ ws_conn.address, err }); ws_conn.close(.{ .code = 4998, .reason = "wsz" }) catch {}; self.websocket.cleanupConn(hc); @@ -822,7 +820,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { fn disown(self: *Self, conn: *Conn(WSH)) void { const http_conn = conn.protocol.http; - switch (http_conn.state) { + switch (http_conn._state) { .request => self.request_list.remove(conn), .handover => self.handover_list.remove(conn), .keepalive => self.keepalive_list.remove(conn), @@ -834,35 +832,49 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { } // Enforces timeouts, and returns when the next timeout should be checked. - fn prepareToWait(self: *Self, now: u32) ?i32 { + fn prepareToWait(self: *Self, now: u32) struct {bool, ?i32} { const request_count, const request_timeout = self.enforceTimeout(&self.request_list, now); - const keepalive_count, const keepalive_timeout = self.enforceTimeout(&self.keepalive_list, now); + // I'm pretty sure this is safe, and pretty neat. The only concurrent + // action that can happen on the keepalive_list is appending a node + // (at the tail). That means we can snapshot the keepalive list under + // a short-lived lock, and then safely iterate the snapshot. + const keepalive_snapshot: List(Conn(WSH)) = blk: { + const list = &self.keepalive_list; + list.mut.lock(); + defer list.mut.unlock(); + break :blk .{.head = list.inner.head, .tail = list.inner.tail}; + }; + const keepalive_count, const keepalive_timeout = self.enforceTimeout(&keepalive_snapshot, now); + + var closed = false; if (request_count > 0) { + closed = true; metrics.timeoutRequest(request_count); } if (keepalive_count > 0) { + closed = true; metrics.timeoutKeepalive(keepalive_count); } if (request_timeout == null and keepalive_timeout == null) { - return null; + return .{closed, null}; } const next = @min(request_timeout orelse MAX_TIMEOUT, keepalive_timeout orelse MAX_TIMEOUT); if (next < now) { - // can happen if a socket was just about to time out when enforceTimeout + // can happen if a socket was just about to timeout when enforceTimeout // was called - return 1; + return .{closed, 1}; } - return @intCast(next - now); + return .{closed, @intCast(next - now)}; } // lists are ordered from soonest to timeout to last, as soon as we find // a connection that isn't timed out, we can break; // This returns the next timeout. - fn enforceTimeout(self: *Self, list: *List(Conn(WSH)), now: u32) struct { usize, ?u32 } { + fn enforceTimeout(self: *Self, list: *const List(Conn(WSH)), now: u32) struct { usize, ?u32 } { var conn = list.head; var count: usize = 0; while (conn) |c| { @@ -893,6 +905,12 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { defer list.mut.unlock(); self.shutdownList(&list.inner); } + + inline fn enableListener(self: *Self, listener: posix.fd_t) void { + self.full = false; + self.loop.monitorAccept(listener) catch |err| log.err("Failed to enable monitor to listening socket: {}", .{err}); + } + }; } @@ -1019,23 +1037,28 @@ fn KQueue(comptime WSH: type) type { try self.change(fd, 0, posix.system.EVFILT.READ, posix.system.EV.DISABLE, 0); } - fn monitorRead(self: *Self, conn: *Conn(WSH), comptime rearm: bool) !void { - if (rearm) { - // for websocket connections, this is called in a thread-pool thread - // so cannot be queued up - it needs to be immediately picked up - // since our worker could be in a wait() call. - const event = Kevent{ - .ident = @intCast(conn.getSocket()), - .filter = posix.system.EVFILT.READ, - .flags = posix.system.EV.ADD | posix.system.EV.ENABLE | posix.system.EV.DISPATCH, - .fflags = 0, - .data = 0, - .udata = @intFromPtr(conn), - }; - _ = try posix.kevent(self.fd, &.{event}, &.{}, null); - } else { - try self.change(conn.getSocket(), @intFromPtr(conn), posix.system.EVFILT.READ, posix.system.EV.ADD | posix.system.EV.ENABLE | posix.system.EV.DISPATCH, 0); - } + fn monitorRead(self: *Self, conn: *Conn(WSH)) !void { + try self.change(conn.getSocket(), @intFromPtr(conn), posix.system.EVFILT.READ, posix.system.EV.ADD | posix.system.EV.ENABLE, 0); + } + + fn rearmRead(self: *Self, conn: *Conn(WSH)) !void{ + // called from the worker thread, can't use change_buffer + _ = try posix.kevent(self.fd, &.{.{ + .ident = @intCast(conn.getSocket()), + .filter = posix.system.EVFILT.READ, + .flags = posix.system.EV.ENABLE, + .fflags = 0, + .data = 0, + .udata = @intFromPtr(conn), + }}, &.{}, null); + } + + fn switchToOneShot(self: *Self, conn: *Conn(WSH)) !void { + // From the Kqueue docs, you'd think you can just re-add the socket with EV.DISPATCH to enable + // dispatching. But that _does not_ work on MacOS. Removing and Re-adding does. + const socket = conn.getSocket(); + try self.change(socket, 0, posix.system.EVFILT.READ, posix.system.EV.DELETE, 0); + try self.change(socket, @intFromPtr(conn), posix.system.EVFILT.READ, posix.system.EV.ADD | posix.system.EV.DISPATCH, 0); } fn remove(self: *Self, conn: *Conn(WSH)) !void { @@ -1183,13 +1206,24 @@ fn EPoll(comptime WSH: type) type { return std.posix.epoll_ctl(self.fd, linux.EPOLL.CTL_DEL, fd, null); } - fn monitorRead(self: *Self, conn: *Conn(WSH), comptime rearm: bool) !void { - const op = if (rearm) linux.EPOLL.CTL_MOD else linux.EPOLL.CTL_ADD; + fn monitorRead(self: *Self, conn: *Conn(WSH)) !void { + var event = linux.epoll_event{ + .data = .{ .ptr = @intFromPtr(conn) }, + .events = linux.EPOLL.IN | linux.EPOLL.RDHUP, + }; + return posix.epoll_ctl(self.fd, linux.EPOLL.CTL_ADD, conn.getSocket(), &event); + } + + fn rearmRead(self: *Self, conn: *Conn(WSH)) !void{ var event = linux.epoll_event{ .data = .{ .ptr = @intFromPtr(conn) }, .events = linux.EPOLL.IN | linux.EPOLL.RDHUP | linux.EPOLL.ONESHOT, }; - return posix.epoll_ctl(self.fd, op, conn.getSocket(), &event); + return posix.epoll_ctl(self.fd, linux.EPOLL.CTL_MOD, conn.getSocket(), &event); + } + + fn switchToOneShot(self: *Self, conn: *Conn(WSH)) !void { + return self.rearmRead(conn); } fn remove(self: *Self, conn: *Conn(WSH)) !void { @@ -1375,7 +1409,6 @@ const HTTPConnPool = struct { return; } - conn.reset(self.retain_allocated_bytes); conns[available] = conn; self.available = available + 1; self.unlock(); @@ -1437,10 +1470,10 @@ pub fn Conn(comptime WSH: type) type { // in nonblocking mode. A pointer to the conn is the userdata passed to epoll/kqueue. // Should only be created through the worker's HTTPConnPool pub const HTTPConn = struct { - // A connection can be in one of four states: - // - active: There's a thread in the thread pool doing something with the connection. - // This is meant to be a "safe" place to be. No otherthread should touch - // the connection while its here, including the main worker thread. + // A connection can be in one of four states. This mostly corresponds to which + // of the worker's lists it is in. + // - active: There's a thread in the thread pool serving a request + // // - handover: The threadpool is done with the request and wants to transfer // control back to the worker. // @@ -1462,11 +1495,11 @@ pub const HTTPConn = struct { close, unknown, keepalive, - need_data, websocket: *anyopaque, }; - state: State, + // can be concurrently accessed, use getState and setState + _state: State, handover: Handover, @@ -1523,21 +1556,29 @@ pub const HTTPConn = struct { errdefer req_arena.deinit(); return .{ + .timeout = 0, .close = false, - .state = .request, + ._state = .request, .handover = .unknown, .stream = undefined, .address = undefined, + .request_count = 0, + .ws_worker = ws_worker, .req_state = req_state, .res_state = res_state, .req_arena = req_arena, .conn_arena = conn_arena, - .timeout = 0, - .request_count = 0, - .ws_worker = ws_worker, }; } + pub fn getState(self: *const HTTPConn) State { + return @atomicLoad(State, &self._state, .acquire); + } + + pub fn setState(self: *HTTPConn, state: State) void { + @atomicStore(State, &self._state, state, .release); + } + pub fn deinit(self: *HTTPConn, allocator: Allocator) void { self.req_state.deinit(); self.req_arena.deinit(); @@ -1545,22 +1586,20 @@ pub const HTTPConn = struct { allocator.destroy(self.conn_arena); } - pub fn keepalive(self: *HTTPConn, retain_allocated_bytes: usize) void { + pub fn requestDone(self: *HTTPConn, retain_allocated_bytes: usize) void { self.req_state.reset(); self.res_state.reset(); _ = self.req_arena.reset(.{ .retain_with_limit = retain_allocated_bytes }); } // getting put back into the pool - pub fn reset(self: *HTTPConn, retain_allocated_bytes: usize) void { + pub fn reset(self: *HTTPConn) void { self.close = false; + self._state = .request; self.handover = .unknown; self.stream = undefined; self.address = undefined; self.request_count = 0; - self.req_state.reset(); - self.res_state.reset(); - _ = self.req_arena.reset(.{ .retain_with_limit = retain_allocated_bytes }); } }; @@ -1595,43 +1634,35 @@ fn initializeBufferPool(allocator: Allocator, config: *const Config) !*BufferPoo // the response. If it returns false, the worker probably wants to close the connection. // This function ensures that both Blocking and NonBlocking workers handle these // errors with the same response -fn requestParseError(conn: *HTTPConn, err: anyerror) !void { +fn requestError(conn: *HTTPConn, err: anyerror) !void { + const handle = conn.stream.handle; switch (err) { error.HeaderTooBig => { metrics.invalidRequest(); - return writeError(conn, 431, "Request header is too big"); + return writeError(handle, 431, "Request header is too big"); }, error.UnknownMethod, error.InvalidRequestTarget, error.UnknownProtocol, error.UnsupportedProtocol, error.InvalidHeaderLine, error.InvalidContentLength => { metrics.invalidRequest(); - return writeError(conn, 400, "Invalid Request"); + return writeError(handle, 400, "Invalid Request"); }, error.BrokenPipe, error.ConnectionClosed, error.ConnectionResetByPeer => return, - else => return serverError(conn, "unknown read/parse error: {}", err), + else => { + log.err("server error: {}", .{err}); + metrics.internalError(); + return writeError(handle, 500, "Internal Server Error"); + }, } - log.err("unknown parse error: {}", .{err}); return err; } -fn serverError(conn: *HTTPConn, comptime log_fmt: []const u8, err: anyerror) !void { - log.err("server error: " ++ log_fmt, .{err}); - metrics.internalError(); - return writeError(conn, 500, "Internal Server Error"); -} - -fn writeError(conn: *HTTPConn, comptime status: u16, comptime msg: []const u8) !void { - const socket = conn.stream.handle; +fn writeError(socket: posix.fd_t, comptime status: u16, comptime msg: []const u8) !void { const response = std.fmt.comptimePrint("HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}", .{ status, msg.len, msg }); + // Zig doesn't have the BSD/Darwin values for this. + const DONTWAIT = if (posix.MSG != void) posix.MSG.DONTWAIT else 0x00080; var i: usize = 0; - - const timeout = std.mem.toBytes(std.posix.timeval{ .sec = 5, .usec = 0 }); - try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.SNDTIMEO, &timeout); while (i < response.len) { - const n = posix.write(socket, response[i..]) catch |err| switch (err) { - error.WouldBlock => return error.Timeout, - else => return err, - }; - + const n = try posix.sendto(socket, response[i..], DONTWAIT, null, 0); if (n == 0) { return error.Closed; } diff --git a/test_runner.zig b/test_runner.zig index 1f34102..4011a60 100644 --- a/test_runner.zig +++ b/test_runner.zig @@ -42,6 +42,7 @@ pub fn main() !void { for (builtin.test_functions) |t| { if (isSetup(t)) { + current_test = friendlyName(t.name); t.func() catch |err| { printer.status(.fail, "\nsetup \"{s}\" failed: {}\n", .{ t.name, err }); return err; @@ -64,18 +65,7 @@ pub fn main() !void { } } - const friendly_name = blk: { - const name = t.name; - var it = std.mem.splitScalar(u8, name, '.'); - while (it.next()) |value| { - if (std.mem.eql(u8, value, "test")) { - const rest = it.rest(); - break :blk if (rest.len > 0) rest else name; - } - } - break :blk name; - }; - + const friendly_name = friendlyName(t.name); current_test = friendly_name; std.testing.allocator_instance = .{}; const result = t.func(); @@ -122,6 +112,7 @@ pub fn main() !void { for (builtin.test_functions) |t| { if (isTeardown(t)) { + current_test = friendlyName(t.name); t.func() catch |err| { printer.status(.fail, "\nteardown \"{s}\" failed: {}\n", .{ t.name, err }); return err; @@ -144,6 +135,17 @@ pub fn main() !void { std.posix.exit(if (fail == 0) 0 else 1); } +fn friendlyName(name: []const u8) []const u8 { + var it = std.mem.splitScalar(u8, name, '.'); + while (it.next()) |value| { + if (std.mem.eql(u8, value, "test")) { + const rest = it.rest(); + return if (rest.len > 0) rest else name; + } + } + return name; +} + const Printer = struct { out: std.fs.File.Writer, @@ -294,7 +296,7 @@ pub fn panic(msg: []const u8, error_return_trace: ?*std.builtin.StackTrace, ret_ if (current_test) |ct| { std.debug.print("\x1b[31m{s}\npanic running \"{s}\"\n{s}\x1b[0m\n", .{ BORDER, ct, BORDER }); } - std.builtin.Panic.call(msg, error_return_trace, ret_addr); + std.debug.defaultPanic(msg, error_return_trace, ret_addr); } fn isUnnamed(t: std.builtin.TestFn) bool {