Skip to content

Commit

Permalink
fix thread issue moving conn from one list to another
Browse files Browse the repository at this point in the history
  • Loading branch information
karlseguin committed Oct 22, 2024
1 parent 302943e commit 776657e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 42 deletions.
1 change: 1 addition & 0 deletions src/t.zig
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub const Context = struct {

const conn = aa.create(Conn) catch unreachable;
conn.* = .{
._mut = .{},
._state = .request,
.handover = .close,
.stream = server,
Expand Down
92 changes: 50 additions & 42 deletions src/worker.zig
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,6 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {

pub fn run(self: *Self, listener: posix.fd_t) void {
var thread_pool = &self.thread_pool;
const active_list = &self.active_list;
const request_list = &self.request_list;
const keepalive_list = &self.keepalive_list;
const timeout_request = self.timeout_request;

self.loop.start() catch |err| {
log.err("Failed to start event loop: {}", .{err});
Expand All @@ -529,10 +525,16 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
};

var now = timestamp();
var last_timeout = now;
while (true) {
const had_timeouts, const timeout = self.prepareToWait(now);
if (had_timeouts and self.full) {
self.enableListener(listener);
var timeout: ?i32 = 1;
if (now - last_timeout > 1) {
// we don't all prepareToWait more than once per second
const had_timeouts, timeout = self.prepareToWait(now);
if (had_timeouts and self.full) {
self.enableListener(listener);
}
last_timeout = now;
}
var it = self.loop.wait(timeout) catch |err| {
log.err("Failed to wait on events: {}", .{err});
Expand All @@ -551,8 +553,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
.signal => self.processSignal(&closed_conn),
.recv => |conn| switch (conn.protocol) {
.http => |http_conn| {
const state = http_conn.getState();
switch (state) {
switch (http_conn.getState()) {
.request, .keepalive => {},
.active, .handover => {
// we need to finish whatever we're doing
Expand All @@ -577,28 +578,11 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
};

if (done == false) {
// we need more data to fulfill the request
switch (state) {
.request => {},
.keepalive => {
http_conn.timeout = now + timeout_request;
keepalive_list.remove(conn);
http_conn._state = .request;
request_list.insert(conn);
},
.active, .handover => unreachable, // handled above
}
self.swapList(conn, .request);
continue;
}

switch (state) {
.keepalive => keepalive_list.remove(conn),
.request => request_list.remove(conn),
.active, .handover => unreachable, // handled above
}

http_conn._state = .active;
active_list.insert(conn);
self.swapList(conn, .active);
thread_pool.spawn(.{ self, now, conn });

},
Expand All @@ -619,6 +603,28 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
}
}

fn swapList(self: *Self, conn: *Conn(WSH), new_state: HTTPConn.State) void {
const http_conn = conn.protocol.http;
http_conn._mut.lock();
defer http_conn._mut.unlock();

switch (http_conn._state) {
.active => self.active_list.remove(conn),
.keepalive => self.keepalive_list.remove(conn),
.request => self.request_list.remove(conn),
.handover => self.handover_list.remove(conn),
}

http_conn._state = new_state;

switch (new_state) {
.active => self.active_list.insert(conn),
.keepalive => self.keepalive_list.insert(conn),
.request => self.request_list.insert(conn),
.handover => self.handover_list.insert(conn),
}
}

fn accept(self: *Self, listener: posix.fd_t, now: u32) !void {
var len = self.len;
const max_conn = self.max_conn;
Expand Down Expand Up @@ -706,7 +712,6 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
while (c) |conn| {
c = conn.next;
const http_conn = conn.protocol.http;
std.debug.assert(http_conn.getState() == .handover);
switch (http_conn.handover) {
.close, .unknown => {
closed_bool.* = true;
Expand Down Expand Up @@ -763,18 +768,13 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
self.server.handleRequest(http_conn, thread_buf);
http_conn.requestDone(self.retain_allocated_bytes);

std.debug.assert(http_conn.getState() == .active);
self.active_list.remove(conn);

switch (http_conn.handover) {
.keepalive => {
http_conn.timeout = now + self.timeout_keepalive;
http_conn.setState(.keepalive);
self.keepalive_list.insert(conn);
self.swapList(conn, .keepalive);
},
.close, .unknown, .websocket, .disown => {
http_conn.setState(.handover);
self.handover_list.insert(conn);
self.swapList(conn, .handover);
self.loop.signal() catch |err| log.err("failed to signal worker: {}", .{err});
}
}
Expand Down Expand Up @@ -844,7 +844,7 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {

const next = @min(request_timeout orelse MAX_TIMEOUT, keepalive_timeout orelse MAX_TIMEOUT);
if (next < now) {
// can happen if a socket was just about to timeout when enforceTimeout
// can happen if a socket was just about to timeout when prepareToWait
// was called
return .{closed, 1};
}
Expand All @@ -863,13 +863,15 @@ pub fn NonBlocking(comptime S: type, comptime WSH: type) type {
while (conn) |c| {
const timeout = c.protocol.http.timeout;
if (timeout > now) {
list.head = c;
return .{ timed_out, count, timeout };
}
count += 1;
conn = c.next;
list.remove(c);
timed_out.insert(c);
}
list.head = null;
list.tail = null;
return .{ timed_out, count, null };
}

Expand Down Expand Up @@ -938,6 +940,7 @@ pub fn List(comptime T: type) type {
} else {
self.tail = node.prev;
}

node.prev = null;
node.next = null;
}
Expand Down Expand Up @@ -1480,6 +1483,8 @@ pub const HTTPConn = struct {
// can be concurrently accessed, use getState and setState
_state: State,

_mut: Thread.Mutex,

handover: Handover,

// unix timestamp (seconds) where this connection should timeout
Expand Down Expand Up @@ -1538,6 +1543,7 @@ pub const HTTPConn = struct {
return .{
.timeout = 0,
.close = false,
._mut = .{},
._state = .request,
.handover = .unknown,
.stream = undefined,
Expand All @@ -1556,10 +1562,6 @@ pub const HTTPConn = struct {
return @atomicLoad(State, &self._state, .acquire);
}

pub fn setState(self: *HTTPConn, state: State) void {
@atomicStore(State, &self._state, state, .release);
}

pub fn deinit(self: *HTTPConn, allocator: Allocator) void {
self.req_state.deinit();
self.req_arena.deinit();
Expand Down Expand Up @@ -1790,6 +1792,12 @@ const TestNode = struct {
id: i32,
next: ?*TestNode = null,
prev: ?*TestNode = null,

fn alloc(id: i32) *TestNode {
const tn = t.allocator.create(TestNode) catch unreachable;
tn.* = .{.id = id};
return tn;
}
};

fn expectList(expected: []const i32, list: List(TestNode)) !void {
Expand Down

0 comments on commit 776657e

Please sign in to comment.