Skip to content

Commit

Permalink
feat: webhook example (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
heilhead authored Jun 26, 2023
1 parent b166f9d commit 77f5fef
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 3 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ anyhow = "1"
structopt = { version = "0.3", default-features = false }
tokio = { version = "1.22", features = ["full"] }
url = "2.3"
warp = { version = "0.3", default-features = false }
serde_json = "1.0"

[[example]]
name = "websocket_client"
Expand All @@ -35,3 +37,7 @@ required-features = ["client","rpc"]
[[example]]
name = "http_client"
required-features = ["client","rpc"]

[[example]]
name = "webhook"
required-features = ["client","rpc"]
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

This is the foundation for the WalletConnect Rust SDK. Currently, there's only the core client and the RPC types required to communicate with the Relay.

See the [basic example](examples/basic_client.rs).
Examples:
- [HTTP client](examples/http_client.rs)
- [WebSocket client](examples/websocket_client.rs)
- [Webhook dispatch](examples/webhook.rs)

## `relay_client`

Expand Down
230 changes: 230 additions & 0 deletions examples/webhook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use {
relay_client::{
http::{Client, WatchRegisterRequest},
ConnectionOptions,
},
relay_rpc::{
auth::{ed25519_dalek::Keypair, rand, AuthToken},
domain::{DecodedClientId, Topic},
jwt::VerifyableClaims,
rpc,
},
std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
sync::Arc,
time::Duration,
},
structopt::StructOpt,
tokio::{sync::mpsc, task::JoinHandle},
url::Url,
warp::Filter,
};

#[derive(StructOpt)]
struct Args {
/// Specify HTTP address.
#[structopt(short, long, default_value = "https://relay.walletconnect.com/rpc")]
address: String,

/// Specify WalletConnect project ID.
#[structopt(short, long, default_value = "3cbaa32f8fbf3cdcc87d27ca1fa68069")]
project_id: String,

/// Webhook server port.
#[structopt(short, long, default_value = "10100")]
webhook_server_port: u16,
}

fn create_conn_opts(key: &Keypair, address: &str, project_id: &str) -> ConnectionOptions {
let aud = Url::parse(address)
.unwrap()
.origin()
.unicode_serialization();

let auth = AuthToken::new("http://example.com")
.aud(aud)
.ttl(Duration::from_secs(60 * 60))
.as_jwt(key)
.unwrap();

ConnectionOptions::new(project_id, auth).with_address(address)
}

#[derive(Debug)]
pub struct WebhookData {
pub url: String,
pub payload: rpc::WatchWebhookPayload,
}

pub struct WebhookServer {
addr: SocketAddr,
handle: JoinHandle<()>,
payload_rx: mpsc::UnboundedReceiver<WebhookData>,
}

impl WebhookServer {
pub fn new(port: u16) -> Self {
let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into();
let (payload_tx, payload_rx) = mpsc::unbounded_channel();
let handle = tokio::spawn(mock_webhook_server(addr, payload_tx));

Self {
addr,
handle,
payload_rx,
}
}

pub fn url(&self) -> String {
format!("http://{}", self.addr)
}

pub async fn recv(&mut self) -> WebhookData {
self.payload_rx.recv().await.unwrap()
}
}

impl Drop for WebhookServer {
fn drop(&mut self) {
self.handle.abort();
}
}

async fn mock_webhook_server(addr: SocketAddr, payload_tx: mpsc::UnboundedSender<WebhookData>) {
let routes = warp::post()
.and(warp::path::tail())
.and(warp::body::json())
.and(warp::any().map(move || payload_tx.clone()))
.then(
move |path: warp::path::Tail,
payload: rpc::WatchWebhookPayload,
payload_tx: mpsc::UnboundedSender<WebhookData>| async move {
let url = format!("http://{addr}/{}", path.as_str());
payload_tx.send(WebhookData { url, payload }).unwrap();
warp::reply()
},
);

warp::serve(routes).run(addr).await;
}

