Skip to content

Commit

Permalink
Extend service DB instead of defining new one
Browse files Browse the repository at this point in the history
  • Loading branch information
tarkah committed Nov 26, 2024
1 parent bf96268 commit 043ceb2
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 167 deletions.
2 changes: 1 addition & 1 deletion crates/avalanche/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn build(request: api::Request<api::v1::avalanche::Build>, context: Contex
.sub
.parse::<endpoint::Id>()
.map_err(Error::InvalidEndpoint)?;
let endpoint = Endpoint::get(context.state.db.acquire().await?.as_mut(), endpoint_id)
let endpoint = Endpoint::get(context.state.service_db.acquire().await?.as_mut(), endpoint_id)
.await
.map_err(Error::LoadEndpoint)?;

Expand Down
3 changes: 2 additions & 1 deletion crates/avalanche/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use crate::Config;
pub async fn build(request: PackageBuild, endpoint: Endpoint, state: State, config: Config) {
info!("Starting build");

let client = service::Client::new(endpoint.host_address.clone()).with_endpoint_auth(endpoint.id, state.db.clone());
let client =
service::Client::new(endpoint.host_address.clone()).with_endpoint_auth(endpoint.id, state.service_db.clone());

let task_id = request.build_id;

Expand Down
8 changes: 4 additions & 4 deletions crates/service/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Account {
}

/// Create / update this account to the provided [`Database`]
pub async fn save<'a>(&self, tx: &mut database::Transaction<'a>) -> Result<(), Error> {
pub async fn save(&self, tx: &mut database::Transaction) -> Result<(), Error> {
sqlx::query(
"
INSERT INTO account
Expand Down Expand Up @@ -203,8 +203,8 @@ pub struct Token {

impl Token {
/// Set the account's bearer token & expiration
pub async fn set<'a>(
tx: &mut database::Transaction<'a>,
pub async fn set(
tx: &mut database::Transaction,
id: Id,
encoded: impl ToString,
expiration: DateTime<Utc>,
Expand Down Expand Up @@ -339,6 +339,6 @@ pub enum Error {

impl From<sqlx::Error> for Error {
fn from(error: sqlx::Error) -> Self {
Error::Database(database::Error::Execute(error))
Error::Database(error.into())
}
}
2 changes: 1 addition & 1 deletion crates/service/src/api/v1/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(crate) fn services(role: Role, config: &Config, state: &crate::State) -> api
.register::<RefreshIssueToken, Error, _>(refresh_issue_token)
.with_state(State {
issuer: config.issuer(role, state.key_pair.clone()),
db: state.db.clone(),
db: state.service_db.clone(),
pending_sent: state.pending_sent.clone(),
upstream: config.upstream,
})
Expand Down
72 changes: 31 additions & 41 deletions crates/service/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Shared service database
//! Service database
use std::path::Path;

use sqlx::{pool::PoolConnection, sqlite::SqliteConnectOptions, Pool, Sqlite, SqliteConnection};
use sqlx::{pool::PoolConnection, Pool, Sqlite, SqliteConnection};
use thiserror::Error;

pub use sqlx::migrate::Migrator;

/// Service database
#[derive(Debug, Clone)]
pub struct Database {
Expand All @@ -15,48 +17,48 @@ pub struct Database {
impl Database {
/// Opens a connection to the provided database path
pub async fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
let options = sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.read_only(false)
.foreign_keys(true);

Self::connect(options).await
}
let pool = sqlx::SqlitePool::connect_with(
sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.read_only(false)
.foreign_keys(true),
)
.await?;

async fn connect(options: SqliteConnectOptions) -> Result<Self, Error> {
let pool = sqlx::SqlitePool::connect_with(options).await.map_err(Error::Connect)?;

sqlx::migrate!("src/database/migrations")
.run(&pool)
.await
.map_err(Error::Migrate)?;
sqlx::migrate!("./migrations").run(&pool).await?;

Ok(Self { pool })
}

/// Runs the provided migrations on the database
pub async fn with_migrations(self, mut migrator: Migrator) -> Result<Self, Error> {
migrator.set_ignore_missing(true).run(&self.pool).await?;
Ok(self)
}

/// Acquire a database connection
pub async fn acquire(&self) -> Result<PoolConnection<Sqlite>, Error> {
self.pool.acquire().await.map_err(Error::Acquire)
Ok(self.pool.acquire().await?)
}

/// Begin a database transaction
pub async fn begin(&self) -> Result<Transaction, Error> {
Ok(Transaction(self.pool.begin().await.map_err(Error::Commit)?))
Ok(Transaction(self.pool.begin().await?))
}
}

/// A database transaction
pub struct Transaction<'a>(sqlx::Transaction<'a, Sqlite>);
pub struct Transaction(sqlx::Transaction<'static, Sqlite>);

impl<'a> Transaction<'a> {
impl Transaction {
/// Commit the transaction
pub async fn commit(self) -> Result<(), Error> {
self.0.commit().await.map_err(Error::Commit)
Ok(self.0.commit().await?)
}
}

impl<'a> AsMut<SqliteConnection> for Transaction<'a> {
impl AsMut<SqliteConnection> for Transaction {
fn as_mut(&mut self) -> &mut SqliteConnection {
self.0.as_mut()
}
Expand All @@ -70,22 +72,10 @@ impl<'a, T> Executor<'a> for &'a mut T where &'a mut T: sqlx::Executor<'a, Datab
/// A database error
#[derive(Debug, Error)]
pub enum Error {
/// Failed to connect
#[error("failed to connect")]
Connect(#[source] sqlx::Error),
/// Migrations failed
#[error("migrations failed")]
Migrate(#[source] sqlx::migrate::MigrateError),
/// Acquire connection
#[error("acquire connection")]
Acquire(#[source] sqlx::Error),
/// Begin transaction
#[error("begin transaction")]
Begin(#[source] sqlx::Error),
/// Commit transaction
#[error("commit transaction")]
Commit(#[source] sqlx::Error),
/// Execute query
#[error("execute query")]
Execute(#[source] sqlx::Error),
/// Sqlx error
#[error("sqlx")]
Sqlx(#[from] sqlx::Error),
/// Migration error
#[error("sqlx migrate")]
Migrate(#[from] sqlx::migrate::MigrateError),
}
24 changes: 9 additions & 15 deletions crates/service/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,13 @@ impl Endpoint {
)
.bind(id.0)
.fetch_one(conn)
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(endpoint)
}

/// Create or update this endpoint to the provided [`Database`]
pub async fn save<'a>(&self, tx: &mut database::Transaction<'a>) -> Result<(), database::Error> {
pub async fn save(&self, tx: &mut database::Transaction) -> Result<(), database::Error> {
sqlx::query(
"
INSERT INTO endpoint
Expand Down Expand Up @@ -143,8 +142,7 @@ impl Endpoint {
.bind(self.kind.role().to_string())
.bind(self.kind.work_status().map(ToString::to_string))
.execute(tx.as_mut())
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(())
}
Expand All @@ -168,14 +166,13 @@ impl Endpoint {
",
)
.fetch_all(conn)
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(endpoints)
}

/// Delete this endpoint from the provided [`Database`]
pub async fn delete<'a>(&self, tx: &mut database::Transaction<'a>) -> Result<(), database::Error> {
pub async fn delete(&self, tx: &mut database::Transaction) -> Result<(), database::Error> {
sqlx::query(
"
DELETE FROM endpoint
Expand All @@ -184,8 +181,7 @@ impl Endpoint {
)
.bind(self.id.0)
.execute(tx.as_mut())
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(())
}
Expand All @@ -211,7 +207,7 @@ pub struct Tokens {

impl Tokens {
/// Save the tokens related to [`Id`] to the provided [`Database`]
pub async fn save<'a>(&self, tx: &mut database::Transaction<'a>, id: Id) -> Result<(), database::Error> {
pub async fn save(&self, tx: &mut database::Transaction, id: Id) -> Result<(), database::Error> {
sqlx::query(
"
UPDATE endpoint
Expand All @@ -225,8 +221,7 @@ impl Tokens {
.bind(&self.access_token)
.bind(id.0)
.execute(tx.as_mut())
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(())
}
Expand All @@ -247,8 +242,7 @@ impl Tokens {
)
.bind(id.0)
.fetch_one(conn)
.await
.map_err(database::Error::Execute)?;
.await?;

Ok(tokens)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/endpoint/enrollment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct Target {

/// Send auto-enrollment to the list of targets if the endpoint isn't already configured
pub(crate) async fn auto_enrollment(targets: &[Target], ourself: Issuer, state: &State) -> Result<(), Error> {
let mut conn = state.db.acquire().await?;
let mut conn = state.service_db.acquire().await?;

let endpoints = Endpoint::list(conn.as_mut()).await.map_err(Error::ListEndpoints)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<'a> Server<'a> {
///
/// [`Database`]: crate::Database
pub async fn start(self, addr: impl ToSocketAddrs) -> Result<(), Error> {
account::sync_admin(&self.state.db, self.config.admin.clone()).await?;
account::sync_admin(&self.state.service_db, self.config.admin.clone()).await?;

if self.role == Role::Hub {
if let Err(e) = enrollment::auto_enrollment(
Expand Down
21 changes: 13 additions & 8 deletions crates/service/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ pub struct State {
/// Database directory
pub db_dir: PathBuf,
/// Service database
pub db: Database,
pub service_db: Database,
/// Key pair used by the service
pub key_pair: KeyPair,
/// Pending enrollment requests that are awaiting confirmation
///
/// Only applicable for hub service
pub pending_sent: SharedMap<endpoint::Id, enrollment::Sent>,
pub(crate) pending_sent: SharedMap<endpoint::Id, enrollment::Sent>,
}

impl State {
Expand All @@ -45,12 +45,11 @@ impl State {
fs::create_dir_all(&db_dir).await.map_err(Error::CreateDbDir)?;
}

let db_path = db_dir.join("service.db");
let key_path = state_dir.join(".privkey");

let db = Database::new(&db_path).await?;
debug!(path = ?db_path, "Database opened");
let service_db_path = db_dir.join("service");
let service_db = Database::new(&service_db_path).await?;
debug!(path = ?service_db_path, "Database opened");

let key_path = state_dir.join(".privkey");
let key_pair = if !key_path.exists() {
let key_pair = KeyPair::generate();
debug!(key_pair = %key_pair.public_key(), "Keypair generated");
Expand All @@ -73,11 +72,17 @@ impl State {
root,
state_dir,
db_dir,
db,
service_db,
key_pair,
pending_sent: Default::default(),
})
}

/// Run the provided migrations against the service database
pub async fn with_migrations(mut self, migrator: database::Migrator) -> Result<Self, Error> {
self.service_db = self.service_db.with_migrations(migrator).await?;
Ok(self)
}
}

/// A state error
Expand Down
5 changes: 0 additions & 5 deletions crates/service/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,4 @@ where
pub async fn remove(&self, key: &K) -> Option<V> {
self.0.lock().await.remove(key)
}

/// Clones out and returns the inner map
pub async fn all(&self) -> HashMap<K, V> {
self.0.lock().await.clone()
}
}
Loading

0 comments on commit 043ceb2

Please sign in to comment.