Skip to content

Commit

Permalink
feat: Cloud support for NDJSON (#17717)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 19, 2024
1 parent f70b7f9 commit 81846f0
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 78 deletions.
56 changes: 18 additions & 38 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};

use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::RowIndex;
use polars_plan::plans::{DslPlan, FileScan};
use polars_plan::prelude::{FileScanOptions, NDJsonReadOptions};
Expand All @@ -22,6 +23,7 @@ pub struct LazyJsonLineReader {
pub(crate) n_rows: Option<usize>,
pub(crate) ignore_errors: bool,
pub(crate) include_file_paths: Option<Arc<str>>,
pub(crate) cloud_options: Option<CloudOptions>,
}

impl LazyJsonLineReader {
Expand All @@ -41,6 +43,7 @@ impl LazyJsonLineReader {
ignore_errors: false,
n_rows: None,
include_file_paths: None,
cloud_options: None,
}
}
/// Add a row index column.
Expand Down Expand Up @@ -92,6 +95,11 @@ impl LazyJsonLineReader {
self
}

pub fn with_cloud_options(mut self, cloud_options: Option<CloudOptions>) -> Self {
self.cloud_options = cloud_options;
self
}

pub fn with_include_file_paths(mut self, include_file_paths: Option<Arc<str>>) -> Self {
self.include_file_paths = include_file_paths;
self
Expand All @@ -100,10 +108,6 @@ impl LazyJsonLineReader {

impl LazyFileListReader for LazyJsonLineReader {
fn finish(self) -> PolarsResult<LazyFrame> {
if !self.glob() {
return self.finish_no_glob();
}

let paths = Arc::new(Mutex::new((self.paths, false)));

let file_options = FileScanOptions {
Expand All @@ -127,7 +131,10 @@ impl LazyFileListReader for LazyJsonLineReader {
schema: self.schema,
};

let scan_type = FileScan::NDJson { options };
let scan_type = FileScan::NDJson {
options,
cloud_options: self.cloud_options,
};

Ok(LazyFrame::from(DslPlan::Scan {
paths,
Expand All @@ -140,39 +147,7 @@ impl LazyFileListReader for LazyJsonLineReader {
}

fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let paths = Arc::new(Mutex::new((self.paths, false)));

let file_options = FileScanOptions {
n_rows: self.n_rows,
with_columns: None,
cache: false,
row_index: self.row_index.clone(),
rechunk: self.rechunk,
file_counter: 0,
hive_options: Default::default(),
glob: false,
include_file_paths: None,
};

let options = NDJsonReadOptions {
n_threads: None,
infer_schema_length: self.infer_schema_length,
chunk_size: NonZeroUsize::new(1 << 18).unwrap(),
low_memory: self.low_memory,
ignore_errors: self.ignore_errors,
schema: self.schema,
};

let scan_type = FileScan::NDJson { options };

Ok(LazyFrame::from(DslPlan::Scan {
paths,
file_info: Arc::new(RwLock::new(None)),
hive_parts: None,
predicate: None,
file_options,
scan_type,
}))
unreachable!();
}

fn paths(&self) -> &[PathBuf] {
Expand Down Expand Up @@ -215,4 +190,9 @@ impl LazyFileListReader for LazyJsonLineReader {
fn row_index(&self) -> Option<&RowIndex> {
self.row_index.as_ref()
}

/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
self.cloud_options.as_ref()
}
}
36 changes: 33 additions & 3 deletions crates/polars-mem-engine/src/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::path::PathBuf;

use polars_core::config;
use polars_core::utils::accumulate_dataframes_vertical;

use super::*;
Expand Down Expand Up @@ -38,6 +39,14 @@ impl JsonExec {
.as_ref()
.unwrap_right();

let verbose = config::verbose();
let force_async = config::force_async();
let run_async = force_async || is_cloud_url(self.paths.first().unwrap());

if force_async && verbose {
eprintln!("ASYNC READING FORCED");
}

let mut n_rows = self.file_scan_options.n_rows;

// Avoid panicking
Expand All @@ -60,9 +69,30 @@ impl JsonExec {
return None;
}

let reader = match JsonLineReader::from_path(p) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
let reader = if run_async {
JsonLineReader::new({
#[cfg(feature = "cloud")]
{
match polars_io::file_cache::FILE_CACHE
.get_entry(p.to_str().unwrap())
// Safety: This was initialized by schema inference.
.unwrap()
.try_open_assume_latest()
{
Ok(v) => v,
Err(e) => return Some(Err(e)),
}
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
})
} else {
match JsonLineReader::from_path(p) {
Ok(r) => r,
Err(e) => return Some(Err(e)),
}
};

let row_index = self.file_scan_options.row_index.as_mut();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ fn create_physical_plan_impl(
metadata,
))),
#[cfg(feature = "json")]
FileScan::NDJson { options } => Ok(Box::new(executors::JsonExec::new(
FileScan::NDJson { options, .. } => Ok(Box::new(executors::JsonExec::new(
paths,
options,
file_options,
Expand Down
38 changes: 23 additions & 15 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,16 @@ pub fn to_alp_impl(
.map_err(|e| e.context(failed_here!(csv scan)))?
},
#[cfg(feature = "json")]
FileScan::NDJson { options } => {
scans::ndjson_file_info(&paths, &file_options, options)
.map_err(|e| e.context(failed_here!(ndjson scan)))?
},
FileScan::NDJson {
options,
cloud_options,
} => scans::ndjson_file_info(
&paths,
&file_options,
options,
cloud_options.as_ref(),
)
.map_err(|e| e.context(failed_here!(ndjson scan)))?,
// FileInfo should be set.
FileScan::Anonymous { .. } => unreachable!(),
}
Expand Down Expand Up @@ -749,21 +755,23 @@ fn expand_scan_paths(
}

{
let paths_expanded = match scan_type {
let paths_expanded = match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet {
ref cloud_options, ..
} => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?,
FileScan::Parquet { cloud_options, .. } => {
expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?
},
#[cfg(feature = "ipc")]
FileScan::Ipc {
ref cloud_options, ..
} => expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?,
FileScan::Ipc { cloud_options, .. } => {
expand_scan_paths_with_hive_update(&lock.0, file_options, cloud_options)?
},
#[cfg(feature = "csv")]
FileScan::Csv {
ref cloud_options, ..
} => expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?,
FileScan::Csv { cloud_options, .. } => {
expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?
},
#[cfg(feature = "json")]
FileScan::NDJson { .. } => expand_paths(&lock.0, file_options.glob, None)?,
FileScan::NDJson { cloud_options, .. } => {
expand_paths(&lock.0, file_options.glob, cloud_options.as_ref())?
},
FileScan::Anonymous { .. } => unreachable!(), // Invariant: Anonymous scans are already expanded.
};

Expand Down
54 changes: 47 additions & 7 deletions crates/polars-plan/src/plans/conversion/scans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use polars_io::RowIndex;

use super::*;

fn get_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> {
fn get_first_path(paths: &[PathBuf]) -> PolarsResult<&PathBuf> {
// Use first path to get schema.
paths
.first()
Expand Down Expand Up @@ -42,7 +42,7 @@ pub(super) fn parquet_file_info(
file_options: &FileScanOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<(FileInfo, Option<FileMetaDataRef>)> {
let path = get_path(paths)?;
let path = get_first_path(paths)?;

let (schema, reader_schema, num_rows, metadata) = if is_cloud_url(path) {
#[cfg(not(feature = "cloud"))]
Expand Down Expand Up @@ -92,7 +92,7 @@ pub(super) fn ipc_file_info(
file_options: &FileScanOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<(FileInfo, arrow::io::ipc::read::FileMetadata)> {
let path = get_path(paths)?;
let path = get_first_path(paths)?;

let metadata = if is_cloud_url(path) {
#[cfg(not(feature = "cloud"))]
Expand Down Expand Up @@ -145,7 +145,7 @@ pub(super) fn csv_file_info(
// * See if we can do this without downloading the entire file

// prints the error message if paths is empty.
let first_path = get_path(paths)?;
let first_path = get_first_path(paths)?;
let run_async = is_cloud_url(first_path) || config::force_async();

let cache_entries = {
Expand Down Expand Up @@ -177,7 +177,7 @@ pub(super) fn csv_file_info(
#[cfg(feature = "cloud")]
{
let entry: &Arc<polars_io::file_cache::FileCacheEntry> =
cache_entries.as_ref().unwrap().get(i).unwrap();
&cache_entries.as_ref().unwrap()[i];
entry.try_open_check_latest()?
}
#[cfg(not(feature = "cloud"))]
Expand Down Expand Up @@ -279,10 +279,50 @@ pub(super) fn ndjson_file_info(
paths: &[PathBuf],
file_options: &FileScanOptions,
ndjson_options: &mut NDJsonReadOptions,
cloud_options: Option<&polars_io::cloud::CloudOptions>,
) -> PolarsResult<FileInfo> {
let path = get_path(paths)?;
use polars_core::config;

let run_async = !paths.is_empty() && is_cloud_url(&paths[0]) || config::force_async();

let f = polars_utils::open_file(path)?;
let cache_entries = {
#[cfg(feature = "cloud")]
{
if run_async {
Some(polars_io::file_cache::init_entries_from_uri_list(
paths
.iter()
.map(|path| Arc::from(path.to_str().unwrap()))
.collect::<Vec<_>>()
.as_slice(),
cloud_options,
)?)
} else {
None
}
}
#[cfg(not(feature = "cloud"))]
{
if run_async {
panic!("required feature `cloud` is not enabled")
}
}
};

let first_path = get_first_path(paths)?;

let f = if run_async {
#[cfg(feature = "cloud")]
{
cache_entries.unwrap()[0].try_open_check_latest()?
}
#[cfg(not(feature = "cloud"))]
{
panic!("required feature `cloud` is not enabled")
}
} else {
polars_utils::open_file(first_path)?
};
let mut reader = std::io::BufReader::new(f);

let (reader_schema, schema) = if let Some(schema) = ndjson_options.schema.take() {
Expand Down
24 changes: 21 additions & 3 deletions crates/polars-plan/src/plans/file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ pub enum FileScan {
metadata: Option<arrow::io::ipc::read::FileMetadata>,
},
#[cfg(feature = "json")]
NDJson { options: NDJsonReadOptions },
NDJson {
options: NDJsonReadOptions,
cloud_options: Option<polars_io::cloud::CloudOptions>,
},
#[cfg_attr(feature = "serde", serde(skip))]
Anonymous {
options: Arc<AnonymousScanOptions>,
Expand Down Expand Up @@ -83,7 +86,16 @@ impl PartialEq for FileScan {
},
) => l == r && c_l == c_r,
#[cfg(feature = "json")]
(FileScan::NDJson { options: l }, FileScan::NDJson { options: r }) => l == r,
(
FileScan::NDJson {
options: l,
cloud_options: c_l,
},
FileScan::NDJson {
options: r,
cloud_options: c_r,
},
) => l == r && c_l == c_r,
_ => false,
}
}
Expand Down Expand Up @@ -122,7 +134,13 @@ impl Hash for FileScan {
cloud_options.hash(state);
},
#[cfg(feature = "json")]
FileScan::NDJson { options } => options.hash(state),
FileScan::NDJson {
options,
cloud_options,
} => {
options.hash(state);
cloud_options.hash(state)
},
FileScan::Anonymous { options, .. } => options.hash(state),
}
}
Expand Down
Loading

0 comments on commit 81846f0

Please sign in to comment.