diff --git a/Cargo.lock b/Cargo.lock index 23dd8bcd..f9d403f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,9 +340,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -1184,6 +1184,7 @@ dependencies = [ "arrow2", "async-compat", "async-stream", + "async-trait", "bytes", "console_error_panic_hook", "futures", @@ -1743,9 +1744,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "7daec296f25a1bae309c0cd5c29c4b260e510e6d813c286b19eaadf409d40fce" dependencies = [ "cfg-if", "serde", @@ -1755,9 +1756,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "e397f4664c0e4e428e8313a469aaa58310d302159845980fd23b0f22a847f217" dependencies = [ "bumpalo", "log", @@ -1782,9 +1783,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "5961017b3b08ad5f3fe39f1e79877f8ee7c23c5e5fd5eb80de95abc41f1f16b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1792,9 +1793,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "c5353b8dab669f5e10f5bd76df26a9360c748f054f862ff5f3f8aae0c7fb3907" dependencies = [ "proc-macro2", "quote", @@ -1805,9 +1806,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "0d046c5d029ba91a1ed14da14dca44b68bf2f124cfbaf741c54151fdb3e0750b" [[package]] name = "wasm-bindgen-test" diff --git a/Cargo.toml b/Cargo.toml index 2c96f5b3..18e69621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,7 +58,7 @@ full = [ ] [dependencies] -wasm-bindgen = { version = "0.2.83", features = ["serde-serialize"] } +wasm-bindgen = { version = "0.2.88", features = ["serde-serialize"] } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires @@ -110,6 +110,7 @@ zstd = { version = "*", features = [ async-compat = { version = "0.2.2", optional = true } async-stream = { version = "0.3.5", optional = true } wasm-streams = { version = "0.3.0", optional = true } +async-trait = "0.1.74" [dependencies.web-sys] version = "0.3.4" @@ -123,6 +124,7 @@ features = [ 'Window', "Document", "Element", + "File" ] [dev-dependencies] diff --git a/src/arrow1/error.rs b/src/arrow1/error.rs index 20c4bc39..6be6f4b3 100644 --- a/src/arrow1/error.rs +++ b/src/arrow1/error.rs @@ -10,7 +10,8 @@ pub enum ParquetWasmError { #[error(transparent)] ParquetError(Box), - + #[error("Column {0} not found in table")] + UnknownColumn(String), #[cfg(feature = "async")] #[error("HTTP error: `{0}`")] HTTPError(Box), diff --git a/src/arrow1/metadata.rs b/src/arrow1/metadata.rs new file mode 100644 index 00000000..28372495 --- /dev/null +++ b/src/arrow1/metadata.rs @@ -0,0 +1,230 @@ +use wasm_bindgen::prelude::*; + +/// Global Parquet metadata. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct ParquetMetaData(parquet::file::metadata::ParquetMetaData); + +#[wasm_bindgen] +impl ParquetMetaData { + /// Returns file metadata as reference. + #[wasm_bindgen(js_name = fileMetadata)] + pub fn file_metadata(&self) -> FileMetaData { + self.0.file_metadata().clone().into() + } + + /// Returns number of row groups in this file. + #[wasm_bindgen(js_name = numRowGroups)] + pub fn num_row_groups(&self) -> usize { + self.0.num_row_groups() + } + + /// Returns row group metadata for `i`th position. + /// Position should be less than number of row groups `num_row_groups`. + #[wasm_bindgen(js_name = rowGroup)] + pub fn row_group(&self, i: usize) -> RowGroupMetaData { + self.0.row_group(i).clone().into() + } + + // /// Returns the column index for this file if loaded + // pub fn column_index(&self) -> Option { + // self.0.column_index() + // } +} + +impl From for ParquetMetaData { + fn from(value: parquet::file::metadata::ParquetMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::ParquetMetaData { + fn from(value: ParquetMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet file. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct FileMetaData(parquet::file::metadata::FileMetaData); + +#[wasm_bindgen] +impl FileMetaData { + /// Returns version of this file. + #[wasm_bindgen] + pub fn version(&self) -> i32 { + self.0.version() + } + + /// Returns number of rows in the file. + #[wasm_bindgen(js_name = numRows)] + pub fn num_rows(&self) -> i64 { + self.0.num_rows() + } + + /// String message for application that wrote this file. + /// + /// This should have the following format: + /// ` version (build )`. + /// + /// ```shell + /// parquet-mr version 1.8.0 (build 0fda28af84b9746396014ad6a415b90592a98b3b) + /// ``` + #[wasm_bindgen(js_name = createdBy)] + pub fn created_by(&self) -> Option { + let s = self.0.created_by()?; + Some(s.to_string()) + } + + /// Returns key_value_metadata of this file. + #[wasm_bindgen(js_name = keyValueMetadata)] + pub fn key_value_metadata(&self) -> Result { + let map = js_sys::Map::new(); + if let Some(metadata) = self.0.key_value_metadata() { + for meta in metadata { + if let Some(value) = &meta.value { + map.set(&JsValue::from_str(&meta.key), &JsValue::from_str(value)); + } + } + } + Ok(map) + } +} + +impl From for FileMetaData { + fn from(value: parquet::file::metadata::FileMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::FileMetaData { + fn from(value: FileMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet row group. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct RowGroupMetaData(parquet::file::metadata::RowGroupMetaData); + +#[wasm_bindgen] +impl RowGroupMetaData { + /// Number of columns in this row group. + #[wasm_bindgen(js_name = numColumns)] + pub fn num_columns(&self) -> usize { + self.0.num_columns() + } + + /// Returns column chunk metadata for `i`th column. + #[wasm_bindgen] + pub fn column(&self, i: usize) -> ColumnChunkMetaData { + self.0.column(i).clone().into() + } + + /// Number of rows in this row group. + #[wasm_bindgen(js_name = numRows)] + pub fn num_rows(&self) -> i64 { + self.0.num_rows() + } + + /// Total byte size of all uncompressed column data in this row group. + #[wasm_bindgen(js_name = totalByteSize)] + pub fn total_byte_size(&self) -> i64 { + self.0.total_byte_size() + } + + /// Total size of all compressed column data in this row group. + #[wasm_bindgen(js_name = compressedSize)] + pub fn compressed_size(&self) -> i64 { + self.0.compressed_size() + } +} + +impl From for RowGroupMetaData { + fn from(value: parquet::file::metadata::RowGroupMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::RowGroupMetaData { + fn from(value: RowGroupMetaData) -> Self { + value.0 + } +} + +/// Metadata for a Parquet column chunk. +#[derive(Debug, Clone)] +#[wasm_bindgen] +pub struct ColumnChunkMetaData(parquet::file::metadata::ColumnChunkMetaData); + +#[wasm_bindgen] +impl ColumnChunkMetaData { + /// File where the column chunk is stored. + /// + /// If not set, assumed to belong to the same file as the metadata. + /// This path is relative to the current file. + #[wasm_bindgen(js_name = filePath)] + pub fn file_path(&self) -> Option { + self.0.file_path().map(|s| s.to_string()) + } + + /// Byte offset in `file_path()`. + #[wasm_bindgen(js_name = fileOffset)] + pub fn file_offset(&self) -> i64 { + self.0.file_offset() + } + + // /// Type of this column. Must be primitive. + // pub fn column_type(&self) -> Type { + // self.column_descr.physical_type() + // } + + /// Path (or identifier) of this column. + #[wasm_bindgen(js_name = columnPath)] + pub fn column_path(&self) -> Vec { + let path = self.0.column_path(); + path.parts().to_vec() + } + + // /// All encodings used for this column. + // pub fn encodings(&self) -> &Vec { + // &self.encodings + // } + + /// Total number of values in this column chunk. + #[wasm_bindgen(js_name = numValues)] + pub fn num_values(&self) -> i64 { + self.0.num_values() + } + + // /// Compression for this column. + // pub fn compression(&self) -> Compression { + // self.compression + // } + + /// Returns the total compressed data size of this column chunk. + #[wasm_bindgen(js_name = compressedSize)] + pub fn compressed_size(&self) -> i64 { + self.0.compressed_size() + } + + /// Returns the total uncompressed data size of this column chunk. + #[wasm_bindgen(js_name = uncompressedSize)] + pub fn uncompressed_size(&self) -> i64 { + self.0.uncompressed_size() + } +} + +impl From for ColumnChunkMetaData { + fn from(value: parquet::file::metadata::ColumnChunkMetaData) -> Self { + Self(value) + } +} + +impl From for parquet::file::metadata::ColumnChunkMetaData { + fn from(value: ColumnChunkMetaData) -> Self { + value.0 + } +} diff --git a/src/arrow1/mod.rs b/src/arrow1/mod.rs index e9edc85c..34682629 100644 --- a/src/arrow1/mod.rs +++ b/src/arrow1/mod.rs @@ -1,15 +1,11 @@ +pub mod error; +pub mod metadata; #[cfg(feature = "reader")] pub mod reader; - +#[cfg(all(feature = "reader", feature = "async"))] +pub mod reader_async; pub mod wasm; - #[cfg(feature = "writer")] pub mod writer; - #[cfg(feature = "writer")] pub mod writer_properties; - -pub mod error; - -#[cfg(all(feature = "reader", feature = "async"))] -pub mod reader_async; diff --git a/src/arrow1/reader_async.rs b/src/arrow1/reader_async.rs index bef91365..cfdf6cec 100644 --- a/src/arrow1/reader_async.rs +++ b/src/arrow1/reader_async.rs @@ -1,15 +1,621 @@ +//! An asynchronous Parquet reader that is able to read and inspect remote files without +//! downloading them in entirety. + +use futures::channel::oneshot; +use futures::future::BoxFuture; +use parquet::arrow::ProjectionMask; +use parquet::schema::types::SchemaDescriptor; +use std::ops::Range; use std::sync::Arc; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::spawn_local; -use crate::arrow1::error::Result; -use crate::common::fetch::{create_reader, get_content_length}; +use crate::arrow1::error::{ParquetWasmError, Result, WasmResult}; +use crate::common::fetch::{ + create_reader, get_content_length, range_from_end, range_from_start_and_length, +}; use arrow::ipc::writer::StreamWriter; -use futures::StreamExt; -use parquet::arrow::async_reader::{ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder}; +use arrow_wasm::arrow1::{RecordBatch, Table}; +use bytes::Bytes; +use futures::TryStreamExt; +use futures::{stream, FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::ArrowReaderMetadata; +use parquet::arrow::async_reader::{ + AsyncFileReader, ParquetRecordBatchStream, ParquetRecordBatchStreamBuilder, +}; use async_compat::{Compat, CompatExt}; -use parquet::file::metadata::FileMetaData; +use parquet::file::footer::{decode_footer, decode_metadata}; +use parquet::file::metadata::{FileMetaData, ParquetMetaData}; use range_reader::RangedAsyncReader; +use reqwest::Client; + +use async_trait::async_trait; + +#[async_trait(?Send)] +trait SharedIO { + fn generate_builder( + reader: &T, + meta: &ArrowReaderMetadata, + batch_size: &usize, + projection_mask: &Option, + ) -> ParquetRecordBatchStreamBuilder { + let builder = + ParquetRecordBatchStreamBuilder::new_with_metadata(reader.clone(), meta.clone()) + .with_batch_size(*batch_size) + .with_projection( + projection_mask + .as_ref() + .unwrap_or(&ProjectionMask::all()) + .clone(), + ); + builder + } + + async fn inner_read_row_group( + &self, + reader: &T, + meta: &ArrowReaderMetadata, + batch_size: &usize, + projection_mask: &Option, + i: usize, + ) -> Result { + let builder = Self::generate_builder(reader, meta, batch_size, projection_mask); + let stream = builder.with_row_groups(vec![i]).build()?; + let results = stream.try_collect::>().await.unwrap(); + + // NOTE: This is not only one batch by default due to arrow-rs's default rechunking. + // assert_eq!(results.len(), 1, "Expected one record batch"); + // Ok(RecordBatch::new(results.pop().unwrap())) + Ok(Table::new(results)) + } + + async fn inner_stream( + &self, + concurrency: Option, + meta: &ArrowReaderMetadata, + reader: &T, + batch_size: &usize, + projection_mask: &Option, + ) -> WasmResult { + use futures::StreamExt; + let concurrency = concurrency.unwrap_or(1); + let meta = meta.clone(); + let reader = reader.clone(); + let batch_size = *batch_size; + let num_row_groups = meta.metadata().num_row_groups(); + let projection_mask = projection_mask.clone(); + let buffered_stream = stream::iter((0..num_row_groups).map(move |i| { + let builder = Self::generate_builder(&reader, &meta, &batch_size, &projection_mask) + .with_row_groups(vec![i]); + builder.build().unwrap().try_collect::>() + })) + .buffered(concurrency); + let out_stream = buffered_stream.flat_map(|maybe_record_batches| { + stream::iter(maybe_record_batches.unwrap()) + .map(|record_batch| Ok(RecordBatch::new(record_batch).into())) + }); + Ok(wasm_streams::ReadableStream::from_stream(out_stream).into_raw()) + } +} + +#[wasm_bindgen] +pub struct AsyncParquetFile { + reader: HTTPFileReader, + meta: ArrowReaderMetadata, + batch_size: usize, + projection_mask: Option, +} + +impl SharedIO for AsyncParquetFile {} + +#[wasm_bindgen] +impl AsyncParquetFile { + #[wasm_bindgen(constructor)] + pub async fn new(url: String) -> WasmResult { + let client = Client::new(); + let mut reader = HTTPFileReader::new(url.clone(), client.clone(), 1024); + let meta = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; + Ok(Self { + reader, + meta, + projection_mask: None, + batch_size: 1024, + }) + } + + #[wasm_bindgen(js_name = withBatchSize)] + pub fn with_batch_size(self, batch_size: usize) -> Self { + Self { batch_size, ..self } + } + + #[wasm_bindgen(js_name = selectColumns)] + pub fn select_columns(self, columns: Vec) -> WasmResult { + let pq_schema = self.meta.parquet_schema(); + let projection_mask = Some(generate_projection_mask(columns, pq_schema)?); + Ok(Self { + projection_mask, + ..self + }) + } + + #[wasm_bindgen] + pub fn metadata(&self) -> WasmResult { + Ok(self.meta.metadata().as_ref().to_owned().into()) + } + + #[wasm_bindgen(js_name = readRowGroup)] + pub async fn read_row_group(&self, i: usize) -> WasmResult
{ + let inner = self + .inner_read_row_group( + &self.reader, + &self.meta, + &self.batch_size, + &self.projection_mask, + i, + ) + .await + .unwrap(); + Ok(inner) + } + + #[wasm_bindgen] + pub async fn stream( + &self, + concurrency: Option, + ) -> WasmResult { + self.inner_stream( + concurrency, + &self.meta, + &self.reader, + &self.batch_size, + &self.projection_mask, + ) + .await + } +} + +#[derive(Debug, Clone)] +pub struct HTTPFileReader { + url: String, + client: Client, + coalesce_byte_size: usize, +} + +impl HTTPFileReader { + pub fn new(url: String, client: Client, coalesce_byte_size: usize) -> Self { + Self { + url, + client, + coalesce_byte_size, + } + } +} + +impl AsyncFileReader for HTTPFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + async move { + let range_str = + range_from_start_and_length(range.start as u64, (range.end - range.start) as u64); + + // Map reqwest error to parquet error + // let map_err = |err| parquet::errors::ParquetError::External(Box::new(err)); + + let bytes = make_range_request_with_client( + self.url.to_string(), + self.client.clone(), + range_str, + ) + .await + .unwrap(); + + Ok(bytes) + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let fetch_ranges = merge_ranges(&ranges, self.coalesce_byte_size); + + // NOTE: This still does _sequential_ requests, but it should be _fewer_ requests if they + // can be merged. + async move { + let mut fetched = Vec::with_capacity(ranges.len()); + + for range in fetch_ranges.iter() { + let data = self.get_bytes(range.clone()).await?; + fetched.push(data); + } + + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + fetch_bytes.slice(start..end) + }) + .collect()) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + let meta = fetch_parquet_metadata(self.url.as_str(), &self.client, None).await?; + Ok(Arc::new(meta)) + } + .boxed() + } +} + +/// Safety: Do not use this in a multi-threaded environment, +/// (transitively depends on !Send web_sys::File) +#[wasm_bindgen] +pub struct AsyncParquetLocalFile { + reader: JsFileReader, + meta: ArrowReaderMetadata, + batch_size: usize, + projection_mask: Option, +} + +impl SharedIO for AsyncParquetLocalFile {} + +#[wasm_bindgen] +impl AsyncParquetLocalFile { + #[wasm_bindgen(constructor)] + pub async fn new(handle: web_sys::File) -> WasmResult { + let mut reader = JsFileReader::new(handle, 1024); + let meta = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; + Ok(Self { + reader, + meta, + batch_size: 1024, + projection_mask: None, + }) + } + + #[wasm_bindgen(js_name = withBatchSize)] + pub fn with_batch_size(self, batch_size: usize) -> Self { + Self { batch_size, ..self } + } + + #[wasm_bindgen(js_name = selectColumns)] + pub fn select_columns(self, columns: Vec) -> WasmResult { + let pq_schema = self.meta.parquet_schema(); + let projection_mask = Some(generate_projection_mask(columns, pq_schema)?); + Ok(Self { + projection_mask, + ..self + }) + } + + #[wasm_bindgen] + pub fn metadata(&self) -> WasmResult { + Ok(self.meta.metadata().as_ref().to_owned().into()) + } + + #[wasm_bindgen(js_name = readRowGroup)] + pub async fn read_row_group(&self, i: usize) -> WasmResult
{ + let inner = self + .inner_read_row_group( + &self.reader, + &self.meta, + &self.batch_size, + &self.projection_mask, + i, + ) + .await + .unwrap(); + Ok(inner) + } + + #[wasm_bindgen] + pub async fn stream( + &self, + concurrency: Option, + ) -> WasmResult { + self.inner_stream( + concurrency, + &self.meta, + &self.reader, + &self.batch_size, + &self.projection_mask, + ) + .await + } +} + +#[derive(Debug, Clone)] +struct WrappedFile { + inner: web_sys::File, + pub size: f64, +} +/// Safety: This is not in fact thread-safe. Do not attempt to use this in work-stealing +/// async runtimes / multi-threaded environments +/// +/// web_sys::File objects, like all JSValues, are !Send (even in JS, there's +/// maybe ~5 Transferable types), and eventually boil down to PhantomData<*mut u8>. +/// Any struct that holds one is inherently !Send, which disqualifies it from being used +/// with the AsyncFileReader trait. +unsafe impl Send for WrappedFile {} + +impl WrappedFile { + pub fn new(inner: web_sys::File) -> Self { + let size = inner.size(); + Self { inner, size } + } + pub async fn get_bytes(&mut self, range: Range) -> Vec { + use js_sys::Uint8Array; + use wasm_bindgen_futures::JsFuture; + let (sender, receiver) = oneshot::channel(); + let file = self.inner.clone(); + spawn_local(async move { + let subset_blob = file + .slice_with_i32_and_i32( + range.start.try_into().unwrap(), + range.end.try_into().unwrap(), + ) + .unwrap(); + let buf = JsFuture::from(subset_blob.array_buffer()).await.unwrap(); + let out_vec = Uint8Array::new_with_byte_offset(&buf, 0).to_vec(); + sender.send(out_vec).unwrap(); + }); + + receiver.await.unwrap() + } +} + +#[derive(Debug, Clone)] +pub struct JsFileReader { + file: WrappedFile, + coalesce_byte_size: usize, +} + +impl JsFileReader { + pub fn new(file: web_sys::File, coalesce_byte_size: usize) -> Self { + Self { + file: WrappedFile::new(file), + coalesce_byte_size, + } + } +} + +impl AsyncFileReader for JsFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + async move { + let (sender, receiver) = oneshot::channel(); + let mut file = self.file.clone(); + spawn_local(async move { + let result: Bytes = file.get_bytes(range).await.into(); + sender.send(result).unwrap() + }); + let data = receiver.await.unwrap(); + Ok(data) + } + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let fetch_ranges = merge_ranges(&ranges, self.coalesce_byte_size); + + // NOTE: This still does _sequential_ requests, but it should be _fewer_ requests if they + // can be merged. + // Assuming that we have a file on the local file system, these fetches should be + // _relatively_ fast + async move { + let mut fetched = Vec::with_capacity(ranges.len()); + + for range in fetch_ranges.iter() { + let data = self.get_bytes(range.clone()).await?; + fetched.push(data); + } + + Ok(ranges + .iter() + .map(|range| { + // a given range CAN span two coalesced row group sets. + // log!("Range: {:?} Actual length: {:?}", range.end - range.start, res.len()); + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + + let start = range.start - fetch_range.start; + let end = range.end - fetch_range.start; + fetch_bytes.slice(start..end) + }) + .collect()) + } + .boxed() + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + // we only *really* need the last 8 bytes to determine the location of the metadata bytes + let file_size: usize = (self.file.size as i64).try_into().unwrap(); + // we already know the size of the file! + let suffix_range: Range = (file_size - 8)..file_size; + let suffix = self.get_bytes(suffix_range).await.unwrap(); + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + let metadata_byte_length = decode_footer(&footer)?; + // Did not fetch the entire file metadata in the initial read, need to make a second request + let meta = if metadata_byte_length > suffix_len - 8 { + // might want to figure out how to get get_bytes to accept a one-sided range + let meta_range = (file_size - metadata_byte_length - 8)..file_size; + + let meta_bytes = self.get_bytes(meta_range).await.unwrap(); + + decode_metadata(&meta_bytes[0..meta_bytes.len() - 8])? + } else { + let metadata_start = suffix_len - metadata_byte_length - 8; + + let slice = &suffix[metadata_start..suffix_len - 8]; + decode_metadata(slice)? + }; + Ok(Arc::new(meta)) + } + .boxed() + } +} + +pub async fn make_range_request_with_client( + url: String, + client: Client, + range_str: String, +) -> std::result::Result { + let (sender, receiver) = oneshot::channel(); + spawn_local(async move { + let resp = client + .get(url) + .header("Range", range_str) + .send() + .await + .unwrap() + .error_for_status() + .unwrap(); + let bytes = resp.bytes().await.unwrap(); + sender.send(bytes).unwrap(); + }); + let data = receiver.await.unwrap(); + Ok(data) +} + +/// Returns a sorted list of ranges that cover `ranges` +/// +/// Copied from object-store +/// https://github.com/apache/arrow-rs/blob/61da64a0557c80af5bb43b5f15c6d8bb6a314cb2/object_store/src/util.rs#L132C1-L169C1 +fn merge_ranges(ranges: &[Range], coalesce: usize) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|range| range.start); + + let mut ret = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + let start = ranges[start_idx].start; + let end = range_end; + ret.push(start..end); + + start_idx = end_idx; + end_idx += 1; + } + + ret +} + +// Derived from: +// https://github.com/apache/arrow-rs/blob/61da64a0557c80af5bb43b5f15c6d8bb6a314cb2/parquet/src/arrow/async_reader/metadata.rs#L54-L57 +pub async fn fetch_parquet_metadata( + url: &str, + client: &Client, + prefetch: Option, +) -> parquet::errors::Result { + let suffix_length = prefetch.unwrap_or(8); + let range_str = range_from_end(suffix_length as u64); + + // Map reqwest error to parquet error + // let map_err = |err| parquet::errors::ParquetError::External(Box::new(err)); + + let suffix = make_range_request_with_client(url.to_string(), client.clone(), range_str) + .await + .unwrap(); + let suffix_len = suffix.len(); + + let mut footer = [0; 8]; + footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]); + + let metadata_byte_length = decode_footer(&footer)?; + + // Did not fetch the entire file metadata in the initial read, need to make a second request + let metadata = if metadata_byte_length > suffix_len - 8 { + let metadata_range_str = range_from_end((metadata_byte_length + 8) as u64); + + let meta_bytes = + make_range_request_with_client(url.to_string(), client.clone(), metadata_range_str) + .await + .unwrap(); + + decode_metadata(&meta_bytes[0..meta_bytes.len() - 8])? + } else { + let metadata_start = suffix_len - metadata_byte_length - 8; + + let slice = &suffix[metadata_start..suffix_len - 8]; + decode_metadata(slice)? + }; + + Ok(metadata) +} + +fn generate_projection_mask( + columns: Vec, + pq_schema: &SchemaDescriptor, +) -> Result { + let col_paths = pq_schema + .columns() + .iter() + .map(|col| col.path().string()) + .collect::>(); + let indices: Vec = columns + .iter() + .map(|col| { + let field_indices: Vec = col_paths + .iter() + .enumerate() + .filter(|(_idx, path)| { + // identical OR the path starts with the column AND the substring is immediately followed by the + // path separator + path.to_string() == col.clone() + || path.starts_with(col) && { + let left_index = path.find(col).unwrap(); + path.chars().nth(left_index + col.len()).unwrap() == '.' + } + }) + .map(|(idx, _)| idx) + .collect(); + if field_indices.is_empty() { + Err(ParquetWasmError::UnknownColumn(col.clone())) + } else { + Ok(field_indices) + } + }) + .collect::>>>()? + .into_iter() + .flatten() + .collect(); + let projection_mask = ProjectionMask::leaves(pq_schema, indices); + Ok(projection_mask) +} pub async fn read_metadata_async( url: String, diff --git a/src/common/fetch.rs b/src/common/fetch.rs index 36ced20d..0fc86f6c 100644 --- a/src/common/fetch.rs +++ b/src/common/fetch.rs @@ -25,8 +25,10 @@ pub async fn get_content_length(url: String) -> Result { /// Construct range header from start and length pub fn range_from_start_and_length(start: u64, length: u64) -> String { - // TODO: should this be start + length - 1? - format!("bytes={}-{}", start, start + length) + // Subtract 1 from length because end is inclusive + // > bytes units ... are offsets (zero-indexed & inclusive) + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range + format!("bytes={}-{}", start, start + length - 1) } pub fn range_from_start(start: u64) -> String { diff --git a/www/async_testing.js b/www/async_testing.js index f99e7b91..b99141ca 100644 --- a/www/async_testing.js +++ b/www/async_testing.js @@ -1,13 +1,36 @@ import * as arrow from "@apache-arrow/es2015-cjs/Arrow.dom"; import * as wasm from "parquet-wasm/bundler/arrow2"; - +import * as wasmArrow1 from "parquet-wasm/bundler/arrow1"; wasm.setPanicHook(); window.wasm = wasm; // const url = 'https://raw.githubusercontent.com/opengeospatial/geoparquet/main/examples/example.parquet'; const url = "https://raw.githubusercontent.com/kylebarron/parquet-wasm/main/tests/data/2-partition-brotli.parquet"; window.url = url; - +ReadableStream.prototype[Symbol.asyncIterator] = async function* () { + const reader = this.getReader() + try { + while (true) { + const { done, value } = await reader.read() + if (done) return + yield value + } + } + finally { + reader.releaseLock() + } +} +async function readParquetFile() { + const blobResult = (await fetch(url)).blob(); + // a bit pointless, but definitely a file. + const simulatedFile = new File([blobResult], '2-partition-brotli.parquet', {type: 'application/vnd.apache.parquet'}) + const instance = await new wasmArrow1.AsyncParquetLocalFile(simulatedFile); + for await (const chunk of instance.stream()) { + console.log(chunk); + console.log(chunk.length); + chunk.free(); + } +} async function main() { console.log("hello world"); const resp = await fetch(url, { method: "HEAD" }); @@ -34,7 +57,9 @@ async function main() { const table = new arrow.Table(tables); window.table = table; console.log("table", table); - + console.log('file IO'); + await readParquetFile(); + console.log('file IO'); console.log("end of js"); }