Skip to content

Commit

Permalink
Implement bincode init method and update gossip serialise test
Browse files Browse the repository at this point in the history
  • Loading branch information
yewman committed Jul 15, 2024
1 parent b311478 commit 88c4fa2
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 22 deletions.
10 changes: 10 additions & 0 deletions src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ pub fn read(allocator: std.mem.Allocator, comptime U: type, reader: anytype, par
@field(data, field.name) = try bincode.read(allocator, field.type, reader, params);
}
}

const init_field = "!bincode-config:init";
if (@hasDecl(T, init_field)) {
const field_config = @field(T, init_field);
if (field_config.init_fn) |init| {
init(&data);
}
}

return data;
},
.Optional => |info| {
Expand Down Expand Up @@ -594,6 +603,7 @@ pub fn FieldConfig(comptime T: type) type {
skip: bool = false,
default_on_eof: bool = false,
default_fn: ?fn (alloc: std.mem.Allocator) T = null,
init_fn: ?fn (self: *T) void = null,
};
}

Expand Down
139 changes: 120 additions & 19 deletions src/gossip/data.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ pub const ContactInfo = struct {
extensions: ArrayList(Extension),
cache: [SOCKET_CACHE_SIZE]SocketAddr = socket_addrs_unspecified(),

pub const @"!bincode-config:init" = bincode.FieldConfig(ContactInfo){ .init_fn = ContactInfo.buildCache };
pub const @"!bincode-config:cache" = bincode.FieldConfig([SOCKET_CACHE_SIZE]SocketAddr){ .skip = true };
pub const @"!bincode-config:addrs" = ShortVecArrayListConfig(IpAddr);
pub const @"!bincode-config:sockets" = ShortVecArrayListConfig(SocketEntry);
Expand All @@ -1226,6 +1227,17 @@ pub const ContactInfo = struct {

const Self = @This();

pub fn buildCache(self: *Self) void {
var port: u16 = 0;
for (self.sockets.items) |socket_entry| {
port += socket_entry.offset;
const addr = self.addrs.items[socket_entry.index];
const socket = SocketAddr.initIpv4(addr.asV4(), port);
socket.sanitize() catch continue;
self.cache[@intFromEnum(socket_entry.key)] = socket;
}
}

pub fn toNodeInstance(self: *Self, rand: std.Random) NodeInstance {
return NodeInstance.init(rand, self.pubkey, @intCast(std.time.milliTimestamp()));
}
Expand Down Expand Up @@ -1605,33 +1617,122 @@ test "gossip.data: set & get socket on contact info" {
}

test "gossip.data: contact info bincode serialize matches rust bincode" {
var rust_contact_info_serialized_bytes = [_]u8{
57, 54, 18, 6, 106, 202, 13, 245, 224, 235, 33, 252, 254, 251, 161, 17, 248, 108, 25, 214, 169,
154, 91, 101, 17, 121, 235, 82, 175, 197, 144, 145, 100, 200, 0, 0, 0, 0, 0, 0, 0, 44,
1, 1, 2, 3, 4, 0, 0, 0, 5, 0, 0, 0, 6, 4, 0, 0, 0, 0, 127, 0, 0,
1, 0, 0, 0, 0, 127, 0, 0, 1, 0, 0, 0, 0, 127, 0, 0, 1, 0, 0, 0, 0,
127, 0, 0, 1, 6, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10,
20, 30, 0,
// ContactInfo generated using rust ConfigInfo::new_rand(..., ...); and printed in debug format
// ContactInfo serialized using rust bincode
//
// ContactInfo {
// pubkey: 4NftWecdfGcYZMJahnAAX5Cw1PLGLZhYFB19wL6AkXqW,
// wallclock: 1721060646885,
// outset: 1721060141617172,
// shred_version: 0,
// version: 2.1.0 (src:00000000; feat:12366211, client:Agave),
// addrs: [127.0.0.1],
// sockets: [
// SocketEntry { key: 10, index: 0, offset: 8001 },
// SocketEntry { key: 11, index: 0, offset: 1 },
// SocketEntry { key: 5, index: 0, offset: 1 },
// SocketEntry { key: 6, index: 0, offset: 1 },
// SocketEntry { key: 9, index: 0, offset: 1 },
// SocketEntry { key: 1, index: 0, offset: 1 },
// SocketEntry { key: 4, index: 0, offset: 2 },
// SocketEntry { key: 8, index: 0, offset: 1 },
// SocketEntry { key: 7, index: 0, offset: 1 },
// SocketEntry { key: 2, index: 0, offset: 889 },
// SocketEntry { key: 3, index: 0, offset: 1 },
// SocketEntry { key: 0, index: 0, offset: 11780 }
// ],
// extensions: [],
// cache: [
// 127.0.0.1:20680,
// 127.0.0.1:8006,
// 127.0.0.1:8899,
// 127.0.0.1:8900,
// 127.0.0.1:8008,
// 127.0.0.1:8003,
// 127.0.0.1:8004,
// 127.0.0.1:8010,
// 127.0.0.1:8009,
// 127.0.0.1:8005,
// 127.0.0.1:8001,
// 127.0.0.1:8002
// ]
// }

const rust_contact_info_serialized_bytes = [_]u8{
50, 32, 58, 140, 212, 209, 174, 133, 183, 143, 242, 155,
13, 127, 185, 10, 117, 50, 199, 209, 255, 166, 74, 36,
67, 97, 239, 155, 203, 202, 153, 93, 229, 191, 213, 185,
139, 50, 20, 208, 96, 138, 75, 29, 6, 0, 0, 0,
2, 1, 0, 0, 0, 0, 0, 131, 177, 188, 0, 3,
1, 0, 0, 0, 0, 127, 0, 0, 1, 12, 10, 0,
193, 62, 11, 0, 1, 5, 0, 1, 6, 0, 1, 9,
0, 1, 1, 0, 1, 4, 0, 2, 8, 0, 1, 7,
0, 1, 2, 0, 249, 6, 3, 0, 1, 0, 0, 132,
92, 0,
};

const pubkey = Pubkey.fromString("4rL4RCWHz3iNCdCaveD8KcHfV9YWGsqSHFPo7X2zBNwa") catch unreachable;
var ci = ContactInfo.initDummyForTest(testing.allocator, pubkey, 100, 200, 300);
defer ci.deinit();
const rust_contact_info_cache = [_]SocketAddr{
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 20680),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8006),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8900),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8008),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8003),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8004),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8010),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8009),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8005),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8001),
SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8002),
};

