Skip to content

Commit

Permalink
fix: flush WAL on each commit to the store (#191)
Browse files Browse the repository at this point in the history
With sync=normal we *could* lose entries in the WAL if the machine were
to die abruptly. By forcing a sync on each batch write we can ensure
that we won't lose tasks between kafka and sqlite.
  • Loading branch information
markstory authored Feb 14, 2025
1 parent 7a789a0 commit 51509b0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct Config {
/// The synchronization mode for the sqlite database
pub db_sync_mode: String,

/// The checkpoint mode to use when storing activations
pub db_checkpoint_mode: String,

/// The maximum number of pending records that can be
/// in the InflightTaskStore (sqlite)
pub max_pending_count: usize,
Expand Down Expand Up @@ -119,6 +122,7 @@ impl Default for Config {
kafka_send_timeout_ms: 500,
db_path: "./taskbroker-inflight.sqlite".to_owned(),
db_sync_mode: "normal".to_owned(),
db_checkpoint_mode: "PASSIVE".to_owned(),
max_pending_count: 2048,
max_pending_buffer_count: 128,
max_processing_deadline: 300,
Expand Down
11 changes: 10 additions & 1 deletion src/inflight_activation_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::config::Config;
pub struct InflightActivationStoreConfig {
pub max_processing_attempts: usize,
pub sync_mode: SqliteSynchronous,
pub checkpoint_mode: String,
}

impl InflightActivationStoreConfig {
Expand All @@ -29,6 +30,7 @@ impl InflightActivationStoreConfig {
.db_sync_mode
.parse()
.expect("Unable to parse db_sync_mode"),
checkpoint_mode: config.db_checkpoint_mode.clone(),
}
}
}
Expand Down Expand Up @@ -258,7 +260,14 @@ impl InflightActivationStore {
})
.push(" ON CONFLICT(id) DO NOTHING")
.build();
Ok(query.execute(&self.sqlite_pool).await?.into())
let result = Ok(query.execute(&self.sqlite_pool).await?.into());

// Sync the WAL into the main database so we don't lose data on host failure.
sqlx::query(format!("PRAGMA wal_checkpoint({})", self.config.checkpoint_mode).as_str())
.execute(&self.sqlite_pool)
.await?;

result
}

pub async fn get_pending_activation(
Expand Down

0 comments on commit 51509b0

Please sign in to comment.