/// Note: This example will only work with a locally running relay, since it
/// requires access to the local HTTP server.
#[tokio::main]
async fn main() -> anyhow::Result<()> {
const PUB_WH_PATH: &str = "/publisher_webhook";
const SUB_WH_PATH: &str = "/subscriber_webhook";

let args = Args::from_args();
let mut server = WebhookServer::new(args.webhook_server_port);
let server_url = server.url();

// Give time for the server to start up.
tokio::time::sleep(Duration::from_millis(500)).await;

let publisher_key = Keypair::generate(&mut rand::thread_rng());
let publisher = Client::new(&create_conn_opts(
&publisher_key,
&args.address,
&args.project_id,
))?;
println!(
"[publisher] client id: {}",
DecodedClientId::from(publisher_key.public_key()).to_did_key()
);

let subscriber_key = Keypair::generate(&mut rand::thread_rng());
let subscriber = Client::new(&create_conn_opts(
&subscriber_key,
&args.address,
&args.project_id,
))?;
println!(
"[subscriber] client id: {}",
DecodedClientId::from(subscriber_key.public_key()).to_did_key()
);

let topic = Topic::generate();
let message: Arc<str> = Arc::from("Hello WalletConnect!");

let sub_relay_id: DecodedClientId = subscriber
.watch_register(
WatchRegisterRequest {
service_url: server_url.clone(),
webhook_url: format!("{}{}", server_url, SUB_WH_PATH),
watch_type: rpc::WatchType::Subscriber,
tags: vec![1100],
statuses: vec![rpc::WatchStatus::Queued],
ttl: Duration::from_secs(600),
},
&subscriber_key,
)
.await
.unwrap()
.relay_id
.into();
subscriber.subscribe(topic.clone()).await.unwrap();
println!(
"[subscriber] watch registered: relay_id={}",
sub_relay_id.to_did_key()
);

let pub_relay_id: DecodedClientId = publisher
.watch_register(
WatchRegisterRequest {
service_url: server_url.clone(),
webhook_url: format!("{}{}", server_url, PUB_WH_PATH),
watch_type: rpc::WatchType::Publisher,
tags: vec![1100],
statuses: vec![rpc::WatchStatus::Accepted],
ttl: Duration::from_secs(600),
},
&publisher_key,
)
.await
.unwrap()
.relay_id
.into();
println!(
"[publisher] watch registered: relay_id={}",
pub_relay_id.to_did_key()
);

publisher
.publish(
topic.clone(),
message.clone(),
1100,
Duration::from_secs(30),
false,
)
.await
.unwrap();
println!("[publisher] message published: topic={topic} message={message}");

tokio::time::sleep(Duration::from_secs(1)).await;

let messages = subscriber.fetch(topic).await?.messages;
let message = messages
.get(0)
.ok_or(anyhow::anyhow!("fetch did not return any messages"))?;
println!("[subscriber] received message: {}", message.message);

let pub_data = server.recv().await;
let decoded = rpc::WatchEventClaims::try_from_str(&pub_data.payload.event_auth).unwrap();
let decoded_json = serde_json::to_string_pretty(&decoded).unwrap();
println!(
"[webhook] publisher: url={} data={}",
pub_data.url, decoded_json
);

let sub_data = server.recv().await;
let decoded = rpc::WatchEventClaims::try_from_str(&sub_data.payload.event_auth).unwrap();
let decoded_json = serde_json::to_string_pretty(&decoded).unwrap();
println!(
"[webhook] subscriber: url={} data={}",
sub_data.url, decoded_json
);

Ok(())
}
1 change: 1 addition & 0 deletions relay_rpc/src/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub struct JwtBasicClaims {
/// Issued at, timestamp.
pub iat: i64,
/// Expiration, timestamp.
#[serde(skip_serializing_if = "Option::is_none")]
pub exp: Option<i64>,
}

Expand Down
10 changes: 8 additions & 2 deletions relay_rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,13 @@ impl From<WatchError> for GenericError {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WatchRegisterResponse {
/// The Relay's public key (did:key).
pub relay_id: DidKey,
}

/// Data structure representing watch registration request params.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -691,8 +698,7 @@ pub struct WatchRegister {

impl RequestPayload for WatchRegister {
type Error = WatchError;
/// The Relay's public key.
type Response = DidKey;
type Response = WatchRegisterResponse;

fn validate(&self) -> Result<(), ValidationError> {
Ok(())
Expand Down
7 changes: 7 additions & 0 deletions relay_rpc/src/rpc/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ impl VerifyableClaims for WatchEventClaims {
}
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WatchWebhookPayload {
/// JWT with [`WatchEventClaims`] payload.
pub event_auth: String,
}

#[cfg(test)]
mod test {
use {
Expand Down

0 comments on commit 77f5fef

Please sign in to comment.