Skip to content

Commit

Permalink
feat: add APIs for MOR snapshot reads (#247)
Browse files Browse the repository at this point in the history
- Make `FileGroup` and `FileSlice` support adding and reading log files
- Perform snapshot read using `FileGroupReader`, which merges log files with base files when required
- Make `BaseFile` and `LogFile` APIs more ergonomic
  • Loading branch information
xushiyan authored Jan 18, 2025
1 parent 6c9f499 commit 3dc5fe2
Show file tree
Hide file tree
Showing 42 changed files with 1,079 additions and 367 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ impl ConfigParser for HudiTableConfig {

fn default_value(&self) -> Option<Self::Output> {
match self {
Self::BaseFileFormat => Some(HudiConfigValue::String(
BaseFileFormatValue::Parquet.as_ref().to_string(),
)),
Self::DatabaseName => Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Expand Down
88 changes: 61 additions & 27 deletions crates/core/src/file_group/base_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,52 +19,85 @@
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
use crate::Result;
use std::str::FromStr;

/// Hudi Base file, part of a [FileSlice].
#[derive(Clone, Debug)]
pub struct BaseFile {
/// The file name of the base file.
pub file_name: String,

/// The id of the enclosing file group.
/// The id of the enclosing [FileGroup].
pub file_id: String,

/// The associated instant time of the base file.
pub instant_time: String,
/// Monotonically increasing token for every attempt to write the [BaseFile].
pub write_token: String,

/// The timestamp of the commit instant in the Timeline that created the [BaseFile].
pub commit_timestamp: String,

/// File extension that matches to [crate::config::table::HudiTableConfig::BaseFileFormat].
///
/// See also [crate::config::table::BaseFileFormatValue].
pub extension: String,

/// The metadata about the file.
pub file_metadata: Option<FileMetadata>,
}

impl BaseFile {
/// Parse file name and extract `file_id` and `instant_time`.
fn parse_file_name(file_name: &str) -> Result<(String, String)> {
/// Parse a base file's name into parts.
///
/// File name format:
///
/// ```text
/// [File Id]_[File Write Token]_[Commit timestamp].[File Extension]
/// ```
fn parse_file_name(file_name: &str) -> Result<(String, String, String, String)> {
let err_msg = format!("Failed to parse file name '{file_name}' for base file.");
let (name, _) = file_name
let (stem, extension) = file_name
.rsplit_once('.')
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
let parts: Vec<&str> = stem.split('_').collect();
let file_id = parts
.first()
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
let instant_time = parts
let write_token = parts
.get(1)
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
let commit_timestamp = parts
.get(2)
.ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
.to_string();
Ok((file_id, instant_time))
Ok((
file_id,
write_token,
commit_timestamp,
extension.to_string(),
))
}

#[inline]
pub fn file_name(&self) -> String {
format!(
"{file_id}_{write_token}_{commit_timestamp}.{extension}",
file_id = self.file_id,
write_token = self.write_token,
commit_timestamp = self.commit_timestamp,
extension = self.extension,
)
}
}

impl TryFrom<&str> for BaseFile {
type Error = CoreError;
impl FromStr for BaseFile {
type Err = CoreError;

fn try_from(file_name: &str) -> Result<Self> {
let (file_id, instant_time) = Self::parse_file_name(file_name)?;
fn from_str(file_name: &str) -> Result<Self, Self::Err> {
let (file_id, write_token, commit_timestamp, extension) = Self::parse_file_name(file_name)?;
Ok(Self {
file_name: file_name.to_string(),
file_id,
instant_time,
write_token,
commit_timestamp,
extension,
file_metadata: None,
})
}
Expand All @@ -74,12 +107,13 @@ impl TryFrom<FileMetadata> for BaseFile {
type Error = CoreError;

fn try_from(metadata: FileMetadata) -> Result<Self> {
let file_name = metadata.name.clone();
let (file_id, instant_time) = Self::parse_file_name(&file_name)?;
let file_name = metadata.name.as_str();
let (file_id, write_token, commit_timestamp, extension) = Self::parse_file_name(file_name)?;
Ok(Self {
file_name,
file_id,
instant_time,
write_token,
commit_timestamp,
extension,
file_metadata: Some(metadata),
})
}
Expand All @@ -93,9 +127,9 @@ mod tests {
#[test]
fn test_create_base_file_from_file_name() {
let file_name = "5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
let base_file = BaseFile::try_from(file_name).unwrap();
let base_file = BaseFile::from_str(file_name).unwrap();
assert_eq!(base_file.file_id, "5a226868-2934-4f84-a16f-55124630c68d-0");
assert_eq!(base_file.instant_time, "20240402144910683");
assert_eq!(base_file.commit_timestamp, "20240402144910683");
assert!(base_file.file_metadata.is_none());
}

Expand All @@ -107,18 +141,18 @@ mod tests {
);
let base_file = BaseFile::try_from(metadata).unwrap();
assert_eq!(base_file.file_id, "5a226868-2934-4f84-a16f-55124630c68d-0");
assert_eq!(base_file.instant_time, "20240402144910683");
assert_eq!(base_file.commit_timestamp, "20240402144910683");
let file_metadata = base_file.file_metadata.unwrap();
assert_eq!(file_metadata.size, 1024);
assert_not!(file_metadata.fully_populated);
}

#[test]
fn create_a_base_file_returns_error() {
let result = BaseFile::try_from("no_file_extension");
let result = BaseFile::from_str("no_file_extension");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let result = BaseFile::try_from(".parquet");
let result = BaseFile::from_str(".parquet");
assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));

let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
Expand Down
97 changes: 97 additions & 0 deletions crates/core/src/file_group/file_slice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
use crate::file_group::log_file::LogFile;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeSet;
use std::path::PathBuf;

/// Within a [crate::file_group::FileGroup],
/// a [FileSlice] is a logical group of [BaseFile] and [LogFile]s.
#[derive(Clone, Debug)]
pub struct FileSlice {
pub base_file: BaseFile,
pub log_files: BTreeSet<LogFile>,
pub partition_path: Option<String>,
}

impl FileSlice {
pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
Self {
base_file,
log_files: BTreeSet::new(),
partition_path,
}
}

fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
let path = PathBuf::from(self.partition_path()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!("Failed to get relative path for file: {file_name}",))
})
}

/// Returns the relative path of the [BaseFile] in the [FileSlice].
pub fn base_file_relative_path(&self) -> Result<String> {
let file_name = &self.base_file.file_name();
self.relative_path_for_file(file_name)
}

/// Returns the relative path of the given [LogFile] in the [FileSlice].
pub fn log_file_relative_path(&self, log_file: &LogFile) -> Result<String> {
let file_name = &log_file.file_name();
self.relative_path_for_file(file_name)
}

/// Returns the enclosing [FileGroup]'s id.
#[inline]
pub fn file_id(&self) -> &str {
&self.base_file.file_id
}

/// Returns the partition path of the [FileSlice].
#[inline]
pub fn partition_path(&self) -> &str {
self.partition_path.as_deref().unwrap_or_default()
}

/// Returns the instant time that marks the [FileSlice] creation.
///
/// This is also an instant time stored in the [Timeline].
#[inline]
pub fn creation_instant_time(&self) -> &str {
&self.base_file.commit_timestamp
}

/// Load [FileMetadata] from storage layer for the [BaseFile] if `file_metadata` is [None]
/// or if `file_metadata` is not fully populated.
pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> Result<()> {
if let Some(metadata) = &self.base_file.file_metadata {
if metadata.fully_populated {
return Ok(());
}
}
let relative_path = self.base_file_relative_path()?;
let fetched_metadata = storage.get_file_metadata(&relative_path).await?;
self.base_file.file_metadata = Some(fetched_metadata);
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/core/src/file_group/log_file/log_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl TryFrom<[u8; 4]> for BlockMetadataKey {
}
}

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct LogBlock {
pub format_version: LogFormatVersion,
Expand Down
Loading

0 comments on commit 3dc5fe2

Please sign in to comment.