Skip to content

Commit

Permalink
Merge pull request #269 from amv-dev/pool-optimizations
Browse files Browse the repository at this point in the history
Pool struct optimizations
  • Loading branch information
blackbeam authored Dec 25, 2021
2 parents 24b3584 + 6f5e46c commit f2db414
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions src/conn/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ impl InnerPool {
}
}

struct ArcedPool {
inner: (Mutex<InnerPool>, Condvar),
min: usize,
max: usize,
count: AtomicUsize,
}

/// `Pool` serves to provide you with a [`PooledConn`](struct.PooledConn.html)'s.
/// However you can prepare statements directly on `Pool` without
/// invoking [`Pool::get_conn`](struct.Pool.html#method.get_conn).
Expand Down Expand Up @@ -92,10 +99,7 @@ impl InnerPool {
/// [`PooledConn`](struct.PooledConn.html) documentation.
#[derive(Clone)]
pub struct Pool {
inner: Arc<(Mutex<InnerPool>, Condvar)>,
min: Arc<AtomicUsize>,
max: Arc<AtomicUsize>,
count: Arc<AtomicUsize>,
arced_pool: Arc<ArcedPool>,
check_health: bool,
use_cache: bool,
}
Expand All @@ -118,7 +122,7 @@ impl Pool {
None
};

let &(ref inner_pool, ref condvar) = &*self.inner;
let &(ref inner_pool, ref condvar) = &self.arced_pool.inner;

let conn = if self.use_cache {
if let Some(query) = stmt {
Expand Down Expand Up @@ -146,9 +150,9 @@ impl Pool {
if let Some(conn) = pool.pool.pop_front() {
drop(pool);
break conn;
} else if self.count.load(Ordering::Relaxed) < self.max.load(Ordering::Relaxed) {
} else if self.arced_pool.count.load(Ordering::Relaxed) < self.arced_pool.max {
pool.new_conn()?;
self.count.fetch_add(1, Ordering::SeqCst);
self.arced_pool.count.fetch_add(1, Ordering::SeqCst);
} else {
pool = if let Some((start, timeout)) = times {
if start.elapsed() > timeout {
Expand All @@ -164,7 +168,7 @@ impl Pool {

if call_ping && self.check_health && !conn.ping() {
if let Err(err) = conn.reset() {
self.count.fetch_sub(1, Ordering::SeqCst);
self.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
return Err(err);
}
}
Expand All @@ -184,10 +188,12 @@ impl Pool {
pub fn new_manual<T: Into<Opts>>(min: usize, max: usize, opts: T) -> Result<Pool> {
let pool = InnerPool::new(min, max, opts.into())?;
Ok(Pool {
inner: Arc::new((Mutex::new(pool), Condvar::new())),
min: Arc::new(AtomicUsize::new(min)),
max: Arc::new(AtomicUsize::new(max)),
count: Arc::new(AtomicUsize::new(min)),
arced_pool: Arc::new(ArcedPool {
inner: (Mutex::new(pool), Condvar::new()),
min,
max,
count: AtomicUsize::new(min),
}),
use_cache: true,
check_health: true,
})
Expand Down Expand Up @@ -243,9 +249,9 @@ impl fmt::Debug for Pool {
write!(
f,
"Pool {{ min: {}, max: {}, count: {} }}",
self.min.load(Ordering::Relaxed),
self.max.load(Ordering::Relaxed),
self.count.load(Ordering::Relaxed)
self.arced_pool.min,
self.arced_pool.max,
self.arced_pool.count.load(Ordering::Relaxed)
)
}
}
Expand Down Expand Up @@ -298,16 +304,16 @@ impl Deref for PooledConn {

impl Drop for PooledConn {
fn drop(&mut self) {
if self.pool.count.load(Ordering::Relaxed) > self.pool.max.load(Ordering::Relaxed)
if self.pool.arced_pool.count.load(Ordering::Relaxed) > self.pool.arced_pool.max
|| self.conn.is_none()
{
self.pool.count.fetch_sub(1, Ordering::SeqCst);
self.pool.arced_pool.count.fetch_sub(1, Ordering::SeqCst);
} else {
self.conn.as_mut().unwrap().set_local_infile_handler(None);
let mut pool = (self.pool.inner).0.lock().unwrap();
let mut pool = (self.pool.arced_pool.inner).0.lock().unwrap();
pool.pool.push_back(self.conn.take().unwrap());
drop(pool);
(self.pool.inner).1.notify_one();
(self.pool.arced_pool.inner).1.notify_one();
}
}
}
Expand Down

0 comments on commit f2db414

Please sign in to comment.