Skip to content

Commit

Permalink
Add possibility to write HDR histogram data to a file in real time
Browse files Browse the repository at this point in the history
Before, it was possible to get HDR histograms data only after
a stress command completion - when report gets generated.
For using latte in performance testing we need to have these data
in real time.

So, add such a possibility.
It gets enabled when following new config option is specified:

  --hdrfile /path/to/hdrfile

If the target file is absent then it will be created automatically.
  • Loading branch information
vponomaryov committed Jan 28, 2025
1 parent 174dbd4 commit bc10cbd
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 26 deletions.
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,14 @@ pub struct RunCommand {
#[clap(long("generate-report"), required = false)]
pub generate_report: bool,

/// Path to a file for streaming HDR histogram data in real-time.
#[clap(
long("hdrfile"),
aliases = &["hdr-file", "hdr-histogram", "hdr-histogram-file"],
required = false
)]
pub hdrfile: Option<PathBuf>,

/// Path to an output file or directory where the JSON report should be written to.
#[clap(short('o'), long)]
#[serde(skip)]
Expand Down
4 changes: 3 additions & 1 deletion src/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::time::MissedTickBehavior;
use tokio_stream::wrappers::IntervalStream;

use crate::error::{LatteError, Result};
use crate::stats::histogram::HistogramWriter;
use crate::{
BenchmarkStats, BoundedCycleCounter, Interval, Progress, Recorder, Workload, WorkloadStats,
};
Expand Down Expand Up @@ -304,6 +305,7 @@ pub async fn par_execute(
workload: Workload,
show_progress: bool,
keep_log: bool,
hdrh_writer: &mut Option<Box<dyn HistogramWriter>>,
) -> Result<BenchmarkStats> {
if exec_options.cycle_range.1 <= exec_options.cycle_range.0 {
return Err(LatteError::Configuration(format!(
Expand All @@ -329,7 +331,7 @@ pub async fn par_execute(
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedCycleCounter::new(exec_options.duration, exec_options.cycle_range);
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency, keep_log);
let mut stats = Recorder::start(rate, concurrency, keep_log, hdrh_writer);

for _ in 0..thread_count {
let s = spawn_stream(
Expand Down
67 changes: 53 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ use std::fs::File;
use std::io::{stdout, Write};
use std::path::{Path, PathBuf};
use std::process::exit;
use std::time::Duration;
use std::time::{Duration, SystemTime};
use std::{env, fs};

use clap::Parser;
use config::RunCommand;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use hdrhistogram::serialization::interval_log::Tag;
use hdrhistogram::serialization::{interval_log, V2DeflateSerializer};
use itertools::Itertools;
use rune::Source;
Expand All @@ -32,6 +31,7 @@ use crate::exec::{par_execute, ExecutionOptions};
use crate::report::{PathAndSummary, Report, RunConfigCmp};
use crate::scripting::connect::ClusterInfo;
use crate::scripting::context::Context;
use crate::stats::histogram::HistogramWriter;
use crate::stats::{BenchmarkCmp, BenchmarkStats, Recorder};
use exec::cycle::BoundedCycleCounter;
use exec::progress::Progress;
Expand Down Expand Up @@ -199,6 +199,7 @@ async fn load(conf: LoadCommand) -> Result<()> {
loader,
!conf.quiet,
false,
&mut None,
)
.await?;

Expand Down Expand Up @@ -264,6 +265,7 @@ async fn run(conf: RunCommand) -> Result<()> {
runner.clone()?,
!conf.quiet,
false,
&mut None,
)
.await?;
}
Expand All @@ -289,16 +291,53 @@ async fn run(conf: RunCommand) -> Result<()> {
};

report::print_log_header();
let stats = match par_execute(
"Running...",
&exec_options,
conf.sampling_interval,
runner,
!conf.quiet,
conf.generate_report,
)
.await
{
let stats = match conf.hdrfile {
Some(ref hdrfile) => {
let path = Path::new(&hdrfile);
if let Some(parent_dir) = path.parent() {
fs::create_dir_all(parent_dir)
.map_err(|e| LatteError::LogFileCreate(hdrfile.clone(), e))?;
}
let hdrfile = File::create(hdrfile)
.map_err(|e| LatteError::LogFileCreate(hdrfile.to_path_buf(), e))?;
let (non_blocking_writer, _hdrh_guard) = tracing_appender::non_blocking(hdrfile);
let non_blocking_writer = Box::new(non_blocking_writer);
let serializer = Box::new(V2DeflateSerializer::new());
let system_time_now = SystemTime::now();
let hdrh_writer = interval_log::IntervalLogWriterBuilder::new()
.add_comment(format!("[Logged with Latte {VERSION}]").as_str())
.with_start_time(system_time_now)
.with_base_time(system_time_now)
.with_max_value_divisor(1000000.0) // ms
.begin_log_with(Box::leak(non_blocking_writer), Box::leak(serializer))
.unwrap();
let boxed_hdrh_writer: Box<dyn HistogramWriter + Send + Sync> = Box::new(hdrh_writer);

par_execute(
"Running...",
&exec_options,
conf.sampling_interval,
runner,
!conf.quiet,
conf.generate_report,
&mut Some(boxed_hdrh_writer),
)
.await
}
None => {
par_execute(
"Running...",
&exec_options,
conf.sampling_interval,
runner,
!conf.quiet,
conf.generate_report,
&mut None,
)
.await
}
};
let stats = match stats {
Ok(stats) => stats,
Err(e) => {
return Err(e);
Expand Down Expand Up @@ -470,13 +509,13 @@ async fn export_hdr_log(conf: HdrCommand) -> Result<()> {
&sample.cycle_latency.histogram.0,
interval_start_time,
interval_duration,
Tag::new(format!("{tag_prefix}cycles").as_str()),
interval_log::Tag::new(format!("{tag_prefix}cycles").as_str()),
)?;
log_writer.write_histogram(
&sample.request_latency.histogram.0,
interval_start_time,
interval_duration,
Tag::new(format!("{tag_prefix}requests").as_str()),
interval_log::Tag::new(format!("{tag_prefix}requests").as_str()),
)?;
}
Ok(())
Expand Down
38 changes: 36 additions & 2 deletions src/stats/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use base64::{engine::general_purpose as base64_engine, Engine as _};
use std::fmt;
use std::io::Cursor;
use std::io;
use std::time::Duration;

use hdrhistogram::serialization::interval_log::{IntervalLogWriter, Tag};
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer};
use hdrhistogram::Histogram;
use serde::de::{Error, Visitor};
Expand Down Expand Up @@ -42,7 +44,7 @@ impl Visitor<'_> for HistogramVisitor {
let decoded = base64_engine::STANDARD
.decode(v)
.map_err(|e| E::custom(format!("Not a valid base64 value. {e}")))?;
let mut cursor = Cursor::new(&decoded);
let mut cursor = io::Cursor::new(&decoded);
let mut deserializer = hdrhistogram::serialization::Deserializer::new();
Ok(SerializableHistogram(
deserializer
Expand All @@ -60,3 +62,35 @@ impl<'de> Deserialize<'de> for SerializableHistogram {
deserializer.deserialize_str(HistogramVisitor)
}
}

pub trait HistogramWriter {
fn write_histogram(
&mut self,
histogram: &Histogram<u64>,
interval_start_time: Duration,
interval_duration: Duration,
tag: Tag,
) -> io::Result<()>;
}

impl<W, S> HistogramWriter for IntervalLogWriter<'_, '_, W, S>
where
W: io::Write + Send + Sync + 'static,
S: hdrhistogram::serialization::Serializer + 'static,
{
fn write_histogram(
&mut self,
histogram: &Histogram<u64>,
interval_start_time: Duration,
interval_duration: Duration,
tag: Tag,
) -> io::Result<()> {
self.write_histogram(histogram, interval_start_time, interval_duration, Some(tag))
.map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("Serialization error: {:?}", e),
)
})
}
}
55 changes: 46 additions & 9 deletions src/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use chrono::{DateTime, Local};
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::ops::Mul;
use std::time::{Instant, SystemTime};
use std::time::{Duration, Instant, SystemTime};

use crate::exec::workload::WorkloadStats;
use crate::stats::latency::{LatencyDistribution, LatencyDistributionRecorder};
use cpu_time::ProcessTime;
use hdrhistogram::serialization::interval_log;
use percentiles::Percentile;
use serde::{Deserialize, Serialize};
use statrs::distribution::{ContinuousCDF, StudentsT};
use throughput::ThroughputMeter;
use timeseries::TimeSeriesStats;

use crate::stats::histogram::HistogramWriter;

pub mod histogram;
pub mod latency;
pub mod percentiles;
Expand Down Expand Up @@ -283,7 +286,7 @@ impl BenchmarkCmp<'_> {
/// throughput and response time distributions. Computes confidence intervals.
/// Can be also used to split the time-series into smaller sub-samples and to
/// compute statistics for each sub-sample separately.
pub struct Recorder {
pub struct Recorder<'a> {
pub start_time: SystemTime,
pub end_time: SystemTime,
pub start_instant: Instant,
Expand All @@ -306,17 +309,19 @@ pub struct Recorder {
rate_limit: Option<f64>,
concurrency_limit: NonZeroUsize,
keep_log: bool,
hdrh_writer: &'a mut Option<Box<dyn HistogramWriter>>,
}

impl Recorder {
impl Recorder<'_> {
/// Creates a new recorder.
/// The `rate_limit` and `concurrency_limit` parameters are used only as the
/// reference levels for relative throughput and relative parallelism.
pub fn start(
rate_limit: Option<f64>,
concurrency_limit: NonZeroUsize,
keep_log: bool,
) -> Recorder {
hdrh_writer: &mut Option<Box<dyn HistogramWriter>>,
) -> Recorder<'_> {
let start_time = SystemTime::now();
let start_instant = Instant::now();
Recorder {
Expand All @@ -342,25 +347,38 @@ impl Recorder {
throughput_meter: ThroughputMeter::default(),
concurrency_meter: TimeSeriesStats::default(),
keep_log,
hdrh_writer,
}
}

/// Adds the statistics of the completed request to the already collected statistics.
/// Called on completion of each sample.
pub fn record(&mut self, samples: &[WorkloadStats]) -> &Sample {
assert!(!samples.is_empty());
for s in samples.iter() {
pub fn record(&mut self, workload_stats: &[WorkloadStats]) -> &Sample {
assert!(!workload_stats.is_empty());
let mut current_sample_latency_recorder: Option<
HashMap<String, LatencyDistributionRecorder>,
> = if self.hdrh_writer.is_some() {
Some(HashMap::new())
} else {
None
};
for s in workload_stats.iter() {
self.request_latency.add(&s.session_stats.resp_times_ns);

for fs in &s.function_stats {
self.cycle_latency.add(&fs.call_latency);
self.cycle_latency_by_fn
.entry(fs.function.name.clone())
.or_default()
.add(&fs.call_latency);
if let Some(ref mut recorder) = current_sample_latency_recorder {
recorder
.entry(fs.function.name.clone())
.or_default()
.add(&fs.call_latency);
}
}
}
let sample = Sample::new(self.start_instant, samples);
let sample = Sample::new(self.start_instant, workload_stats);
self.cycle_count += sample.cycle_count;
self.cycle_error_count += sample.cycle_error_count;
self.request_count += sample.request_count;
Expand All @@ -376,6 +394,25 @@ impl Recorder {
if !self.keep_log {
self.log.clear();
}

// Write HDR histogram data
if let Some(hdrh_writer) = self.hdrh_writer {
if let Some(ref recorder) = current_sample_latency_recorder {
let interval_start_time = Duration::from_millis((sample.time_s * 1000.0) as u64);
let interval_duration = Duration::from_millis((sample.duration_s * 1000.0) as u64);
for (fn_name, fn_latencies) in recorder.iter() {
hdrh_writer
.write_histogram(
&fn_latencies.distribution().histogram.0,
interval_start_time,
interval_duration,
interval_log::Tag::new(&format!("fn--{fn_name}")).expect("REASON"),
)
.unwrap();
}
}
}

self.log.push(sample);
self.log.last().unwrap()
}
Expand Down

0 comments on commit bc10cbd

Please sign in to comment.