Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add paths module to influxdb3_write #24579

Merged
merged 8 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! to be persisted. A new open segment will be created and new writes will be written to that segment.

pub mod catalog;
pub mod paths;
pub mod persister;
pub mod wal;
pub mod write_buffer;
Expand Down
128 changes: 128 additions & 0 deletions influxdb3_write/src/paths.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::SegmentId;
use std::convert::AsRef;
use std::ops::Deref;
use std::path::Path;
use std::path::PathBuf;

/// File extension for catalog files
const CATALOG_FILE_EXTENSION: &str = "json";

/// File extension for parquet files
const PARQUET_FILE_EXTENSION: &str = "parquet";

/// File extension for segment files
const SEGMENT_FILE_EXTENSION: &str = "wal";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be SEGMENT_WAL_FILE_EXTENSION to differentiate. Then we'd have SEGMENT_INFO_FILE_EXTENSION, which would be json.


#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CatalogFilePath(PathBuf);

impl CatalogFilePath {
pub fn new(prefix: impl Into<PathBuf>, sequence_number: u64) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sequence_number should be a u32. No need for a u64 here and limiting the size ensures that 10 digit padding on the string conversion won't break for any value that is valid.

let mut path = prefix.into();
path.push("catalogs");
path.push(format!("{sequence_number:010}"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to work as a naming convention for the Catalog or SegmentInfo file names. We need file names such that when ordered lexicographically, the highest numbered ones will be returned first. Changing the file to be named format!("{(u32::MAX - sequence_number):010}") would achieve this. Same applies for SegmentInfo files.

For wal files we don't need this naming convention, but we could follow it to keep things consistent. It would also become handy if we ever want to copy the wal segments into object storage for access by other systems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted out of this for wal files since they don't really need to be human readable, but for the other types I did the u32::MAX trick so that they show up in order.

path.set_extension(CATALOG_FILE_EXTENSION);
Self(path)
}
}

impl Deref for CatalogFilePath {
type Target = Path;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl AsRef<Path> for CatalogFilePath {
fn as_ref(&self) -> &Path {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParquetFilePath(PathBuf);

impl ParquetFilePath {
pub fn new(
prefix: impl Into<PathBuf>,
db_name: &str,
table_name: &str,
year: u16,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have this be a time for the parquet file's min_time and then convert that to a string, rather than taking three separate arguments.

month: u8,
day: u8,
file_number: usize,
) -> Self {
let mut path = prefix.into();
path.push("dbs");
path.push(db_name);
path.push(table_name);
path.push(format!("{year}-{month:02}-{day:02}"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use min_time as an argument, probably better to use a strtime equivalent format string here. Think we'll need to use Chrono for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These days it or time are both maintained now, but I think to keep it consistent I'll use chrono

path.push(format!("{file_number:010}"));
path.set_extension(PARQUET_FILE_EXTENSION);
Self(path)
}
}

impl Deref for ParquetFilePath {
type Target = Path;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl AsRef<Path> for ParquetFilePath {
fn as_ref(&self) -> &Path {
&self.0
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SegmentFilePath(PathBuf);

impl SegmentFilePath {
pub fn new(prefix: impl Into<PathBuf>, segment_id: SegmentId) -> Self {
let mut path = prefix.into();
path.push("segments");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wal files don't need to have this directory since the only thing in the wal directory will be segment files.

path.push(format!("{:010}", segment_id.0));
path.set_extension(SEGMENT_FILE_EXTENSION);
Self(path)
}
}

impl Deref for SegmentFilePath {
type Target = Path;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl AsRef<Path> for SegmentFilePath {
fn as_ref(&self) -> &Path {
&self.0
}
}

#[test]
fn catalog_file_path_new() {
assert_eq!(
*CatalogFilePath::new("prefix/dir", 0),
PathBuf::from("prefix/dir/catalogs/0000000000.json").as_ref()
);
}
#[test]
fn parquet_file_path_new() {
assert_eq!(
*ParquetFilePath::new("prefix/dir", "my_db", "my_table", 2038, 1, 19, 0),
PathBuf::from("prefix/dir/dbs/my_db/my_table/2038-01-19/0000000000.parquet").as_ref()
);
}
#[test]
fn segment_file_path_new() {
assert_eq!(
*SegmentFilePath::new("prefix/dir", SegmentId::new(0)),
PathBuf::from("prefix/dir/segments/0000000000.wal").as_ref()
);
}
34 changes: 17 additions & 17 deletions influxdb3_write/src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This is the implementation of the `Wal` that the buffer uses to make buffered data durable
//! on disk.

use crate::paths::SegmentFilePath;
use crate::{
SegmentFile, SegmentId, SegmentIdBytes, SequenceNumber, Wal, WalOp, WalOpBatch,
WalSegmentReader, WalSegmentWriter,
Expand All @@ -24,9 +25,6 @@ use thiserror::Error;
type FileTypeIdentifier = [u8; 8];
const FILE_TYPE_IDENTIFIER: &[u8] = b"idb3.001";

/// File extension for segment files
const SEGMENT_FILE_EXTENSION: &str = "wal";

#[derive(Debug, Error)]
pub enum Error {
#[error("io error: {source}")]
Expand All @@ -49,7 +47,7 @@ pub enum Error {
#[error("invalid segment file {segment_id:?} at {path:?}: {reason}")]
InvalidSegmentFile {
segment_id: SegmentId,
path: PathBuf,
path: SegmentFilePath,
reason: String,
},

Expand Down Expand Up @@ -94,7 +92,7 @@ impl WalImpl {
pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
let root = path.into();
info!(wal_dir=?root, "Ensuring WAL directory exists");
std::fs::create_dir_all(&root)?;
std::fs::create_dir_all(&root.join("segments"))?;

// ensure the directory creation is actually fsync'd so that when we create files there
// we don't lose them (see: https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf)
Expand All @@ -117,7 +115,7 @@ impl WalImpl {
}

fn segment_files(&self) -> Result<Vec<SegmentFile>> {
let dir = std::fs::read_dir(&self.root)?;
let dir = std::fs::read_dir(&self.root.join("segments"))?;

let mut segment_files = Vec::new();

Expand Down Expand Up @@ -161,7 +159,7 @@ impl WalImpl {
}

fn delete_wal_segment(&self, segment_id: SegmentId) -> Result<()> {
let path = build_segment_path(self.root.clone(), segment_id);
let path = SegmentFilePath::new(self.root.clone(), segment_id);
std::fs::remove_file(path)?;
Ok(())
}
Expand Down Expand Up @@ -197,7 +195,7 @@ pub struct WalSegmentWriterImpl {

impl WalSegmentWriterImpl {
pub fn new_or_open(root: PathBuf, segment_id: SegmentId) -> Result<Self> {
let path = build_segment_path(root, segment_id);
let path = SegmentFilePath::new(root, segment_id);

// if there's already a file there, validate its header and pull the sequence number from the last entry
if path.exists() {
Expand Down Expand Up @@ -225,6 +223,15 @@ impl WalSegmentWriterImpl {
}
}

let parent = path
.parent()
.expect("A SegmentFilePath should have a parent directory");

// Make sure that the segments directory with the prefix exists
if !parent.exists() {
std::fs::create_dir_all(parent)?;
}

// it's a new file, initialize it with the header and get ready to start writing
let mut f = OpenOptions::new().write(true).create(true).open(&path)?;

Expand Down Expand Up @@ -325,7 +332,7 @@ pub struct WalSegmentReaderImpl {

impl WalSegmentReaderImpl {
pub fn new(root: impl Into<PathBuf>, segment_id: SegmentId) -> Result<Self> {
let path = build_segment_path(root, segment_id);
let path = SegmentFilePath::new(root, segment_id);
let f = BufReader::new(File::open(path.clone())?);

let mut reader = Self { f, segment_id };
Expand Down Expand Up @@ -359,7 +366,7 @@ impl WalSegmentReaderImpl {
}

fn read_segment_file_info_if_exists(
path: PathBuf,
path: SegmentFilePath,
segment_id: SegmentId,
) -> Result<Option<ExistingSegmentFileInfo>> {
let f = match File::open(path.clone()) {
Expand Down Expand Up @@ -549,13 +556,6 @@ where
}
}

fn build_segment_path(dir: impl Into<PathBuf>, id: SegmentId) -> PathBuf {
let mut path = dir.into();
path.push(format!("{:010}", id.0));
path.set_extension(SEGMENT_FILE_EXTENSION);
path
}

fn segment_id_from_file_name(name: &str) -> Result<SegmentId> {
let id = name
.parse::<u32>()
Expand Down