Skip to content

Commit

Permalink
Implement registry for drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Jul 24, 2023
1 parent 4132d25 commit dc35088
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 32 deletions.
87 changes: 56 additions & 31 deletions query-engine/connectors/sql-query-connector/src/database/js.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,49 @@ use connector_interface::{
error::{ConnectorError, ErrorKind},
Connection, Connector,
};
use once_cell::sync::Lazy;
use quaint::{
connector::IsolationLevel,
prelude::{Queryable as QuaintQueryable, *},
};
use std::sync::Arc;
use std::{
collections::{hash_map::Entry, HashMap},
sync::{Arc, Mutex},
};

/// Registry is the type for the global registry of Js connectors.
type Registry = HashMap<String, JsTransactionCapable>;

// TODO: https://github.com/prisma/team-orm/issues/245
// implement registry for client drivers, rather than a global variable,
// this would require the register_driver and registered_js_driver functions to
// receive an identifier for the specific driver
static QUERYABLE: once_cell::sync::OnceCell<Arc<dyn Queryable>> = once_cell::sync::OnceCell::new();
/// REGISTRY is the global registry of JsConnectors
static REGISTRY: Lazy<Mutex<Registry>> = Lazy::new(|| Mutex::new(HashMap::new()));

pub fn registered_js_connector() -> Option<&'static Arc<dyn Queryable>> {
QUERYABLE.get()
pub fn registered_js_connector(provider: &str) -> connector::Result<JsTransactionCapable> {
let lock = REGISTRY.lock().unwrap();
lock.get(provider)
.ok_or(ConnectorError::from_kind(ErrorKind::UnsupportedConnector(format!(
"A Javascript connector proxy for {} was not registered",
provider
))))
.map(|conn_ref| conn_ref.to_owned())
}

pub fn register_js_connector(driver: Arc<dyn Queryable>) {
if QUERYABLE.set(driver).is_err() {
panic!("Cannot register driver twice");
pub fn register_js_connector(provider: &str, connector: Arc<dyn QuaintQueryable>) -> Result<(), String> {
let mut lock = REGISTRY.lock().unwrap();
let entry = lock.entry(provider.to_string());
match entry {
Entry::Occupied(_) => Err(format!(
"A Javascript connector proxy for {} was already registered. Skipping",
provider
)),
Entry::Vacant(v) => {
v.insert(JsTransactionCapable { queryable: connector });
Ok(())
}
}
}

pub struct Js {
connector: JsConnector,
queryable: JsTransactionCapable,
connection_info: ConnectionInfo,
features: psl::PreviewFeatures,
psl_connector: psl::builtin_connectors::JsConnector,
Expand All @@ -51,30 +70,31 @@ impl FromSource for Js {
url: &str,
features: psl::PreviewFeatures,
) -> connector_interface::Result<Js> {
let psl_connector = source.active_connector.as_js_connector().unwrap_or_else(|| {
panic!(
"Connector for {} is not a JsConnector",
match source.active_connector.as_js_connector() {
Some(psl_connector) => {
let queryable = registered_js_connector(source.active_provider)?;
let connection_info = get_connection_info(url)?;

Ok(Js {
queryable,
connection_info,
features,
psl_connector,
})
}
None => panic!(
"Connector for provider {} is not a JsConnector",
source.active_connector.provider_name()
)
});

let connector = registered_js_connector().unwrap().clone();
let connection_info = get_connection_info(url)?;

return Ok(Js {
connector: JsConnector { queryable: connector },
connection_info,
features: features.to_owned(),
psl_connector,
});
),
}
}
}

#[async_trait]
impl Connector for Js {
async fn get_connection<'a>(&'a self) -> connector::Result<Box<dyn Connection + Send + Sync + 'static>> {
super::catch(self.connection_info.clone(), async move {
let sql_conn = SqlConnection::new(self.connector.clone(), &self.connection_info, self.features);
let sql_conn = SqlConnection::new(self.queryable.clone(), &self.connection_info, self.features);
Ok(Box::new(sql_conn) as Box<dyn Connection + Send + Sync + 'static>)
})
.await
Expand Down Expand Up @@ -107,12 +127,12 @@ impl Connector for Js {
// declaration, so finally I couldn't come up with anything better then wrapping a QuaintQueryable
// in this object, and implementing TransactionCapable (and quaint::Queryable) explicitly for it.
#[derive(Clone)]
struct JsConnector {
pub struct JsTransactionCapable {
queryable: Arc<dyn QuaintQueryable>,
}

#[async_trait]
impl QuaintQueryable for JsConnector {
impl QuaintQueryable for JsTransactionCapable {
async fn query(&self, q: Query<'_>) -> quaint::Result<quaint::prelude::ResultSet> {
self.queryable.query(q).await
}
Expand All @@ -137,6 +157,8 @@ impl QuaintQueryable for JsConnector {
self.queryable.execute_raw_typed(sql, params).await
}

/// Run a command in the database, for queries that can't be run using
/// prepared statements.
async fn raw_cmd(&self, cmd: &str) -> quaint::Result<()> {
self.queryable.raw_cmd(cmd).await
}
Expand All @@ -149,13 +171,16 @@ impl QuaintQueryable for JsConnector {
self.queryable.is_healthy()
}

/// Sets the transaction isolation level to given value.
/// Implementers have to make sure that the passed isolation level is valid for the underlying database.
async fn set_tx_isolation_level(&self, isolation_level: IsolationLevel) -> quaint::Result<()> {
self.queryable.set_tx_isolation_level(isolation_level).await
}

/// Signals if the isolation level SET needs to happen before or after the tx BEGIN.
fn requires_isolation_first(&self) -> bool {
self.queryable.requires_isolation_first()
}
}

impl TransactionCapable for JsConnector {}
impl TransactionCapable for JsTransactionCapable {}
6 changes: 5 additions & 1 deletion query-engine/query-engine-node-api/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,15 @@ impl QueryEngine {
let overrides: Vec<(_, _)> = datasource_overrides.into_iter().collect();
let mut schema = psl::validate(datamodel.into());
let config = &mut schema.configuration;
let provider_name = schema.connector.provider_name();

#[cfg(feature = "js-connectors")]
if let Some(driver) = maybe_driver {
let queryable = js_connectors::JsQueryable::from(driver);
sql_connector::register_js_connector(Arc::new(queryable));
match sql_connector::register_js_connector(provider_name, Arc::new(queryable)) {
Ok(_) => tracing::info!("Registered js connector for {provider_name}"),
Err(err) => tracing::error!("Failed to registered js connector for {provider_name}. {err}"),
}
}

schema
Expand Down

0 comments on commit dc35088

Please sign in to comment.