Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(gossip) Add methods, data structures, and edits to improve memory safety in gossip #189

Merged
merged 20 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9dc11fe
Remove redundant parameter from processMessages
yewman Jul 3, 2024
68a8e4c
Implement ThreadSafeContactInfo to avoid moving ContactInfo data stru…
yewman Jul 3, 2024
0436566
Modify getContactInfo to use ThreadSafeContactInfo, arguably this is …
yewman Jul 3, 2024
3070872
Fix unsafe access of gossip values in GossipService.handleBatchPullRe…
yewman Jul 3, 2024
1ae6985
Add comments on memory unsafety in GossipService.buildPushMessages fo…
yewman Jul 3, 2024
a1adf4f
Remove redundant, potentially dangerous argument for PullRequestTask,…
yewman Jul 5, 2024
56296f5
Add saturating subs to timestamp operations
yewman Jul 8, 2024
37d60a8
Address minor PR comments
yewman Jul 8, 2024
713538b
Change to use timer instead of wall clock, not that nice as we need t…
yewman Jul 8, 2024
4e55d79
Implement clone for gossip data to allow for safe construction of mes…
yewman Jul 9, 2024
65b5c8f
Implement fix for memory leaks in gossip: needs testing
yewman Jul 10, 2024
945ab44
Add gossip diagrams to readme.md
yewman Jul 10, 2024
75c2fe6
Implement Timer wrapper for save timing units
yewman Jul 11, 2024
b311478
a few small fixes
0xNineteen Jul 11, 2024
88c4fa2
Implement bincode init method and update gossip serialise test
yewman Jul 15, 2024
d169cef
Address PR comments
yewman Jul 15, 2024
cbde33e
Add comments on post deserialise method
yewman Jul 15, 2024
eeed38e
Rename post deserialise callback
yewman Jul 15, 2024
ea4b3a8
add additional comment
0xNineteen Jul 15, 2024
060dc49
fix lint
0xNineteen Jul 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 22 additions & 28 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@

const std = @import("std");
const curl = @import("curl");
const Pubkey = @import("../core/pubkey.zig").Pubkey;
const gossip = @import("../gossip/service.zig");
const GossipService = gossip.GossipService;
const ContactInfo = @import("../gossip/data.zig").ContactInfo;
const GossipTable = @import("../gossip/table.zig").GossipTable;
const SlotAndHash = @import("./snapshots.zig").SlotAndHash;
const Logger = @import("../trace/log.zig").Logger;
const Hash = @import("../core/hash.zig").Hash;
const sig = @import("../lib.zig");

const SlotAndHash = sig.accounts_db.snapshots.SlotAndHash;
const Pubkey = sig.core.Pubkey;
const Hash = sig.core.Hash;
const GossipTable = sig.gossip.GossipTable;
const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo;
const GossipService = sig.gossip.GossipService;
const Logger = sig.trace.Logger;

const DOWNLOAD_PROGRESS_UPDATES_NS = 30 * std.time.ns_per_s;

