Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Remove TraceReader trait. #203

Merged
merged 1 commit into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 17 additions & 32 deletions src/operator/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ use crate::{
},
circuit_cache_key,
time::NestedTimestamp32,
trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace, TraceReader},
trace::{ord::OrdKeySpine, BatchReader, Builder, Cursor as TraceCursor, Trace},
NumEntries, Timestamp,
};
use size_of::SizeOf;
use std::{
borrow::Cow,
cmp::{max, Ordering},
collections::BTreeSet,
cmp::Ordering,
collections::{BTreeSet, HashMap},
hash::Hash,
marker::PhantomData,
mem::take,
ops::{Add, Neg},
};

Expand Down Expand Up @@ -299,12 +298,12 @@ where
pub struct DistinctTrace<Z, T>
where
Z: ZSet,
T: TraceReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
{
// Keeps track of keys that need to be considered at future times.
// Specifically, `future_updates[i]` accumulates all keys observed during
// the current epoch whose weight can change at time `i`.
future_updates: Vec<BTreeSet<Z::Key>>,
future_updates: HashMap<u32, BTreeSet<Z::Key>>,
// TODO: not needed once timekeeping is handled by the circuit.
time: u32,
empty_input: bool,
Expand All @@ -315,12 +314,12 @@ where
impl<Z, T> DistinctTrace<Z, T>
where
Z: ZSet,
T: TraceReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), R = Z::R> + 'static,
T::Time: Timestamp,
{
fn new() -> Self {
Self {
future_updates: Vec::new(),
future_updates: HashMap::new(),
time: HasZero::zero(),
empty_input: false,
empty_output: false,
Expand All @@ -334,7 +333,7 @@ where
Z: ZSet,
Z::Key: Clone + Ord + PartialEq,
Z::R: ZRingValue,
T: TraceReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
{
// Evaluate nested incremental distinct for a single value.
//
Expand Down Expand Up @@ -446,7 +445,10 @@ where
// Record next_ts in `self.future_updates`.
if let Some(next_ts) = next_ts {
let idx: usize = next_ts.inner() as usize;
self.future_updates[idx].insert(value.clone());
self.future_updates
.entry(idx as u32)
.or_insert_with(BTreeSet::new)
.insert(value.clone());
}
} else if weight.ge0() && !weight.is_zero() {
builder.push((Z::item_from(value.clone(), ()), HasOne::one()));
Expand All @@ -458,14 +460,14 @@ impl<Z, T> Operator for DistinctTrace<Z, T>
where
Z: ZSet,
Z::Key: SizeOf + Clone + Ord + PartialEq,
T: TraceReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
T: BatchReader<Key = Z::Key, Val = (), Time = NestedTimestamp32, R = Z::R> + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::Borrowed("DistinctTrace")
}

fn metadata(&self, meta: &mut OperatorMeta) {
let size: usize = self.future_updates.iter().map(BTreeSet::len).sum();
let size: usize = self.future_updates.values().map(BTreeSet::len).sum();
let bytes = self.future_updates.size_of();

meta.extend(metadata! {
Expand Down Expand Up @@ -495,11 +497,7 @@ where
assert_eq!(scope, 0);
self.empty_input
&& self.empty_output
&& self
.future_updates
.iter()
.skip(self.time as usize)
.all(|vals| vals.is_empty())
&& self.future_updates.values().all(|vals| vals.is_empty())
}
}

Expand Down Expand Up @@ -527,7 +525,7 @@ where
// appeared in one of the previous epochs at time `t`.
//
// To efficiently compute keys that satisfy the second condition, we use the
// `future_updates` vector, where for each key observed in the current epoch
// `future_updates` map, where for each key observed in the current epoch
// at time `t1` we lookup the smallest time `t2 > t1` (if any) at which we saw
// the key during any previous epochs and record this key in
// `future_updates[t2]`. Then when evaluating an operatpr at time `t` we
Expand All @@ -541,26 +539,13 @@ where

self.empty_input = delta.is_zero();

// Make sure we have enough room in `future_updates` to
// accommodate the largest timestamp in the trace, so we don't
// need to worry about growing `future_updates` later on.
let mut new_len: u32 = self.time + 1;
trace.map_batches(|batch| {
for ts in batch.upper() {
new_len = max(new_len, ts.inner() + 1);
}
});

self.future_updates
.resize(new_len as usize, BTreeSet::new());

let mut builder = Z::Builder::with_capacity((), delta.len());

let mut trace_cursor = trace.cursor();

// For all keys in delta, for all keys in future_updates[time].
let mut delta_cursor = delta.cursor();
let candidates = take(&mut self.future_updates[self.time as usize]);
let candidates = self.future_updates.remove(&self.time).unwrap_or_default();
let mut cand_iterator = candidates.iter();

let mut candidate = cand_iterator.next();
Expand Down
8 changes: 4 additions & 4 deletions src/operator/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
time::Timestamp,
trace::{
cursor::Cursor as TraceCursor, spine_fueled::Spine, Batch, BatchReader, Batcher, Builder,
Trace, TraceReader,
Trace,
},
OrdIndexedZSet, OrdZSet,
};
Expand Down Expand Up @@ -549,7 +549,7 @@ impl JoinStats {

pub struct JoinTrace<F, I, T, Z, It>
where
T: TraceReader,
T: BatchReader,
Z: IndexedZSet,
{
join_func: F,
Expand All @@ -569,7 +569,7 @@ where

impl<F, I, T, Z, It> JoinTrace<F, I, T, Z, It>
where
T: TraceReader,
T: BatchReader,
Z: IndexedZSet,
{
pub fn new(join_func: F, location: &'static Location<'static>) -> Self {
Expand All @@ -590,7 +590,7 @@ impl<F, I, T, Z, It> Operator for JoinTrace<F, I, T, Z, It>
where
F: 'static,
I: 'static,
T: TraceReader + 'static,
T: BatchReader + 'static,
Z: IndexedZSet,
Z::Batcher: SizeOf,
It: 'static,
Expand Down
22 changes: 11 additions & 11 deletions src/operator/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
Circuit, ExportId, ExportStream, GlobalNodeId, OwnershipPreference, Scope, Stream,
},
circuit_cache_key,
trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace, TraceReader},
trace::{cursor::Cursor, spine_fueled::Spine, Batch, BatchReader, Builder, Trace},
NumEntries, Timestamp,
};
use size_of::SizeOf;
Expand Down Expand Up @@ -154,7 +154,7 @@ where
impl<P, T> Stream<Circuit<P>, T>
where
P: Clone + 'static,
T: TraceReader + 'static,
T: Trace + 'static,
{
pub fn delay_trace(&self) -> Stream<Circuit<P>, T> {
self.circuit()
Expand All @@ -168,14 +168,14 @@ where

pub struct UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
_phantom: PhantomData<T>,
}

impl<T> UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
pub fn new() -> Self {
Self {
Expand All @@ -186,7 +186,7 @@ where

impl<T> Default for UntimedTraceAppend<T>
where
T: TraceReader,
T: Trace,
{
fn default() -> Self {
Self::new()
Expand All @@ -195,7 +195,7 @@ where

impl<T> Operator for UntimedTraceAppend<T>
where
T: TraceReader + 'static,
T: Trace + 'static,
{
fn name(&self) -> Cow<'static, str> {
Cow::from("UntimedTraceAppend")
Expand Down Expand Up @@ -241,15 +241,15 @@ where

pub struct TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
time: T::Time,
_phantom: PhantomData<B>,
}

impl<T, B> TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
pub fn new() -> Self {
Self {
Expand All @@ -261,7 +261,7 @@ where

impl<T, B> Default for TraceAppend<T, B>
where
T: TraceReader,
T: Trace,
{
fn default() -> Self {
Self::new()
Expand All @@ -270,7 +270,7 @@ where

impl<T, B> Operator for TraceAppend<T, B>
where
T: TraceReader + 'static,
T: Trace + 'static,
B: 'static,
{
fn name(&self) -> Cow<'static, str> {
Expand Down Expand Up @@ -325,7 +325,7 @@ where
}
}

pub struct Z1Trace<T: TraceReader> {
pub struct Z1Trace<T: Trace> {
time: T::Time,
trace: Option<T>,
// `dirty[scope]` is `true` iff at least one non-empty update was added to the trace
Expand Down
12 changes: 6 additions & 6 deletions src/operator/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
},
operator::trace::{DelayedTraceId, TraceAppend, TraceId, Z1Trace},
trace::{
consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, Builder, Trace,
TraceReader,
consolidation::consolidate, cursor::Cursor, spine_fueled::Spine, Batch, BatchReader,
Builder, Trace,
},
utils::VecExt,
Circuit, Stream, Timestamp,
Expand Down Expand Up @@ -98,15 +98,15 @@ where

pub struct Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
time: T::Time,
phantom: PhantomData<B>,
}

impl<T, B> Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
pub fn new() -> Self {
Self {
Expand All @@ -118,7 +118,7 @@ where

impl<T, B> Default for Upsert<T, B>
where
T: TraceReader,
T: BatchReader,
{
fn default() -> Self {
Self::new()
Expand All @@ -127,7 +127,7 @@ where

impl<T, B> Operator for Upsert<T, B>
where
T: TraceReader + 'static,
T: BatchReader + 'static,
B: 'static,
{
fn name(&self) -> Cow<'static, str> {
Expand Down
34 changes: 4 additions & 30 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,14 @@ use crate::{
};
use size_of::SizeOf;

/// A trace whose contents may be read.
///
/// This is a restricted interface to the more general `Trace` trait, which
/// extends this trait with further methods to update the contents of the trace.
/// These methods are used to examine the contents, and to update the reader's
/// capabilities (which may release restrictions on the mutations to the
/// underlying trace and cause work to happen).
pub trait TraceReader: BatchReader {
/// The type of an immutable collection of updates.
type Batch: Batch<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R> + 'static;

// TODO: Do we want a version of `cursor` with an upper bound on time? E.g., it
// could help in `distinct` to avoid iterating into the future (and then
// drop future timestamps anyway).
/*
/// Acquires a cursor to the restriction of the collection's contents to updates at times not greater or
/// equal to an element of `upper`.
///
/// This method is expected to work if called with an `upper` that (i) was an observed bound in batches from
/// the trace, and (ii) the trace has not been advanced beyond `upper`. Practically, the implementation should
/// be expected to look for a "clean cut" using `upper`, and if it finds such a cut can return a cursor. This
/// should allow `upper` such as `&[]` as used by `self.cursor()`, though it is difficult to imagine other uses.
fn cursor_through(&self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, <Self::Cursor as Cursor<Self::Key, Self::Val, Self::Time, Self::R>>::Storage)>;
*/

/// Maps logic across the non-empty sequence of batches in the trace.
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F);
}

/// An append-only collection of `(key, val, time, diff)` tuples.
///
/// The trace must be constructable from, and navigable by the `Key`, `Val`,
/// `Time` types, but does not need to return them.
pub trait Trace: TraceReader {
pub trait Trace: BatchReader {
/// The type of an immutable collection of updates.
type Batch: Batch<Key = Self::Key, Val = Self::Val, Time = Self::Time, R = Self::R> + 'static;

/// Allocates a new empty trace.
fn new(activator: Option<Activator>) -> Self;

Expand Down
Loading