// Build identical Sig contact info
var sig_contact_info = ContactInfo{
.pubkey = Pubkey.fromString("4NftWecdfGcYZMJahnAAX5Cw1PLGLZhYFB19wL6AkXqW") catch unreachable,
.wallclock = 1721060646885,
.outset = 1721060141617172,
.shred_version = 0,
.version = ClientVersion.new(2, 1, 0, 0, 12366211, 3),
.addrs = ArrayList(IpAddr).init(testing.allocator),
.sockets = ArrayList(SocketEntry).init(testing.allocator),
.extensions = ArrayList(Extension).init(testing.allocator),
};
defer sig_contact_info.deinit();
sig_contact_info.addrs.append(IpAddr.newIpv4(127, 0, 0, 1)) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .turbine_recv, .index = 0, .offset = 8001 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .turbine_recv_quic, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .tpu, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .tpu_forwards, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .tpu_vote, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .repair, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .serve_repair, .index = 0, .offset = 2 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .tpu_quic, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .tpu_forwards_quic, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .rpc, .index = 0, .offset = 889 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .rpc_pubsub, .index = 0, .offset = 1 }) catch unreachable;
sig_contact_info.sockets.append(.{ .key = .gossip, .index = 0, .offset = 11780 }) catch unreachable;
sig_contact_info.buildCache();

// Check that the cache is built correctly
for (0.., sig_contact_info.cache) |i, socket| {
try testing.expect(socket.eql(&rust_contact_info_cache[i]));
break;
}

// Check that the serialized bytes match the rust serialized bytes
var buf = std.ArrayList(u8).init(testing.allocator);
bincode.write(buf.writer(), ci, bincode.Params.standard) catch unreachable;
bincode.write(buf.writer(), sig_contact_info, bincode.Params.standard) catch unreachable;
defer buf.deinit();

try testing.expect(std.mem.eql(u8, &rust_contact_info_serialized_bytes, buf.items));

// Check that the deserialized contact info matches the original
var stream = std.io.fixedBufferStream(buf.items);
var ci2 = try bincode.read(testing.allocator, ContactInfo, stream.reader(), bincode.Params.standard);
defer ci2.deinit();

try testing.expect(ci2.addrs.items.len == 4);
try testing.expect(ci2.sockets.items.len == 6);
try testing.expect(ci2.pubkey.equals(&ci.pubkey));
try testing.expect(ci2.outset == ci.outset);
var sig_contact_info_deserialised = try bincode.read(testing.allocator, ContactInfo, stream.reader(), bincode.Params.standard);
defer sig_contact_info_deserialised.deinit();
try testing.expect(sig_contact_info_deserialised.addrs.items.len == 1);
try testing.expect(sig_contact_info_deserialised.sockets.items.len == 12);
try testing.expect(sig_contact_info_deserialised.pubkey.equals(&sig_contact_info.pubkey));
try testing.expect(sig_contact_info_deserialised.outset == sig_contact_info.outset);
}

test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" {
Expand Down
7 changes: 4 additions & 3 deletions src/gossip/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,8 @@ pub const GossipService = struct {
var shred_version_assigned = false;

while (!self.exit.load(.unordered)) {
defer loop_timer.reset();

if (pull_req_timer.read().as_millis() > GOSSIP_PULL_RATE_MS) pull_blk: {
defer pull_req_timer.reset();
// this also includes sending ping messages to other peers
Expand Down Expand Up @@ -833,7 +835,6 @@ pub const GossipService = struct {

// sleep
if (loop_timer.read().as_millis() < GOSSIP_SLEEP_MILLIS) {
defer loop_timer.reset();
const time_left_ms = GOSSIP_SLEEP_MILLIS -| loop_timer.read().as_millis();
std.time.sleep(time_left_ms * std.time.ns_per_ms);
}
Expand Down Expand Up @@ -3104,11 +3105,11 @@ pub const BenchmarkGossipServicePullRequests = struct {
bench_args.n_pull_requests,
);
for (0..bench_args.n_pull_requests) |_| {
const packet = try fuzz.randomPullRequest(
const packet = try fuzz.randomPullRequestWithContactInfo(
allocator,
rng,
&recv_keypair,
address.toEndpoint(),
signed_contact_info_recv,
);
packet_batch.appendAssumeCapacity(packet);
}
Expand Down

0 comments on commit 88c4fa2

Please sign in to comment.