Skip to content

Commit

Permalink
rsc: Optimize blob eviction deletion (#1615)
Browse files Browse the repository at this point in the history
* rsc: Optimize blob eviction deletion

* cleanup
  • Loading branch information
V-FEXrt authored Aug 1, 2024
1 parent 958299c commit ff0aeb3
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 84 deletions.
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod m20240509_163905_add_label_to_job;
mod m20240517_195757_add_updated_at_to_blob;
mod m20240522_185420_create_job_history;
mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;

pub struct Migrator;

Expand All @@ -29,6 +30,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240517_195757_add_updated_at_to_blob::Migration),
Box::new(m20240522_185420_create_job_history::Migration),
Box::new(m20240731_152842_create_job_size_proc::Migration),
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS blob_updated_at_idx
ON blob(updated_at)
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS job_created_at_idx
ON job(created_at)
",
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS job_created_at_idx
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS blob_updated_at_idx
",
)
.await?;

Ok(())
}
}
24 changes: 7 additions & 17 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn launch_blob_eviction(
// This gives clients time to reference a blob before it gets evicted.
let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc();

let blobs = match database::read_unreferenced_blobs(
let blobs = match database::delete_unreferenced_blobs(
conn.as_ref(),
ttl,
config.blob_eviction.chunk_size,
Expand All @@ -255,35 +255,25 @@ fn launch_blob_eviction(
{
Ok(b) => b,
Err(err) => {
tracing::error!(%err, "Failed to fetch blobs for eviction");
tracing::error!(%err, "Failed to delete blobs for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

let blob_ids: Vec<Uuid> = blobs.iter().map(|blob| blob.id).collect();
let eligible = blob_ids.len();
should_sleep = eligible == 0;
let deleted = blobs.len();

tracing::info!(%eligible, "At least N blobs eligible for eviction");
should_sleep = deleted == 0;

// Delete blobs from database
match database::delete_blobs_by_ids(conn.as_ref(), blob_ids).await {
Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"),
Err(err) => {
tracing::error!(%err, "Failed to delete blobs from db for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};
tracing::info!(%deleted, "N blobs deleted for eviction");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
Expand All @@ -292,7 +282,7 @@ fn launch_blob_eviction(
tokio::spawn(async move {
store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
});
Expand Down
104 changes: 37 additions & 67 deletions rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,44 +577,6 @@ pub async fn upsert_blob<T: ConnectionTrait>(

// ---------- Read ----------

// Reads blobs from the database that are unreferenced and have surpassed the allocated grace
// period to be referenced.
//
// For new blobs this allows the client to create several blobs and then reference them all at
// once. Existing blobs whose job was just evicted will likely be well past the grace period and
// thus quickly evicted themselves.
pub async fn read_unreferenced_blobs<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<blob::Model>, DbErr> {
// Limit = 16k as the query is also subject to parameter max.
// Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction
// to avoid large eviction stalls.
Blob::find()
.from_raw_sql(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
SELECT * FROM blob
WHERE updated_at <= $1
AND id IN
(
SELECT id FROM blob
EXCEPT
(
SELECT blob_id FROM output_file
UNION SELECT stdout_blob_id FROM job
UNION SELECT stderr_blob_id FROM job
)
)
LIMIT $2
"#,
[ttl.into(), chunk.into()],
))
.all(db)
.await
}

pub async fn count_blobs<T: ConnectionTrait>(db: &T) -> Result<u64, DbErr> {
Blob::find().count(db).await
}
Expand All @@ -639,37 +601,45 @@ pub async fn total_blob_size<T: ConnectionTrait>(db: &T) -> Result<Option<TotalB
// ---------- Update ----------

// ---------- Delete ----------
pub async fn delete_blobs_by_ids<T: ConnectionTrait>(db: &T, ids: Vec<Uuid>) -> Result<u64, DbErr> {
if ids.len() == 0 {
return Ok(0);
}

let mut affected = 0;

let chunked: Vec<Vec<Uuid>> = ids
.into_iter()
.chunks((MAX_SQLX_PARAMS / 1).into())
.into_iter()
.map(|chunk| chunk.collect())
.collect();
#[derive(Clone, Debug, FromQueryResult)]
pub struct DeletedBlob {
pub store_id: Uuid,
pub key: String,
}

for chunk in chunked {
let result = Blob::delete_many()
.filter(
entity::blob::Column::Id.in_subquery(
migration::Query::select()
.column(migration::Asterisk)
.from_values(chunk, migration::Alias::new("foo"))
.take(),
),
// Deletes blobs from the database that are unreferenced and have surpassed the allocated grace
// period to be referenced.
//
// For new blobs this allows the client to create several blobs and then reference them all at
// once. Existing blobs whose job was just evicted will likely be well past the grace period and
// thus quickly evicted themselves.
pub async fn delete_unreferenced_blobs<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<DeletedBlob>, DbErr> {
DeletedBlob::find_by_statement(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
WITH
eligible_blob_ids as (
SELECT id FROM blob
WHERE updated_at <= $1
EXCEPT (
SELECT blob_id FROM output_file
UNION SELECT stdout_blob_id FROM job
UNION SELECT stderr_blob_id FROM job
)
LIMIT $2
)
.exec(db)
.await?;

affected += result.rows_affected;
}

Ok(affected)
DELETE from blob b
WHERE b.id IN (SELECT id FROM eligible_blob_ids)
RETURNING b.store_id, b.key
"#,
[ttl.into(), chunk.into()],
))
.all(db)
.await
}

// --------------------------------------------------
Expand Down

0 comments on commit ff0aeb3

Please sign in to comment.