Skip to content

Commit

Permalink
Merge branch 'master' into rsc-tier-cache-dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
V-FEXrt authored Aug 1, 2024
2 parents ee96b36 + ba2afd5 commit c0329b2
Show file tree
Hide file tree
Showing 11 changed files with 270 additions and 131 deletions.
4 changes: 2 additions & 2 deletions rust/entity/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ pub struct Model {
pub cwd: String,
pub stdin: String,
pub is_atty: bool,
#[sea_orm(column_type = "Binary(BlobSize::Blob(None))")]
pub hidden_info: Vec<u8>,
pub hidden_info: String,
pub stdout_blob_id: Uuid,
pub stderr_blob_id: Uuid,
pub status: i32,
Expand All @@ -28,6 +27,7 @@ pub struct Model {
pub memory: i64,
pub i_bytes: i64,
pub o_bytes: i64,
pub size: Option<i64>,
pub created_at: DateTime,
pub label: String,
}
Expand Down
4 changes: 4 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod m20231128_000751_normalize_uses_table;
mod m20240509_163905_add_label_to_job;
mod m20240517_195757_add_updated_at_to_blob;
mod m20240522_185420_create_job_history;
mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;

pub struct Migrator;

Expand All @@ -27,6 +29,8 @@ impl MigratorTrait for Migrator {
Box::new(m20240509_163905_add_label_to_job::Migration),
Box::new(m20240517_195757_add_updated_at_to_blob::Migration),
Box::new(m20240522_185420_create_job_history::Migration),
Box::new(m20240731_152842_create_job_size_proc::Migration),
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
]
}
}
4 changes: 3 additions & 1 deletion rust/migration/src/m20220101_000002_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Job::Cwd).string().not_null())
.col(ColumnDef::new(Job::Stdin).string().not_null())
.col(ColumnDef::new(Job::IsAtty).boolean().not_null())
.col(ColumnDef::new(Job::HiddenInfo).ezblob())
.col(ColumnDef::new(Job::HiddenInfo).string().not_null())
.col(ColumnDef::new(Job::StdoutBlobId).uuid().not_null())
.col(ColumnDef::new(Job::StderrBlobId).uuid().not_null())
.col(ColumnDef::new(Job::Status).integer().not_null())
Expand All @@ -44,6 +44,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Job::Memory).big_unsigned().not_null())
.col(ColumnDef::new(Job::IBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::OBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::Size).big_unsigned())
.foreign_key(
ForeignKeyCreateStatement::new()
.name("fk-stdout_blob_id-blob")
Expand Down Expand Up @@ -240,6 +241,7 @@ pub enum Job {
Memory,
IBytes,
OBytes,
Size,
}

#[derive(DeriveIden)]
Expand Down
75 changes: 75 additions & 0 deletions rust/migration/src/m20240731_152842_create_job_size_proc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE OR REPLACE PROCEDURE calculate_job_size(
job_lim int,
INOUT updated_count int
)
language plpgsql
as $$
BEGIN
-- Run the query that find the jobs, calcs their sizes, and then updates the table
WITH
eligible_jobs as (
SELECT id, stdout_blob_id, stderr_blob_id
FROM job
WHERE size IS NULL
ORDER BY created_at
ASC
LIMIT job_lim
),
job_blob_size as (
SELECT ej.id, SUM(COALESCE(b.size,0)) as size
FROM eligible_jobs ej
LEFT JOIN output_file o
ON ej.id = o.job_id
LEFT JOIN blob b
ON o.blob_id = b.id
GROUP BY ej.id
),
full_size as (
SELECT
ej.id,
CAST(jb.size + stdout.size + stderr.size as BIGINT) as size
FROM eligible_jobs ej
INNER JOIN job_blob_size jb
ON ej.id = jb.id
INNER JOIN blob stdout
ON ej.stdout_blob_id = stdout.id
INNER JOIN blob stderr
ON ej.stderr_blob_id = stderr.id
)
UPDATE job j
SET size = f.size
FROM full_size f
WHERE j.id = f.id;
-- Grab the rows affected count
GET DIAGNOSTICS updated_count = ROW_COUNT;
END;
$$;
",
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("DROP PROCEDURE IF EXISTS calculate_job_size(int, int)")
.await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS blob_updated_at_idx
ON blob(updated_at)
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS job_created_at_idx
ON job(created_at)
",
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS job_created_at_idx
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS blob_updated_at_idx
",
)
.await?;

Ok(())
}
}
4 changes: 4 additions & 0 deletions rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
"ttl": 86400,
"chunk_size": 16000
}
},
"job_size_calculate": {
"tick_rate": 60,
"chunk_size": 100
}
}
1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn add_job(
i_bytes: Set(payload.ibytes as i64),
o_bytes: Set(payload.obytes as i64),
label: Set(payload.label.unwrap_or("".to_string())),
size: NotSet,
};

