Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for MoR read optimized query #128

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use std::collections::HashMap;
use std::env;
use std::io::{BufRead, BufReader};
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
Expand All @@ -30,13 +29,12 @@ use strum::IntoEnumIterator;
use url::Url;

use HudiInternalConfig::SkipConfigValidation;
use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
use TableTypeValue::CopyOnWrite;
use HudiTableConfig::{DropsPartitionFields, TableVersion};

use crate::config::internal::HudiInternalConfig;
use crate::config::read::HudiReadConfig;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
use crate::storage::utils::{empty_options, parse_uri};
Expand Down Expand Up @@ -174,12 +172,6 @@ impl Table {
hudi_configs.validate(conf)?
}

// additional validation
let table_type = hudi_configs.get(TableType)?.to::<String>();
if TableTypeValue::from_str(&table_type)? != CopyOnWrite {
return Err(anyhow!("Only support copy-on-write table."));
}

let table_version = hudi_configs.get(TableVersion)?.to::<isize>();
if !(5..=6).contains(&table_version) {
return Err(anyhow!("Only support table version 5 and 6."));
Expand Down
108 changes: 78 additions & 30 deletions crates/core/src/table/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::cmp::{Ordering, PartialOrd};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
Expand All @@ -29,6 +30,10 @@ use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
use url::Url;

use crate::config::table::{
HudiTableConfig::TableType,
TableTypeValue::{self, CopyOnWrite, MergeOnRead},
};
use crate::config::HudiConfigs;
use crate::file_group::FileGroup;
use crate::storage::utils::split_filename;
Expand Down Expand Up @@ -91,6 +96,7 @@ impl Instant {
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct Timeline {
table_type: TableTypeValue,
configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
pub instants: Vec<Instant>,
Expand All @@ -103,24 +109,42 @@ impl Timeline {
configs: Arc<HudiConfigs>,
) -> Result<Self> {
let storage = Storage::new(base_url, &storage_options)?;
let instants = Self::load_completed_commits(&storage).await?;
let table_type_value = configs.get(TableType)?.to::<String>();
let table_type = TableTypeValue::from_str(&table_type_value)?;
let instants = Self::load_completed_commits(&storage, &table_type).await?;
Ok(Self {
table_type,
storage,
configs,
instants,
})
}

async fn load_completed_commits(storage: &Storage) -> Result<Vec<Instant>> {
async fn load_completed_commits(
storage: &Storage,
table_type: &TableTypeValue,
) -> Result<Vec<Instant>> {
let mut completed_commits = Vec::new();
for file_info in storage.list_files(Some(".hoodie")).await? {
let (file_stem, file_ext) = split_filename(file_info.name.as_str())?;
if matches!(file_ext.as_str(), "commit" | "replacecommit") {
completed_commits.push(Instant {
state: State::Completed,
timestamp: file_stem,
action: file_ext.to_owned(),
})
match file_ext.as_str() {
"commit" | "replacecommit" if *table_type == CopyOnWrite => {
completed_commits.push(Instant {
state: State::Completed,
timestamp: file_stem,
action: file_ext.to_owned(),
});
}
"commit" | "deltacommit" | "compaction" | "logcompaction" | "replacecommit"
if *table_type == MergeOnRead =>
{
completed_commits.push(Instant {
state: State::Completed,
timestamp: file_stem,
action: file_ext.to_owned(),
});
}
_ => {}
}
}
completed_commits.sort();
Expand Down Expand Up @@ -163,7 +187,13 @@ impl Timeline {
.and_then(|obj| obj.values().next())
.and_then(|value| value.as_array())
.and_then(|arr| arr.first())
.and_then(|first_value| first_value["path"].as_str());
.and_then(|first_value| {
if self.table_type == CopyOnWrite {
first_value["path"].as_str()
} else {
first_value["baseFile"].as_str()
}
});

if let Some(path) = parquet_path {
let parquet_meta = self
Expand Down Expand Up @@ -218,37 +248,37 @@ mod tests {
use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use url::Url;

use hudi_tests::TestTable;

use crate::config::HudiConfigs;
use crate::table::timeline::{Instant, State, Timeline};

#[tokio::test]
async fn timeline_read_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
let timeline = Timeline::new(
async fn get_basic_timeline(base_url: Url, table_type: &str) -> Result<Timeline> {
let mut config = HashMap::new();
config.insert("hoodie.table.type".to_string(), table_type.to_string());
Ok(Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
Arc::new(HudiConfigs::new(config)),
)
.await
.unwrap();
.await?)
}

#[tokio::test]
async fn timeline_read_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
let timeline = get_basic_timeline(base_url, "COPY_ON_WRITE").await.unwrap();
let table_schema = timeline.get_latest_schema().await.unwrap();
assert_eq!(table_schema.fields.len(), 21)
}

#[tokio::test]
async fn timeline_read_latest_schema_from_empty_table() {
let base_url = TestTable::V6Empty.url();
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
let timeline = get_basic_timeline(base_url, "COPY_ON_WRITE").await.unwrap();
let table_schema = timeline.get_latest_schema().await;
assert!(table_schema.is_err());
assert_eq!(
Expand All @@ -263,13 +293,7 @@ mod tests {
canonicalize(Path::new("tests/data/timeline/commits_stub")).unwrap(),
)
.unwrap();
let timeline = Timeline::new(
Arc::new(base_url),
Arc::new(HashMap::new()),
Arc::new(HudiConfigs::empty()),
)
.await
.unwrap();
let timeline = get_basic_timeline(base_url, "COPY_ON_WRITE").await.unwrap();
assert_eq!(
timeline.instants,
vec![
Expand All @@ -286,4 +310,28 @@ mod tests {
]
)
}

#[tokio::test]
async fn init_commits_timeline_mor() {
let base_url = Url::from_file_path(
canonicalize(Path::new("tests/data/timeline/mor")).unwrap(),
)
.unwrap();
let timeline = get_basic_timeline(base_url, "MERGE_ON_READ").await.unwrap();
assert_eq!(
timeline.instants,
vec![
Instant {
state: State::Completed,
action: "deltacommit".to_owned(),
timestamp: "20240902114809775".to_owned(),
},
Instant {
state: State::Completed,
action: "deltacommit".to_owned(),
timestamp: "20240902123341435".to_owned(),
},
]
)
}
}
Empty file.