Skip to content

Commit

Permalink
Create a better error handling while we analyze the event line
Browse files Browse the repository at this point in the history
  • Loading branch information
tipogi committed Jan 22, 2025
1 parent e26935f commit 02c9d60
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/events/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ pub enum EventProcessorError {
#[error("SkipIndexing: The PUT event appears to be unindexed")]
SkipIndexing,
// The event could not be parsed from a line
#[error("InvalidEvent: {message}")]
InvalidEvent { message: String },
#[error("InvalidEventLine: {message}")]
InvalidEventLine { message: String },
// The Pubky client could not resolve the pubky
#[error("PubkyClientError: {message}")]
PubkyClientError { message: String },
Expand Down
51 changes: 38 additions & 13 deletions src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ impl Event {
debug!("New event: {}", line);
let parts: Vec<&str> = line.split(' ').collect();
if parts.len() != 2 {
return Err(EventProcessorError::InvalidEvent {
message: format!("Malformed event line: {}", line),
return Err(EventProcessorError::InvalidEventLine {
message: format!("Malformed event line, {}", line),
}
.into());
}
Expand All @@ -86,15 +86,16 @@ impl Event {
"PUT" => EventType::Put,
"DEL" => EventType::Del,
_ => {
return Err(EventProcessorError::InvalidEvent {
message: format!("Unknown event type: {}", parts[0]),
return Err(EventProcessorError::InvalidEventLine {
message: format!("Unknown event type, {}", line),
}
.into())
}
};

let uri = parts[1].to_string();
let parsed_uri = ParsedUri::try_from(uri.as_str()).unwrap_or_default();
let parsed_uri = ParsedUri::try_from(uri.as_str())?;
println!("ParsedURI: {:?}", parsed_uri);

//TODO: This conversion to a match statement that only uses IF conditions is silly.
// We could be patter matching the split test for "posts", "follows", etc maybe?
Expand All @@ -104,34 +105,58 @@ impl Event {
},
_ if uri.contains("/posts/") => ResourceType::Post {
author_id: parsed_uri.user_id,
post_id: parsed_uri.post_id.ok_or("Missing post_id")?,
post_id: parsed_uri
.post_id
.ok_or(EventProcessorError::InvalidEventLine {
message: format!("Missing post_id, {} {}", parts[0], uri),
})?,
},
_ if uri.contains("/follows/") => ResourceType::Follow {
follower_id: parsed_uri.user_id,
followee_id: parsed_uri.follow_id.ok_or("Missing followee_id")?,
followee_id: parsed_uri
.follow_id
.ok_or(EventProcessorError::InvalidEventLine {
message: format!("Missing followee_id, {} {}", parts[0], uri),
})?,
},
_ if uri.contains("/mutes/") => ResourceType::Mute {
user_id: parsed_uri.user_id,
muted_id: parsed_uri.muted_id.ok_or("Missing muted_id")?,
muted_id: parsed_uri
.muted_id
.ok_or(EventProcessorError::InvalidEventLine {
message: format!("Missing muted_id, {} {}", parts[0], uri),
})?,
},
_ if uri.contains("/bookmarks/") => ResourceType::Bookmark {
user_id: parsed_uri.user_id,
bookmark_id: parsed_uri.bookmark_id.ok_or("Missing bookmark_id")?,
bookmark_id: parsed_uri.bookmark_id.ok_or(
EventProcessorError::InvalidEventLine {
message: format!("Missing bookmark_id, {} {}", parts[0], uri),
},
)?,
},
_ if uri.contains("/tags/") => ResourceType::Tag {
user_id: parsed_uri.user_id,
tag_id: parsed_uri.tag_id.ok_or("Missing tag_id")?,
tag_id: parsed_uri
.tag_id
.ok_or(EventProcessorError::InvalidEventLine {
message: format!("Missing tag_id, {} {}", parts[0], uri),
})?,
},
_ if uri.contains("/files/") => ResourceType::File {
user_id: parsed_uri.user_id,
file_id: parsed_uri.file_id.ok_or("Missing file_id")?,
file_id: parsed_uri
.file_id
.ok_or(EventProcessorError::InvalidEventLine {
message: format!("Missing file_id, {} {}", parts[0], uri),
})?,
},
_ if uri.contains("/blobs") => return Ok(None),
_ if uri.contains("/last_read") => return Ok(None),
_ if uri.contains("/settings") => return Ok(None),
_ => {
return Err(EventProcessorError::InvalidEvent {
message: format!("Unrecognized resource in URI: {}", uri),
return Err(EventProcessorError::InvalidEventLine {
message: format!("Unrecognized resource in URI, {} {}", parts[0], uri),
}
.into())
}
Expand Down
7 changes: 5 additions & 2 deletions src/events/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ impl EventProcessor {
Ok(_) => Ok(()),
Err(e) => {
let retry_event = match e.downcast_ref::<EventProcessorError>() {
Some(EventProcessorError::InvalidEventLine { message }) => {
error!("{}", message);
return Ok(());
}
Some(event_processor_error) => RetryEvent::new(event_processor_error.clone()),
// Others errors must be logged at least for now
None => {
error!("Unhandled error type for URI: {}. {:?}", event.uri, e);
error!("Unhandled error type for URI: {}, {:?}", event.uri, e);
return Ok(());
}
};
Expand All @@ -160,7 +164,6 @@ impl EventProcessor {
.await
{
Ok(_) => {
info!("Message send to the RetryManager succesfully from the channel");
// TODO: Investigate non-blocking alternatives
// The current use of `tokio::time::sleep` (in the watcher tests) is intended to handle a situation where tasks in other threads
// are not being tracked. This could potentially lead to issues with writing `RetryEvents` in certain cases.
Expand Down
2 changes: 1 addition & 1 deletion src/events/retry/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl RetryManager {
index_key: String,
retry_event: RetryEvent,
) -> Result<(), DynError> {
error!("{}: {}", retry_event.error_type, index_key);
error!("{}, {}", retry_event.error_type, index_key);
retry_event.put_to_index(index_key).await?;
Ok(())
}
Expand Down
78 changes: 56 additions & 22 deletions src/events/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::types::DynError;
use crate::types::PubkyId;
use std::convert::TryFrom;

use super::error::EventProcessorError;

#[derive(Default, Debug)]
pub struct ParsedUri {
pub user_id: PubkyId,
Expand All @@ -21,54 +23,72 @@ impl TryFrom<&str> for ParsedUri {

// Ensure the URI starts with the correct prefix
if !uri.starts_with("pubky://") {
return Err("Invalid URI, must start with pubky://".into());
return Err(EventProcessorError::InvalidEventLine {
message: format!("Invalid URI, must start with pubky://, {}", uri),
}
.into());
}

// Extract the user_id from the initial part of the URI
if let Some(user_id) = extract_segment(uri, "pubky://", "/pub/") {
parsed_uri.user_id = PubkyId::try_from(user_id)?;
} else {
return Err("Uri Pubky ID is invalid".into());
return Err(EventProcessorError::InvalidEventLine {
message: format!("Uri Pubky ID is invalid, {}", uri),
}
.into());
}

// Ensure that the URI belongs to pubky.app
if let Some(app_segment) = extract_segment(uri, "/pub/", "/") {
if app_segment != "pubky.app" {
return Err("The Event URI does not belong to pubky.app".into());
return Err(EventProcessorError::InvalidEventLine {
message: format!("The Event URI does not belong to pubky.app, {}", uri),
}
.into());
}
} else {
return Err("The Event URI is malformed".into());
return Err(EventProcessorError::InvalidEventLine {
message: format!("The Event URI is malformed, {}", uri),
}
.into());
}

// Extract post_id if present
if let Some(post_id) = extract_segment(uri, "/posts/", "/") {
parsed_uri.post_id = Some(post_id.to_string());
}
parsed_uri.post_id = extract_segment(uri, "/posts/", "/")
.filter(|id| !id.is_empty())
.map(String::from);

// Extract follow_id if present
if let Some(follow_id) = extract_segment(uri, "/follows/", "/") {
parsed_uri.follow_id = Some(PubkyId::try_from(follow_id)?);
}
parsed_uri.follow_id = extract_segment(uri, "/follows/", "/")
.map(PubkyId::try_from)
.transpose()
.map_err(|e| EventProcessorError::InvalidEventLine {
message: format!("{}, {}", e, uri),
})?;

// Extract muted_id if present
if let Some(muted_id) = extract_segment(uri, "/mutes/", "/") {
parsed_uri.muted_id = Some(PubkyId::try_from(muted_id)?);
}
parsed_uri.muted_id = extract_segment(uri, "/mutes/", "/")
.map(PubkyId::try_from)
.transpose()
.map_err(|e| EventProcessorError::InvalidEventLine {
message: format!("{}, {}", e, uri),
})?;

// Extract bookmark_id if present
if let Some(bookmark_id) = extract_segment(uri, "/bookmarks/", "/") {
parsed_uri.bookmark_id = Some(bookmark_id.to_string());
}
parsed_uri.bookmark_id = extract_segment(uri, "/bookmarks/", "/")
.filter(|id| !id.is_empty())
.map(String::from);

// Extract tag_id if present
if let Some(tag_id) = extract_segment(uri, "/tags/", "/") {
parsed_uri.tag_id = Some(tag_id.to_string());
}
parsed_uri.tag_id = extract_segment(uri, "/tags/", "/")
.filter(|id| !id.is_empty())
.map(String::from);

// Extract file_id if present
if let Some(file_id) = extract_segment(uri, "/files/", "/") {
parsed_uri.file_id = Some(file_id.to_string());
}
parsed_uri.file_id = extract_segment(uri, "/files/", "/")
.filter(|id| !id.is_empty())
.map(String::from);

Ok(parsed_uri)
}
Expand All @@ -83,3 +103,17 @@ fn extract_segment<'a>(uri: &'a str, start_pattern: &str, end_pattern: &str) ->

Some(&uri[start_idx..end_idx])
}

#[cfg(test)]
mod tests {

use super::*;

#[test]
fn test_bookmark_uri() {
let uri =
"pubky://phbhg3qgcttn95guepmbud1nzcxhg3xc5j5k4h7i8a4b6wb3nw1o/pub/pubky.app/bookmarks/";
let parsed_uri = ParsedUri::try_from(uri).unwrap_or_default();
println!("ParsedUri: {:?}", parsed_uri);
}
}

0 comments on commit 02c9d60

Please sign in to comment.