Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add segmenting to write buffer and persistence to wal #24610

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use influxdb3_server::{query_executor::QueryExecutorImpl, serve, CommonServerSta
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::SegmentId;
use iox_query::exec::{Executor, ExecutorConfig};
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
Expand Down Expand Up @@ -48,6 +49,9 @@ pub enum Error {

#[error("Wal error: {0}")]
Wal(#[from] influxdb3_write::wal::Error),

#[error("Write buffer error: {0}")]
WriteBuffer(#[from] influxdb3_write::write_buffer::Error),
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -237,7 +241,12 @@ pub async fn command(config: Config) -> Result<()> {
.wal_directory
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&catalog), wal));
// TODO: the next segment ID should be loaded from the persister
let write_buffer = Arc::new(WriteBufferImpl::new(
Arc::clone(&catalog),
wal,
SegmentId::new(0),
)?);
let query_executor = QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),
Expand Down
13 changes: 9 additions & 4 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod tests {
use datafusion::parquet::data_type::AsBytes;
use hyper::{body, Body, Client, Request, Response};
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::SegmentId;
use iox_query::exec::{Executor, ExecutorConfig};
use object_store::DynObjectStore;
use parquet_file::storage::{ParquetStorage, StorageId};
Expand Down Expand Up @@ -197,10 +198,14 @@ mod tests {
mem_pool_size: usize::MAX,
}));

let write_buffer = Arc::new(influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
));
let write_buffer = Arc::new(
influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None::<Arc<influxdb3_write::wal::WalImpl>>,
SegmentId::new(0),
)
.unwrap(),
);
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,
Arc::clone(&write_buffer),
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_write/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ license.workspace = true
[dependencies]
data_types = { path = "../data_types" }
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
iox_catalog = { path = "../iox_catalog" }
iox_query = { path = "../iox_query" }
object_store.workspace = true
observability_deps = { path = "../observability_deps" }
Expand All @@ -33,5 +32,6 @@ bytes = "1.5.0"
futures-util = "0.3.30"

[dev-dependencies]
arrow_util = { path = "../arrow_util" }
test_helpers = { path = "../test_helpers" }

2 changes: 2 additions & 0 deletions influxdb3_write/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum Error {

pub type Result<T, E = Error> = std::result::Result<T, E>;

pub const TIME_COLUMN_NAME: &str = "time";

#[derive(Debug)]
pub struct Catalog {
inner: RwLock<InnerCatalog>,
Expand Down
11 changes: 7 additions & 4 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ pub trait Persister: Debug + Send + Sync + 'static {

pub trait Wal: Debug + Send + Sync + 'static {
/// Opens a writer to a segment, either creating a new file or appending to an existing file.
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentWriter>;
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<Box<dyn WalSegmentWriter>>;

/// Opens a reader to a segment file.
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentReader>;
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<Box<dyn WalSegmentReader>>;

/// Checks the WAL directory for any segment files and returns them.
fn segment_files(&self) -> wal::Result<Vec<SegmentFile>>;
Expand All @@ -225,6 +225,8 @@ pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;

fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;

fn last_sequence_number(&self) -> SequenceNumber;
}

pub trait WalSegmentReader: Debug + Send + Sync + 'static {
Expand All @@ -234,7 +236,7 @@ pub trait WalSegmentReader: Debug + Send + Sync + 'static {
}

/// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct WalOpBatch {
pub sequence_number: SequenceNumber,
pub ops: Vec<WalOp>,
Expand All @@ -255,7 +257,7 @@ pub enum WalOp {
pub struct LpWriteOp {
pub db_name: String,
pub lp: String,
pub default_time: u64,
pub default_time: i64,
}

/// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while
Expand All @@ -278,6 +280,7 @@ pub struct BufferedWriteRequest {
pub tag_count: usize,
pub total_buffer_memory_used: usize,
pub segment_id: SegmentId,
pub sequence_number: SequenceNumber,
}

/// A persisted Catalog that contains the database, table, and column schemas.
Expand Down
67 changes: 55 additions & 12 deletions influxdb3_write/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ impl WalImpl {
Ok(Self { root })
}

fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
let writer = WalSegmentWriterImpl::new_or_open(self.root.clone(), segment_id)?;
Ok(writer)
Ok(Box::new(writer))
}

fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
let reader = WalSegmentReaderImpl::new(self.root.clone(), segment_id)?;
Ok(reader)
Ok(Box::new(reader))
}

fn segment_files(&self) -> Result<Vec<SegmentFile>> {
Expand Down Expand Up @@ -166,11 +166,11 @@ impl WalImpl {
}

impl Wal for WalImpl {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentWriter>> {
self.open_segment_writer(segment_id)
}

fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<Box<dyn WalSegmentReader>> {
self.open_segment_reader(segment_id)
}

Expand Down Expand Up @@ -247,18 +247,28 @@ impl WalSegmentWriterImpl {
}

fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
println!("write batch in impl");
// Ensure the write buffer is always empty before using it.
self.buffer.clear();

self.sequence_number = self.sequence_number.next();
let sequence_number = self.sequence_number.next();

let batch = WalOpBatch {
sequence_number: self.sequence_number,
sequence_number,
ops,
};

let data = serde_json::to_vec(&batch)?;

let bytes_written = self.write_bytes(data)?;

self.bytes_written += bytes_written;
self.sequence_number = sequence_number;

Ok(self.sequence_number)
}

fn write_bytes(&mut self, data: Vec<u8>) -> Result<usize> {
// Only designed to support chunks up to `u32::max` bytes long.
let uncompressed_len = data.len();
u32::try_from(uncompressed_len)?;
Expand All @@ -270,7 +280,7 @@ impl WalSegmentWriterImpl {
.expect("cannot fail to write to buffer");

// Compress the payload into the reused buffer, recording the crc hash
// as it is wrote.
// as it is written.
let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer));
encoder.write_all(&data)?;
let (checksum, buf) = encoder
Expand Down Expand Up @@ -298,9 +308,7 @@ impl WalSegmentWriterImpl {
// fsync the fd
self.f.sync_all().expect("fsync failure");

self.bytes_written += bytes_written;

Ok(self.sequence_number)
Ok(bytes_written)
}
}

Expand All @@ -313,6 +321,41 @@ impl WalSegmentWriter for WalSegmentWriterImpl {
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
self.write_batch(ops)
}

fn last_sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}

#[derive(Debug)]
pub struct WalSegmentWriterNoopImpl {
segment_id: SegmentId,
sequence_number: SequenceNumber,
}

impl WalSegmentWriterNoopImpl {
pub fn new(segment_id: SegmentId) -> Self {
Self {
segment_id,
sequence_number: SequenceNumber::new(0),
}
}
}

impl WalSegmentWriter for WalSegmentWriterNoopImpl {
fn id(&self) -> SegmentId {
self.segment_id
}

fn write_batch(&mut self, _ops: Vec<WalOp>) -> Result<SequenceNumber> {
let sequence_number = self.sequence_number.next();
self.sequence_number = sequence_number;
Ok(sequence_number)
}

fn last_sequence_number(&self) -> SequenceNumber {
self.sequence_number
}
}

#[derive(Debug)]
Expand Down
Loading