Skip to content

Commit

Permalink
chore(cubesql): Allow to do custom representations for sessions
Browse files Browse the repository at this point in the history
chore(cubesql): Allow to do custom representations for sessions
  • Loading branch information
ovr committed Aug 22, 2024
1 parent 954df5a commit 78548dd
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl TableProvider for InfoSchemaProcesslistProvider {
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let mut builder = InformationSchemaProcesslistBuilder::new();

for process_list in self.sessions.process_list().await {
for process_list in self.sessions.map_sessions::<SessionProcessList>().await {
builder.add_row(process_list);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ impl PgStatActivityBuilder {
self.oid.append_value(session.oid).unwrap();
self.datname.append_option(session.datname).unwrap();
self.pid.append_value(session.pid).unwrap();
self.leader_pid.append_null().unwrap();
self.usesysid.append_null().unwrap();
self.leader_pid.append_option(session.leader_pid).unwrap();
self.usesysid.append_option(session.usesysid).unwrap();
self.usename.append_option(session.usename).unwrap();
self.application_name
.append_option(session.application_name)
Expand Down Expand Up @@ -205,7 +205,7 @@ impl TableProvider for PgCatalogStatActivityProvider {
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let sessions = self.sessions.stat_activity().await;
let sessions = self.sessions.map_sessions::<SessionStatActivity>().await;
let mut builder = PgStatActivityBuilder::new(sessions.len());

for session in sessions {
Expand Down
3 changes: 2 additions & 1 deletion rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ async fn get_test_session_with_config_and_transport(
let session_manager = Arc::new(SessionManager::new(server.clone()));
let session = session_manager
.create_session(protocol, "127.0.0.1".to_string(), 1234, None)
.await.unwrap();
.await

Check warning on line 569 in rust/cubesql/cubesql/src/compile/test/mod.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/test/mod.rs#L569

Added line #L569 was not covered by tests
.unwrap();

// Populate like shims
session.state.set_database(Some(db_name.to_string()));
Expand Down
9 changes: 6 additions & 3 deletions rust/cubesql/cubesql/src/sql/postgres/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;

use super::shim::AsyncPostgresShim;
use crate::{
compile::DatabaseProtocol,
config::processing_loop::{ProcessingLoop, ShutdownMode},
sql::SessionManager,
telemetry::{ContextLogger, SessionLogger},
CubeError,
};
use crate::sql::Session;
use super::shim::AsyncPostgresShim;

pub struct PostgresServer {
// options
Expand Down Expand Up @@ -98,7 +97,11 @@ impl ProcessingLoop for PostgresServer {
}
};

let session = match self.session_manager.create_session(DatabaseProtocol::PostgreSQL, client_addr, client_port, None).await {
let session = match self
.session_manager
.create_session(DatabaseProtocol::PostgreSQL, client_addr, client_port, None)
.await

Check warning on line 103 in rust/cubesql/cubesql/src/sql/postgres/service.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/sql/postgres/service.rs#L103

Added line #L103 was not covered by tests
{
Ok(r) => r,
Err(err) => {
error!("Session creation error: {}", err);
Expand Down
84 changes: 43 additions & 41 deletions rust/cubesql/cubesql/src/sql/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,46 +403,7 @@ pub struct Session {
pub state: Arc<SessionState>,
}

impl Session {
// For PostgreSQL
pub fn to_stat_activity(self: &Arc<Self>) -> SessionStatActivity {
let query = self.state.current_query();

let application_name = if let Some(v) = self.state.get_variable("application_name") {
match v.value {
ScalarValue::Utf8(r) => r,
_ => None,
}
} else {
None
};

SessionStatActivity {
oid: self.state.connection_id,
datname: self.state.database(),
pid: self.state.connection_id,
leader_pid: None,
usesysid: 0,
usename: self.state.user(),
application_name,
client_addr: self.state.client_ip.clone(),
client_hostname: None,
client_port: self.state.client_port.clone(),
query,
}
}

// For MySQL
pub fn to_process_list(self: &Arc<Self>) -> SessionProcessList {
SessionProcessList {
id: self.state.connection_id,
host: self.state.client_ip.clone(),
user: self.state.user(),
database: self.state.database(),
}
}
}

/// Specific representation of session for MySQL
#[derive(Debug)]
pub struct SessionProcessList {
pub id: u32,
Expand All @@ -451,17 +412,58 @@ pub struct SessionProcessList {
pub database: Option<String>,
}

impl From<&Arc<Session>> for SessionProcessList {
fn from(session: &Arc<Session>) -> Self {
Self {
id: session.state.connection_id,
host: session.state.client_ip.clone(),
user: session.state.user(),
database: session.state.database(),
}
}
}

/// Specific representation of session for PostgreSQL
#[derive(Debug)]
pub struct SessionStatActivity {
pub oid: u32,
pub datname: Option<String>,
pub pid: u32,
pub leader_pid: Option<u32>,
pub usesysid: u32,
pub usesysid: Option<u32>,
pub usename: Option<String>,
pub application_name: Option<String>,
pub client_addr: String,
pub client_hostname: Option<String>,
pub client_port: u16,
pub query: Option<String>,
}

impl From<&Arc<Session>> for SessionStatActivity {
fn from(session: &Arc<Session>) -> Self {
let query = session.state.current_query();

let application_name = if let Some(v) = session.state.get_variable("application_name") {
match v.value {
ScalarValue::Utf8(r) => r,
_ => None,

Check warning on line 449 in rust/cubesql/cubesql/src/sql/session.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/sql/session.rs#L449

Added line #L449 was not covered by tests
}
} else {
None

Check warning on line 452 in rust/cubesql/cubesql/src/sql/session.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/sql/session.rs#L452

Added line #L452 was not covered by tests
};

Self {
oid: session.state.connection_id,
datname: session.state.database(),
pid: session.state.connection_id,
leader_pid: None,
usesysid: None,
usename: session.state.user(),
application_name,
client_addr: session.state.client_ip.clone(),
client_hostname: None,
client_port: session.state.client_port.clone(),
query,
}
}
}
24 changes: 9 additions & 15 deletions rust/cubesql/cubesql/src/sql/session_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use std::{

use super::{
server_manager::ServerManager,
session::{Session, SessionProcessList, SessionStatActivity, SessionState},
session::{Session, SessionState},
};
use crate::compile::DatabaseProtocol;

#[derive(Debug)]
struct SessionManagerInner {
sessions: HashMap<u32, Arc<Session>>,
uid_to_session: HashMap<String, Arc<Session>>
uid_to_session: HashMap<String, Arc<Session>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SessionManager {
return Err(CubeError::user(format!(
"Session cannot be created, because extra_id: {} already exists",
extra_id
)))
)));
}

Check warning on line 80 in rust/cubesql/cubesql/src/sql/session_manager.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/sql/session_manager.rs#L75-L80

Added lines #L75 - L80 were not covered by tests

guard.uid_to_session.insert(extra_id, session_ref.clone());

Check warning on line 82 in rust/cubesql/cubesql/src/sql/session_manager.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/sql/session_manager.rs#L82

Added line #L82 was not covered by tests
Expand All @@ -87,20 +87,14 @@ impl SessionManager {
Ok(session_ref)
}

pub async fn stat_activity(self: &Arc<Self>) -> Vec<SessionStatActivity> {
pub async fn map_sessions<T: for<'a> From<&'a Arc<Session>>>(self: &Arc<Self>) -> Vec<T> {
let guard = self.sessions.read().await;

guard.sessions.values()
.map(Session::to_stat_activity)
.collect::<Vec<SessionStatActivity>>()
}

pub async fn process_list(self: &Arc<Self>) -> Vec<SessionProcessList> {
let guard = self.sessions.read().await;

guard.uid_to_session.values()
.map(Session::to_process_list)
.collect::<Vec<SessionProcessList>>()
guard
.sessions
.values()
.map(|session| T::from(session))
.collect::<Vec<T>>()
}

pub async fn get_session(&self, connection_id: u32) -> Option<Arc<Session>> {
Expand Down

0 comments on commit 78548dd

Please sign in to comment.