Skip to content

Commit

Permalink
chore(rust!): move offset_by implementation from polars-plan to polar…
Browse files Browse the repository at this point in the history
…s-time, rename feature from DateOffset to OffsetBy
  • Loading branch information
MarcoGorelli committed Jun 8, 2024
1 parent 38149d6 commit 149e7e0
Show file tree
Hide file tree
Showing 16 changed files with 178 additions and 165 deletions.
4 changes: 2 additions & 2 deletions crates/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ dtype-u16 = ["polars-plan/dtype-u16", "polars-pipe?/dtype-u16", "polars-expr/dty
dtype-u8 = ["polars-plan/dtype-u8", "polars-pipe?/dtype-u8", "polars-expr/dtype-u8"]

object = ["polars-plan/object"]
date_offset = ["polars-plan/date_offset"]
offset_by = ["polars-plan/offset_by"]
trigonometry = ["polars-plan/trigonometry"]
sign = ["polars-plan/sign"]
timezones = ["polars-plan/timezones"]
Expand Down Expand Up @@ -258,7 +258,7 @@ features = [
"cum_agg",
"cumulative_eval",
"cutqcut",
"date_offset",
"offset_by",
"diagonal_concat",
"diff",
"dot_diagram",
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ dtype-array = ["polars-core/dtype-array", "polars-ops/dtype-array"]
dtype-categorical = ["polars-core/dtype-categorical"]
dtype-struct = ["polars-core/dtype-struct"]
object = ["polars-core/object"]
date_offset = ["polars-time", "chrono"]
offset_by = ["polars-time/offset_by", "chrono"]
list_gather = ["polars-ops/list_gather"]
list_count = ["polars-ops/list_count"]
array_count = ["polars-ops/array_count", "dtype-array"]
Expand Down Expand Up @@ -199,7 +199,7 @@ features = [
"is_last_distinct",
"dtype-time",
"array_any_all",
"date_offset",
"offset_by",
"parquet",
"strings",
"row_hash",
Expand Down
14 changes: 9 additions & 5 deletions crates/polars-plan/src/dsl/dt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ impl DateLikeNameSpace {
}

/// Roll backward to the first day of the month.
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
pub fn month_start(self) -> Expr {
self.0
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::MonthStart))
}

/// Roll forward to the last day of the month.
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
pub fn month_end(self) -> Expr {
self.0
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::MonthEnd))
Expand Down Expand Up @@ -252,10 +252,14 @@ impl DateLikeNameSpace {

/// Offset this `Date/Datetime` by a given offset [`Duration`].
/// This will take leap years/ months into account.
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
pub fn offset_by(self, by: Expr) -> Expr {
self.0
.map_many_private(FunctionExpr::DateOffset, &[by], false, false)
self.0.map_many_private(
FunctionExpr::TemporalExpr(TemporalFunction::OffsetBy),
&[by],
false,
false,
)
}

#[cfg(feature = "timezones")]
Expand Down
29 changes: 21 additions & 8 deletions crates/polars-plan/src/dsl/function_expr/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use chrono_tz::Tz;
use polars_time::base_utc_offset as base_utc_offset_fn;
#[cfg(feature = "timezones")]
use polars_time::dst_offset as dst_offset_fn;
#[cfg(feature = "offset_by")]
use polars_time::impl_offset_by;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -48,9 +50,11 @@ pub enum TemporalFunction {
ConvertTimeZone(TimeZone),
TimeStamp(TimeUnit),
Truncate,
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
OffsetBy,
#[cfg(feature = "offset_by")]
MonthStart,
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
MonthEnd,
#[cfg(feature = "timezones")]
BaseUtcOffset,
Expand Down Expand Up @@ -101,9 +105,11 @@ impl TemporalFunction {
dtype => polars_bail!(ComputeError: "expected Datetime, got {}", dtype),
}),
Truncate => mapper.with_same_dtype(),
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
OffsetBy => mapper.with_same_dtype(),
#[cfg(feature = "offset_by")]
MonthStart => mapper.with_same_dtype(),
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
MonthEnd => mapper.with_same_dtype(),
#[cfg(feature = "timezones")]
BaseUtcOffset => mapper.with_dtype(DataType::Duration(TimeUnit::Milliseconds)),
Expand Down Expand Up @@ -169,9 +175,11 @@ impl Display for TemporalFunction {
WithTimeUnit(_) => "with_time_unit",
TimeStamp(tu) => return write!(f, "dt.timestamp({tu})"),
Truncate => "truncate",
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
OffsetBy => "offset_by",
#[cfg(feature = "offset_by")]
MonthStart => "month_start",
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
MonthEnd => "month_end",
#[cfg(feature = "timezones")]
BaseUtcOffset => "base_utc_offset",
Expand Down Expand Up @@ -392,7 +400,12 @@ pub(super) fn truncate(s: &[Series]) -> PolarsResult<Series> {
Ok(out)
}

#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
pub(super) fn offset_by(s: &[Series]) -> PolarsResult<Series> {
impl_offset_by(&s[0], &s[1])
}

#[cfg(feature = "offset_by")]
pub(super) fn month_start(s: &Series) -> PolarsResult<Series> {
Ok(match s.dtype() {
DataType::Datetime(_, tz) => match tz {
Expand All @@ -409,7 +422,7 @@ pub(super) fn month_start(s: &Series) -> PolarsResult<Series> {
})
}

#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
pub(super) fn month_end(s: &Series) -> PolarsResult<Series> {
Ok(match s.dtype() {
DataType::Datetime(_, tz) => match tz {
Expand Down
13 changes: 1 addition & 12 deletions crates/polars-plan/src/dsl/function_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ mod sign;
mod strings;
#[cfg(feature = "dtype-struct")]
mod struct_;
#[cfg(any(feature = "temporal", feature = "date_offset"))]
#[cfg(any(feature = "temporal", feature = "offset_by"))]
mod temporal;
#[cfg(feature = "trigonometry")]
pub mod trigonometry;
Expand Down Expand Up @@ -148,8 +148,6 @@ pub enum FunctionExpr {
SearchSorted(SearchSortedSide),
#[cfg(feature = "range")]
Range(RangeFunction),
#[cfg(feature = "date_offset")]
DateOffset,
#[cfg(feature = "trigonometry")]
Trigonometry(TrigonometricFunction),
#[cfg(feature = "trigonometry")]
Expand Down Expand Up @@ -413,8 +411,6 @@ impl Hash for FunctionExpr {
Abs => {},
Negate => {},
NullCount => {},
#[cfg(feature = "date_offset")]
DateOffset => {},
#[cfg(feature = "arg_where")]
ArgWhere => {},
#[cfg(feature = "trigonometry")]
Expand Down Expand Up @@ -615,8 +611,6 @@ impl Display for FunctionExpr {
SearchSorted(_) => "search_sorted",
#[cfg(feature = "range")]
Range(func) => return write!(f, "{func}"),
#[cfg(feature = "date_offset")]
DateOffset => "dt.offset_by",
#[cfg(feature = "trigonometry")]
Trigonometry(func) => return write!(f, "{func}"),
#[cfg(feature = "trigonometry")]
Expand Down Expand Up @@ -900,11 +894,6 @@ impl From<FunctionExpr> for SpecialEq<Arc<dyn SeriesUdf>> {
#[cfg(feature = "range")]
Range(func) => func.into(),

#[cfg(feature = "date_offset")]
DateOffset => {
map_as_slice!(temporal::date_offset)
},

#[cfg(feature = "trigonometry")]
Trigonometry(trig_function) => {
map!(trigonometry::apply_trigonometric_function, trig_function)
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-plan/src/dsl/function_expr/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ impl FunctionExpr {
SearchSorted(_) => mapper.with_dtype(IDX_DTYPE),
#[cfg(feature = "range")]
Range(func) => func.get_field(mapper),
#[cfg(feature = "date_offset")]
DateOffset { .. } => mapper.with_same_dtype(),
#[cfg(feature = "trigonometry")]
Trigonometry(_) => mapper.map_to_float_dtype(),
#[cfg(feature = "trigonometry")]
Expand Down
134 changes: 6 additions & 128 deletions crates/polars-plan/src/dsl/function_expr/temporal.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
#[cfg(feature = "date_offset")]
use arrow::legacy::time_zone::Tz;
#[cfg(feature = "date_offset")]
use polars_core::chunked_array::ops::arity::broadcast_try_binary_elementwise;
#[cfg(feature = "date_offset")]
use polars_time::prelude::*;

use super::*;
use crate::{map, map_as_slice};

Expand Down Expand Up @@ -49,9 +42,13 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
Truncate => {
map_as_slice!(datetime::truncate)
},
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
OffsetBy => {
map_as_slice!(datetime::offset_by)
},
#[cfg(feature = "offset_by")]
MonthStart => map!(datetime::month_start),
#[cfg(feature = "date_offset")]
#[cfg(feature = "offset_by")]
MonthEnd => map!(datetime::month_end),
#[cfg(feature = "timezones")]
BaseUtcOffset => map!(datetime::base_utc_offset),
Expand Down Expand Up @@ -185,125 +182,6 @@ pub(super) fn datetime(
Ok(s)
}

#[cfg(feature = "date_offset")]
fn apply_offsets_to_datetime(
datetime: &Logical<DatetimeType, Int64Type>,
offsets: &StringChunked,
time_zone: Option<&Tz>,
) -> PolarsResult<Int64Chunked> {
match offsets.len() {
1 => match offsets.get(0) {
Some(offset) => {
let offset = &Duration::parse(offset);
if offset.is_constant_duration(datetime.time_zone().as_deref()) {
// fastpath!
let mut duration = match datetime.time_unit() {
TimeUnit::Milliseconds => offset.duration_ms(),
TimeUnit::Microseconds => offset.duration_us(),
TimeUnit::Nanoseconds => offset.duration_ns(),
};
if offset.negative() {
duration = -duration;
}
Ok(datetime.0.clone().wrapping_add_scalar(duration))
} else {
let offset_fn = match datetime.time_unit() {
TimeUnit::Milliseconds => Duration::add_ms,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Nanoseconds => Duration::add_ns,
};
datetime
.0
.try_apply_nonnull_values_generic(|v| offset_fn(offset, v, time_zone))
}
},
_ => Ok(datetime.0.apply(|_| None)),
},
_ => {
let offset_fn = match datetime.time_unit() {
TimeUnit::Milliseconds => Duration::add_ms,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Nanoseconds => Duration::add_ns,
};
broadcast_try_binary_elementwise(datetime, offsets, |timestamp_opt, offset_opt| match (
timestamp_opt,
offset_opt,
) {
(Some(timestamp), Some(offset)) => {
offset_fn(&Duration::parse(offset), timestamp, time_zone).map(Some)
},
_ => Ok(None),
})
},
}
}

#[cfg(feature = "date_offset")]
pub(super) fn date_offset(s: &[Series]) -> PolarsResult<Series> {
let ts = &s[0];
let offsets = &s[1].str()?;

let preserve_sortedness: bool;
let out = match ts.dtype() {
DataType::Date => {
let ts = ts
.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap();
let datetime = ts.datetime().unwrap();
let out = apply_offsets_to_datetime(datetime, offsets, None)?;
// sortedness is only guaranteed to be preserved if a constant offset is being added to every datetime
preserve_sortedness = match offsets.len() {
1 => offsets.get(0).is_some(),
_ => false,
};
out.cast(&DataType::Datetime(TimeUnit::Milliseconds, None))
.unwrap()
.cast(&DataType::Date)
},
DataType::Datetime(tu, tz) => {
let datetime = ts.datetime().unwrap();

let out = match tz {
#[cfg(feature = "timezones")]
Some(ref tz) => {
apply_offsets_to_datetime(datetime, offsets, tz.parse::<Tz>().ok().as_ref())?
},
_ => apply_offsets_to_datetime(datetime, offsets, None)?,
};
// Sortedness may not be preserved when crossing daylight savings time boundaries
// for calendar-aware durations.
// Constant durations (e.g. 2 hours) always preserve sortedness.
preserve_sortedness = match offsets.len() {
1 => match offsets.get(0) {
Some(offset) => {
let offset = Duration::parse(offset);
tz.is_none()
|| tz.as_deref() == Some("UTC")
|| offset.is_constant_duration(tz.as_deref())
},
None => false,
},
_ => false,
};
out.cast(&DataType::Datetime(*tu, tz.clone()))
},
dt => polars_bail!(
ComputeError: "cannot use 'date_offset' on Series of datatype {}", dt,
),
};
if preserve_sortedness {
out.map(|mut out| {
out.set_sorted_flag(ts.is_sorted_flag());
out
})
} else {
out.map(|mut out| {
out.set_sorted_flag(IsSorted::Not);
out
})
}
}

pub(super) fn combine(s: &[Series], tu: TimeUnit) -> PolarsResult<Series> {
let date = &s[0];
let time = &s[1];
Expand Down
1 change: 1 addition & 0 deletions crates/polars-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dtype-date = ["polars-core/dtype-date", "temporal"]
dtype-datetime = ["polars-core/dtype-datetime", "temporal"]
dtype-time = ["polars-core/dtype-time", "temporal"]
dtype-duration = ["polars-core/dtype-duration", "temporal"]
offset_by = []
rolling_window = ["polars-core/rolling_window"]
rolling_window_by = ["polars-core/rolling_window_by", "dtype-duration"]
fmt = ["polars-core/fmt"]
Expand Down
8 changes: 8 additions & 0 deletions crates/polars-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ pub mod chunkedarray;
mod date_range;
mod dst_offset;
mod group_by;
#[cfg(feature = "offset_by")]
mod month_end;
#[cfg(feature = "offset_by")]
mod month_start;
#[cfg(feature = "offset_by")]
mod offset_by;
pub mod prelude;
mod round;
pub mod series;
Expand All @@ -21,8 +25,12 @@ pub use date_range::*;
pub use dst_offset::*;
#[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
pub use group_by::dynamic::*;
#[cfg(feature = "offset_by")]
pub use month_end::*;
#[cfg(feature = "offset_by")]
pub use month_start::*;
#[cfg(feature = "offset_by")]
pub use offset_by::*;
pub use round::*;
pub use truncate::*;
pub use upsample::*;
Expand Down
Loading

0 comments on commit 149e7e0

Please sign in to comment.