Skip to content

Commit

Permalink
feat: support MOR read-optimized query (#259)
Browse files Browse the repository at this point in the history
Add a read config `hoodie.read.use.read_optimized.mode` to allow performing read-optimized queries for MOR table.
  • Loading branch information
xushiyan authored Jan 22, 2025
1 parent b658a98 commit af997b7
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 41 deletions.
72 changes: 64 additions & 8 deletions crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::FromStr;

use strum_macros::EnumIter;

use crate::config::error::ConfigError::{NotFound, ParseInt};
use crate::config::error::ConfigError::{NotFound, ParseBool, ParseInt};
use crate::config::Result;
use crate::config::{ConfigParser, HudiConfigValue};

Expand Down Expand Up @@ -52,6 +52,10 @@ pub enum HudiReadConfig {

/// Parallelism for listing files on storage.
ListingParallelism,

/// When set to true, only [BaseFile]s will be read for optimized reads.
/// This is only applicable to Merge-On-Read (MOR) tables.
UseReadOptimizedMode,
}

impl AsRef<str> for HudiReadConfig {
Expand All @@ -60,6 +64,7 @@ impl AsRef<str> for HudiReadConfig {
Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
Self::UseReadOptimizedMode => "hoodie.read.use.read_optimized.mode",
}
}
}
Expand All @@ -71,6 +76,7 @@ impl ConfigParser for HudiReadConfig {
match self {
HudiReadConfig::InputPartitions => Some(HudiConfigValue::UInteger(0usize)),
HudiReadConfig::ListingParallelism => Some(HudiConfigValue::UInteger(10usize)),
HudiReadConfig::UseReadOptimizedMode => Some(HudiConfigValue::Boolean(false)),
_ => None,
}
}
Expand All @@ -93,32 +99,82 @@ impl ConfigParser for HudiReadConfig {
usize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
Self::UseReadOptimizedMode => get_result
.and_then(|v| {
bool::from_str(v).map_err(|e| ParseBool(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::Boolean),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::config::read::HudiReadConfig::InputPartitions;
use crate::config::read::HudiReadConfig::{
InputPartitions, ListingParallelism, UseReadOptimizedMode,
};

#[test]
fn parse_valid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(), "100".to_string())]);
let value = InputPartitions.parse_value(&options).unwrap().to::<usize>();
assert_eq!(value, 100);
let options = HashMap::from([
(InputPartitions.as_ref().to_string(), "100".to_string()),
(ListingParallelism.as_ref().to_string(), "100".to_string()),
(
UseReadOptimizedMode.as_ref().to_string(),
"true".to_string(),
),
]);
assert_eq!(
InputPartitions.parse_value(&options).unwrap().to::<usize>(),
100
);
assert_eq!(
ListingParallelism
.parse_value(&options)
.unwrap()
.to::<usize>(),
100
);
assert!(UseReadOptimizedMode
.parse_value(&options)
.unwrap()
.to::<bool>());
}

#[test]
fn parse_invalid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(), "foo".to_string())]);
let value = InputPartitions.parse_value(&options);
assert!(matches!(value.unwrap_err(), ParseInt(_, _, _)));
let options = HashMap::from([
(InputPartitions.as_ref().to_string(), "foo".to_string()),
(ListingParallelism.as_ref().to_string(), "_100".to_string()),
(UseReadOptimizedMode.as_ref().to_string(), "1".to_string()),
]);
assert!(matches!(
InputPartitions.parse_value(&options).unwrap_err(),
ParseInt(_, _, _)
));
assert_eq!(
InputPartitions
.parse_value_or_default(&options)
.to::<usize>(),
0
);
assert!(matches!(
ListingParallelism.parse_value(&options).unwrap_err(),
ParseInt(_, _, _)
));
assert_eq!(
ListingParallelism
.parse_value_or_default(&options)
.to::<usize>(),
10
);
assert!(matches!(
UseReadOptimizedMode.parse_value(&options).unwrap_err(),
ParseBool(_, _, _)
));
assert!(!UseReadOptimizedMode
.parse_value_or_default(&options)
.to::<bool>(),)
}
}
93 changes: 71 additions & 22 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ mod fs_view;
mod listing;
pub mod partition;

