Skip to content

Commit

Permalink
Merge pull request #1 from njaremko/fix-performance
Browse files Browse the repository at this point in the history
Fix performance
  • Loading branch information
njaremko authored Dec 24, 2024
2 parents 946d9b9 + 93ba46b commit f9e0441
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
osv (0.3.8)
osv (0.3.9)
rb_sys (~> 0.9.39)

GEM
Expand Down
28 changes: 15 additions & 13 deletions benchmark/comparison_benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
require "fileutils"

# Generate a larger test file for more meaningful benchmarks
def generate_test_data(rows = 1_000_000)
def generate_test_data(rows = 100_000)
headers = %w[id name age email city country]
StringIO.new.tap do |io|
io.puts headers.join(",")
Expand Down Expand Up @@ -39,11 +39,13 @@ def generate_test_data(rows = 1_000_000)

x.report("OSV - Hash output") { File.open("benchmark/test.csv") { |f| OSV.for_each(f).to_a } }

x.report("CSV - Hash output") { File.open("benchmark/test.csv") { |f| CSV.new(f, headers: true).map(&:to_h) } }
# x.report("CSV - Hash output") { File.open("benchmark/test.csv") { |f| CSV.new(f, headers: true).map(&:to_h) } }

x.report("OSV - Array output") { File.open("benchmark/test.csv") { |f| OSV.for_each(f, result_type: :array).to_a } }
x.report("OSV - Array output") do
File.open("benchmark/test.csv") { |f| OSV.for_each(f, has_headers: false, result_type: :array).to_a }
end

x.report("CSV - Array output") { File.open("benchmark/test.csv") { |f| CSV.new(f).read } }
# x.report("CSV - Array output") { File.open("benchmark/test.csv") { |f| CSV.new(f).read } }

x.report("FastCSV - Array output") do
result = []
Expand All @@ -57,13 +59,13 @@ def generate_test_data(rows = 1_000_000)
io.close
end

x.report("CSV - StringIO") do
io = StringIO.new(test_data)
result = CSV.new(io, headers: true).map(&:to_h)
io.close
# x.report("CSV - StringIO") do
# io = StringIO.new(test_data)
# result = CSV.new(io, headers: true).map(&:to_h)
# io.close

result
end
# result
# end

x.report("FastCSV - StringIO") do
result = []
Expand All @@ -76,9 +78,9 @@ def generate_test_data(rows = 1_000_000)

x.report("OSV - Gzipped") { Zlib::GzipReader.open("benchmark/test.csv.gz") { |gz| OSV.for_each(gz).to_a } }

x.report("CSV - Gzipped") do
Zlib::GzipReader.open("benchmark/test.csv.gz") { |gz| CSV.new(gz, headers: true).map(&:to_h) }
end
# x.report("CSV - Gzipped") do
# Zlib::GzipReader.open("benchmark/test.csv.gz") { |gz| CSV.new(gz, headers: true).map(&:to_h) }
# end

x.compare!
end
Expand Down
70 changes: 49 additions & 21 deletions ext/osv/src/csv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ use flate2::read::GzDecoder;
use magnus::{rb_sys::AsRawValue, value::ReprValue, Error as MagnusError, RString, Ruby, Value};
use std::{
fs::File,
io::{self, Read},
io::{self, BufReader, Read},
marker::PhantomData,
os::fd::FromRawFd,
thread,
};
use thiserror::Error;

const BATCH_SIZE: usize = 16384;
const READ_BUFFER_SIZE: usize = 16384;
const ROW_BUFFER_SIZE: usize = 16384;

#[derive(Error, Debug)]
pub enum ReaderError {
#[error("Failed to get file descriptor: {0}")]
Expand Down Expand Up @@ -68,7 +72,7 @@ impl<'a, T: RecordParser + Send + 'static> RecordReaderBuilder<'a, T> {
delimiter: b',',
quote_char: b'"',
null_string: None,
buffer: 1000,
buffer: BATCH_SIZE,
flexible: false,
flexible_default: None,
_phantom: PhantomData,
Expand Down Expand Up @@ -128,17 +132,20 @@ impl<'a, T: RecordParser + Send + 'static> RecordReaderBuilder<'a, T> {
}

