Skip to content

Commit

Permalink
Merge pull request #1 from jupyterkat/buffered
Browse files Browse the repository at this point in the history
moves write operations to a different thread and buffer it
  • Loading branch information
alexkar598 authored Jul 26, 2023
2 parents f04a619 + fdcb19f commit 09c2147
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ file = []
git = ["git2", "chrono"]
http = ["reqwest", "serde", "serde_json", "once_cell", "jobs"]
json = ["serde", "serde_json"]
log = ["chrono"]
log = ["chrono", "flume"]
sql = ["mysql", "serde", "serde_json", "once_cell", "dashmap", "jobs"]
time = []
toml = ["serde", "serde_json", "toml-dep"]
Expand Down
107 changes: 75 additions & 32 deletions src/log.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,42 @@
use crate::error::Result;
use chrono::Utc;
use std::{
cell::RefCell,
cell::{Cell, RefCell},
collections::hash_map::{Entry, HashMap},
ffi::OsString,
fs,
fs::{File, OpenOptions},
io::Write,
path::Path,
sync::Once,
thread::JoinHandle,
};

static ONCE: Once = Once::new();

thread_local! {
static FILE_MAP: RefCell<HashMap<OsString, File>> = RefCell::new(HashMap::new());
static FILE_MAP: RefCell<HashMap<OsString, File>> = RefCell::new(HashMap::new()); //on worker thread
static SENDER: RefCell<Option<flume::Sender<Box<(String, String, bool)>>>> = RefCell::new(None); //on main thread
static HANDLE: Cell<Option<JoinHandle<()>>> = Cell::new(None); //on main thread
}

byond_fn!(fn log_write(path, data, ...rest) {
FILE_MAP.with(|cell| -> Result<()> {
// open file
let mut map = cell.borrow_mut();
let path = Path::new(path as &str);
let file = match map.entry(path.into()) {
Entry::Occupied(elem) => elem.into_mut(),
Entry::Vacant(elem) => elem.insert(open(path)?),
};

if rest.first().map(|x| &**x) == Some("false") {
// Write the data to the file with no accoutrements.
write!(file, "{}", data)?;
} else {
// write first line, timestamped
let mut iter = data.split('\n');
if let Some(line) = iter.next() {
write!(file, "[{}] {}\n", Utc::now().format("%F %T%.3f"), line)?;
}

// write remaining lines
for line in iter {
write!(file, " - {}\n", line)?;
}
}

Ok(())
}).err()
init_worker();
SENDER.with(|sender| {
_ = sender.borrow().as_ref().unwrap().send(Box::new(
(path.to_string(), data.to_string(), rest.first().map(|x| &**x) == Some("false"))
))
});
Some("")
});

byond_fn!(
fn log_close_all() {
FILE_MAP.with(|cell| {
let mut map = cell.borrow_mut();
map.clear();
SENDER.with(|cell| cell.replace(None));
HANDLE.with(|cell| {
if let Some(handle) = cell.replace(None) {
let _ = handle.join();
};
});
Some("")
}
Expand All @@ -61,3 +49,58 @@ fn open(path: &Path) -> Result<File> {

Ok(OpenOptions::new().append(true).create(true).open(path)?)
}
fn init_worker() {
ONCE.call_once(|| {
let (sender, receiver) = flume::unbounded();
SENDER.with(|cell| *cell.borrow_mut() = Some(sender));
HANDLE.with(|cell| {
let handle = std::thread::spawn(move || {
loop {
let packet = receiver.recv();

if let Ok(packet) = packet {
let (path, data, rest) = *packet;
_ = FILE_MAP.with(|cell| -> Result<()> {
// open file
let mut map = cell.borrow_mut();
let path = Path::new(&path);
let file = match map.entry(path.into()) {
Entry::Occupied(elem) => elem.into_mut(),
Entry::Vacant(elem) => elem.insert(open(path)?),
};

let mut buffer = std::io::BufWriter::new(file);

if rest {
// Write the data to the file with no accoutrements.
write!(buffer, "{}", data)?;
} else {
// write first line, timestamped
let mut iter = data.split('\n');
if let Some(line) = iter.next() {
write!(
buffer,
"[{}] {}\n",
Utc::now().format("%F %T%.3f"),
line
)?;
}

// write remaining lines
for line in iter {
write!(buffer, " - {}\n", line)?;
}
}

Ok(())
});
} else {
FILE_MAP.with(|cell| cell.borrow_mut().clear());
return;
}
}
});
cell.set(Some(handle));
});
});
}

0 comments on commit 09c2147

Please sign in to comment.