Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Remove TraceReader trait.
Browse files Browse the repository at this point in the history
`TraceReader` only defined a single method, `map_batches`.  Work on
persistent traces showed that this is not a good design, as it exposes
the internal structure of the trace.  We only used this API in one place
in `distinct.rs`, and that code would have to be re-written anyway as we
generalize `distinct` to work in arbitrary nested scopes.

We eliminate the `map_batches` method from the public API and downgrade
it to a private method of `Spine`.  We also remove the no longer useful
trait `TraceReader`.
  • Loading branch information
ryzhyk committed Oct 10, 2022
1 parent 874b979 commit e50067c
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 92 deletions.
49 changes: 17 additions & 32 deletions src/operator/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ use crate::{
},
circuit_cache_key,
time::NestedTimestamp32,
trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace, TraceReader},
trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace},
NumEntries, Timestamp,
};
use size_of::SizeOf;
use std::{
borrow::Cow,
cmp::{max, Ordering},
collections::BTreeSet,
cmp::Ordering,
collections::{BTreeSet, HashMap},
hash::Hash,
marker::PhantomData,
mem::take,
ops::{Add, Neg},
};

Expand Down Expand Up @@ -299,12 +298,12 @@ where
pub struct DistinctTrace<Z, T>
where
Z: ZSet,
T: TraceReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
{
// Keeps track of keys that need to be considered at future times.
// Specifically, `future_updates[i]` accumulates all keys observed during
// the current epoch whose weight can change at time `i`.
future_updates: Vec<BTreeSet<Z::Key>>,
future_updates: HashMap<u32, BTreeSet<Z::Key>>,
// TODO: not needed once timekeeping is handled by the circuit.
time: u32,
empty_input: bool,
Expand All @@ -315,12 +314,12 @@ where
impl<Z, T> DistinctTrace<Z, T>
where
Z: ZSet,
T: TraceReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T::Time: Timestamp,
{
fn new() -> Self {
Self {
future_updates: Vec::new(),
future_updates: HashMap::new(),
time: HasZero::zero(),
empty_input: false,
empty_output: false,
Expand All @@ -334,7 +333,7 @@ where
Z: ZSet,
Z::Key: Clone + Ord + PartialEq,
Z::R: ZRingValue,
T: TraceReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
{
// Evaluate nested incremental distinct for a single value.
//
Expand Down Expand Up @@ -446,7 +445,10 @@ where
// Record next_ts in `self.future_updates`.
if let Some(next_ts) = next_ts {
let idx: usize = next_ts.inner() as usize;
self.future_updates[idx].insert(value.clone());
self.future_updates
.entry(idx as u32)
.or_insert_with(BTreeSet::new)
.insert(value.clone());
}
} else if weight.ge0() && !weight.is_zero() {
builder.push((Z::item_from(value.clone(), ()), HasOne::one()));
Expand All @@ -458,14 +460,14 @@ impl<Z, T> Operator for DistinctTrace<Z, T>
where
Z: ZSet,
Z::Key: SizeOf + Clone + Ord + PartialEq,
T: TraceReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("DistinctTrace")
}

fn metadata(&self, meta: &mut OperatorMeta) {
let size: usize = self.future_updates.iter().map(BTreeSet::len).sum();
let size: usize = self.future_updates.values().map(BTreeSet::len).sum();
let bytes = self.future_updates.size_of();

meta.extend(metadata! {
Expand Down Expand Up @@ -495,11 +497,7 @@ where
assert_eq!(scope, 0);
self.empty_input
&& self.empty_output
&& self
.future_updates
.iter()
.skip(self.time as usize)
.all(|vals| vals.is_empty())
&& self.future_updates.values().all(|vals| vals.is_empty())
}
}

Expand Down Expand Up @@ -527,7 +525,7 @@ where
// appeared in one of the previous epochs at time `t`.
//
// To efficiently compute keys that satisfy the second condition, we use the
// `future_updates` vector, where for each key observed in the current epoch
// `future_updates` map, where for each key observed in the current epoch
// at time `t1` we lookup the smallest time `t2 > t1` (if any) at which we saw
// the key during any previous epochs and record this key in
// `future_updates[t2]`. Then when evaluating an operatpr at time `t` we
Expand All @@ -541,26 +539,13 @@ where

self.empty_input = delta.is_zero();

// Make sure we have enough room in `future_updates` to
// accommodate the largest timestamp in the trace, so we don't
// need to worry about growing `future_updates` later on.
let mut new_len: u32 = self.time + 1;
trace.map_batches(|batch| {
for ts in batch.upper() {
new_len = max(new_len, ts.inner() + 1);
}
});

self.future_updates
.resize(new_len as usize, BTreeSet::new());

let mut builder = Z::Builder::with_capacity((), delta.len());

let mut trace_cursor = trace.cursor();

// For all keys in delta, for all keys in future_updates[time].
let mut delta_cursor = delta.cursor();
let candidates = take(&mut self.future_updates[self.time as usize]);
let candidates = self.future_updates.remove(&self.time).unwrap_or_default();
let mut cand_iterator = candidates.iter();

let mut candidate = cand_iterator.next();
Expand Down
8 changes: 4 additions & 4 deletions src/operator/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
time::Timestamp,
trace::{
cursor::Cursor as TraceCursor, spine_fueled::Spine, Batch, BatchReader, Batcher, Builder,
Trace, TraceReader,
Trace,
},
OrdIndexedZSet, OrdZSet,
};
Expand Down Expand Up @@ -549,7 +549,7 @@ impl JoinStats {

pub struct JoinTrace<F, I, T, Z, It>
where
T: TraceReader,
T: BatchReader,
Z: IndexedZSet,
{
join_func: F,
Expand All @@ -569,7 +569,7 @@ where

impl<F, I, T, Z, It> JoinTrace<F, I, T, Z, It>
where
T: TraceReader,
T: BatchReader,
Z: IndexedZSet,
{
pub fn new(join_func: F, location: &'static Location<'static>) -> Self {
Expand All @@ -590,7 +590,7 @@ impl<F, I, T, Z, It> Operator for JoinTrace<F, I, T, Z, It>
where
F: 'static,
I: 'static,
T: TraceReader + 'static,
T: BatchReader + 'static,
Z: IndexedZSet,
Z::Batcher: SizeOf,
It: 'static,
Expand Down
22 changes: 11 additions & 11 deletions src/operator/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Circuit, ExportId, ExportStream, GlobalNodeId, OwnershipPreference, Scope, Stream,
},
circuit_cache_key,
trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace, TraceReader},
trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace},
NumEntries, Timestamp,
};
use size_of::SizeOf;
Expand Down Expand Up @@ -154,7 +154,7 @@ where
impl<P, T> Stream<Circuit<P>, T>
where
P: Clone + 'static,
T: TraceReader + 'static,
T: Trace + 'static,
{
pub fn delay_trace(&self) -> Stream<Circuit<P>, T> {
self.circuit()
Expand All @@ -168,14 +168,14 @@ where

pub struct UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
_phantom: PhantomData<T>,
}

impl<T> UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
pub fn new() -> Self {
Self {
Expand All @@ -186,7 +186,7 @@ where

impl<T> Default for UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
fn default() -> Self {
Self::new()
Expand All @@ -195,7 +195,7 @@ where

impl<T> Operator for UntimedTraceAppend<T>
where
T: TraceReader + 'static,
T: Trace + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("UntimedTraceAppend")
Expand Down Expand Up @@ -241,15 +241,15 @@ where

pub struct TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
time: T::Time,
_phantom: PhantomData<B>,
}

impl<T, B> TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
pub fn new() -> Self {
Self {
Expand All @@ -261,7 +261,7 @@ where

impl<T, B> Default for TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
fn default() -> Self {
Self::new()
Expand All @@ -270,7 +270,7 @@ where

impl<T, B> Operator for TraceAppend<T, B>
where
T: TraceReader + 'static,
T: Trace + 'static,
B: 'static,
{
fn name(&self) -> Cow<'static, str> {
Expand Down Expand Up @@ -325,7 +325,7 @@ where
}
}

pub struct Z1Trace<T: TraceReader> {
pub struct Z1Trace<T: Trace> {
time: T::Time,
trace: Option<T>,
// `dirty[scope]` is `true` iff at least one non-empty update was added to the trace
Expand Down
12 changes: 6 additions & 6 deletions src/operator/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
},
operator::trace::{DelayedTraceId, TraceAppend, TraceId, Z1Trace},
trace::{
consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, Builder, Trace,
TraceReader,
consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, BatchReader,
Builder, Trace,
},
utils::VecExt,
Circuit, Stream, Timestamp,
Expand Down Expand Up @@ -98,15 +98,15 @@ where

pub struct Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
time: T::Time,
phantom: PhantomData<B>,
}

impl<T, B> Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
pub fn new() -> Self {
Self {
Expand All @@ -118,7 +118,7 @@ where

impl<T, B> Default for Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
fn default() -> Self {
Self::new()
Expand All @@ -127,7 +127,7 @@ where

impl<T, B> Operator for Upsert<T, B>
where
T: TraceReader + 'static,
T: BatchReader + 'static,
B: 'static,
{
fn name(&self) -> Cow<'static, str> {
Expand Down
34 changes: 4 additions & 30 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,14 @@ use crate::{
};
use size_of::SizeOf;

/// A trace whose contents may be read.
///
/// This is a restricted interface to the more general `Trace` trait, which
/// extends this trait with further methods to update the contents of the trace.
/// These methods are used to examine the contents, and to update the reader's
/// capabilities (which may release restrictions on the mutations to the
/// underlying trace and cause work to happen).
pub trait TraceReader: BatchReader {
/// The type of an immutable collection of updates.
type Batch: Batch<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R> + 'static;

// TODO: Do we want a version of `cursor` with an upper bound on time? E.g., it
// could help in `distinct` to avoid iterating into the future (and then
// drop future timestamps anyway).
/*
/// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
/// equal to an element of `upper`.
///
/// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
/// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
/// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
/// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
fn cursor_through(&self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage)>;
*/

/// Maps logic across the non-empty sequence of batches in the trace.
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
}

/// An append-only collection of `(key, val, time, diff)` tuples.
///
/// The trace must be constructable from, and navigable by the `Key`, `Val`,
/// `Time` types, but does not need to return them.
pub trait Trace: TraceReader {
pub trait Trace: BatchReader {
/// The type of an immutable collection of updates.
type Batch: Batch<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R> + 'static;

/// Allocates a new empty trace.
fn new(activator: Option<Activator>) -> Self;

Expand Down
Loading

0 comments on commit e50067c

Please sign in to comment.