diff --git a/src/bincode/bincode.zig b/src/bincode/bincode.zig index fae87b9e3..f3d9fc25a 100644 --- a/src/bincode/bincode.zig +++ b/src/bincode/bincode.zig @@ -149,6 +149,15 @@ pub fn read(allocator: std.mem.Allocator, comptime U: type, reader: anytype, par @field(data, field.name) = try bincode.read(allocator, field.type, reader, params); } } + + const init_field = "!bincode-config:init"; + if (@hasDecl(T, init_field)) { + const field_config = @field(T, init_field); + if (field_config.init_fn) |init| { + init(&data); + } + } + return data; }, .Optional => |info| { @@ -594,6 +603,7 @@ pub fn FieldConfig(comptime T: type) type { skip: bool = false, default_on_eof: bool = false, default_fn: ?fn (alloc: std.mem.Allocator) T = null, + init_fn: ?fn (self: *T) void = null, }; } diff --git a/src/gossip/data.zig b/src/gossip/data.zig index 559dd7363..a5d471355 100644 --- a/src/gossip/data.zig +++ b/src/gossip/data.zig @@ -1218,6 +1218,7 @@ pub const ContactInfo = struct { extensions: ArrayList(Extension), cache: [SOCKET_CACHE_SIZE]SocketAddr = socket_addrs_unspecified(), + pub const @"!bincode-config:init" = bincode.FieldConfig(ContactInfo){ .init_fn = ContactInfo.buildCache }; pub const @"!bincode-config:cache" = bincode.FieldConfig([SOCKET_CACHE_SIZE]SocketAddr){ .skip = true }; pub const @"!bincode-config:addrs" = ShortVecArrayListConfig(IpAddr); pub const @"!bincode-config:sockets" = ShortVecArrayListConfig(SocketEntry); @@ -1226,6 +1227,17 @@ pub const ContactInfo = struct { const Self = @This(); + pub fn buildCache(self: *Self) void { + var port: u16 = 0; + for (self.sockets.items) |socket_entry| { + port += socket_entry.offset; + const addr = self.addrs.items[socket_entry.index]; + const socket = SocketAddr.initIpv4(addr.asV4(), port); + socket.sanitize() catch continue; + self.cache[@intFromEnum(socket_entry.key)] = socket; + } + } + pub fn toNodeInstance(self: *Self, rand: std.Random) NodeInstance { return NodeInstance.init(rand, self.pubkey, @intCast(std.time.milliTimestamp())); } @@ -1605,33 +1617,122 @@ test "gossip.data: set & get socket on contact info" { } test "gossip.data: contact info bincode serialize matches rust bincode" { - var rust_contact_info_serialized_bytes = [_]u8{ - 57, 54, 18, 6, 106, 202, 13, 245, 224, 235, 33, 252, 254, 251, 161, 17, 248, 108, 25, 214, 169, - 154, 91, 101, 17, 121, 235, 82, 175, 197, 144, 145, 100, 200, 0, 0, 0, 0, 0, 0, 0, 44, - 1, 1, 2, 3, 4, 0, 0, 0, 5, 0, 0, 0, 6, 4, 0, 0, 0, 0, 127, 0, 0, - 1, 0, 0, 0, 0, 127, 0, 0, 1, 0, 0, 0, 0, 127, 0, 0, 1, 0, 0, 0, 0, - 127, 0, 0, 1, 6, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10, 20, 30, 10, - 20, 30, 0, + // ContactInfo generated using rust ConfigInfo::new_rand(..., ...); and printed in debug format + // ContactInfo serialized using rust bincode + // + // ContactInfo { + // pubkey: 4NftWecdfGcYZMJahnAAX5Cw1PLGLZhYFB19wL6AkXqW, + // wallclock: 1721060646885, + // outset: 1721060141617172, + // shred_version: 0, + // version: 2.1.0 (src:00000000; feat:12366211, client:Agave), + // addrs: [127.0.0.1], + // sockets: [ + // SocketEntry { key: 10, index: 0, offset: 8001 }, + // SocketEntry { key: 11, index: 0, offset: 1 }, + // SocketEntry { key: 5, index: 0, offset: 1 }, + // SocketEntry { key: 6, index: 0, offset: 1 }, + // SocketEntry { key: 9, index: 0, offset: 1 }, + // SocketEntry { key: 1, index: 0, offset: 1 }, + // SocketEntry { key: 4, index: 0, offset: 2 }, + // SocketEntry { key: 8, index: 0, offset: 1 }, + // SocketEntry { key: 7, index: 0, offset: 1 }, + // SocketEntry { key: 2, index: 0, offset: 889 }, + // SocketEntry { key: 3, index: 0, offset: 1 }, + // SocketEntry { key: 0, index: 0, offset: 11780 } + // ], + // extensions: [], + // cache: [ + // 127.0.0.1:20680, + // 127.0.0.1:8006, + // 127.0.0.1:8899, + // 127.0.0.1:8900, + // 127.0.0.1:8008, + // 127.0.0.1:8003, + // 127.0.0.1:8004, + // 127.0.0.1:8010, + // 127.0.0.1:8009, + // 127.0.0.1:8005, + // 127.0.0.1:8001, + // 127.0.0.1:8002 + // ] + // } + + const rust_contact_info_serialized_bytes = [_]u8{ + 50, 32, 58, 140, 212, 209, 174, 133, 183, 143, 242, 155, + 13, 127, 185, 10, 117, 50, 199, 209, 255, 166, 74, 36, + 67, 97, 239, 155, 203, 202, 153, 93, 229, 191, 213, 185, + 139, 50, 20, 208, 96, 138, 75, 29, 6, 0, 0, 0, + 2, 1, 0, 0, 0, 0, 0, 131, 177, 188, 0, 3, + 1, 0, 0, 0, 0, 127, 0, 0, 1, 12, 10, 0, + 193, 62, 11, 0, 1, 5, 0, 1, 6, 0, 1, 9, + 0, 1, 1, 0, 1, 4, 0, 2, 8, 0, 1, 7, + 0, 1, 2, 0, 249, 6, 3, 0, 1, 0, 0, 132, + 92, 0, }; - const pubkey = Pubkey.fromString("4rL4RCWHz3iNCdCaveD8KcHfV9YWGsqSHFPo7X2zBNwa") catch unreachable; - var ci = ContactInfo.initDummyForTest(testing.allocator, pubkey, 100, 200, 300); - defer ci.deinit(); + const rust_contact_info_cache = [_]SocketAddr{ + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 20680), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8006), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8899), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8900), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8008), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8003), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8004), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8010), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8009), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8005), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8001), + SocketAddr.initIpv4(.{ 127, 0, 0, 1 }, 8002), + }; + // Build identical Sig contact info + var sig_contact_info = ContactInfo{ + .pubkey = Pubkey.fromString("4NftWecdfGcYZMJahnAAX5Cw1PLGLZhYFB19wL6AkXqW") catch unreachable, + .wallclock = 1721060646885, + .outset = 1721060141617172, + .shred_version = 0, + .version = ClientVersion.new(2, 1, 0, 0, 12366211, 3), + .addrs = ArrayList(IpAddr).init(testing.allocator), + .sockets = ArrayList(SocketEntry).init(testing.allocator), + .extensions = ArrayList(Extension).init(testing.allocator), + }; + defer sig_contact_info.deinit(); + sig_contact_info.addrs.append(IpAddr.newIpv4(127, 0, 0, 1)) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .turbine_recv, .index = 0, .offset = 8001 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .turbine_recv_quic, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .tpu, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .tpu_forwards, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .tpu_vote, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .repair, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .serve_repair, .index = 0, .offset = 2 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .tpu_quic, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .tpu_forwards_quic, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .rpc, .index = 0, .offset = 889 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .rpc_pubsub, .index = 0, .offset = 1 }) catch unreachable; + sig_contact_info.sockets.append(.{ .key = .gossip, .index = 0, .offset = 11780 }) catch unreachable; + sig_contact_info.buildCache(); + + // Check that the cache is built correctly + for (0.., sig_contact_info.cache) |i, socket| { + try testing.expect(socket.eql(&rust_contact_info_cache[i])); + break; + } + + // Check that the serialized bytes match the rust serialized bytes var buf = std.ArrayList(u8).init(testing.allocator); - bincode.write(buf.writer(), ci, bincode.Params.standard) catch unreachable; + bincode.write(buf.writer(), sig_contact_info, bincode.Params.standard) catch unreachable; defer buf.deinit(); - try testing.expect(std.mem.eql(u8, &rust_contact_info_serialized_bytes, buf.items)); + // Check that the deserialized contact info matches the original var stream = std.io.fixedBufferStream(buf.items); - var ci2 = try bincode.read(testing.allocator, ContactInfo, stream.reader(), bincode.Params.standard); - defer ci2.deinit(); - - try testing.expect(ci2.addrs.items.len == 4); - try testing.expect(ci2.sockets.items.len == 6); - try testing.expect(ci2.pubkey.equals(&ci.pubkey)); - try testing.expect(ci2.outset == ci.outset); + var sig_contact_info_deserialised = try bincode.read(testing.allocator, ContactInfo, stream.reader(), bincode.Params.standard); + defer sig_contact_info_deserialised.deinit(); + try testing.expect(sig_contact_info_deserialised.addrs.items.len == 1); + try testing.expect(sig_contact_info_deserialised.sockets.items.len == 12); + try testing.expect(sig_contact_info_deserialised.pubkey.equals(&sig_contact_info.pubkey)); + try testing.expect(sig_contact_info_deserialised.outset == sig_contact_info.outset); } test "gossip.data: ContactInfo bincode roundtrip maintains data integrity" { diff --git a/src/gossip/service.zig b/src/gossip/service.zig index dc3b180bf..c05dc22d0 100644 --- a/src/gossip/service.zig +++ b/src/gossip/service.zig @@ -763,6 +763,8 @@ pub const GossipService = struct { var shred_version_assigned = false; while (!self.exit.load(.unordered)) { + defer loop_timer.reset(); + if (pull_req_timer.read().as_millis() > GOSSIP_PULL_RATE_MS) pull_blk: { defer pull_req_timer.reset(); // this also includes sending ping messages to other peers @@ -833,7 +835,6 @@ pub const GossipService = struct { // sleep if (loop_timer.read().as_millis() < GOSSIP_SLEEP_MILLIS) { - defer loop_timer.reset(); const time_left_ms = GOSSIP_SLEEP_MILLIS -| loop_timer.read().as_millis(); std.time.sleep(time_left_ms * std.time.ns_per_ms); } @@ -3104,11 +3105,11 @@ pub const BenchmarkGossipServicePullRequests = struct { bench_args.n_pull_requests, ); for (0..bench_args.n_pull_requests) |_| { - const packet = try fuzz.randomPullRequest( + const packet = try fuzz.randomPullRequestWithContactInfo( allocator, rng, - &recv_keypair, address.toEndpoint(), + signed_contact_info_recv, ); packet_batch.appendAssumeCapacity(packet); }