Skip to content

Commit

Permalink
General restructure
Browse files Browse the repository at this point in the history
* Move more specific functions to the only files they're used.
* Move the `serve*` functions outside of `Context`, making them
  free functions which just accept the context and work pool.
* Remove `acceptAndServeConnection`; originally this was required to
  be able to nicely structure the unit test, and used to be more
  integrated, however it no longer makes sense as a concept.
* Inline `handleRequest` into the basic backend.
* Make the `acceptHandled` function, moved into the basic backend,
  guarantee the specified `sync` behavior, and inline `have_accept4`.
* Appropriately re-export the relevant parts of the server API.
* Added top level doc comments.
  • Loading branch information
InKryption committed Feb 6, 2025
1 parent 7629280 commit 3e6c4fb
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 250 deletions.
175 changes: 160 additions & 15 deletions src/rpc/server/basic.zig
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const builtin = @import("builtin");
const std = @import("std");
const sig = @import("../../sig.zig");

Expand All @@ -6,14 +7,17 @@ const requests = server.requests;
const connection = server.connection;

pub const AcceptAndServeConnectionError =
connection.AcceptHandledError ||
connection.SetSocketSyncError ||
std.mem.Allocator.Error ||
AcceptHandledError ||
SetSocketSyncError ||
std.http.Server.ReceiveHeadError ||
requests.HandleRequestError;
std.http.Server.Response.WriteError ||
std.mem.Allocator.Error ||
std.fs.File.GetSeekPosError ||
std.fs.File.OpenError ||
std.fs.File.ReadError;

