Skip to content

Commit

Permalink
Pool ActiveQuerys in the query stack
Browse files Browse the repository at this point in the history
  • Loading branch information
Veykril committed Jan 4, 2025
1 parent 67b033e commit f3f6cc4
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 24 deletions.
79 changes: 74 additions & 5 deletions src/active_query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{mem, ops};

use rustc_hash::FxHashMap;

use super::zalsa_local::{EdgeKind, QueryEdges, QueryOrigin, QueryRevisions};
Expand Down Expand Up @@ -56,7 +58,7 @@ pub(crate) struct ActiveQuery {
}

impl ActiveQuery {
pub(super) fn new(database_key_index: DatabaseKeyIndex) -> Self {
pub(crate) fn new(database_key_index: DatabaseKeyIndex) -> Self {
ActiveQuery {
database_key_index,
durability: Durability::MAX,
Expand All @@ -70,6 +72,30 @@ impl ActiveQuery {
}
}

fn reset(&mut self, new_database_key_index: DatabaseKeyIndex) {
let Self {
database_key_index,
durability,
changed_at,
input_outputs,
untracked_read,
cycle,
disambiguator_map,
tracked_struct_ids,
accumulated,
} = self;
*database_key_index = new_database_key_index;
*durability = Durability::MAX;
*changed_at = Revision::start();
input_outputs.clear();
*untracked_read = false;
*cycle = None;
disambiguator_map.clear();
// These two are cleared via `mem::take`` when popped off as revisions.
debug_assert!(tracked_struct_ids.is_empty());
_ = accumulated;
}

pub(super) fn add_read(
&mut self,
input: DependencyIndex,
Expand Down Expand Up @@ -105,11 +131,11 @@ impl ActiveQuery {
self.input_outputs.contains(&(EdgeKind::Output, key))
}

pub(crate) fn into_revisions(self) -> QueryRevisions {
fn take_revisions(&mut self) -> QueryRevisions {
let input_outputs = if self.input_outputs.is_empty() {
Box::default()
} else {
self.input_outputs.into_iter().collect()
self.input_outputs.iter().copied().collect()
};

let edges = QueryEdges::new(input_outputs);
Expand All @@ -124,8 +150,8 @@ impl ActiveQuery {
changed_at: self.changed_at,
origin,
durability: self.durability,
tracked_struct_ids: self.tracked_struct_ids,
accumulated: self.accumulated,
tracked_struct_ids: mem::take(&mut self.tracked_struct_ids),
accumulated: mem::take(&mut self.accumulated),
}
}

Expand Down Expand Up @@ -166,3 +192,46 @@ impl ActiveQuery {
result
}
}

#[derive(Debug, Default)]
pub(crate) struct QueryStack {
stack: Vec<ActiveQuery>,
len: usize,
}

impl ops::Deref for QueryStack {
type Target = [ActiveQuery];

fn deref(&self) -> &Self::Target {
&self.stack[..self.len]
}
}

impl ops::DerefMut for QueryStack {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.stack[..self.len]
}
}

impl QueryStack {
pub(crate) fn push_new_query(&mut self, database_key_index: DatabaseKeyIndex) {
if self.len < self.stack.len() {
self.stack[self.len].reset(database_key_index);
} else {
self.stack.push(ActiveQuery::new(database_key_index));
}
self.len += 1;
}

pub(crate) fn len(&self) -> usize {
self.len
}

pub(crate) fn pop_into_revisions(&mut self) -> Option<QueryRevisions> {
if self.len == 0 {
return None;
}
self.len -= 1;
Some(self.stack[self.len].take_revisions())
}
}
1 change: 0 additions & 1 deletion src/function/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ where

// Query was not previously executed, or value is potentially
// stale, or value is absent. Let's execute!
let database_key_index = active_query.database_key_index;
let id = database_key_index.key_index;
let value = match Cycle::catch(|| C::execute(db, C::id_to_input(db, id))) {
Ok(v) => v,
Expand Down
30 changes: 12 additions & 18 deletions src/zalsa_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rustc_hash::FxHashMap;
use tracing::debug;

use crate::accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues};
use crate::active_query::ActiveQuery;
use crate::active_query::QueryStack;
use crate::durability::Durability;
use crate::key::DatabaseKeyIndex;
use crate::key::DependencyIndex;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct ZalsaLocal {
///
/// Unwinding note: pushes onto this vector must be popped -- even
/// during unwinding.
query_stack: RefCell<Vec<ActiveQuery>>,
query_stack: RefCell<QueryStack>,

/// Stores the most recent page for a given ingredient.
/// This is thread-local to avoid contention.
Expand All @@ -46,7 +46,7 @@ pub struct ZalsaLocal {
impl ZalsaLocal {
pub(crate) fn new() -> Self {
ZalsaLocal {
query_stack: RefCell::new(vec![]),
query_stack: RefCell::new(QueryStack::default()),
most_recent_pages: RefCell::new(FxHashMap::default()),
}
}
Expand Down Expand Up @@ -87,7 +87,7 @@ impl ZalsaLocal {
#[inline]
pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
let mut query_stack = self.query_stack.borrow_mut();
query_stack.push(ActiveQuery::new(database_key_index));
query_stack.push_new_query(database_key_index);
ActiveQueryGuard {
local_state: self,
database_key_index,
Expand All @@ -96,8 +96,8 @@ impl ZalsaLocal {
}

/// Executes a closure within the context of the current active query stacks.
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
c(self.query_stack.borrow_mut().as_mut())
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut QueryStack) -> R) -> R {
c(&mut self.query_stack.borrow_mut())
}

fn query_in_progress(&self) -> bool {
Expand Down Expand Up @@ -496,15 +496,15 @@ pub(crate) struct ActiveQueryGuard<'me> {
}

impl ActiveQueryGuard<'_> {
fn pop_helper(&self) -> ActiveQuery {
fn pop_impl(&self) -> QueryRevisions {
self.local_state.with_query_stack(|stack| {
// Sanity check: pushes and pops should be balanced.
assert_eq!(stack.len(), self.push_len);
debug_assert_eq!(
stack.last().unwrap().database_key_index,
self.database_key_index
);
stack.pop().unwrap()
stack.pop_into_revisions().unwrap()
})
}

Expand All @@ -519,8 +519,8 @@ impl ActiveQueryGuard<'_> {
}

/// Invoked when the query has successfully completed execution.
pub(crate) fn complete(self) -> ActiveQuery {
let query = self.pop_helper();
fn complete(self) -> QueryRevisions {
let query = self.pop_impl();
std::mem::forget(self);
query
}
Expand All @@ -530,13 +530,7 @@ impl ActiveQueryGuard<'_> {
/// query's execution.
#[inline]
pub(crate) fn pop(self) -> QueryRevisions {
// Extract accumulated inputs.
let popped_query = self.complete();

// If this frame were a cycle participant, it would have unwound.
assert!(popped_query.cycle.is_none());

popped_query.into_revisions()
self.complete()
}

/// If the active query is registered as a cycle participant, remove and
Expand All @@ -549,6 +543,6 @@ impl ActiveQueryGuard<'_> {

impl Drop for ActiveQueryGuard<'_> {
fn drop(&mut self) {
self.pop_helper();
self.pop_impl();
}
}

0 comments on commit f3f6cc4

Please sign in to comment.