From 92f5632fb7d42bcaa409c6704f01f8fffdddb14e Mon Sep 17 00:00:00 2001 From: suurkivi <136639517+suurkivi@users.noreply.github.com> Date: Wed, 6 Nov 2024 10:53:47 -0800 Subject: [PATCH] grpc server scaffolding --- Cargo.toml | 6 +- build.rs | 10 +- src/main.rs | 60 ++++++++-- src/network/gossip.rs | 3 +- src/network/mod.rs | 1 + src/network/server.rs | 34 ++++++ src/proto/message.proto | 209 +++++++++++++++++++++++++++++++++ src/proto/rpc.proto | 8 ++ src/proto/username_proof.proto | 17 +++ 9 files changed, 334 insertions(+), 14 deletions(-) create mode 100644 src/network/server.rs create mode 100644 src/proto/message.proto create mode 100644 src/proto/rpc.proto create mode 100644 src/proto/username_proof.proto diff --git a/Cargo.toml b/Cargo.toml index 84357d91..f94f76e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,14 +12,14 @@ tokio = { version = "1.40.0", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10.6" -tonic = "0.9.2" -prost = "0.11.9" +tonic = "0.12.3" +prost = "0.13.3" futures = "0.3.28" parking_lot = "0.12.1" clap = { version = "4.3.0", features = ["derive"] } libp2p = { version = "0.54.1", features = ["tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic"] } async-trait = "0.1.68" -tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] } hex = "0.4.3" ractor = "0.11.2" malachite-consensus = { path = "../malachite/code/crates/consensus" } diff --git a/build.rs b/build.rs index d394d6dd..0f4a0aa3 100644 --- a/build.rs +++ b/build.rs @@ -1,10 +1,16 @@ fn main() -> Result<(), Box> { - // tonic_build::compile_protos("src/proto/blocks.proto")?; let mut builder = tonic_build::configure(); // Custom type attributes required for malachite builder = builder.type_attribute("snapchain.ShardHash", "#[derive(Eq, PartialOrd, Ord)]"); - builder.compile(&["src/proto/blocks.proto"], &["src/proto"])?; + // TODO: auto-discover proto files + builder.compile(&[ + "src/proto/blocks.proto", + "src/proto/rpc.proto", + "src/proto/message.proto", + "src/proto/username_proof.proto", + ], &["src/proto"])?; + Ok(()) } diff --git a/src/main.rs b/src/main.rs index 8856b3f0..1a9138f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,11 @@ pub mod network; pub mod connectors; mod cfg; +use std::error::Error; +use std::io; +use std::net::SocketAddr; +use clap::Parser; +use futures::stream::StreamExt; use libp2p::identity::ed25519::Keypair; use malachite_config::TimeoutConfig; use malachite_metrics::{Metrics, SharedRegistry}; @@ -11,14 +16,19 @@ use std::time::Duration; use tokio::signal::ctrl_c; use tokio::sync::mpsc; use tokio::{select, time}; +use tokio::time::sleep; +use tonic::transport::Server; +use tracing::{error, info}; use tracing_subscriber::EnvFilter; use connectors::fname::Fetcher; use crate::consensus::consensus::{Consensus, ConsensusMsg, ConsensusParams}; use crate::core::types::{proto, Address, Height, ShardId, SnapchainShard, SnapchainValidator, SnapchainValidatorContext, SnapchainValidatorSet}; -use crate::network::gossip::{GossipEvent}; +use crate::network::gossip::GossipEvent; use network::gossip::SnapchainGossip; +use network::server::MySnapchainService; +use network::server::rpc::snapchain_service_server::SnapchainServiceServer; pub enum SystemMessage { Consensus(ConsensusMsg), @@ -26,7 +36,7 @@ pub enum SystemMessage { #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> Result<(), Box> { let args: Vec = std::env::args().collect(); let app_config = cfg::load_and_merge_config(args)?; @@ -39,7 +49,17 @@ async fn main() -> Result<(), Box> { let port = base_port + app_config.id; let addr = format!("/ip4/0.0.0.0/udp/{}/quic-v1", port); - println!("SnapchainService (ID: {}) listening on {}", app_config.id, addr); + let base_grpc_port = 50060; + let grpc_port = base_grpc_port + app_config.id; + let grpc_addr = format!("0.0.0.0:{}", grpc_port); + let grpc_socket_addr: SocketAddr = grpc_addr.parse()?; + + info!( + id = app_config.id, + addr = addr, + grpc_addr = grpc_addr, + "SnapchainService listening", + ); let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); match app_config.log_format.as_str() { @@ -60,7 +80,7 @@ async fn main() -> Result<(), Box> { let gossip_result = SnapchainGossip::create(keypair.clone(), addr, system_tx.clone()); if let Err(e) = gossip_result { - println!("Failed to create SnapchainGossip: {:?}", e); + error!(error = ?e, "Failed to create SnapchainGossip"); return Ok(()); } @@ -68,9 +88,9 @@ async fn main() -> Result<(), Box> { let gossip_tx = gossip.tx.clone(); tokio::spawn(async move { - println!("Starting gossip"); + info!("Starting gossip"); gossip.start().await; - println!("Gossip Stopped"); + info!("Gossip Stopped"); }); if !app_config.fnames.disable { @@ -81,6 +101,25 @@ async fn main() -> Result<(), Box> { }); } + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + + tokio::spawn(async move { + let service = MySnapchainService::default(); + + let resp = Server::builder() + .add_service(SnapchainServiceServer::new(service)) + .serve(grpc_socket_addr) + .await; + + let msg = "grpc server stopped"; + match resp { + Ok(()) => error!(msg), + Err(e) => error!(error = ?e, "{}", msg), + } + + shutdown_tx.send(()).await.ok(); + }); + let registry = SharedRegistry::global(); let metrics = Metrics::register(registry); @@ -119,7 +158,12 @@ async fn main() -> Result<(), Box> { loop { select! { _ = ctrl_c() => { - println!("Received Ctrl-C, shutting down"); + info!("Received Ctrl-C, shutting down"); + consensus_actor.stop(None); + return Ok(()); + } + _ = shutdown_rx.recv() => { + error!("Received shutdown signal, shutting down"); consensus_actor.stop(None); return Ok(()); } @@ -134,7 +178,7 @@ async fn main() -> Result<(), Box> { }), nonce: tick_count as u64, // Need the nonce to avoid the gossip duplicate message check }; - println!("Registering validator with nonce: {}", register_validator.nonce); + info!("Registering validator with nonce: {}", register_validator.nonce); gossip_tx.send(GossipEvent::RegisterValidator(register_validator)).await?; } } diff --git a/src/network/gossip.rs b/src/network/gossip.rs index 8b47d299..4e8f06bc 100644 --- a/src/network/gossip.rs +++ b/src/network/gossip.rs @@ -14,6 +14,7 @@ use std::time::Duration; use tokio::io; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; +use tracing::info; pub enum GossipEvent { BroadcastSignedVote(SignedVote), @@ -124,7 +125,7 @@ impl SnapchainGossip { SwarmEvent::Behaviour(SnapchainBehaviorEvent::Gossipsub(gossipsub::Event::Unsubscribed { peer_id, topic })) => println!("Peer: {peer_id} unsubscribed to topic: {topic}"), SwarmEvent::NewListenAddr { address, .. } => { - println!("Local node is listening on {address}"); + info!(address = address.to_string(), "Local node is listening"); }, SwarmEvent::Behaviour(SnapchainBehaviorEvent::Gossipsub(gossipsub::Event::Message { propagation_source: peer_id, diff --git a/src/network/mod.rs b/src/network/mod.rs index e6491174..964676ec 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -1 +1,2 @@ pub mod gossip; +pub mod server; diff --git a/src/network/server.rs b/src/network/server.rs new file mode 100644 index 00000000..83647094 --- /dev/null +++ b/src/network/server.rs @@ -0,0 +1,34 @@ +use std::error::Error; +use std::net::SocketAddr; +use tonic::{transport::Server, Request, Response, Status}; +use tonic::Code::Unimplemented; +use tracing::{info}; +use hex::ToHex; + + +pub mod rpc { + tonic::include_proto!("rpc"); +} + +pub mod message { + tonic::include_proto!("message"); +} + +pub mod username_proof { + tonic::include_proto!("username_proof"); +} + +use rpc::snapchain_service_server::{SnapchainService, SnapchainServiceServer}; +use message::{Message}; + +#[derive(Default)] +pub struct MySnapchainService; + +#[tonic::async_trait] +impl SnapchainService for MySnapchainService { + async fn submit_message(&self, request: Request) -> Result, Status> { + let hash = request.get_ref().hash.encode_hex::(); + info!(hash, "Received a message"); + Err(Status::new(Unimplemented, "not implemented")) + } +} diff --git a/src/proto/message.proto b/src/proto/message.proto new file mode 100644 index 00000000..ff2cb9c2 --- /dev/null +++ b/src/proto/message.proto @@ -0,0 +1,209 @@ +syntax = "proto3"; +package message; + +import "username_proof.proto"; + + +/** + * A Message is a delta operation on the Farcaster network. The message protobuf is an envelope + * that wraps a MessageData object and contains a hash and signature which can verify its authenticity. + */ +message Message { + MessageData data = 1; // Contents of the message + bytes hash = 2; // Hash digest of data + HashScheme hash_scheme = 3; // Hash scheme that produced the hash digest + bytes signature = 4; // Signature of the hash digest + SignatureScheme signature_scheme = 5; // Signature scheme that produced the signature + bytes signer = 6; // Public key or address of the key pair that produced the signature + optional bytes data_bytes = 7; // MessageData serialized to bytes if using protobuf serialization other than ts-proto +} + +/** +* A MessageData object contains properties common to all messages and wraps a body object which +* contains properties specific to the MessageType. +*/ +message MessageData { + MessageType type = 1; // Type of message contained in the body + uint64 fid = 2; // Farcaster ID of the user producing the message + uint32 timestamp = 3; // Farcaster epoch timestamp in seconds + FarcasterNetwork network = 4; // Farcaster network the message is intended for + oneof body { + CastAddBody cast_add_body = 5; + CastRemoveBody cast_remove_body = 6; + ReactionBody reaction_body = 7; + VerificationAddAddressBody verification_add_address_body = 9; + VerificationRemoveBody verification_remove_body = 10; + // SignerAddBody signer_add_body = 11; // Deprecated + UserDataBody user_data_body = 12; + // SignerRemoveBody signer_remove_body = 13; // Deprecated + LinkBody link_body = 14; + username_proof.UserNameProof username_proof_body = 15; + FrameActionBody frame_action_body = 16; + + // Compaction messages + LinkCompactStateBody link_compact_state_body = 17; + } // Properties specific to the MessageType +} + +/** Type of hashing scheme used to produce a digest of MessageData */ +enum HashScheme { + HASH_SCHEME_NONE = 0; + HASH_SCHEME_BLAKE3 = 1; // Default scheme for hashing MessageData +} + +/** Type of signature scheme used to sign the Message hash */ +enum SignatureScheme { + SIGNATURE_SCHEME_NONE = 0; + SIGNATURE_SCHEME_ED25519 = 1; // Ed25519 signature (default) + SIGNATURE_SCHEME_EIP712 = 2; // ECDSA signature using EIP-712 scheme +} + +/** Type of the MessageBody */ +enum MessageType { + MESSAGE_TYPE_NONE = 0; + MESSAGE_TYPE_CAST_ADD = 1; // Add a new Cast + MESSAGE_TYPE_CAST_REMOVE = 2; // Remove an existing Cast + MESSAGE_TYPE_REACTION_ADD = 3; // Add a Reaction to a Cast + MESSAGE_TYPE_REACTION_REMOVE = 4; // Remove a Reaction from a Cast + MESSAGE_TYPE_LINK_ADD = 5; // Add a new Link + MESSAGE_TYPE_LINK_REMOVE = 6; // Remove an existing Link + MESSAGE_TYPE_VERIFICATION_ADD_ETH_ADDRESS = 7; // Add a Verification of an Ethereum Address + MESSAGE_TYPE_VERIFICATION_REMOVE = 8; // Remove a Verification + // Deprecated + // MESSAGE_TYPE_SIGNER_ADD = 9; // Add a new Ed25519 key pair that signs messages for a user + // MESSAGE_TYPE_SIGNER_REMOVE = 10; // Remove an Ed25519 key pair that signs messages for a user + MESSAGE_TYPE_USER_DATA_ADD = 11; // Add metadata about a user + MESSAGE_TYPE_USERNAME_PROOF = 12; // Add or replace a username proof + MESSAGE_TYPE_FRAME_ACTION = 13; // A Farcaster Frame action + MESSAGE_TYPE_LINK_COMPACT_STATE = 14; // Link Compaction State Message +} + +/** Farcaster network the message is intended for */ +enum FarcasterNetwork { + FARCASTER_NETWORK_NONE = 0; + FARCASTER_NETWORK_MAINNET = 1; // Public primary network + FARCASTER_NETWORK_TESTNET = 2; // Public test network + FARCASTER_NETWORK_DEVNET = 3; // Private test network +} + +/** Adds metadata about a user */ +message UserDataBody { + UserDataType type = 1; // Type of metadata + string value = 2; // Value of the metadata +} + +/** Type of UserData */ +enum UserDataType { + USER_DATA_TYPE_NONE = 0; + USER_DATA_TYPE_PFP = 1; // Profile Picture for the user + USER_DATA_TYPE_DISPLAY = 2; // Display Name for the user + USER_DATA_TYPE_BIO = 3; // Bio for the user + USER_DATA_TYPE_URL = 5; // URL of the user + USER_DATA_TYPE_USERNAME = 6; // Preferred Name for the user + USER_DATA_TYPE_LOCATION = 7; // Current location for the user + USER_DATA_TYPE_TWITTER = 8; // Username of user on twitter + USER_DATA_TYPE_GITHUB = 9; // Username of user on github +} + +message Embed { + oneof embed { + string url = 1; + CastId cast_id = 2; + } +} + +/** Type of cast */ +enum CastType { + CAST = 0; + LONG_CAST = 1; +} + + +/** Adds a new Cast */ +message CastAddBody { + repeated string embeds_deprecated = 1; // URLs to be embedded in the cast + repeated uint64 mentions = 2; // Fids mentioned in the cast + oneof parent { + CastId parent_cast_id = 3; // Parent cast of the cast + string parent_url = 7; // Parent URL + }; + string text = 4; // Text of the cast + repeated uint32 mentions_positions = 5; // Positions of the mentions in the text + repeated Embed embeds = 6; // URLs or cast ids to be embedded in the cast + CastType type = 8; // Type of cast +} + +/** Removes an existing Cast */ +message CastRemoveBody { + bytes target_hash = 1; // Hash of the cast to remove +} + +/** Identifier used to look up a Cast */ +message CastId { + uint64 fid = 1; // Fid of the user who created the cast + bytes hash = 2; // Hash of the cast +} + +/** Adds or removes a Reaction from a Cast */ +message ReactionBody { + ReactionType type = 1; // Type of reaction + oneof target { + CastId target_cast_id = 2; // CastId of the Cast to react to + string target_url = 3; // URL to react to + } +} + +/** Type of Reaction */ +enum ReactionType { + REACTION_TYPE_NONE = 0; + REACTION_TYPE_LIKE = 1; // Like the target cast + REACTION_TYPE_RECAST = 2; // Share target cast to the user's audience +} + +/** Type of Protocol to disambiguate verification addresses */ +enum Protocol { + PROTOCOL_ETHEREUM = 0; + PROTOCOL_SOLANA = 1; +} + +/** Adds a Verification of ownership of an Address based on Protocol */ +message VerificationAddAddressBody { + bytes address = 1; // Address being verified for a given Protocol + bytes claim_signature = 2; // Signature produced by the user's address for a given Protocol + bytes block_hash = 3; // Hash of the latest Ethereum block when the signature was produced + uint32 verification_type = 4; // Type of verification. 0 = EOA, 1 = contract + uint32 chain_id = 5; // 0 for EOA verifications, 1 or 10 for contract verifications + Protocol protocol = 7; // Protocol of the Verification +} + +/** Removes a Verification of a given protocol */ +message VerificationRemoveBody { + bytes address = 1; // Address of the Verification to remove + Protocol protocol = 2; // Protocol of the Verification to remove +} + +/** Adds or removes a Link */ +message LinkBody { + string type = 1; // Type of link, <= 8 characters + optional uint32 displayTimestamp = 2; // User-defined timestamp that preserves original timestamp when message.data.timestamp needs to be updated for compaction + oneof target { + uint64 target_fid = 3; // The fid the link relates to + } +} + +/** A Compaction message for the Link Store */ +message LinkCompactStateBody { + string type = 1; // Type of link, <= 8 characters + repeated uint64 target_fids = 2; +} + +/** A Farcaster Frame action */ +message FrameActionBody { + bytes url = 1; // URL of the Frame triggering the action + uint32 button_index = 2; // The index of the button pressed (1-4) + CastId cast_id = 3; // The cast which contained the frame url + bytes input_text = 4; // Text input from the user, if present + bytes state = 5; // Serialized frame state value + bytes transaction_id = 6; // Chain-specific transaction ID for tx actions + bytes address = 7; // Chain-specific address for tx actions +} \ No newline at end of file diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto new file mode 100644 index 00000000..ead622fc --- /dev/null +++ b/src/proto/rpc.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; +package rpc; + +import "message.proto"; + +service SnapchainService { + rpc SubmitMessage(message.Message) returns (message.Message); +} diff --git a/src/proto/username_proof.proto b/src/proto/username_proof.proto new file mode 100644 index 00000000..39edb00d --- /dev/null +++ b/src/proto/username_proof.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; +package username_proof; + +enum UserNameType { + USERNAME_TYPE_NONE = 0; + USERNAME_TYPE_FNAME = 1; + USERNAME_TYPE_ENS_L1 = 2; +} + +message UserNameProof { + uint64 timestamp = 1; + bytes name = 2; + bytes owner = 3; + bytes signature = 4; + uint64 fid = 5; + UserNameType type = 6; +} \ No newline at end of file