/// Analogous to [PeerSnapshotHash](https://github.com/anza-xyz/agave/blob/f868aa38097094e4fb78a885b6fb27ce0e43f5c7/validator/src/bootstrap.rs#L342)
const PeerSnapshotHash = struct {
contact_info: ContactInfo,
contact_info: ThreadSafeContactInfo,
full_snapshot: SlotAndHash,
inc_snapshot: ?SlotAndHash,
};
Expand All @@ -41,7 +42,7 @@ const PeerSearchResult = struct {
pub fn findPeersToDownloadFromAssumeCapacity(
allocator: std.mem.Allocator,
table: *const GossipTable,
contact_infos: []const ContactInfo,
contact_infos: []const ThreadSafeContactInfo,
yewman marked this conversation as resolved.
Show resolved Hide resolved
my_shred_version: usize,
my_pubkey: Pubkey,
blacklist: []const Pubkey,
Expand Down Expand Up @@ -98,10 +99,10 @@ pub fn findPeersToDownloadFromAssumeCapacity(
result.invalid_shred_version += 1;
continue;
}
_ = peer_contact_info.getSocket(.rpc) orelse {
if (peer_contact_info.rpc_addr == null) {
result.no_rpc_count += 1;
continue;
};
}
const gossip_data = table.get(.{ .SnapshotHashes = peer_contact_info.pubkey }) orelse {
result.no_snapshot_hashes_count += 1;
continue;
Expand Down Expand Up @@ -143,7 +144,6 @@ pub fn findPeersToDownloadFromAssumeCapacity(
}

valid_peers.appendAssumeCapacity(.{
// NOTE: maybe we need to deep clone here due to arraylist sockets?
.contact_info = peer_contact_info.*,
.full_snapshot = snapshot_hashes.full,
.inc_snapshot = max_inc_hash,
Expand All @@ -168,7 +168,7 @@ pub fn downloadSnapshotsFromGossip(
logger.infof("starting snapshot download with min download speed: {d} MB/s", .{min_mb_per_sec});

// TODO: maybe make this bigger? or dynamic?
var contact_info_buf: [1_000]ContactInfo = undefined;
var contact_info_buf: [1_000]ThreadSafeContactInfo = undefined;

const my_contact_info = gossip_service.my_contact_info;

Expand All @@ -187,7 +187,7 @@ pub fn downloadSnapshotsFromGossip(
defer lg.unlock();
const table: *const GossipTable = lg.get();

const contacts = table.getContactInfos(&contact_info_buf, 0);
const contacts = table.getThreadSafeContactInfos(&contact_info_buf, 0);

try available_snapshot_peers.ensureTotalCapacity(contacts.len);
const result = try findPeersToDownloadFromAssumeCapacity(
Expand All @@ -214,7 +214,7 @@ pub fn downloadSnapshotsFromGossip(
});
defer allocator.free(snapshot_filename);

const rpc_socket = peer.contact_info.getSocket(.rpc).?;
const rpc_socket = peer.contact_info.rpc_addr.?;
const rpc_url_bounded = rpc_socket.toStringBounded();
const rpc_url = rpc_url_bounded.constSlice();

Expand Down Expand Up @@ -480,16 +480,13 @@ test "accounts_db.download: test remove untrusted peers" {
const my_shred_version: usize = 19;
const my_pubkey = Pubkey.random(rng);

const contact_infos: []ContactInfo = try allocator.alloc(ContactInfo, 10);
defer {
for (contact_infos) |ci| ci.deinit();
allocator.free(contact_infos);
}
const contact_infos: []ThreadSafeContactInfo = try allocator.alloc(ThreadSafeContactInfo, 10);
defer allocator.free(contact_infos);

for (contact_infos) |*ci| {
var lci = LegacyContactInfo.default(Pubkey.random(rng));
lci.rpc.setPort(19); // no long unspecified = valid
ci.* = try lci.toContactInfo(allocator);
ci.* = ThreadSafeContactInfo.fromLegacyContactInfo(lci);
ci.shred_version = 19; // matching shred version
}

Expand Down Expand Up @@ -559,16 +556,13 @@ test "accounts_db.download: test finding peers" {
const my_shred_version: usize = 19;
const my_pubkey = Pubkey.random(rng);

const contact_infos: []ContactInfo = try allocator.alloc(ContactInfo, 10);
defer {
for (contact_infos) |ci| ci.deinit();
allocator.free(contact_infos);
}
const contact_infos: []ThreadSafeContactInfo = try allocator.alloc(ThreadSafeContactInfo, 10);
defer allocator.free(contact_infos);

for (contact_infos) |*ci| {
var lci = LegacyContactInfo.default(Pubkey.random(rng));
lci.rpc.setPort(19); // no long unspecified = valid
ci.* = try lci.toContactInfo(allocator);
ci.* = ThreadSafeContactInfo.fromLegacyContactInfo(lci);
ci.shred_version = 19; // matching shred version
}

Expand Down
11 changes: 11 additions & 0 deletions src/bincode/bincode.zig
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ pub fn read(allocator: std.mem.Allocator, comptime U: type, reader: anytype, par
@field(data, field.name) = try bincode.read(allocator, field.type, reader, params);
}
}

// TODO: improve implementation of post deserialise method
const post_deserialize = "!bincode-config:post-deserialize";
if (@hasDecl(T, post_deserialize)) {
const field_config = @field(T, post_deserialize);
if (field_config.post_deserialize_fn) |post_deserialize_fn| {
post_deserialize_fn(&data);
}
}

return data;
},
.Optional => |info| {
Expand Down Expand Up @@ -594,6 +604,7 @@ pub fn FieldConfig(comptime T: type) type {
skip: bool = false,
default_on_eof: bool = false,
default_fn: ?fn (alloc: std.mem.Allocator) T = null,
post_deserialize_fn: ?fn (self: *T) void = null,
};
}

Expand Down
10 changes: 10 additions & 0 deletions src/bloom/bit_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ pub fn DynamicArrayBitSet(comptime MaskIntType: type) type {
return self.bit_length;
}

pub fn clone(self: Self, allocator: Allocator) error{OutOfMemory}!Self {
return .{
.num_masks = self.num_masks,
.last_pad_bits = self.last_pad_bits,
.last_item_mask = self.last_item_mask,
.bit_length = self.bit_length,
.masks = try allocator.dupe(MaskInt, self.masks),
};
}

/// Returns true if the bit at the specified index
/// is present in the set, false otherwise.
pub fn isSet(self: Self, index: usize) bool {
Expand Down
42 changes: 42 additions & 0 deletions src/core/transaction.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ pub const Transaction = struct {
};
}

pub fn clone(self: *const Transaction, allocator: std.mem.Allocator) error{OutOfMemory}!Transaction {
return .{
.signatures = try allocator.dupe(Signature, self.signatures),
.message = try self.message.clone(allocator),
};
}

pub fn deinit(self: *Transaction, allocator: std.mem.Allocator) void {
allocator.free(self.signatures);
self.message.deinit(allocator);
}

pub fn sanitize(self: *const Transaction) !void {
const num_required_sigs = self.message.header.num_required_signatures;
const num_signatures = self.signatures.len;
Expand Down Expand Up @@ -57,6 +69,23 @@ pub const Message = struct {
};
}

pub fn clone(self: *const Message, allocator: std.mem.Allocator) error{OutOfMemory}!Message {
const instructions = try allocator.alloc(CompiledInstruction, self.instructions.len);
for (instructions, 0..) |*ci, i| ci.* = try self.instructions[i].clone(allocator);
return .{
.header = self.header,
.account_keys = try allocator.dupe(Pubkey, self.account_keys),
.recent_blockhash = self.recent_blockhash,
.instructions = instructions,
};
}

pub fn deinit(self: *Message, allocator: std.mem.Allocator) void {
allocator.free(self.account_keys);
for (self.instructions) |*ci| ci.deinit(allocator);
allocator.free(self.instructions);
}

pub const MessageSanitizeError = error{
NotEnoughAccounts,
MissingWritableFeePayer,
Expand Down Expand Up @@ -118,6 +147,19 @@ pub const CompiledInstruction = struct {

pub const @"!bincode-config:accounts" = ShortVecConfig(u8);
pub const @"!bincode-config:data" = ShortVecConfig(u8);

pub fn clone(self: *const CompiledInstruction, allocator: std.mem.Allocator) error{OutOfMemory}!CompiledInstruction {
return .{
.program_id_index = self.program_id_index,
.accounts = try allocator.dupe(u8, self.accounts),
.data = try allocator.dupe(u8, self.data),
};
}

pub fn deinit(self: *CompiledInstruction, allocator: std.mem.Allocator) void {
allocator.free(self.accounts);
allocator.free(self.data);
}
};

test "core.transaction: tmp" {
Expand Down
27 changes: 12 additions & 15 deletions src/gossip/active_set.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ const EndPoint = network.EndPoint;

const Pubkey = sig.core.Pubkey;
const Bloom = sig.bloom.Bloom;
const ContactInfo = sig.gossip.data.ContactInfo;
const SignedGossipData = sig.gossip.data.SignedGossipData;
const LegacyContactInfo = sig.gossip.data.LegacyContactInfo;
const ThreadSafeContactInfo = sig.gossip.data.ThreadSafeContactInfo;
const GossipTable = sig.gossip.table.GossipTable;

const getWallclockMs = sig.gossip.getWallclockMs;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub const ActiveSet = struct {
pub fn rotate(
self: *Self,
rand: std.Random,
peers: []ContactInfo,
peers: []ThreadSafeContactInfo,
) error{OutOfMemory}!void {
// clear the existing
var iter = self.peers.iterator();
Expand All @@ -65,7 +65,7 @@ pub const ActiveSet = struct {
return;
}
const size = @min(peers.len, NUM_ACTIVE_SET_ENTRIES);
shuffleFirstN(rand, ContactInfo, peers, size);
shuffleFirstN(rand, ThreadSafeContactInfo, peers, size);

const bloom_num_items = @max(peers.len, MIN_NUM_BLOOM_ITEMS);
for (0..size) |i| {
Expand Down Expand Up @@ -107,8 +107,8 @@ pub const ActiveSet = struct {
var iter = self.peers.iterator();
while (iter.next()) |entry| {
// lookup peer contact info
const peer_info = table.getContactInfo(entry.key_ptr.*) orelse continue;
const peer_gossip_addr = peer_info.getSocket(.gossip) orelse continue;
const peer_info = table.getThreadSafeContactInfo(entry.key_ptr.*) orelse continue;
const peer_gossip_addr = peer_info.gossip_addr orelse continue;

peer_gossip_addr.sanitize() catch continue;

Expand Down Expand Up @@ -138,15 +138,12 @@ test "gossip.active_set: init/deinit" {

// insert some contacts
var rng = std.rand.DefaultPrng.init(100);
var gossip_peers = try std.ArrayList(ContactInfo).initCapacity(alloc, 10);
defer {
for (gossip_peers.items) |p| p.deinit();
gossip_peers.deinit();
}
var gossip_peers = try std.ArrayList(ThreadSafeContactInfo).initCapacity(alloc, 10);
defer gossip_peers.deinit();

for (0..GOSSIP_PUSH_FANOUT) |_| {
var data = LegacyContactInfo.random(rng.random());
try gossip_peers.append(try data.toContactInfo(alloc));
const data = LegacyContactInfo.random(rng.random());
try gossip_peers.append(ThreadSafeContactInfo.fromLegacyContactInfo(data));

var keypair = try KeyPair.create(null);
const value = try SignedGossipData.initSigned(.{
Expand Down Expand Up @@ -182,16 +179,16 @@ test "gossip.active_set: gracefully rotates with duplicate contact ids" {
const alloc = std.testing.allocator;

var rng = std.rand.DefaultPrng.init(100);
var gossip_peers = try std.ArrayList(ContactInfo).initCapacity(alloc, 10);
var gossip_peers = try std.ArrayList(ThreadSafeContactInfo).initCapacity(alloc, 10);
defer gossip_peers.deinit();

var data = try LegacyContactInfo.random(rng.random()).toContactInfo(alloc);
var dupe = try LegacyContactInfo.random(rng.random()).toContactInfo(alloc);
defer data.deinit();
defer dupe.deinit();
dupe.pubkey = data.pubkey;
try gossip_peers.append(data);
try gossip_peers.append(dupe);
try gossip_peers.append(ThreadSafeContactInfo.fromContactInfo(data));
try gossip_peers.append(ThreadSafeContactInfo.fromContactInfo(dupe));

var active_set = ActiveSet.init(alloc);
defer active_set.deinit();
Expand Down
Loading
Loading