Skip to content

Commit

Permalink
feat: support column stats provider for PaquetTable (parquet_rs) (dat…
Browse files Browse the repository at this point in the history
…abendlabs#12809)

* parquet_rs: read meta when create parquet table and collect stats.

* Store parquet meta in memory after creating table.

* Lazy read parquet metas.

* Skip create stats provider if is copy.

* Fix deadlock.

* Fix comment.

* Fix leaf fields.

* Satisfy comment.

* Fix clippy.

* Fix unit tests for leaf fields.

---------

Co-authored-by: sundyli <[email protected]>
  • Loading branch information
RinChanNOWWW and sundy-li authored Sep 13, 2023
1 parent 69be928 commit 5261704
Show file tree
Hide file tree
Showing 22 changed files with 627 additions and 215 deletions.
60 changes: 0 additions & 60 deletions src/common/storage/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use std::collections::HashMap;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use common_base::runtime::execute_futures_in_parallel;
use common_base::runtime::GLOBAL_MEM_STAT;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::FieldIndex;
Expand Down Expand Up @@ -79,64 +77,6 @@ pub fn infer_schema_with_extension(meta: &ParquetMetaData) -> Result<ArrowSchema
Ok(arrow_schema)
}

#[allow(dead_code)]
async fn read_parquet_metas_batch(
file_infos: Vec<(String, u64)>,
op: Operator,
max_memory_usage: u64,
) -> Result<Vec<ParquetMetaData>> {
let mut metas = vec![];
for (path, size) in file_infos {
metas.push(read_metadata_async(&path, &op, Some(size)).await?)
}
let used = GLOBAL_MEM_STAT.get_memory_usage();
if max_memory_usage as i64 - used < 100 * 1024 * 1024 {
Err(ErrorCode::Internal(format!(
"not enough memory to load parquet file metas, max_memory_usage = {}, used = {}.",
max_memory_usage, used
)))
} else {
Ok(metas)
}
}

#[async_backtrace::framed]
pub async fn read_parquet_metas_in_parallel(
op: Operator,
file_infos: Vec<(String, u64)>,
thread_nums: usize,
permit_nums: usize,
max_memory_usage: u64,
) -> Result<Vec<ParquetMetaData>> {
let batch_size = 100;
if file_infos.len() <= batch_size {
read_parquet_metas_batch(file_infos, op.clone(), max_memory_usage).await
} else {
let mut chunks = file_infos.chunks(batch_size);

let tasks = std::iter::from_fn(move || {
chunks.next().map(|location| {
read_parquet_metas_batch(location.to_vec(), op.clone(), max_memory_usage)
})
});

let result = execute_futures_in_parallel(
tasks,
thread_nums,
permit_nums,
"read-parquet-metas-worker".to_owned(),
)
.await?
.into_iter()
.collect::<Result<Vec<Vec<_>>>>()?
.into_iter()
.flatten()
.collect();

Ok(result)
}
}

/// Layout of Parquet file
/// +---------------------------+-----+---+
/// | Rest of file | B | A |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod result_scan;
mod stage;

pub use data_source_info::DataSourceInfo;
pub use parquet::FullParquetMeta;
pub use parquet::ParquetTableInfo;
pub use parquet::ParquetTableInfo as ParquetTableInfoV2;
pub use parquet2::Parquet2TableInfo;
Expand Down
39 changes: 39 additions & 0 deletions src/query/catalog/src/plan/datasource/datasource_info/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::io::Cursor;
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use common_base::base::tokio::sync::Mutex;
use common_expression::ColumnId;
use common_expression::TableField;
use common_expression::TableSchema;
use common_meta_app::principal::StageInfo;
use common_meta_app::schema::TableInfo;
use common_storage::StageFileInfo;
use common_storage::StageFilesInfo;
use parquet_rs::file::metadata::ParquetMetaData;
use parquet_rs::format::SchemaElement;
use parquet_rs::schema::types;
use parquet_rs::schema::types::SchemaDescPtr;
use parquet_rs::schema::types::SchemaDescriptor;
use serde::Deserialize;
use storages_common_table_meta::meta::ColumnStatistics;
use thrift::protocol::TCompactInputProtocol;
use thrift::protocol::TCompactOutputProtocol;
use thrift::protocol::TInputProtocol;
Expand All @@ -36,6 +42,20 @@ use thrift::protocol::TType;

