Skip to content

Commit

Permalink
Add consecutive filters
Browse files Browse the repository at this point in the history
Co-authored-by: Tony Giorgio <[email protected]>
  • Loading branch information
TonyGiorgio authored and benthecarman committed Jul 12, 2023
1 parent 9b6c079 commit 2e118ae
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ build/
/target
/dist
.dev.vars
.wrangler
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ default = ["console_error_panic_hook"]

[dependencies]
cfg-if = "0.1.2"
worker = { version = "0.0.13", features = ["queue"] }
worker = { version = "0.0.13", features = ["queue"] }
futures = "0.3.26"
futures-util = { version = "0.3", default-features = false }
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
serde = { version = "^1.0", features = ["derive"] }

Expand Down
130 changes: 103 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use ::nostr::{
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
};
use futures::StreamExt;
use futures_util::lock::Mutex;
use serde::{Deserialize, Serialize};
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use worker::*;

Expand Down Expand Up @@ -181,6 +184,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
server.accept()?;
console_log!("accepted websocket, about to spawn event stream");
wasm_bindgen_futures::spawn_local(async move {
let running_thread = Arc::new(AtomicBool::new(false));
let requested_filters = Arc::new(Mutex::new(Filter::new()));
let mut event_stream = server.events().expect("stream error");
console_log!("spawned event stream, waiting for first message..");
while let Some(event) = event_stream.next().await {
Expand Down Expand Up @@ -307,6 +312,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
// by reading storage and sending any new events
// one caveat is that this will send events multiple
// times if they are in multiple filters
let mut valid = false;
for filter in filters {
let valid_nwc = {
// has correct kinds
Expand All @@ -324,36 +330,55 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
};

if valid_nwc {
let ctx_clone = ctx.clone();
let sub_id = subscription_id.clone();
let server_clone = server.clone();
wasm_bindgen_futures::spawn_local(async move {
console_log!("Got NWC filter!");
let mut sent_events = vec![];
loop {
match handle_filter(
&sent_events,
sub_id.clone(),
filter.clone(),
&server_clone,
&ctx_clone,
)
.await
{
Ok(new_event_ids) => {
// add new events to sent events
sent_events.extend(new_event_ids);
}
Err(e) => console_log!(
let mut master_guard = requested_filters.lock().await;
let master_filter = master_guard.deref_mut();
// now add the new filters to the main filter
// object. This is a bit of a hack but we only
// check certain sub filters for NWC.
combine_filters(master_filter, &filter);
drop(master_guard);
valid = true;
}
}

// only spin up a new one if there's not a
// spawn_local already going with filters
// when other filters are added in, it should
// be picked up in the master filter
if !running_thread.load(Ordering::Relaxed) && valid {
// set running thread to true
running_thread.swap(true, Ordering::Relaxed);

let ctx_clone = ctx.clone();
let sub_id = subscription_id.clone();
let server_clone = server.clone();
let master_clone = requested_filters.clone();
wasm_bindgen_futures::spawn_local(async move {
console_log!("Got NWC filter!");
let mut sent_events = vec![];
loop {
let master = master_clone.lock().await;
console_log!("Looping through filter handling...");
match handle_filter(
&sent_events,
sub_id.clone(),
master.clone(),
&server_clone,
&ctx_clone,
).await
{
Ok(new_event_ids) => {
// add new events to sent events
sent_events.extend(new_event_ids);
}
Err(e) => console_log!(
"error handling filter: {e}"
),
}

utils::delay(10_000).await;
}
});
}
} {
utils::delay(10_000).await;
}
});
} else {
// if not a nwc filter, we just send EOSE
let relay_msg = RelayMessage::new_eose(subscription_id);
server
Expand Down Expand Up @@ -637,6 +662,57 @@ pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Resul
Ok(())
}

fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) {
if let Some(vec) = &new_filter.ids {
master_filter
.ids
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.authors {
master_filter
.authors
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.kinds {
master_filter
.kinds
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.events {
master_filter
.events
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.pubkeys {
master_filter
.pubkeys
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.hashtags {
master_filter
.hashtags
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.references {
master_filter
.references
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
if let Some(vec) = &new_filter.identifiers {
master_filter
.identifiers
.get_or_insert_with(Vec::new)
.extend(vec.clone());
}
}

fn relay_response(msg: RelayMessage) -> worker::Result<Response> {
Response::from_json(&msg)?.with_cors(&cors())
}
Expand Down

0 comments on commit 2e118ae

Please sign in to comment.