Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust, python): deprecate 'use_earliest' argument in favour of 'ambiguous', which can take expressions #10719

Merged
merged 3 commits into from
Aug 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod take_agg;
mod time;

#[cfg(feature = "timezones")]
pub use time::replace_time_zone;
pub use time::convert_to_naive_local;

/// Internal state of [SlicesIterator]
#[derive(Debug, PartialEq)]
Expand Down
64 changes: 11 additions & 53 deletions crates/polars-arrow/src/kernels/time.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
use arrow::array::PrimitiveArray;
use arrow::compute::arity::try_unary;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use arrow::error::{Error as ArrowError, Result};
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use chrono::{LocalResult, NaiveDateTime, TimeZone};
use chrono_tz::Tz;

use crate::error::PolarsResult;

fn convert_to_naive_local(
pub fn convert_to_naive_local(
from_tz: &Tz,
to_tz: &Tz,
ndt: NaiveDateTime,
use_earliest: Option<bool>,
ambiguous: &str,
) -> Result<NaiveDateTime> {
let ndt = from_tz.from_utc_datetime(&ndt).naive_local();
match to_tz.from_local_datetime(&ndt) {
LocalResult::Single(dt) => Ok(dt.naive_utc()),
LocalResult::Ambiguous(dt_earliest, dt_latest) => match use_earliest {
Some(true) => Ok(dt_earliest.naive_utc()),
Some(false) => Ok(dt_latest.naive_utc()),
None => Err(ArrowError::InvalidArgumentError(
format!("datetime '{}' is ambiguous in time zone '{}'. Please use `use_earliest` to tell how it should be localized.", ndt, to_tz)
))
LocalResult::Ambiguous(dt_earliest, dt_latest) => match ambiguous {
"earliest" => Ok(dt_earliest.naive_utc()),
"latest" => Ok(dt_latest.naive_utc()),
"raise" => Err(ArrowError::InvalidArgumentError(
format!("datetime '{}' is ambiguous in time zone '{}'. Please use `ambiguous` to tell how it should be localized.", ndt, to_tz)
)),
ambiguous => Err(ArrowError::InvalidArgumentError(
format!("Invalid argument {}, expected one of: \"earliest\", \"latest\", \"raise\"", ambiguous)
)),
},
LocalResult::None => Err(ArrowError::InvalidArgumentError(
format!(
Expand All @@ -35,40 +30,3 @@ fn convert_to_naive_local(
)),
}
}

pub fn replace_time_zone(
arr: &PrimitiveArray<i64>,
tu: TimeUnit,
from_tz: &Tz,
to_tz: &Tz,
use_earliest: Option<bool>,
) -> PolarsResult<PrimitiveArray<i64>> {
let res = match tu {
TimeUnit::Millisecond => try_unary(
arr,
|value| {
let ndt = timestamp_ms_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_millis())
},
ArrowDataType::Int64,
),
TimeUnit::Microsecond => try_unary(
arr,
|value| {
let ndt = timestamp_us_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_micros())
},
ArrowDataType::Int64,
),
TimeUnit::Nanosecond => try_unary(
arr,
|value| {
let ndt = timestamp_ns_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_nanos())
},
ArrowDataType::Int64,
),
_ => unreachable!(),
};
Ok(res?)
}
9 changes: 8 additions & 1 deletion crates/polars-io/src/csv/read_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ pub(crate) fn cast_columns(
(DataType::Utf8, DataType::Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu, false, false, None, None)
.as_datetime(
None,
*tu,
false,
false,
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
}?;
Expand Down
59 changes: 40 additions & 19 deletions crates/polars-ops/src/chunked_array/datetime/replace_time_zone.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use chrono::NaiveDateTime;
use chrono_tz::Tz;
use polars_arrow::kernels::replace_time_zone as replace_time_zone_kernel;
use polars_arrow::kernels::convert_to_naive_local;
use polars_core::chunked_array::ops::arity::try_binary_elementwise_values;
use polars_core::prelude::*;

fn parse_time_zone(s: &str) -> PolarsResult<Tz> {
Expand All @@ -8,26 +13,42 @@ fn parse_time_zone(s: &str) -> PolarsResult<Tz> {
}

pub fn replace_time_zone(
ca: &DatetimeChunked,
datetime: &Logical<DatetimeType, Int64Type>,
time_zone: Option<&str>,
use_earliest: Option<bool>,
ambiguous: &Utf8Chunked,
) -> PolarsResult<DatetimeChunked> {
let out: PolarsResult<_> = {
let from_tz = parse_time_zone(ca.time_zone().as_deref().unwrap_or("UTC"))?;
let to_tz = parse_time_zone(time_zone.unwrap_or("UTC"))?;
let chunks = ca.downcast_iter().map(|arr| {
replace_time_zone_kernel(
arr,
ca.time_unit().to_arrow(),
&from_tz,
&to_tz,
use_earliest,
)
});
let out = ChunkedArray::try_from_chunk_iter(ca.name(), chunks)?;
Ok(out.into_datetime(ca.time_unit(), time_zone.map(|x| x.to_string())))
let from_tz = parse_time_zone(datetime.time_zone().as_deref().unwrap_or("UTC"))?;
let to_tz = parse_time_zone(time_zone.unwrap_or("UTC"))?;
let timestamp_to_datetime: fn(i64) -> NaiveDateTime = match datetime.time_unit() {
TimeUnit::Milliseconds => timestamp_ms_to_datetime,
TimeUnit::Microseconds => timestamp_us_to_datetime,
TimeUnit::Nanoseconds => timestamp_ns_to_datetime,
};
let mut out = out?;
out.set_sorted_flag(ca.is_sorted_flag());
let datetime_to_timestamp: fn(NaiveDateTime) -> i64 = match datetime.time_unit() {
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
TimeUnit::Microseconds => datetime_to_timestamp_us,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
};
let out = match ambiguous.len() {
1 => match ambiguous.get(0) {
Some(ambiguous) => datetime.0.try_apply(|timestamp| {
let ndt = timestamp_to_datetime(timestamp);
Ok(datetime_to_timestamp(convert_to_naive_local(
&from_tz, &to_tz, ndt, ambiguous,
)?))
}),
_ => Ok(datetime.0.apply(|_| None)),
},
_ => {
try_binary_elementwise_values(datetime, ambiguous, |timestamp: i64, ambiguous: &str| {
let ndt = timestamp_to_datetime(timestamp);
Ok::<i64, PolarsError>(datetime_to_timestamp(convert_to_naive_local(
&from_tz, &to_tz, ndt, ambiguous,
)?))
})
},
};
let mut out = out?.into_datetime(datetime.time_unit(), time_zone.map(|x| x.to_string()));
out.set_sorted_flag(datetime.is_sorted_flag());
Ok(out)
}
25 changes: 12 additions & 13 deletions crates/polars-plan/src/dsl/dt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,12 @@ impl DateLikeNameSpace {
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::TimeStamp(tu)))
}

pub fn truncate(self, options: TruncateOptions) -> Expr {
self.0
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::Truncate(
options,
)))
pub fn truncate(self, options: TruncateOptions, ambiguous: Expr) -> Expr {
self.0.map_many_private(
FunctionExpr::TemporalExpr(TemporalFunction::Truncate(options)),
&[ambiguous],
false,
)
}

// roll backward to the first day of the month
Expand Down Expand Up @@ -267,14 +268,12 @@ impl DateLikeNameSpace {
}

#[cfg(feature = "timezones")]
pub fn replace_time_zone(
self,
time_zone: Option<TimeZone>,
use_earliest: Option<bool>,
) -> Expr {
self.0.map_private(FunctionExpr::TemporalExpr(
TemporalFunction::ReplaceTimeZone(time_zone, use_earliest),
))
pub fn replace_time_zone(self, time_zone: Option<TimeZone>, ambiguous: Expr) -> Expr {
self.0.map_many_private(
FunctionExpr::TemporalExpr(TemporalFunction::ReplaceTimeZone(time_zone)),
&[ambiguous],
false,
)
}

pub fn combine(self, time: Expr, tu: TimeUnit) -> Expr {
Expand Down
55 changes: 37 additions & 18 deletions crates/polars-plan/src/dsl/function_expr/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub enum TemporalFunction {
DSTOffset,
Round(String, String),
#[cfg(feature = "timezones")]
ReplaceTimeZone(Option<TimeZone>, Option<bool>),
ReplaceTimeZone(Option<TimeZone>),
DateRange {
every: Duration,
closed: ClosedWindow,
Expand All @@ -67,7 +67,6 @@ pub enum TemporalFunction {
DatetimeFunction {
time_unit: TimeUnit,
time_zone: Option<TimeZone>,
use_earliest: Option<bool>,
},
}

Expand Down Expand Up @@ -105,7 +104,7 @@ impl Display for TemporalFunction {
DSTOffset => "dst_offset",
Round(..) => "round",
#[cfg(feature = "timezones")]
ReplaceTimeZone(_, _) => "replace_time_zone",
ReplaceTimeZone(_) => "replace_time_zone",
DateRange { .. } => return write!(f, "date_range"),
DateRanges { .. } => return write!(f, "date_ranges"),
TimeRange { .. } => return write!(f, "time_range"),
Expand Down Expand Up @@ -147,10 +146,12 @@ pub(super) fn ordinal_day(s: &Series) -> PolarsResult<Series> {
pub(super) fn time(s: &Series) -> PolarsResult<Series> {
match s.dtype() {
#[cfg(feature = "timezones")]
DataType::Datetime(_, Some(_)) => {
polars_ops::prelude::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Time)
},
DataType::Datetime(_, Some(_)) => polars_ops::prelude::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Time),
DataType::Datetime(_, _) => s.datetime().unwrap().cast(&DataType::Time),
DataType::Date => s.datetime().unwrap().cast(&DataType::Time),
DataType::Time => Ok(s.clone()),
Expand All @@ -162,8 +163,12 @@ pub(super) fn date(s: &Series) -> PolarsResult<Series> {
#[cfg(feature = "timezones")]
DataType::Datetime(_, Some(tz)) => {
let mut out = {
polars_ops::chunked_array::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Date)?
polars_ops::chunked_array::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Date)?
};
if tz != "UTC" {
// DST transitions may not preserve sortedness.
Expand All @@ -181,8 +186,12 @@ pub(super) fn datetime(s: &Series) -> PolarsResult<Series> {
#[cfg(feature = "timezones")]
DataType::Datetime(tu, Some(tz)) => {
let mut out = {
polars_ops::chunked_array::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Datetime(*tu, None))?
polars_ops::chunked_array::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Datetime(*tu, None))?
};
if tz != "UTC" {
// DST transitions may not preserve sortedness.
Expand Down Expand Up @@ -216,21 +225,31 @@ pub(super) fn timestamp(s: &Series, tu: TimeUnit) -> PolarsResult<Series> {
s.timestamp(tu).map(|ca| ca.into_series())
}

pub(super) fn truncate(s: &Series, options: &TruncateOptions) -> PolarsResult<Series> {
let mut out = match s.dtype() {
pub(super) fn truncate(s: &[Series], options: &TruncateOptions) -> PolarsResult<Series> {
let time_series = &s[0];
let ambiguous = &s[1].utf8().unwrap();
let mut out = match time_series.dtype() {
DataType::Datetime(_, tz) => match tz {
#[cfg(feature = "timezones")]
Some(tz) => s
Some(tz) => time_series
.datetime()
.unwrap()
.truncate(options, tz.parse::<Tz>().ok().as_ref(), ambiguous)?
.into_series(),
_ => time_series
.datetime()
.unwrap()
.truncate(options, tz.parse::<Tz>().ok().as_ref())?
.truncate(options, None, ambiguous)?
.into_series(),
_ => s.datetime().unwrap().truncate(options, None)?.into_series(),
},
DataType::Date => s.date().unwrap().truncate(options, None)?.into_series(),
DataType::Date => time_series
.date()
.unwrap()
.truncate(options, None, ambiguous)?
.into_series(),
dt => polars_bail!(opq = round, got = dt, expected = "date/datetime"),
};
out.set_sorted_flag(s.is_sorted_flag());
out.set_sorted_flag(time_series.is_sorted_flag());
Ok(out)
}

Expand Down
12 changes: 5 additions & 7 deletions crates/polars-plan/src/dsl/function_expr/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ pub(super) fn set_sorted_flag(s: &Series, sorted: IsSorted) -> PolarsResult<Seri
}

#[cfg(feature = "timezones")]
pub(super) fn replace_time_zone(
s: &Series,
time_zone: Option<&str>,
use_earliest: Option<bool>,
) -> PolarsResult<Series> {
let ca = s.datetime().unwrap();
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, use_earliest)?.into_series())
pub(super) fn replace_time_zone(s: &[Series], time_zone: Option<&str>) -> PolarsResult<Series> {
let s1 = &s[0];
let ca = s1.datetime().unwrap();
let s2 = &s[1].utf8().unwrap();
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, s2)?.into_series())
}
16 changes: 5 additions & 11 deletions crates/polars-plan/src/dsl/function_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ impl From<StringFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
},
#[cfg(feature = "temporal")]
Strptime(dtype, options) => {
map!(strings::strptime, dtype.clone(), &options)
map_as_slice!(strings::strptime, dtype.clone(), &options)
},
#[cfg(feature = "concat_str")]
ConcatVertical(delimiter) => map!(strings::concat, &delimiter),
Expand Down Expand Up @@ -749,7 +749,7 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
Nanosecond => map!(datetime::nanosecond),
TimeStamp(tu) => map!(datetime::timestamp, tu),
Truncate(truncate_options) => {
map!(datetime::truncate, &truncate_options)
map_as_slice!(datetime::truncate, &truncate_options)
},
#[cfg(feature = "date_offset")]
MonthStart => map!(datetime::month_start),
Expand All @@ -761,8 +761,8 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
DSTOffset => map!(datetime::dst_offset),
Round(every, offset) => map!(datetime::round, &every, &offset),
#[cfg(feature = "timezones")]
ReplaceTimeZone(tz, use_earliest) => {
map!(dispatch::replace_time_zone, tz.as_deref(), use_earliest)
ReplaceTimeZone(tz) => {
map_as_slice!(dispatch::replace_time_zone, tz.as_deref())
},
Combine(tu) => map_as_slice!(temporal::combine, tu),
DateRange {
Expand Down Expand Up @@ -818,14 +818,8 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
DatetimeFunction {
time_unit,
time_zone,
use_earliest,
} => {
map_as_slice!(
temporal::datetime,
&time_unit,
time_zone.as_deref(),
use_earliest
)
map_as_slice!(temporal::datetime, &time_unit, time_zone.as_deref())
},
}
}
Expand Down
Loading