diff --git a/src/t.zig b/src/t.zig index 7b7bb9f..03eec35 100644 --- a/src/t.zig +++ b/src/t.zig @@ -98,6 +98,7 @@ pub const Context = struct { const conn = aa.create(Conn) catch unreachable; conn.* = .{ + ._mut = .{}, ._state = .request, .handover = .close, .stream = server, diff --git a/src/worker.zig b/src/worker.zig index 82be2ae..1a32dbf 100644 --- a/src/worker.zig +++ b/src/worker.zig @@ -513,10 +513,6 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { pub fn run(self: *Self, listener: posix.fd_t) void { var thread_pool = &self.thread_pool; - const active_list = &self.active_list; - const request_list = &self.request_list; - const keepalive_list = &self.keepalive_list; - const timeout_request = self.timeout_request; self.loop.start() catch |err| { log.err("Failed to start event loop: {}", .{err}); @@ -529,10 +525,16 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { }; var now = timestamp(); + var last_timeout = now; while (true) { - const had_timeouts, const timeout = self.prepareToWait(now); - if (had_timeouts and self.full) { - self.enableListener(listener); + var timeout: ?i32 = 1; + if (now - last_timeout > 1) { + // we don't all prepareToWait more than once per second + const had_timeouts, timeout = self.prepareToWait(now); + if (had_timeouts and self.full) { + self.enableListener(listener); + } + last_timeout = now; } var it = self.loop.wait(timeout) catch |err| { log.err("Failed to wait on events: {}", .{err}); @@ -551,8 +553,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { .signal => self.processSignal(&closed_conn), .recv => |conn| switch (conn.protocol) { .http => |http_conn| { - const state = http_conn.getState(); - switch (state) { + switch (http_conn.getState()) { .request, .keepalive => {}, .active, .handover => { // we need to finish whatever we're doing @@ -577,28 +578,11 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { }; if (done == false) { - // we need more data to fulfill the request - switch (state) { - .request => {}, - .keepalive => { - http_conn.timeout = now + timeout_request; - keepalive_list.remove(conn); - http_conn._state = .request; - request_list.insert(conn); - }, - .active, .handover => unreachable, // handled above - } + self.swapList(conn, .request); continue; } - switch (state) { - .keepalive => keepalive_list.remove(conn), - .request => request_list.remove(conn), - .active, .handover => unreachable, // handled above - } - - http_conn._state = .active; - active_list.insert(conn); + self.swapList(conn, .active); thread_pool.spawn(.{ self, now, conn }); }, @@ -619,6 +603,28 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { } } + fn swapList(self: *Self, conn: *Conn(WSH), new_state: HTTPConn.State) void { + const http_conn = conn.protocol.http; + http_conn._mut.lock(); + defer http_conn._mut.unlock(); + + switch (http_conn._state) { + .active => self.active_list.remove(conn), + .keepalive => self.keepalive_list.remove(conn), + .request => self.request_list.remove(conn), + .handover => self.handover_list.remove(conn), + } + + http_conn._state = new_state; + + switch (new_state) { + .active => self.active_list.insert(conn), + .keepalive => self.keepalive_list.insert(conn), + .request => self.request_list.insert(conn), + .handover => self.handover_list.insert(conn), + } + } + fn accept(self: *Self, listener: posix.fd_t, now: u32) !void { var len = self.len; const max_conn = self.max_conn; @@ -706,7 +712,6 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { while (c) |conn| { c = conn.next; const http_conn = conn.protocol.http; - std.debug.assert(http_conn.getState() == .handover); switch (http_conn.handover) { .close, .unknown => { closed_bool.* = true; @@ -763,18 +768,13 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { self.server.handleRequest(http_conn, thread_buf); http_conn.requestDone(self.retain_allocated_bytes); - std.debug.assert(http_conn.getState() == .active); - self.active_list.remove(conn); - switch (http_conn.handover) { .keepalive => { http_conn.timeout = now + self.timeout_keepalive; - http_conn.setState(.keepalive); - self.keepalive_list.insert(conn); + self.swapList(conn, .keepalive); }, .close, .unknown, .websocket, .disown => { - http_conn.setState(.handover); - self.handover_list.insert(conn); + self.swapList(conn, .handover); self.loop.signal() catch |err| log.err("failed to signal worker: {}", .{err}); } } @@ -844,7 +844,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { const next = @min(request_timeout orelse MAX_TIMEOUT, keepalive_timeout orelse MAX_TIMEOUT); if (next < now) { - // can happen if a socket was just about to timeout when enforceTimeout + // can happen if a socket was just about to timeout when prepareToWait // was called return .{closed, 1}; } @@ -863,13 +863,15 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type { while (conn) |c| { const timeout = c.protocol.http.timeout; if (timeout > now) { + list.head = c; return .{ timed_out, count, timeout }; } count += 1; conn = c.next; - list.remove(c); timed_out.insert(c); } + list.head = null; + list.tail = null; return .{ timed_out, count, null }; } @@ -938,6 +940,7 @@ pub fn List(comptime T: type) type { } else { self.tail = node.prev; } + node.prev = null; node.next = null; } @@ -1480,6 +1483,8 @@ pub const HTTPConn = struct { // can be concurrently accessed, use getState and setState _state: State, + _mut: Thread.Mutex, + handover: Handover, // unix timestamp (seconds) where this connection should timeout @@ -1538,6 +1543,7 @@ pub const HTTPConn = struct { return .{ .timeout = 0, .close = false, + ._mut = .{}, ._state = .request, .handover = .unknown, .stream = undefined, @@ -1556,10 +1562,6 @@ pub const HTTPConn = struct { return @atomicLoad(State, &self._state, .acquire); } - pub fn setState(self: *HTTPConn, state: State) void { - @atomicStore(State, &self._state, state, .release); - } - pub fn deinit(self: *HTTPConn, allocator: Allocator) void { self.req_state.deinit(); self.req_arena.deinit(); @@ -1790,6 +1792,12 @@ const TestNode = struct { id: i32, next: ?*TestNode = null, prev: ?*TestNode = null, + + fn alloc(id: i32) *TestNode { + const tn = t.allocator.create(TestNode) catch unreachable; + tn.* = .{.id = id}; + return tn; + } }; fn expectList(expected: []const i32, list: List(TestNode)) !void {