Skip to content

Commit

Permalink
Make RuntimeEnvBuilder rather than RuntimeConfig (apache#12157)
Browse files Browse the repository at this point in the history
* feat/12156: Make RuntimeEnvBuilder rather than RuntimeConfig

Signed-off-by: Devan <[email protected]>

* feat/12156: Make RuntimeEnvBuilder rather than RuntimeConfig

Signed-off-by: Devan <[email protected]>

* doc link

Signed-off-by: Devan <[email protected]>

* update to use builder for rt env

Signed-off-by: Devan <[email protected]>

* update to use builder

Signed-off-by: Devan <[email protected]>

* clippy

Signed-off-by: Devan <[email protected]>

* touch

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* revert some formatting that occurred

Signed-off-by: Devan <[email protected]>

* revert some formatting that occurred

Signed-off-by: Devan <[email protected]>

* use builder

Signed-off-by: Devan <[email protected]>

* fmt

Signed-off-by: Devan <[email protected]>

* Update datafusion/execution/src/runtime_env.rs

Co-authored-by: Andrew Lamb <[email protected]>

---------

Signed-off-by: Devan <[email protected]>
Co-authored-by: Devan <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
3 people authored Aug 28, 2024
1 parent 7d8bb0b commit 5163e15
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 83 deletions.
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit};

use crate::execution::session_state::SessionStateBuilder;
Expand Down Expand Up @@ -863,7 +863,7 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionStateBuilder::new()
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ where
/// # use std::sync::Arc;
/// # use datafusion::prelude::*;
/// # use datafusion::execution::SessionStateBuilder;
/// # use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder;
/// // Configure a 4k batch size
/// let config = SessionConfig::new() .with_batch_size(4 * 1024);
///
/// // configure a memory limit of 1GB with 20% slop
/// let runtime_env = RuntimeEnv::new(
/// RuntimeConfig::new()
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
/// ).unwrap();
/// .build()
/// .unwrap();
///
/// // Create a SessionState using the config and runtime_env
/// let state = SessionStateBuilder::new()
Expand Down Expand Up @@ -1623,7 +1623,7 @@ mod tests {
use super::{super::options::CsvReadOptions, *};
use crate::assert_batches_eq;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeConfig;
use crate::execution::runtime_env::RuntimeEnvBuilder;
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};

Expand Down Expand Up @@ -1758,8 +1758,7 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let rt_cfg = RuntimeConfig::new();
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
Expand Down
12 changes: 7 additions & 5 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
Expand Down Expand Up @@ -136,10 +136,12 @@ impl SortTest {
.sort_spill_reservation_bytes,
);

let runtime_env = RuntimeConfig::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build();
let runtime = Arc::new(runtime_env.unwrap());
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build()
.unwrap(),
);
SessionContext::new_with_config_rt(session_config, runtime)
} else {
SessionContext::new_with_config(session_config)
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::fs::File;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -509,17 +509,17 @@ impl TestCase {

let table = scenario.table();

let mut rt_config = RuntimeConfig::new()
let rt_config = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

if let Some(pool) = memory_pool {
rt_config = rt_config.with_memory_pool(pool);
let runtime = if let Some(pool) = memory_pool {
rt_config.with_memory_pool(pool).build().unwrap()
} else {
rt_config.build().unwrap()
};

let runtime = RuntimeEnv::new(rt_config).unwrap();

// Configure execution
let builder = SessionStateBuilder::new()
.with_config(config)
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_execution::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

use datafusion::execution::session_state::SessionStateBuilder;
use tempfile::tempdir;
Expand Down Expand Up @@ -198,7 +198,10 @@ fn get_cache_runtime_state() -> (
.with_list_files_cache(Some(list_file_cache.clone()));

let rt = Arc::new(
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(),
RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()
.expect("could not build runtime environment"),
);
let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();

Expand Down
27 changes: 20 additions & 7 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use url::Url;
/// Execution runtime environment that manages system resources such
/// as memory, disk, cache and storage.
///
/// A [`RuntimeEnv`] is created from a [`RuntimeConfig`] and has the
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
/// following resource management functionality:
///
/// * [`MemoryPool`]: Manage memory
Expand Down Expand Up @@ -147,13 +147,17 @@ impl RuntimeEnv {

impl Default for RuntimeEnv {
fn default() -> Self {
RuntimeEnv::new(RuntimeConfig::new()).unwrap()
RuntimeEnvBuilder::new().build().unwrap()
}
}

/// Please see: <https://github.com/apache/datafusion/issues/12156>
/// This a type alias for backwards compatibility.
pub type RuntimeConfig = RuntimeEnvBuilder;

#[derive(Clone)]
/// Execution runtime configuration
pub struct RuntimeConfig {
pub struct RuntimeEnvBuilder {
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
/// [`MemoryPool`] from which to allocate memory
Expand All @@ -166,13 +170,13 @@ pub struct RuntimeConfig {
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}

impl Default for RuntimeConfig {
impl Default for RuntimeEnvBuilder {
fn default() -> Self {
Self::new()
}
}

impl RuntimeConfig {
impl RuntimeEnvBuilder {
/// New with default values
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -229,8 +233,17 @@ impl RuntimeConfig {
self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
}

/// Build a `RuntimeEnv` object from the configuration
/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
RuntimeEnv::new(self)
let memory_pool = self
.memory_pool
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(RuntimeEnv {
memory_pool,
disk_manager: DiskManager::try_new(self.disk_manager)?,
cache_manager: CacheManager::try_new(&self.cache_manager)?,
object_store_registry: self.object_store_registry,
})
}
}
5 changes: 3 additions & 2 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
config::SessionConfig,
memory_pool::MemoryPool,
registry::FunctionRegistry,
runtime_env::{RuntimeConfig, RuntimeEnv},
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::ExprPlanner;
Expand Down Expand Up @@ -57,7 +57,8 @@ pub struct TaskContext {

impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnv::new(RuntimeConfig::new())
let runtime = RuntimeEnvBuilder::new()
.build()
.expect("default runtime created successfully");

// Create a default task context, mostly useful for testing
Expand Down
15 changes: 8 additions & 7 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,7 @@ mod tests {
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::FairSpillPool;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
Expand Down Expand Up @@ -1324,11 +1324,10 @@ mod tests {
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = Arc::new(
RuntimeEnv::new(
RuntimeConfig::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory))),
)
.unwrap(),
RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build()
.unwrap(),
);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down Expand Up @@ -1809,7 +1808,9 @@ mod tests {
let input_schema = input.schema();

let runtime = Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(),
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ mod tests {
use crate::test::build_table_scan_i32;

use datafusion_common::{assert_batches_sorted_eq, assert_contains};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -673,8 +673,11 @@ mod tests {

#[tokio::test]
async fn test_overallocation() -> Result<()> {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
16 changes: 11 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ mod tests {
ScalarValue,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};

Expand Down Expand Up @@ -3798,8 +3798,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down Expand Up @@ -3871,8 +3874,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ mod tests {

use arrow::datatypes::{DataType, Field};
use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
Expand Down Expand Up @@ -1019,8 +1019,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
Loading

0 comments on commit 5163e15

Please sign in to comment.