let file = unsafe { File::from_raw_fd(fd) };
Ok(Box::new(file))
Ok(Box::new(BufReader::with_capacity(READ_BUFFER_SIZE, file)))
}

fn handle_file_path(&self) -> Result<Box<dyn Read + Send + 'static>, ReaderError> {
let path = self.to_read.to_r_string()?.to_string()?;
let file = File::open(&path)?;

Ok(if path.ends_with(".gz") {
Box::new(GzDecoder::new(file))
Box::new(GzDecoder::new(BufReader::with_capacity(
READ_BUFFER_SIZE,
file,
)))
} else {
Box::new(file)
Box::new(BufReader::with_capacity(READ_BUFFER_SIZE, file))
})
}

Expand Down Expand Up @@ -188,11 +195,13 @@ impl<'a, T: RecordParser + Send + 'static> RecordReaderBuilder<'a, T> {
readable: Box<dyn Read + Send + 'static>,
) -> Result<RecordReader<T>, ReaderError> {
let flexible = self.flexible || self.flexible_default.is_some();

let mut reader = csv::ReaderBuilder::new()
.has_headers(self.has_headers)
.delimiter(self.delimiter)
.quote(self.quote_char)
.flexible(flexible)
.buffer_capacity(READ_BUFFER_SIZE)
.from_reader(readable);

let headers = RecordReader::<T>::get_headers(self.ruby, &mut reader, self.has_headers)?;
Expand All @@ -201,24 +210,42 @@ impl<'a, T: RecordParser + Send + 'static> RecordReaderBuilder<'a, T> {

let (sender, receiver) = kanal::bounded(self.buffer);
let null_string = self.null_string.clone();

let flexible_default = self.flexible_default.clone();
let handle = thread::spawn(move || {
let mut record = csv::StringRecord::new();
while let Ok(true) = reader.read_record(&mut record) {
let row = T::parse(
&static_headers,
&record,
null_string.as_deref(),
flexible_default.as_deref(),
);
if sender.send(row).is_err() {
break;

let handle = thread::Builder::new()
.name("csv_parser".to_string())
.stack_size(2 * 1024 * 1024)
.spawn(move || {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut record = csv::StringRecord::with_capacity(ROW_BUFFER_SIZE, headers.len());

while let Ok(true) = reader.read_record(&mut record) {
let row = T::parse(
&static_headers,
&record,
null_string.as_deref(),
flexible_default.as_deref(),
);

batch.push(row);

if batch.len() >= BATCH_SIZE {
for row in batch.drain(..) {
if sender.send(row).is_err() {
return;
}
}
}
}
}
let file_to_forget = reader.into_inner();
std::mem::forget(file_to_forget);
});

for row in batch.drain(..) {
let _ = sender.send(row);
}

let file_to_forget = reader.into_inner();
std::mem::forget(file_to_forget);
})
.map_err(|e| ReaderError::Ruby(e.to_string()))?;

Ok(RecordReader {
reader: ReadImpl::MultiThreaded {
Expand Down Expand Up @@ -250,6 +277,7 @@ impl<'a, T: RecordParser + Send + 'static> RecordReaderBuilder<'a, T> {
headers: static_headers,
null_string: self.null_string,
flexible_default: self.flexible_default,
record_buffer: csv::StringRecord::new(),
},
})
}
Expand Down
49 changes: 27 additions & 22 deletions ext/osv/src/csv/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,40 @@ pub trait RecordParser {
impl RecordParser for HashMap<&'static str, Option<String>> {
type Output = Self;

#[inline]
#[inline(always)]
fn parse(
headers: &[&'static str],
record: &csv::StringRecord,
null_string: Option<&str>,
flexible_default: Option<&str>,
) -> Self::Output {
let mut map = HashMap::with_capacity(headers.len());
headers.iter().enumerate().for_each(|(i, header)| {
let value = record.get(i).map_or_else(
|| flexible_default.map(|s| s.to_string()),
|field| {
if null_string == Some(field) {
let capacity = headers.len();
let mut map = HashMap::with_capacity(capacity);
let default = flexible_default.map(String::from);

while let Some(&header) = headers.iter().next() {
let value = match record.iter().next() {
Some(field) => {
if Some(field) == null_string {
None
} else if field.is_empty() {
Some(String::new())
} else {
Some(field.to_string())
Some(field.into())
}
},
);
map.insert(*header, value);
});
}
None => default.clone(),
};
map.insert(header, value);
}
map
}
}

impl RecordParser for Vec<Option<String>> {
type Output = Self;

#[inline]
#[inline(always)]
fn parse(
headers: &[&'static str],
record: &csv::StringRecord,
Expand All @@ -53,22 +56,24 @@ impl RecordParser for Vec<Option<String>> {
) -> Self::Output {
let target_len = headers.len();
let mut vec = Vec::with_capacity(target_len);
vec.extend(record.iter().map(|field| {
if null_string == Some(field) {

for field in record.iter() {
let value = if Some(field) == null_string {
None
} else if field.is_empty() {
Some(String::new())
} else {
Some(field.to_string())
}
}));
Some(field.into())
};
vec.push(value);
}

// Fill remaining slots with flexible_default if needed
if let Some(default) = flexible_default {
while vec.len() < target_len {
vec.push(Some(default.to_string()));
if vec.len() < target_len {
if let Some(default) = flexible_default {
vec.resize_with(target_len, || Some(default.to_string()));
}
}

vec
}
}
23 changes: 11 additions & 12 deletions ext/osv/src/csv/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub enum ReadImpl<T: RecordParser> {
headers: Vec<&'static str>,
null_string: Option<String>,
flexible_default: Option<String>,
record_buffer: csv::StringRecord,
},
MultiThreaded {
headers: Vec<&'static str>,
Expand Down Expand Up @@ -35,18 +36,16 @@ impl<T: RecordParser> ReadImpl<T> {
headers,
null_string,
flexible_default,
} => {
let mut record = csv::StringRecord::new();
match reader.read_record(&mut record) {
Ok(true) => Some(T::parse(
headers,
&record,
null_string.as_deref(),
flexible_default.as_deref(),
)),
_ => None,
}
}
record_buffer,
} => match reader.read_record(record_buffer) {
Ok(true) => Some(T::parse(
headers,
record_buffer,
null_string.as_deref(),
flexible_default.as_deref(),
)),
_ => None,
},
}
}

Expand Down
2 changes: 0 additions & 2 deletions ext/osv/src/csv/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ impl<T: RecordParser> RecordReader<T> {
})?;

Ok(if has_headers {
// Pre-allocate the vector with exact capacity
let mut headers = Vec::with_capacity(first_row.len());
headers.extend(first_row.iter().map(String::from));
headers
} else {
// Pre-allocate the vector with exact capacity
let mut headers = Vec::with_capacity(first_row.len());
headers.extend((0..first_row.len()).map(|i| format!("c{i}")));
headers
Expand Down
23 changes: 16 additions & 7 deletions ext/osv/src/csv/record.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use magnus::{IntoValue, Ruby, Value};
use magnus::{IntoValue, RArray, Ruby, Value};
use std::collections::HashMap;

#[derive(Debug)]
Expand All @@ -8,16 +8,25 @@ pub enum CsvRecord {
}

impl IntoValue for CsvRecord {
#[inline]
#[inline(always)]
fn into_value_with(self, handle: &Ruby) -> Value {
match self {
CsvRecord::Vec(vec) => vec.into_value_with(handle),
CsvRecord::Vec(vec) => {
let ary = RArray::with_capacity(vec.len());

for opt_str in vec {
let _ = ary.push(opt_str);
}

ary.into_value_with(handle)
}
CsvRecord::Map(map) => {
// Pre-allocate the hash with the known size
let hash = handle.hash_new_capa(map.len());
map.into_iter()
.try_for_each(|(k, v)| hash.aset(k, v))
.unwrap();

for (k, v) in map {
let _ = hash.aset(k, v);
}

hash.into_value_with(handle)
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/osv/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module OSV
VERSION = "0.3.8"
VERSION = "0.3.9"
end

0 comments on commit f9e0441

Please sign in to comment.