// Now perform the insert as a single transaction
Expand Down
10 changes: 10 additions & 0 deletions rust/rsc/src/bin/rsc/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCCronLoopConfig {
// How often to run the loop in seconds
pub tick_rate: u64,
// Maximum number of objects to procss per tick. Must be 1 >= x <= 16000
pub chunk_size: i32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCTTLConfig {
// How often to run the eviction check in seconds
Expand Down Expand Up @@ -47,6 +55,8 @@ pub struct RSCConfig {
pub blob_eviction: RSCTTLConfig,
// The config to control job eviction
pub job_eviction: RSCJobEvictionConfig,
// The config to control job size calculation
pub job_size_calculate: RSCCronLoopConfig,
}

impl RSCConfig {
Expand Down
74 changes: 51 additions & 23 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ fn launch_job_eviction(conn: Arc<DatabaseConnection>, tick_interval: u64, ttl: u
let mut interval = tokio::time::interval(Duration::from_secs(tick_interval));
loop {
interval.tick().await;
tracing::info!("Job TTL eviction tick");

let ttl = (Utc::now() - Duration::from_secs(ttl)).naive_utc();

match database::evict_jobs_ttl(conn.clone(), ttl).await {
Expand All @@ -240,7 +238,6 @@ fn launch_blob_eviction(
tokio::time::interval(Duration::from_secs(config.blob_eviction.tick_rate));
let mut should_sleep = false;
loop {
tracing::info!("Blob TTL eviction tick");
if should_sleep {
interval.tick().await;
}
Expand All @@ -249,7 +246,7 @@ fn launch_blob_eviction(
// This gives clients time to reference a blob before it gets evicted.
let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc();

let blobs = match database::read_unreferenced_blobs(
let blobs = match database::delete_unreferenced_blobs(
conn.as_ref(),
ttl,
config.blob_eviction.chunk_size,
Expand All @@ -258,37 +255,25 @@ fn launch_blob_eviction(
{
Ok(b) => b,
Err(err) => {
tracing::error!(%err, "Failed to fetch blobs for eviction");
tracing::error!(%err, "Failed to delete blobs for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

let blob_ids: Vec<Uuid> = blobs.iter().map(|blob| blob.id).collect();
let eligible = blob_ids.len();
should_sleep = eligible == 0;
let deleted = blobs.len();

tracing::info!(%eligible, "At least N blobs eligible for eviction");
should_sleep = deleted == 0;

// Delete blobs from database
match database::delete_blobs_by_ids(conn.as_ref(), blob_ids).await {
Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"),
Err(err) => {
tracing::error!(%err, "Failed to delete blobs from db for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

tracing::info!("Spawning blob deletion from stores");
tracing::info!(%deleted, "N blobs deleted for eviction");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
Expand All @@ -297,7 +282,7 @@ fn launch_blob_eviction(
tokio::spawn(async move {
store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
});
Expand All @@ -306,6 +291,42 @@ fn launch_blob_eviction(
});
}

fn launch_job_size_calculate(conn: Arc<DatabaseConnection>, config: Arc<config::RSCConfig>) {
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(config.job_size_calculate.tick_rate));
let mut should_sleep = false;
loop {
if should_sleep {
interval.tick().await;
}

let count = match database::calculate_job_size(
conn.as_ref(),
config.job_size_calculate.chunk_size,
)
.await
{
Ok(Some(c)) => c.updated_count,
Ok(None) => {
tracing::error!("Failed to extract result from calculating job size");
should_sleep = true;
continue; // Try again on the next tick
}
Err(err) => {
tracing::error!(%err, "Failed to calculate and update job size");
should_sleep = true;
continue; // Try again on the next tick
}
};

should_sleep = count == 0;

tracing::info!(%count, "Calculated and updated size for jobs");
}
});
}

fn request_max_fileno_limit() {
let Ok((current, max)) = Resource::NOFILE.get() else {
tracing::warn!("Unable to discover fileno limits. Using default");
Expand Down Expand Up @@ -360,7 +381,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Activate blob stores
let stores = activate_stores(connection.clone()).await;

// Launch evictions threads
// Launch long running concurrent threads
match &config.job_eviction {
config::RSCJobEvictionConfig::TTL(ttl) => {
launch_job_eviction(connection.clone(), ttl.tick_rate, ttl.ttl);
Expand All @@ -369,6 +390,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

launch_blob_eviction(connection.clone(), config.clone(), stores.clone());
launch_job_size_calculate(connection.clone(), config.clone());

// Launch the server
let router = create_router(connection.clone(), config.clone(), &stores);
Expand Down Expand Up @@ -432,6 +454,10 @@ mod tests {
ttl: 100,
chunk_size: 100,
}),
job_size_calculate: config::RSCCronLoopConfig {
tick_rate: 10,
chunk_size: 100,
},
}
}

Expand Down Expand Up @@ -788,6 +814,7 @@ mod tests {
i_bytes: Set(100000),
o_bytes: Set(1000),
label: Set("".to_string()),
size: NotSet,
};

insert_job.save(conn.clone().as_ref()).await.unwrap();
Expand All @@ -812,6 +839,7 @@ mod tests {
i_bytes: Set(100000),
o_bytes: Set(1000),
label: Set("".to_string()),
size: NotSet,
};

insert_job.save(conn.clone().as_ref()).await.unwrap();
Expand Down
Loading

0 comments on commit c0329b2

Please sign in to comment.