Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve "from_list/2" decoding of terms for binary series #929

Merged
merged 4 commits into from
Jun 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 73 additions & 55 deletions native/explorer/src/series/from_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub fn s_from_list_date(name: &str, val: Term) -> Result<ExSeries, ExplorerError

let values: Vec<Option<i32>> = iterator
.map(|item| match item.get_type() {
TermType::Integer => item.decode::<i32>().map(Some).map_err(|err| {
TermType::Integer => item.decode::<Option<i32>>().map_err(|err| {
ExplorerError::Other(format!("int number is too big for an i32: {err:?}"))
}),
TermType::Map => item
Expand Down Expand Up @@ -55,7 +55,7 @@ pub fn s_from_list_naive_datetime(

let values: Vec<Option<i64>> = iterator
.map(|item| match item.get_type() {
TermType::Integer => item.decode::<i64>().map(Some).map_err(|err| {
TermType::Integer => item.decode::<Option<i64>>().map_err(|err| {
ExplorerError::Other(format!("int number is too big for an i64: {err:?}"))
}),
TermType::Map => item
Expand Down Expand Up @@ -98,7 +98,7 @@ pub fn s_from_list_datetime(

let values: Vec<Option<i64>> = iterator
.map(|item| match item.get_type() {
TermType::Integer => item.decode::<i64>().map(Some).map_err(|err| {
TermType::Integer => item.decode::<Option<i64>>().map_err(|err| {
ExplorerError::Other(format!("int number is too big for an i64: {err:?}"))
}),
TermType::Map => item
Expand Down Expand Up @@ -139,7 +139,7 @@ pub fn s_from_list_duration(

let values: Vec<Option<i64>> = iterator
.map(|item| match item.get_type() {
TermType::Integer => item.decode::<i64>().map(Some).map_err(|err| {
TermType::Integer => item.decode::<Option<i64>>().map_err(|err| {
ExplorerError::Other(format!("int number is too big for an i64: {err:?}"))
}),
TermType::Map => item
Expand Down Expand Up @@ -175,7 +175,7 @@ pub fn s_from_list_time(name: &str, val: Term) -> Result<ExSeries, ExplorerError

let values: Vec<Option<i64>> = iterator
.map(|item| match item.get_type() {
TermType::Integer => item.decode::<i64>().map(Some).map_err(|err| {
TermType::Integer => item.decode::<Option<i64>>().map_err(|err| {
ExplorerError::Other(format!("int number is too big for an i64: {err:?}"))
}),
TermType::Map => item
Expand Down Expand Up @@ -212,8 +212,9 @@ pub fn s_from_list_null(name: &str, length: usize) -> ExSeries {
macro_rules! from_list {
($name:ident, $type:ty) => {
#[rustler::nif(schedule = "DirtyCpu")]
pub fn $name(name: &str, val: Vec<Option<$type>>) -> ExSeries {
ExSeries::new(Series::new(name, val.as_slice()))
pub fn $name(name: &str, val: Term) -> NifResult<ExSeries> {
val.decode::<Vec<Option<$type>>>()
.map(|values| ExSeries::new(Series::new(name, values.as_slice())))
}
};
}
Expand All @@ -238,38 +239,40 @@ macro_rules! from_list_float {
let nan = atoms::nan();
let infinity = atoms::infinity();
let neg_infinity = atoms::neg_infinity();
let nil = rustler::types::atom::nil();

let values: NifResult<Vec<Option<$type>>> = val
.decode::<ListIterator>()?
val.decode::<ListIterator>()?
.map(|item| match item.get_type() {
TermType::Float => item.decode::<Option<$type>>(),
TermType::Integer => {
let int_value = item.decode::<i64>().unwrap();
Ok(Some(int_value as $type))
}
TermType::Atom => Ok(if nan.eq(&item) {
Some($module::NAN)
} else if infinity.eq(&item) {
Some($module::INFINITY)
} else if neg_infinity.eq(&item) {
Some($module::NEG_INFINITY)
} else {
None
}),
TermType::Atom => {
if nan.eq(&item) {
Ok(Some($module::NAN))
} else if infinity.eq(&item) {
Ok(Some($module::INFINITY))
} else if neg_infinity.eq(&item) {
Ok(Some($module::NEG_INFINITY))
} else if nil.eq(&item) {
Ok(None)
} else {
let message = format!(
"from_list/2 cannot read the atom `{item:?}` for a float series"
);
Err(Error::RaiseTerm(Box::new(message)))
}
}
term_type => {
let message = format!("from_list/2 not implemented for {term_type:?}");
Err(Error::RaiseTerm(Box::new(message)))
}
})
.collect::<NifResult<Vec<Option<$type>>>>();

match (values) {
Ok(x) => {
let s = Series::new(name, x);
Ok(ExSeries::new(s))
}
Err(x) => Err(x),
}
.collect::<NifResult<Vec<Option<$type>>>>()
.map(|values| {
ExSeries::new(Series::new(name, values))
})
}
};
}
Expand All @@ -278,51 +281,66 @@ from_list_float!(s_from_list_f32, f32, f32);
from_list_float!(s_from_list_f64, f64, f64);

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_from_list_binary(name: &str, val: Vec<Option<Binary>>) -> ExSeries {
ExSeries::new(Series::new(
name,
val.iter()
.map(|bin| bin.map(|bin| bin.as_slice()))
.collect::<Vec<Option<&[u8]>>>(),
))
pub fn s_from_list_binary(name: &str, val: Term) -> NifResult<ExSeries> {
val.decode::<ListIterator>()?
.map(|term| {
term.decode::<Option<Binary>>()
.map(|maybe_bin| maybe_bin.map(|bin| bin.as_slice()))
})
.collect::<NifResult<Vec<Option<&[u8]>>>>()
.map(|values| ExSeries::new(Series::new(name, values)))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_from_list_categories(name: &str, val: Vec<Option<String>>) -> ExSeries {
ExSeries::new(
Series::new(name, val.as_slice())
pub fn s_from_list_categories(name: &str, val: Term) -> NifResult<ExSeries> {
let decoded = val.decode::<Vec<Option<String>>>()?;
Ok(ExSeries::new(
Series::new(name, decoded.as_slice())
.cast(&DataType::Categorical(None, CategoricalOrdering::default()))
.unwrap(),
)
.map_err(|err| {
let message = format!(
"from_list/2 cannot cast a string series to categories series: {err:?}"
);
Error::RaiseTerm(Box::new(message))
})?,
))
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_from_list_of_series(name: &str, series_vec: Vec<Option<ExSeries>>) -> ExSeries {
let lists: Vec<Option<Series>> = series_vec
.iter()
.map(|maybe_series| {
maybe_series
.as_ref()
.map(|ex_series| ex_series.clone_inner())
})
.collect();
pub fn s_from_list_of_series(name: &str, series_term: Term) -> NifResult<ExSeries> {
series_term
.decode::<Vec<Option<ExSeries>>>()
.map(|series_vec| {
let lists: Vec<Option<Series>> = series_vec
.iter()
.map(|maybe_series| {
maybe_series
.as_ref()
.map(|ex_series| ex_series.clone_inner())
})
.collect();

ExSeries::new(Series::new(name, lists))
ExSeries::new(Series::new(name, lists))
})
}

#[rustler::nif(schedule = "DirtyCpu")]
pub fn s_from_list_of_series_as_structs(name: &str, series_vec: Vec<ExSeries>) -> ExSeries {
let struct_chunked = StructChunked::new(
pub fn s_from_list_of_series_as_structs(name: &str, series_term: Term) -> NifResult<ExSeries> {
let series_vec = series_term.decode::<Vec<ExSeries>>()?;
match StructChunked::new(
name,
series_vec
.into_iter()
.map(|s| s.clone_inner())
.collect::<Vec<_>>()
.as_slice(),
)
.unwrap();

ExSeries::new(struct_chunked.into_series())
) {
Ok(struct_chunked) => Ok(ExSeries::new(struct_chunked.into_series())),
Err(err) => {
let message = format!("from_list/2 cannot create series of structs: {err:?}");
Err(Error::RaiseTerm(Box::new(message)))
}
}
}

macro_rules! from_binary {
Expand Down
Loading