use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::expr::filter::{Filter, FilterField};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
Expand Down Expand Up @@ -141,11 +140,25 @@ impl Table {
.await
}

pub fn base_url(&self) -> Result<Url> {
#[inline]
pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath);
self.hudi_configs
.get(HudiTableConfig::BasePath)?
.get(HudiTableConfig::BasePath)
.expect(&err_msg)
.to_url()
.map_err(CoreError::from)
.expect(&err_msg)
}

#[inline]
pub fn table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType);
let table_type = self
.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
.to::<String>();
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}

#[inline]
Expand Down Expand Up @@ -176,16 +189,6 @@ impl Table {
.register_object_store(runtime_env.clone());
}

pub fn get_table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType);
let table_type = self
.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
.to::<String>();
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}

/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
Expand Down Expand Up @@ -289,11 +292,21 @@ impl Table {
///
/// If the [AsOfTimestamp] configuration is set, the records at the specified timestamp will be returned.
pub async fn read_snapshot(&self, filters: &[Filter]) -> Result<Vec<RecordBatch>> {
let read_optimized_mode = self
.hudi_configs
.get_or_default(UseReadOptimizedMode)
.to::<bool>();

if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str(), filters)
.await
self.read_snapshot_as_of(
timestamp.to::<String>().as_str(),
filters,
read_optimized_mode,
)
.await
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.read_snapshot_as_of(timestamp, filters).await
self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)
.await
} else {
Ok(Vec::new())
}
Expand All @@ -304,10 +317,12 @@ impl Table {
&self,
timestamp: &str,
filters: &[Filter],
read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp, filters).await?;
let fg_reader = self.create_file_group_reader();
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;
let base_file_only =
read_optimized_mode || self.table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range = InstantRange::up_to(timestamp, &timezone);
let batches = futures::future::try_join_all(
Expand Down Expand Up @@ -351,7 +366,9 @@ impl Table {
];
let fg_reader =
self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;

// Read-optimized mode does not apply to incremental query semantics.
let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range =
InstantRange::within_open_closed(start_timestamp, end_timestamp, &timezone);
Expand Down Expand Up @@ -416,7 +433,7 @@ mod tests {
/// Test helper to get relative file paths from the table with filters.
async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url()?;
let base_url = table.base_url();
for f in table.get_file_slices(filters).await? {
let relative_path = f.base_file_relative_path()?;
let file_url = join_url_segments(&base_url, &[relative_path.as_str()])?;
Expand Down Expand Up @@ -946,6 +963,36 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor();
let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
.iter()
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let latest_commit = commit_timestamps.last().unwrap();
let records = hudi_table
.read_snapshot_as_of(latest_commit, &[], true)
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![
(1, "Alice", true), // this was updated to false in a log file and not to be read out
(2, "Bob", false),
(3, "Carol", true),
(4, "Diana", true), // this was inserted in a base file and should be read out
]
);
Ok(())
}

#[tokio::test]
async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
Expand Down Expand Up @@ -977,7 +1024,9 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
let records = hudi_table.read_snapshot_as_of(first_commit, &[]).await?;
let records = hudi_table
.read_snapshot_as_of(first_commit, &[], false)
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

Expand Down
13 changes: 2 additions & 11 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,7 @@ impl TableProvider for HudiDataSource {
.get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice())
.await
.map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?;
let base_url = self.table.base_url().map_err(|e| {
Execution(format!(
"Failed to get base path config from Hudi table: {e:?}"
))
})?;
let base_url = self.table.base_url();
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
for file_slice_vec in file_slices {
let mut parquet_file_group_vec = Vec::new();
Expand All @@ -204,12 +200,7 @@ impl TableProvider for HudiDataSource {
parquet_file_groups.push(parquet_file_group_vec)
}

let base_url = self.table.base_url().map_err(|e| {
Execution(format!(
"Failed to get base path config from Hudi table: {}",
e
))
})?;
let base_url = self.table.base_url();
let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
let fsc = FileScanConfig::new(url, self.schema())
.with_file_groups(parquet_file_groups)
Expand Down

0 comments on commit af997b7

Please sign in to comment.