use crate::plan::datasource::datasource_info::parquet_read_options::ParquetReadOptions;

#[derive(Clone, Debug)]
pub struct FullParquetMeta {
pub location: String,

pub meta: Arc<ParquetMetaData>,
/// Row group level statistics.
///
/// We collect the statistics here to avoid multiple computations of the same parquet meta.
///
/// The container is organized as:
/// - row_group_level_stats[i][j] is the statistics of the j-th column in the i-th row group of current file.
pub row_group_level_stats: Option<Vec<HashMap<ColumnId, ColumnStatistics>>>,
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
pub struct ParquetTableInfo {
pub read_options: ParquetReadOptions,
Expand All @@ -50,6 +70,19 @@ pub struct ParquetTableInfo {
pub files_to_read: Option<Vec<StageFileInfo>>,
pub schema_from: String,
pub compression_ratio: f64,

// These fields are only used in coordinator node of the cluster,
// so we don't need to serialize them.
#[serde(skip)]
pub leaf_fields: Arc<Vec<TableField>>,
#[serde(skip)]
pub parquet_metas: Arc<Mutex<Vec<Arc<FullParquetMeta>>>>,
#[serde(skip)]
pub need_stats_provider: bool,
#[serde(skip)]
pub max_threads: usize,
#[serde(skip)]
pub max_memory_usage: u64,
}

impl ParquetTableInfo {
Expand Down Expand Up @@ -103,6 +136,7 @@ mod tests {
use std::sync::Arc;

use arrow_schema::Schema as ArrowSchema;
use common_base::base::tokio::sync::Mutex;
use common_storage::StageFilesInfo;
use parquet_rs::basic::ConvertedType;
use parquet_rs::basic::Repetition;
Expand Down Expand Up @@ -172,13 +206,18 @@ mod tests {
pattern: None,
},
table_info: Default::default(),
leaf_fields: Arc::new(vec![]),
arrow_schema: ArrowSchema {
fields: Default::default(),
metadata: Default::default(),
},
files_to_read: None,
schema_from: "".to_string(),
compression_ratio: 0.0,
parquet_metas: Arc::new(Mutex::new(vec![])),
need_stats_provider: false,
max_threads: 1,
max_memory_usage: 10000,
};
let s = serde_json::to_string(&info).unwrap();
let info = serde_json::from_str::<ParquetTableInfo>(&s).unwrap();
Expand Down
35 changes: 31 additions & 4 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ pub enum AppendMode {
pub trait ColumnStatisticsProvider {
// returns the statistics of the given column, if any.
// column_id is just the index of the column in table's schema
fn column_statistics(&self, column_id: ColumnId) -> Option<BasicColumnStatistics>;
fn column_statistics(&self, column_id: ColumnId) -> Option<&BasicColumnStatistics>;
}

pub mod column_stats_provider_impls {
Expand All @@ -434,7 +434,7 @@ pub mod column_stats_provider_impls {
pub struct DummyColumnStatisticsProvider;

impl ColumnStatisticsProvider for DummyColumnStatisticsProvider {
fn column_statistics(&self, _column_id: ColumnId) -> Option<BasicColumnStatistics> {
fn column_statistics(&self, _column_id: ColumnId) -> Option<&BasicColumnStatistics> {
None
}
}
Expand Down Expand Up @@ -479,7 +479,34 @@ impl Parquet2TableColumnStatisticsProvider {
}

impl ColumnStatisticsProvider for Parquet2TableColumnStatisticsProvider {
fn column_statistics(&self, column_id: ColumnId) -> Option<BasicColumnStatistics> {
self.column_stats.get(&column_id).cloned().flatten()
fn column_statistics(&self, column_id: ColumnId) -> Option<&BasicColumnStatistics> {
self.column_stats.get(&column_id).and_then(|s| s.as_ref())
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default)]
pub struct ParquetTableColumnStatisticsProvider {
column_stats: HashMap<ColumnId, Option<BasicColumnStatistics>>,
num_rows: u64,
}

impl ParquetTableColumnStatisticsProvider {
pub fn new(
column_stats: HashMap<ColumnId, Option<BasicColumnStatistics>>,
num_rows: u64,
) -> Self {
Self {
column_stats,
num_rows,
}
}
pub fn num_rows(&self) -> u64 {
self.num_rows
}
}

impl ColumnStatisticsProvider for ParquetTableColumnStatisticsProvider {
fn column_statistics(&self, column_id: ColumnId) -> Option<&BasicColumnStatistics> {
self.column_stats.get(&column_id).and_then(|s| s.as_ref())
}
}
16 changes: 13 additions & 3 deletions src/query/expression/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,9 +779,12 @@ impl TableSchema {
fn collect_in_field(
field: &TableField,
fields: &mut Vec<TableField>,
is_nullable: bool,
next_column_id: &mut ColumnId,
) {
match field.data_type() {
let ty = field.data_type();
let is_nullable = ty.is_nullable() || is_nullable;
match ty.remove_nullable() {
TableDataType::Tuple {
fields_type,
fields_name,
Expand All @@ -794,6 +797,7 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
Expand All @@ -806,6 +810,7 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
Expand All @@ -817,12 +822,17 @@ impl TableSchema {
*next_column_id,
),
fields,
is_nullable,
next_column_id,
);
}
_ => {
*next_column_id += 1;
fields.push(field.clone())
let mut field = field.clone();
if is_nullable {
field.data_type = field.data_type.wrap_nullable();
}
fields.push(field)
}
}
}
Expand All @@ -834,7 +844,7 @@ impl TableSchema {
continue;
}
let mut next_column_id = field.column_id;
collect_in_field(field, &mut fields, &mut next_column_id);
collect_in_field(field, &mut fields, false, &mut next_column_id);
}
fields
}
Expand Down
8 changes: 3 additions & 5 deletions src/query/expression/tests/it/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,8 @@ fn test_schema_from_struct() -> Result<()> {
TableDataType::Number(NumberDataType::UInt64),
),
(
"nullarray",
TableDataType::Nullable(Box::new(TableDataType::Array(Box::new(
TableDataType::Number(NumberDataType::UInt64),
)))),
"nullarray:0",
TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt64))),
),
(
"maparray:key",
Expand Down Expand Up @@ -364,7 +362,7 @@ fn test_schema_from_struct() -> Result<()> {
(3, "tuplearray:1:0"),
(4, "arraytuple:0:0"),
(5, "arraytuple:0:1"),
(6, "nullarray"),
(6, "nullarray:0"),
(7, "maparray:key"),
(8, "maparray:value"),
(9, "nullu64"),
Expand Down
3 changes: 2 additions & 1 deletion src/query/sql/src/planner/binder/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ impl Binder {
.await?
} else {
ParquetRSTable::create(
table_ctx.clone(),
stage_info.clone(),
files_info,
read_options,
Expand Down Expand Up @@ -958,7 +959,7 @@ impl Binder {
if let Some(col_id) = *leaf_index {
let col_stat =
statistics_provider.column_statistics(col_id as ColumnId);
col_stats.insert(*column_index, col_stat);
col_stats.insert(*column_index, col_stat.cloned());
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/query/sql/src/planner/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ impl Planner {
let res = async {
// Step 2: Parse the SQL.
let (mut stmt, format) = parse_sql(&tokens, sql_dialect)?;

if matches!(stmt, Statement::Copy(_)) {
// Indicate binder there is no need to collect column statistics for the binding table.
self.ctx.attach_query_str("Copy".to_string(), String::new());
}

self.replace_stmt(&mut stmt, sql_dialect);

// Step 3: Bind AST with catalog, and generate a pure logical SExpr
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ impl FuseTableColumnStatisticsProvider {
}

impl ColumnStatisticsProvider for FuseTableColumnStatisticsProvider {
fn column_statistics(&self, column_id: ColumnId) -> Option<BasicColumnStatistics> {
self.column_stats.get(&column_id).cloned().flatten()
fn column_statistics(&self, column_id: ColumnId) -> Option<&BasicColumnStatistics> {
self.column_stats.get(&column_id).and_then(|s| s.as_ref())
}
}
2 changes: 2 additions & 0 deletions src/query/storages/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,13 @@ impl IcebergTable {
.map(|f| f.into())
.collect::<Vec<arrow_schema::Field>>();
let arrow_schema = arrow_schema::Schema::new(arrow_fields);
let leaf_fields = Arc::new(table_schema.leaf_fields());

let praquet_reader = Arc::new(ParquetRSReader::create(
ctx.clone(),
self.op.clone(),
table_schema,
leaf_fields,
&arrow_schema,
plan,
ParquetReadOptions::default(),
Expand Down
Loading

0 comments on commit 5261704

Please sign in to comment.