pub fn acceptAndServeConnection(server_ctx: *server.Context) !void {
const conn = connection.acceptHandled(
const conn = acceptHandled(
server_ctx.tcp,
.blocking,
) catch |err| switch (err) {
Expand All @@ -22,11 +26,6 @@ pub fn acceptAndServeConnection(server_ctx: *server.Context) !void {
};
defer conn.stream.close();

if (!connection.HAVE_ACCEPT4) {
// make sure the accepted socket is in blocking mode
try connection.setSocketSync(conn.stream.handle, .blocking);
}

server_ctx.wait_group.start();
defer server_ctx.wait_group.finish();

Expand All @@ -36,10 +35,156 @@ pub fn acceptAndServeConnection(server_ctx: *server.Context) !void {
var http_server = std.http.Server.init(conn, buffer);
var request = try http_server.receiveHead();

try requests.handleRequest(
server_ctx.logger,
&request,
server_ctx.snapshot_dir,
server_ctx.latest_snapshot_gen_info,
const conn_address = request.server.connection.address;
server_ctx.logger.info().logf("Responding to request from {}: {} {s}", .{
conn_address, requests.methodFmt(request.head.method), request.head.target,
});

switch (request.head.method) {
.HEAD, .GET => switch (requests.getRequestTargetResolve(
server_ctx.logger,
request.head.target,
server_ctx.latest_snapshot_gen_info,
)) {
inline .full_snapshot, .inc_snapshot => |pair| {
const snap_info, var full_info_lg = pair;
defer full_info_lg.unlock();

const archive_name_bounded = snap_info.snapshotArchiveName();
const archive_name = archive_name_bounded.constSlice();

const archive_file = try server_ctx.snapshot_dir.openFile(archive_name, .{});
defer archive_file.close();

const archive_len = try archive_file.getEndPos();

var send_buffer: [4096]u8 = undefined;
var response = request.respondStreaming(.{
.send_buffer = &send_buffer,
.content_length = archive_len,
.respond_options = .{},
});

if (!response.elide_body) {
// use a length which is still a multiple of 2, greater than the send_buffer length,
// in order to almost always force the http server method to flush, instead of
// pointlessly copying data into the send buffer.
const read_buffer_len = comptime std.mem.alignForward(
usize,
send_buffer.len + 1,
2,
);
var read_buffer: [read_buffer_len]u8 = undefined;

while (true) {
const file_data_len = try archive_file.read(&read_buffer);
if (file_data_len == 0) break;
const file_data = read_buffer[0..file_data_len];
try response.writeAll(file_data);
}
} else {
std.debug.assert(response.transfer_encoding.content_length == archive_len);
// NOTE: in order to avoid needing to actually spend time writing the response body,
// just trick the API into thinking we already wrote the entire thing by setting this
// to 0.
response.transfer_encoding.content_length = 0;
}

try response.end();
return;
},
.unrecognized => {},
},
.POST => {
server_ctx.logger.err().logf("{} tried to invoke our RPC", .{conn_address});
return try request.respond("RPCs are not yet implemented", .{
.status = .service_unavailable,
.keep_alive = false,
});
},
else => {},
}

server_ctx.logger.err().logf(
"{} made an unrecognized request '{} {s}'",
.{ conn_address, requests.methodFmt(request.head.method), request.head.target },
);
try request.respond("", .{
.status = .not_found,
.keep_alive = false,
});
}

const SyncKind = enum { blocking, nonblocking };

const AcceptHandledError =
error{
ConnectionAborted,
ProtocolFailure,
WouldBlock,
} || connection.HandleAcceptError ||
SetSocketSyncError;

fn acceptHandled(
tcp_server: std.net.Server,
sync: SyncKind,
) AcceptHandledError!std.net.Server.Connection {
var accept_flags: u32 = std.posix.SOCK.CLOEXEC;
accept_flags |= switch (sync) {
.blocking => 0,
.nonblocking => std.posix.SOCK.NONBLOCK,
};

// When this is false, it means we can't apply flags to
// the accepted socket, and we'll have to ensure that the
// relevant flags are enabled/disabled after acceptance.
const have_accept4 = comptime !builtin.target.isDarwin();

const conn: std.net.Server.Connection = while (true) {
var addr: std.net.Address = .{ .any = undefined };
var addr_len: std.posix.socklen_t = @sizeOf(@TypeOf(addr.any));
const rc = if (have_accept4)
std.posix.system.accept4(tcp_server.stream.handle, &addr.any, &addr_len, accept_flags)
else
std.posix.system.accept(tcp_server.stream.handle, &addr.any, &addr_len);

break switch (try connection.handleAcceptResult(std.posix.errno(rc))) {
.intr => continue,
.conn_aborted => return error.ConnectionAborted,
.proto_fail => return error.ProtocolFailure,
.again => return error.WouldBlock,
.success => .{
.stream = .{ .handle = rc },
.address = addr,
},
};
};

if (!have_accept4) {
try setSocketSync(conn.stream.handle, sync);
}

return conn;
}

const SetSocketSyncError = std.posix.FcntlError;

/// Ensure the socket is set to be blocking or nonblocking.
/// Useful in tandem with the situation described by `HAVE_ACCEPT4`.
fn setSocketSync(
socket: std.posix.socket_t,
sync: SyncKind,
) SetSocketSyncError!void {
const FlagsInt = @typeInfo(std.posix.O).Struct.backing_integer.?;
var flags_int: FlagsInt = @intCast(try std.posix.fcntl(socket, std.posix.F.GETFL, 0));
const flags = std.mem.bytesAsValue(std.posix.O, std.mem.asBytes(&flags_int));

const nonblock_wanted = switch (sync) {
.blocking => false,
.nonblocking => true,
};
if (flags.NONBLOCK != nonblock_wanted) {
flags.NONBLOCK = nonblock_wanted;
_ = try std.posix.fcntl(socket, std.posix.F.SETFL, flags_int);
}
}
85 changes: 0 additions & 85 deletions src/rpc/server/connection.zig
Original file line number Diff line number Diff line change
@@ -1,91 +1,6 @@
const builtin = @import("builtin");
const std = @import("std");

/// When this is false, it means `accept[Handled]` can't apply
/// flags to the accepted socket, and the caller will have to
/// to ensure relevant flags are enabled/disabled after acceptance.
pub const HAVE_ACCEPT4 = !builtin.target.isDarwin();

pub const GetSockNameError = std.posix.GetSockNameError;

pub fn getSockName(
socket_handle: std.posix.socket_t,
) GetSockNameError!std.net.Address {
var addr: std.net.Address = .{ .any = undefined };
var addr_len: std.posix.socklen_t = @sizeOf(@TypeOf(addr.any));
try std.posix.getsockname(socket_handle, &addr.any, &addr_len);
return addr;
}

pub const AcceptHandledError = HandleAcceptError || error{
ConnectionAborted,
ProtocolFailure,
WouldBlock,
};

pub fn acceptHandled(
tcp_server: std.net.Server,
/// NOTE: this is *only* a hint, and may not apply on all platforms.
/// See `have_accept4`.
sync_hint: enum { blocking, nonblocking },
) AcceptHandledError!std.net.Server.Connection {
var accept_flags: u32 = std.posix.SOCK.CLOEXEC;
accept_flags |= switch (sync_hint) {
.blocking => 0,
.nonblocking => std.posix.SOCK.NONBLOCK,
};

while (true) {
var addr: std.net.Address = .{ .any = undefined };
var addr_len: std.posix.socklen_t = @sizeOf(@TypeOf(addr.any));
const rc = if (HAVE_ACCEPT4)
std.posix.system.accept4(
tcp_server.stream.handle,
&addr.any,
&addr_len,
accept_flags,
)
else
std.posix.system.accept(
tcp_server.stream.handle,
&addr.any,
&addr_len,
);

return switch (try handleAcceptResult(std.posix.errno(rc))) {
.intr => continue,
.conn_aborted => error.ConnectionAborted,
.proto_fail => error.ProtocolFailure,
.again => error.WouldBlock,
.success => .{
.stream = .{ .handle = rc },
.address = addr,
},
};
}
}

pub const SetSocketSyncError = std.posix.FcntlError;

/// Ensure the socket is set to be blocking or nonblocking.
pub fn setSocketSync(
socket: std.posix.socket_t,
sync: enum { blocking, nonblocking },
) SetSocketSyncError!void {
const FlagsInt = @typeInfo(std.posix.O).Struct.backing_integer.?;
var flags_int: FlagsInt = @intCast(try std.posix.fcntl(socket, std.posix.F.GETFL, 0));
const flags = std.mem.bytesAsValue(std.posix.O, std.mem.asBytes(&flags_int));

const nonblock_wanted = switch (sync) {
.blocking => false,
.nonblocking => true,
};
if (flags.NONBLOCK != nonblock_wanted) {
flags.NONBLOCK = nonblock_wanted;
_ = try std.posix.fcntl(socket, std.posix.F.SETFL, flags_int);
}
}

pub const HandleAcceptError = error{
ProcessFdQuotaExceeded,
SystemFdQuotaExceeded,
Expand Down
16 changes: 16 additions & 0 deletions src/rpc/server/lib.zig
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
//! RPC Server API.
//!
//! The server can be run by calling `serveSpawn`, or `serve`; in
//! order to do this, the caller must first initialize a `Context`
//! to provide the basic state and dependencies required to operate
//! the server, and must also provide a `WorkPool`, initialized to
//! a given backend.

const server = @import("server.zig");

comptime {
_ = server;
}

pub const MIN_READ_BUFFER_SIZE = server.MIN_READ_BUFFER_SIZE;

pub const serveSpawn = server.serveSpawn;
pub const serve = server.serve;

pub const Context = server.Context;
pub const WorkPool = server.WorkPool;

// backends
pub const basic = server.basic;
pub const LinuxIoUring = server.LinuxIoUring;
13 changes: 11 additions & 2 deletions src/rpc/server/linux_io_uring.zig
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const builtin = @import("builtin");
const std = @import("std");
const sig = @import("../../sig.zig");

const server = @import("server.zig");

const requests = server.requests;
const connection = server.connection;

Expand Down Expand Up @@ -149,7 +149,7 @@ fn consumeOurCqe(
const addr_err_logger = server_ctx.logger.err().field(
"address",
// if we fail to getSockName, just print the error in place of the address;
connection.getSockName(entry_data.stream.handle),
getSocketName(entry_data.stream.handle),
);
errdefer addr_err_logger.log("Dropping connection");

Expand Down Expand Up @@ -783,6 +783,15 @@ const EntryState = union(enum) {
};
};

fn getSocketName(
socket_handle: std.posix.socket_t,
) std.posix.GetSockNameError!std.net.Address {
var addr: std.net.Address = .{ .any = undefined };
var addr_len: std.posix.socklen_t = @sizeOf(@TypeOf(addr.any));
try std.posix.getsockname(socket_handle, &addr.any, &addr_len);
return addr;
}

const GetSqeRetryError = IouEnterError;

/// Try to `get_sqe`; if the submission queue is too full for that, call `submit()`,
Expand Down
Loading

0 comments on commit 3e6c4fb

Please sign in to comment.