Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update Serverinfo to use struct #71

Merged
merged 3 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,9 @@ impl<T> crate::batchmap::Server<T> {
where
T: BatchMapper + Send + Sync + 'static,
{
let mut info = shared::default_info_file();
let mut info = shared::ServerInfo::default();
// update the info json metadata field, and add the map mode
info["metadata"][shared::MAP_MODE_KEY] =
serde_json::Value::String(shared::BATCH_MAP.to_string());
info.set_metadata(shared::MAP_MODE_KEY, shared::BATCH_MAP);
let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ impl<T> Server<T> {
where
T: Mapper + Send + Sync + 'static,
{
let mut info = shared::default_info_file();
let mut info = shared::ServerInfo::default();
// update the info json metadata field, and add the map mode key value pair
info["metadata"][shared::MAP_MODE_KEY] =
serde_json::Value::String(shared::UNARY_MAP.to_string());
info.set_metadata(shared::MAP_MODE_KEY, shared::UNARY_MAP);

let listener =
shared::create_listener_stream(&self.sock_addr, &self.server_info_file, info)?;
let handler = self.svc.take().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ impl<C> Server<C> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let creator = self.creator.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = channel(1);
Expand Down
88 changes: 63 additions & 25 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use serde::{Deserialize, Serialize};
use tokio::net::UnixListener;
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
Expand All @@ -16,43 +17,81 @@ pub(crate) const STREAM_MAP: &str = "stream-map";
pub(crate) const BATCH_MAP: &str = "batch-map";
const MINIMUM_NUMAFLOW_VERSION: &str = "1.2.0-rc4";

// default_info_file is a function to get a default server info json
// file content. This is used to write the server info file.
// This function is used in the write_info_file function.
// This function is not exposed to the user.
pub fn default_info_file() -> serde_json::Value {
let metadata: HashMap<String, String> = HashMap::new();
serde_json::json!({
"protocol": "uds",
"language": "rust",
"version": "0.0.1",
"metadata": metadata,
"minimum-numaflow-version": MINIMUM_NUMAFLOW_VERSION,
})
// ServerInfo structure to store server-related information
#[derive(Serialize, Deserialize, Debug)]
pub(crate) struct ServerInfo {
#[serde(default)]
protocol: String,
#[serde(default)]
language: String,
#[serde(default)]
minimum_numaflow_version: String,
#[serde(default)]
version: String,
#[serde(default)]
metadata: Option<HashMap<String, String>>, // Metadata is optional
}
impl ServerInfo {
// default_info_file is a function to get a default server info json
// file content. This is used to write the server info file.
// This function is used in the write_info_file function.
// This function is not exposed to the user.
pub fn default() -> Self {
let metadata: HashMap<String, String> = HashMap::new();
// Return the default server info json content
// Create a ServerInfo object with default values
ServerInfo {
protocol: "uds".to_string(),
language: "rust".to_string(),
minimum_numaflow_version: MINIMUM_NUMAFLOW_VERSION.to_string(),
vigith marked this conversation as resolved.
Show resolved Hide resolved
version: "0.0.1".to_string(),
metadata: Option::from(metadata),
}
}

// Check if the struct is empty
pub fn is_empty(&self) -> bool {
self.protocol.is_empty()
&& self.language.is_empty()
&& self.minimum_numaflow_version.is_empty()
&& self.version.is_empty()
&& self.metadata.is_none()
}

// Set metadata key-value pair
pub fn set_metadata(&mut self, key: &str, value: &str) {
if let Some(metadata) = &mut self.metadata {
metadata.insert(key.to_string(), value.to_string());
} else {
let mut metadata = HashMap::new();
metadata.insert(key.to_string(), value.to_string());
self.metadata = Some(metadata);
}
}
}

// #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))]
#[tracing::instrument(fields(path = ? path.as_ref()))]
fn write_info_file(path: impl AsRef<Path>, mut server_info: serde_json::Value) -> io::Result<()> {
fn write_info_file(path: impl AsRef<Path>, mut server_info: ServerInfo) -> io::Result<()> {
let parent = path.as_ref().parent().unwrap();
fs::create_dir_all(parent)?;

// TODO: make port-number and CPU meta-data configurable, e.g., ("CPU_LIMIT", "1")

// if server_info object is not provided, use the default one
if server_info.is_null() {
server_info = default_info_file();
// If the server_info is empty, set it to the default
if server_info.is_empty() {
server_info = ServerInfo::default();
}

// Convert to a string of JSON and print it out
let content = format!("{}U+005C__END__", server_info);
let serialized = serde_json::to_string(&server_info).unwrap();
let content = format!("{}U+005C__END__", serialized);
info!(content, "Writing to file");
fs::write(path, content)
}

pub(crate) fn create_listener_stream(
socket_file: impl AsRef<Path>,
server_info_file: impl AsRef<Path>,
server_info: serde_json::Value,
server_info: ServerInfo,
) -> Result<UnixListenerStream, Box<dyn std::error::Error + Send + Sync>> {
write_info_file(server_info_file, server_info)
.map_err(|e| format!("writing info file: {e:?}"))?;
Expand Down Expand Up @@ -170,10 +209,9 @@ mod tests {
let temp_file = NamedTempFile::new()?;

// Get a default server info file content
// let server_info = default_info_file();
let mut info = default_info_file();
let mut info = ServerInfo::default();
// update the info json metadata field, and add the map mode key value pair
info["metadata"][MAP_MODE_KEY] = serde_json::Value::String(BATCH_MAP.to_string());
info.set_metadata(MAP_MODE_KEY, BATCH_MAP);

// Call write_info_file with the path of the temporary file
write_info_file(temp_file.path(), info)?;
Expand All @@ -188,7 +226,7 @@ mod tests {
assert!(contents.contains(r#""language":"rust""#));
assert!(contents.contains(r#""version":"0.0.1""#));
assert!(contents.contains(r#""metadata":{"MAP_MODE":"batch-map"}"#));
assert!(contents.contains(r#""minimum-numaflow-version":"1.2.0-rc4""#));
assert!(contents.contains(r#""minimum_numaflow_version":"1.2.0-rc4""#));

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
2 changes: 1 addition & 1 deletion src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let cln_token = CancellationToken::new();
Expand Down
2 changes: 1 addition & 1 deletion src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
2 changes: 1 addition & 1 deletion src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(
&self.sock_addr,
&self.server_info_file,
shared::default_info_file(),
shared::ServerInfo::default(),
)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
Expand Down
Loading