Skip to content

Commit

Permalink
Merge branch 'main' into issue-10943
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop authored Jul 17, 2024
2 parents 280af67 + b0925c8 commit 6e6a8dd
Show file tree
Hide file tree
Showing 130 changed files with 12,327 additions and 1,306 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ rand = "0.8"
regex = "1.8"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.47", features = ["visitor"] }
sqlparser = { version = "0.48", features = ["visitor"] }
tempfile = "3"
thiserror = "1.0.44"
tokio = { version = "1.36", features = ["macros", "rt", "sync"] }
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;

use async_trait::async_trait;
use dirs::home_dir;
Expand Down Expand Up @@ -162,6 +163,7 @@ impl SchemaProvider for DynamicFileSchemaProvider {
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let mut builder = SessionStateBuilder::from(state.clone());
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let scheme = table_url.scheme();
Expand All @@ -178,13 +180,18 @@ impl SchemaProvider for DynamicFileSchemaProvider {
// to any command options so the only choice is to use an empty collection
match scheme {
"s3" | "oss" | "cos" => {
state = state.add_table_options_extension(AwsOptions::default());
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(AwsOptions::default())
}
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
if let Some(table_options) = builder.table_options() {
table_options.extensions.insert(GcpOptions::default())
}
}
_ => {}
};
state = builder.build();
let store = get_object_store(
&state,
table_url.scheme(),
Expand Down
80 changes: 40 additions & 40 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@
//! DeltaScan
//! ```

use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::Result;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_common::internal_err;
use datafusion_common::{internal_err, DataFusionError};
use datafusion_expr::registry::FunctionRegistry;
use datafusion_expr::ScalarUDF;
use datafusion_expr::{AggregateUDF, ScalarUDF};
use datafusion_proto::physical_plan::{AsExecutionPlan, PhysicalExtensionCodec};
use datafusion_proto::protobuf;
use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -239,53 +240,52 @@ struct ComposedPhysicalExtensionCodec {
codecs: Vec<Arc<dyn PhysicalExtensionCodec>>,
}

impl ComposedPhysicalExtensionCodec {
fn try_any<T>(
&self,
mut f: impl FnMut(&dyn PhysicalExtensionCodec) -> Result<T>,
) -> Result<T> {
let mut last_err = None;
for codec in &self.codecs {
match f(codec.as_ref()) {
Ok(node) => return Ok(node),
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
DataFusionError::NotImplemented("Empty list of composed codecs".to_owned())
}))
}
}

impl PhysicalExtensionCodec for ComposedPhysicalExtensionCodec {
fn try_decode(
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
registry: &dyn FunctionRegistry,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_decode(buf, inputs, registry) {
Ok(plan) => return Ok(plan),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
self.try_any(|codec| codec.try_decode(buf, inputs, registry))
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_encode(node.clone(), buf) {
Ok(_) => return Ok(()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
self.try_any(|codec| codec.try_encode(node.clone(), buf))
}

fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_decode_udf(name, _buf) {
Ok(plan) => return Ok(plan),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
self.try_any(|codec| codec.try_decode_udf(name, buf))
}

fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) -> Result<()> {
let mut last_err = None;
for codec in &self.codecs {
match codec.try_encode_udf(_node, _buf) {
Ok(_) => return Ok(()),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap())
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
self.try_any(|codec| codec.try_encode_udf(node, buf))
}

fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
self.try_any(|codec| codec.try_decode_udaf(name, buf))
}

fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
self.try_any(|codec| codec.try_encode_udaf(node, buf))
}
}
9 changes: 4 additions & 5 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::{
datatypes::UInt64Type,
};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::{
datasource::{
file_format::{
Expand All @@ -32,9 +33,9 @@ use datafusion::{
MemTable,
},
error::Result,
execution::{context::SessionState, runtime_env::RuntimeEnv},
execution::context::SessionState,
physical_plan::ExecutionPlan,
prelude::{SessionConfig, SessionContext},
prelude::SessionContext,
};
use datafusion_common::{GetExt, Statistics};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
Expand Down Expand Up @@ -176,9 +177,7 @@ impl GetExt for TSVFileFactory {
#[tokio::main]
async fn main() -> Result<()> {
// Create a new context with the default configuration
let config = SessionConfig::new();
let runtime = RuntimeEnv::default();
let mut state = SessionState::new_with_config_rt(config, Arc::new(runtime));
let mut state = SessionStateBuilder::new().with_default_features().build();

// Register the custom file format
let file_format = Arc::new(TSVFileFactory::new());
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/plan_to_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use datafusion::error::Result;

use datafusion::prelude::*;
use datafusion::sql::unparser::expr_to_sql;
use datafusion_sql::unparser::dialect::CustomDialect;
use datafusion_sql::unparser::dialect::CustomDialectBuilder;
use datafusion_sql::unparser::{plan_to_sql, Unparser};

/// This example demonstrates the programmatic construction of SQL strings using
Expand Down Expand Up @@ -80,7 +80,9 @@ fn simple_expr_to_pretty_sql_demo() -> Result<()> {
/// using a custom dialect and an explicit unparser
fn simple_expr_to_sql_demo_escape_mysql_style() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
let dialect = CustomDialect::new(Some('`'));
let dialect = CustomDialectBuilder::new()
.with_identifier_quote_style('`')
.build();
let unparser = Unparser::new(&dialect);
let sql = unparser.expr_to_sql(&expr)?.to_string();
assert_eq!(sql, r#"((`a` < 5) OR (`a` = 8))"#);
Expand Down
13 changes: 6 additions & 7 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,8 +1682,10 @@ impl ScalarValue {
DataType::UInt16 => build_array_primitive!(UInt16Array, UInt16),
DataType::UInt32 => build_array_primitive!(UInt32Array, UInt32),
DataType::UInt64 => build_array_primitive!(UInt64Array, UInt64),
DataType::Utf8View => build_array_string!(StringViewArray, Utf8View),
DataType::Utf8 => build_array_string!(StringArray, Utf8),
DataType::LargeUtf8 => build_array_string!(LargeStringArray, LargeUtf8),
DataType::BinaryView => build_array_string!(BinaryViewArray, BinaryView),
DataType::Binary => build_array_string!(BinaryArray, Binary),
DataType::LargeBinary => build_array_string!(LargeBinaryArray, LargeBinary),
DataType::Date32 => build_array_primitive!(Date32Array, Date32),
Expand Down Expand Up @@ -1841,8 +1843,6 @@ impl ScalarValue {
| DataType::Time64(TimeUnit::Millisecond)
| DataType::Map(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::Utf8View
| DataType::BinaryView
| DataType::ListView(_)
| DataType::LargeListView(_) => {
return _internal_err!(
Expand Down Expand Up @@ -2678,7 +2678,10 @@ impl ScalarValue {
DataType::Duration(TimeUnit::Nanosecond) => {
typed_cast!(array, index, DurationNanosecondArray, DurationNanosecond)?
}

DataType::Map(_, _) => {
let a = array.slice(index, 1);
Self::Map(Arc::new(a.as_map().to_owned()))
}
other => {
return _not_impl_err!(
"Can't create a scalar from array of type \"{other:?}\""
Expand Down Expand Up @@ -5692,16 +5695,12 @@ mod tests {
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
);

// needs https://github.com/apache/arrow-rs/issues/5893
/*
check_scalar_cast(ScalarValue::Utf8(None), DataType::Utf8View);
check_scalar_cast(ScalarValue::from("foo"), DataType::Utf8View);
check_scalar_cast(
ScalarValue::from("larger than 12 bytes string"),
DataType::Utf8View,
);
*/
}

// mimics how casting work on scalar values by `casting` `scalar` to `desired_type`
Expand Down
7 changes: 6 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ mod tests {
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{col, lit};

use crate::execution::session_state::SessionStateBuilder;
use chrono::DateTime;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
Expand Down Expand Up @@ -814,7 +815,11 @@ mod tests {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionState::new_with_config_rt(cfg, runtime);
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();
let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod opener;
mod page_filter;
mod reader;
mod row_filter;
mod row_groups;
mod row_group_filter;
mod statistics;
mod writer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`ParquetOpener`] for opening Parquet files

use crate::datasource::physical_plan::parquet::page_filter::PagePruningPredicate;
use crate::datasource::physical_plan::parquet::row_groups::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::row_group_filter::RowGroupAccessPlanFilter;
use crate::datasource::physical_plan::parquet::{
row_filter, should_enable_page_index, ParquetAccessPlan,
};
Expand Down
25 changes: 19 additions & 6 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;

use crate::execution::session_state::SessionStateBuilder;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -294,7 +295,11 @@ impl SessionContext {
/// all `SessionContext`'s should be configured with the
/// same `RuntimeEnv`.
pub fn new_with_config_rt(config: SessionConfig, runtime: Arc<RuntimeEnv>) -> Self {
let state = SessionState::new_with_config_rt(config, runtime);
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.build();
Self::new_with_state(state)
}

Expand All @@ -315,7 +320,7 @@ impl SessionContext {
}

/// Creates a new `SessionContext` using the provided [`SessionState`]
#[deprecated(since = "32.0.0", note = "Use SessionState::new_with_state")]
#[deprecated(since = "32.0.0", note = "Use SessionContext::new_with_state")]
pub fn with_state(state: SessionState) -> Self {
Self::new_with_state(state)
}
Expand Down Expand Up @@ -1574,6 +1579,7 @@ mod tests {
use datafusion_common_runtime::SpawnedTask;

use crate::catalog::schema::SchemaProvider;
use crate::execution::session_state::SessionStateBuilder;
use crate::physical_planner::PhysicalPlanner;
use async_trait::async_trait;
use tempfile::TempDir;
Expand Down Expand Up @@ -1707,7 +1713,11 @@ mod tests {
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
.set_str("datafusion.catalog.has_header", "true");
let session_state = SessionState::new_with_config_rt(cfg, runtime);
let session_state = SessionStateBuilder::new()
.with_config(cfg)
.with_runtime_env(runtime)
.with_default_features()
.build();
let ctx = SessionContext::new_with_state(session_state);
ctx.refresh_catalogs().await?;

Expand All @@ -1733,9 +1743,12 @@ mod tests {
#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::default());
let session_state =
SessionState::new_with_config_rt(SessionConfig::new(), runtime)
.with_query_planner(Arc::new(MyQueryPlanner {}));
let session_state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_runtime_env(runtime)
.with_default_features()
.with_query_planner(Arc::new(MyQueryPlanner {}))
.build();
let ctx = SessionContext::new_with_state(session_state);

let df = ctx.sql("SELECT 1").await?;
Expand Down
Loading

0 comments on commit 6e6a8dd

Please sign in to comment.