Skip to content

Commit

Permalink
chore: update Serverinfo to use struct (#71)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Aug 10, 2024
1 parent bebcd2d commit 867ea8c
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 36 deletions.
5 changes: 2 additions & 3 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,9 @@ impl<T> crate::batchmap::Server<T> {
where
T: BatchMapper + Send + Sync + 'static,
{
let mut info = shared::default_info_file();
let mut info = shared::ServerInfo::default();
// update the info json metadata field, and add the map mode
info["metadata"][shared::MAP_MODE_KEY] =
serde_json::Value::String(shared::BATCH_MAP.to_string());
info.set_metadata(shared::MAP_MODE_KEY, shared::BATCH_MAP);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ impl<T> Server<T> {
where
T: Mapper + Send + Sync + 'static,
{
let mut info = shared::default_info_file();
let mut info = shared::ServerInfo::default();
// update the info json metadata field, and add the map mode key value pair
info["metadata"][shared::MAP_MODE_KEY] =
serde_json::Value::String(shared::UNARY_MAP.to_string());
info.set_metadata(shared::MAP_MODE_KEY, shared::UNARY_MAP);

let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ impl<C> Server<C> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let creator = self.creator.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = channel(1);
Expand Down
88 changes: 63 additions & 25 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use serde::{Deserialize, Serialize};
use tokio::net::UnixListener;
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -16,43 +17,81 @@ pub(crate) const STREAM_MAP: &str = "stream-map";
pub(crate) const BATCH_MAP: &str = "batch-map";
const MINIMUM_NUMAFLOW_VERSION: &str = "1.2.0-rc4";

// default_info_file is a function to get a default server info json
// file content. This is used to write the server info file.
// This function is used in the write_info_file function.
// This function is not exposed to the user.
pub fn default_info_file() -> serde_json::Value {
let metadata: HashMap<String, String> = HashMap::new();
serde_json::json!({
"protocol": "uds",
"language": "rust",
"version": "0.0.1",
"metadata": metadata,
"minimum-numaflow-version": MINIMUM_NUMAFLOW_VERSION,
})
// ServerInfo structure to store server-related information
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct ServerInfo {
#[serde(default)]
protocol: String,
#[serde(default)]
language: String,
#[serde(default)]
minimum_numaflow_version: String,
#[serde(default)]
version: String,
#[serde(default)]
metadata: Option<HashMap<String, String>>, // Metadata is optional
}
impl ServerInfo {
// default_info_file is a function to get a default server info json
// file content. This is used to write the server info file.
// This function is used in the write_info_file function.
// This function is not exposed to the user.
pub fn default() -> Self {
let metadata: HashMap<String, String> = HashMap::new();
// Return the default server info json content
// Create a ServerInfo object with default values
ServerInfo {
protocol: "uds".to_string(),
language: "rust".to_string(),
minimum_numaflow_version: MINIMUM_NUMAFLOW_VERSION.to_string(),
version: "0.0.1".to_string(),
metadata: Option::from(metadata),
}
}

// Check if the struct is empty
pub fn is_empty(&self) -> bool {
self.protocol.is_empty()
&& self.language.is_empty()
&& self.minimum_numaflow_version.is_empty()
&& self.version.is_empty()
&& self.metadata.is_none()
}

// Set metadata key-value pair
pub fn set_metadata(&mut self, key: &str, value: &str) {
if let Some(metadata) = &mut self.metadata {
metadata.insert(key.to_string(), value.to_string());
} else {
let mut metadata = HashMap::new();
metadata.insert(key.to_string(), value.to_string());
self.metadata = Some(metadata);
}
}
}

// #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))]
#[tracing::instrument(fields(path = ? path.as_ref()))]
fn write_info_file(path: impl AsRef<Path>, mut server_info: serde_json::Value) -> io::Result<()> {
fn write_info_file(path: impl AsRef<Path>, mut server_info: ServerInfo) -> io::Result<()> {
let parent = path.as_ref().parent().unwrap();
fs::create_dir_all(parent)?;

// TODO: make port-number and CPU meta-data configurable, e.g., ("CPU_LIMIT", "1")

// if server_info object is not provided, use the default one
if server_info.is_null() {
server_info = default_info_file();
// If the server_info is empty, set it to the default
if server_info.is_empty() {
server_info = ServerInfo::default();
}

// Convert to a string of JSON and print it out
let content = format!("{}U+005C__END__", server_info);
let serialized = serde_json::to_string(&server_info).unwrap();
let content = format!("{}U+005C__END__", serialized);
info!(content, "Writing to file");
fs::write(path, content)
}

pub(crate) fn create_listener_stream(
socket_file: impl AsRef<Path>,
server_info_file: impl AsRef<Path>,
server_info: serde_json::Value,
server_info: ServerInfo,
) -> Result<UnixListenerStream, Box<dyn std::error::Error + Send + Sync>> {
write_info_file(server_info_file, server_info)
.map_err(|e| format!("writing info file: {e:?}"))?;
Expand Down Expand Up @@ -170,10 +209,9 @@ mod tests {
let temp_file = NamedTempFile::new()?;

// Get a default server info file content
// let server_info = default_info_file();
let mut info = default_info_file();
let mut info = ServerInfo::default();
// update the info json metadata field, and add the map mode key value pair
info["metadata"][MAP_MODE_KEY] = serde_json::Value::String(BATCH_MAP.to_string());
info.set_metadata(MAP_MODE_KEY, BATCH_MAP);

// Call write_info_file with the path of the temporary file
write_info_file(temp_file.path(), info)?;
Expand All @@ -188,7 +226,7 @@ mod tests {
assert!(contents.contains(r#""language":"rust""#));
assert!(contents.contains(r#""version":"0.0.1""#));
assert!(contents.contains(r#""metadata":{"MAP_MODE":"batch-map"}"#));
assert!(contents.contains(r#""minimum-numaflow-version":"1.2.0-rc4""#));
assert!(contents.contains(r#""minimum_numaflow_version":"1.2.0-rc4""#));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
2 changes: 1 addition & 1 deletion src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let cln_token = CancellationToken::new();
Expand Down
2 changes: 1 addition & 1 deletion src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
2 changes: 1 addition & 1 deletion src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down

0 comments on commit 867ea8c

Please sign in to comment.