-
Notifications
You must be signed in to change notification settings - Fork 20
Persistent Trace with RocksDB #124
Conversation
In regards to serializing and comparing data my vote is on |
Very cool, thanks heaps!
I don't think we need to achieve optimal performance to merge this. Let's start with something that works and iterate. Besides, I've no idea what performance level we should strive for -- something we need to figure out.
Alternatively, we can require that one should call
Assuming we switch to one RocksDB instance per trace, I'm not sure we need the merge operation on persistent traces at all (other than the merging performed by RocksDB in the background).
We only have single-threaded accesses now (since we shard data across multiple threads). We recently realized that non-equi-join operators may require sharing the same batch or trace across all workers, so this may change.
Fair enough, so we measure pure RocksDB + OS overheads. I wonder how much slower it will run on top of an SSD.
I don't expect we will construct persistent Z-sets using builders. Most likely, batches will be constructed in memory as our normal "light" OrdZSets and will get converted to persistent representation when added to a persistent trace.
Likewise, these overheads probably don't matter too much, as iterating over entire persistent Z-set is not a common operation.
Seek is much more important than iteration, so it's great that this overhead is lower.
OrdZSet uses binary search, which I guess can be expensive compared to hash-based indexing for random seeks. But in practice we seek for monotonically increasing keys, which should be less expensive using binary search (less distance to search for). |
Ideally, our persistent trace design will avoid the overheads of serialization and persistence unless we are actually running out of RAM. Small traces may end up living in memory forever (checkpointing aside). |
Thanks for the comments! Some other things that came up (aside from turning this code into a trace instead of an OrdZSet):
|
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #124 +/- ##
==========================================
- Coverage 85.62% 85.39% -0.24%
==========================================
Files 129 129
Lines 23702 24005 +303
==========================================
+ Hits 20295 20499 +204
- Misses 3407 3506 +99
|
b689142
to
a78cc7f
Compare
I opened PRs at and which solves some issues with serialization that we need for supporting the nexmark benchmarks. |
cb8c28d
to
1ac37bc
Compare
We now pass most tests -- a few are ignored with the persistent feature due to missing implementation of |
This is super cool, thanks! Will start reviewing this as soon as I'm finished with my current PR (should be today). Re merging process: I haven't seen all the code yet, but I was thinking I will start with creating a PR that will move all standard type bounds to a trait. This will simplify trait bounds everywhere and will create a single location where we need to specify encode/decode bounds. Does this make sense? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why'd you use bincode instead of rkyv, rkyv is significantly faster and can even allow us to do zero-copy data manipulation instead of having to serialize and deserialize at every boundary point between rocksdb and everything else?
Cargo.toml
Outdated
rocksdb = { version = "0.18", default-features = false, features = ["multi-threaded-cf"] } | ||
bincode = { version = "2.0.0-rc.2" } | ||
uuid = { version = "1.1.2", features = ["v4"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't these be optional and activated with the the persistence
feature?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the persistence feature flag only "swaps" the Spine struct with the PersistentTrace struct so it gets used as part of all the other dbsp code:
https://github.com/vmware/database-stream-processor/pull/124/files#diff-f8789d5760a694bd6491c0ed6d25099ee9e3dcaed42f69d2f492e779b10c44b4R21
Without the persistence feature the persistent code will still compile and run the persistent tests that check for spine equivalence (which I figured might be nice during development to catch problems early). But if we want to change that so that we don't have any persistent code compiled in without that feature flag we can make them optional.
Maybe at least I should rename the feature to describe better what it does.
I tried to switch to rkyv once during writing this (for like 20min but gave up because I didn't understand the docs quickly enough ;)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial pass is that everything looks pretty good, however I don't really like how infectious the persistent trace is. There's definitely cases where we want in-memory traces and the ability to store more native rust types (e.g. &'static str
) within traces and the fact that persistence precludes that regardless of whether or not it's enabled isn't great. I definitely think that a dedicated trace type and the ability to specify what kind of trace an operator will use is the way to go in terms of flexibility and modularity
We certainly don't want to rely on a compile-time switch to enable/disable persistence. The long-term plan is to tune the performance of persistent traces, so that the cost of persistence only needs to be paid when needed (i.e., stuff doesn't fit in memory or needs to be checkpointed). Then the persistent trace will become the default one, but the programmer will still be able to use |
Support for (at least) |
I was meaning types that aren't compatible with the database in general, not just |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comments so far. Haven't read cursor.rs
yet.
@@ -132,6 +132,24 @@ pub trait CursorDebug<'s, K: Clone, V: Clone, T: Clone, R: Clone>: Cursor<'s, K, | |||
} | |||
out | |||
} | |||
|
|||
fn val_to_vec(&mut self) -> Vec<(V, Vec<(T, R)>)> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these allocations make me nervous, and there is no easy way to at least reduce them using with_capacity
. We try to minimize allocations throughout the code base, and malloc still shows up a lot in profiling, so this is going to introduce significant overhead unless of course it's dominated by other rocksbd costs :)
I don't have the big picture yet, and maybe this is ok for getting something working initially, but still want to bookmark this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can optimize this by reusing the allocations/vectors from the batch itself which we get on insert.
let mut found_v = false; | ||
let mut found_t = false; | ||
for (existing_v, ref mut existing_tw) in vals.iter_mut() { | ||
if existing_v == &v { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use trace::layers::advance()
to lookup the value?
for (existing_v, ref mut existing_tw) in vals.iter_mut() { | ||
if existing_v == &v { | ||
for (t, w) in &tws { | ||
for (existing_t, ref mut existing_w) in existing_tw.iter_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, this could use exponential search.
/// | ||
/// # TODO | ||
/// Probably lots of efficiency improvements to be had here: We're sorting | ||
/// several times when we probably can be smarter etc. -- not clear it matters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like you could just replace Values
with Spine
, which already minimizes excessive sorting. Most of the logic in this function would then be replaced with Spine::insert
and Spine::recede_to
.
Finished reading the code. I'll work on the following TODOs to merge this:
Did I miss anything? The main high-level issue (which we already discussed offline) is that storing all values for a key as a blob is really expensive and non-scalable. This is in contrast to |
That seems to be it yes.
Yes, I think it's good to separate performance from design/correctness, e.g., let's make the first PR about having some design/impl that works (but not optimized) and we make sure it's tested as best as we can. Then we can improve performance in upcoming PRs? |
Definitely. |
here's some numbers from running nexmark :) (note that q0-2, q14 and q22 don't use nexmark Spine/PersistentTrace so they're the same code):
|
d06944d
to
4cd35a9
Compare
Just for reference I was looking into a memory leak when we use the rocksdb APIs: For reference of A few ideas of why this might be happening:
|
7d4ba41
to
b15d1d5
Compare
Opened a PR for now so I can dump information somewhere; but this isn't ready to merge. If we are fine with the current overheads I measured with the OrdZSet experiment (see below) then, my next step will be to make a persistent trace by porting this code over from the persistent OrdZSet into something that implements Trace functionality instead.
Notes on using RocksDB
RocksDB seek calls always start from the beginning, whereas our Cursor seek
does not
probably doesn't matter (it keeps an index to the right block)
Comparison of complex keys
significant we can avoid it in ceqrtain well-specified but common(?) cases,
e.g., if the key is a usize then the serialized bytes (using big-endian
encoding) preserves lexicographic ordering with bincode
Gets
get_pinned
is preferrable over get as it avoids aVec
allocation/deallocation on every lookup.
Merging:
ingest_external_file
for creating a new "batch", then use db.merge() tomerge in the other (smaller) half?
One database per DS/Trace or one DB with a ColumnFamily per DS/Trace?
through a singleton
of CFs?
approach) for perf or mem or disk consumption
databases
ThreadingMode: there are two ways, we currently use
MultiThreaded
but...
MuliThreaded takes a &self and uses the DB's internal RWLock
MultiThreaded
multi-threaded access to a single ColumnFamily
going from MultiThreaded to SingleThreaded
the LevelDB rwlock, replacing it with a better variant helped to scale
reads...
Notes on RocksDB Performance
size 4K and 16M keys
cargo criterion
(need to docargo install cargo-criterion
)target/criterion
tmpfs
)No space left on device
during benchmarking, because tmpfs wassmall; increased with:
sudo mount -o remount,size=12G /tmp
to 12 GiBglobal_opts.set_max_open_files(9000);
andulimit -n 9000
in bashwhich resolved the problem
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
Not sure why, maybe the OrdZSet seek() is O(n), RocksDB will use its index?, OrdZSet uses exponential search (log(n)), whereas RockDB will use an index to find the right block so O(1))Not sure why, maybe the OrdZSet seek() is O(n), RocksDB will use its index?, OrdZSet uses exponential search (log(n)), whereas RockDB will use an index to find the right block so O(1))