Skip to content

Commit

Permalink
Fence write by read timestamp to prevent write beneath read (#29)
Browse files Browse the repository at this point in the history
Resolves #27.
  • Loading branch information
kezhuw authored Jul 20, 2024
1 parent cbae242 commit a4e304c
Show file tree
Hide file tree
Showing 10 changed files with 1,109 additions and 86 deletions.
8 changes: 8 additions & 0 deletions src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ impl Timestamp {
pub fn into_physical(self) -> Self {
Self { logical: 0, ..self }
}

pub fn next(self) -> Self {
if self.logical < u32::MAX {
return Self { logical: self.logical + 1, ..self };
}
let duration = Duration::new(self.seconds, self.nanoseconds) + Duration::new(0, 1);
Self { seconds: duration.as_secs(), nanoseconds: duration.subsec_nanos(), logical: 0 }
}
}

impl Add<Duration> for Timestamp {
Expand Down
211 changes: 196 additions & 15 deletions src/protos/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::cmp::Ordering::{self, *};

use super::*;
Expand Down Expand Up @@ -39,10 +40,26 @@ impl From<KeyRange> for KeySpan {
}

impl KeySpan {
pub fn new_key(key: impl Into<Vec<u8>>) -> Self {
Self { key: key.into(), end: vec![] }
}

pub fn new_range(key: impl Into<Vec<u8>>, end: impl Into<Vec<u8>>) -> Self {
Self { key: key.into(), end: end.into() }
}

pub fn end_key(&self) -> &[u8] {
self.as_ref().end_key()
}

pub fn is_before(&self, key: &[u8]) -> bool {
if self.end.is_empty() {
key > self.key.as_slice()
} else {
key >= self.end.as_slice()
}
}

pub fn extend_start(&mut self, start: &[u8]) -> bool {
if start < self.key.as_slice() {
self.key = start.to_vec();
Expand All @@ -55,6 +72,27 @@ impl KeySpan {
pub fn as_ref(&self) -> KeySpanRef<'_> {
KeySpanRef { key: self.key.as_slice(), end: self.end.as_slice() }
}

pub fn end(&self) -> Cow<'_, [u8]> {
if self.end.is_empty() {
let mut end = Vec::with_capacity(self.key.len() + 1);
end.extend(&self.key);
end.push(0);
Cow::Owned(end)
} else {
Cow::Borrowed(&self.end)
}
}

pub fn into_end(mut self) -> Vec<u8> {
match self.end.is_empty() {
true => {
self.key.push(0);
self.key
},
false => self.end,
}
}
}

#[derive(Clone, Copy)]
Expand All @@ -63,28 +101,95 @@ pub struct KeySpanRef<'a> {
pub end: &'a [u8],
}

#[derive(Debug, PartialEq)]
pub enum SpanOrdering {
LessDisjoint,
LessContiguous,

GreaterDisjoint,
GreaterContiguous,

Equal,

IntersectLeft,
IntersectRight,

ContainRight,
ContainLeft,
ContainAll,

SubsetAll,
SubsetRight,
SubsetLeft,
}

impl SpanOrdering {
fn from(ordering: Ordering) -> Self {
match ordering {
Ordering::Less => Self::LessDisjoint,
Ordering::Equal => Self::Equal,
Ordering::Greater => Self::GreaterDisjoint,
}
}

pub fn reverse(self) -> Self {
match self {
SpanOrdering::LessDisjoint => SpanOrdering::GreaterDisjoint,
SpanOrdering::LessContiguous => SpanOrdering::GreaterContiguous,
SpanOrdering::GreaterDisjoint => SpanOrdering::LessDisjoint,
SpanOrdering::GreaterContiguous => SpanOrdering::LessContiguous,
SpanOrdering::Equal => SpanOrdering::Equal,
SpanOrdering::IntersectLeft => SpanOrdering::IntersectRight,
SpanOrdering::IntersectRight => SpanOrdering::IntersectLeft,
SpanOrdering::ContainRight => SpanOrdering::SubsetRight,
SpanOrdering::ContainLeft => SpanOrdering::SubsetLeft,
SpanOrdering::ContainAll => SpanOrdering::SubsetAll,
SpanOrdering::SubsetRight => SpanOrdering::ContainRight,
SpanOrdering::SubsetLeft => SpanOrdering::ContainLeft,
SpanOrdering::SubsetAll => SpanOrdering::ContainAll,
}
}
}

impl<'a> KeySpanRef<'a> {
pub fn new(key: &'a [u8], end: &'a [u8]) -> Self {
Self { key, end }
}

pub fn compare(&self, other: KeySpanRef<'_>) -> Ordering {
match (self.end.is_empty(), other.end.is_empty()) {
(true, true) => self.key.cmp(other.key),
(false, true) => match other.key.cmp(self.key) {
Less => Less,
Equal => Equal,
Greater => match other.key.cmp(self.end) {
Less => Equal,
Equal | Greater => Greater,
pub fn is_single(&self) -> bool {
self.end.is_empty()
}

pub fn compare(&self, other: KeySpanRef<'_>) -> SpanOrdering {
match (self.is_single(), other.is_single()) {
(true, true) => SpanOrdering::from(self.key.cmp(other.key)),
(true, false) => match self.key.cmp(other.key) {
Less => SpanOrdering::LessDisjoint,
Equal => SpanOrdering::SubsetLeft,
Greater => match self.key.cmp(other.end) {
Less => SpanOrdering::ContainAll,
Equal => SpanOrdering::GreaterContiguous,
Greater => SpanOrdering::GreaterDisjoint,
},
},
(true, false) => other.compare(*self).reverse(),
(false, false) => match self.compare(KeySpanRef::new(other.key, Default::default())) {
Greater => Greater,
_ => match self.compare(KeySpanRef::new(other.end, Default::default())) {
Less => Less,
_ => Equal,
(false, true) => other.compare(*self).reverse(),
(false, false) => match self.end.cmp(other.key) {
Less => SpanOrdering::LessDisjoint,
Equal => SpanOrdering::LessContiguous,
Greater => match self.key.cmp(other.end) {
Equal => SpanOrdering::GreaterContiguous,
Greater => SpanOrdering::GreaterDisjoint,
Less => match (self.key.cmp(other.key), self.end.cmp(other.end)) {
(Less, Less) => SpanOrdering::IntersectRight,
(Less, Equal) => SpanOrdering::ContainRight,
(Less, Greater) => SpanOrdering::ContainAll,
(Equal, Equal) => SpanOrdering::Equal,
(Equal, Less) => SpanOrdering::SubsetLeft,
(Equal, Greater) => SpanOrdering::ContainLeft,
(Greater, Greater) => SpanOrdering::IntersectLeft,
(Greater, Equal) => SpanOrdering::SubsetRight,
(Greater, Less) => SpanOrdering::SubsetAll,
},
},
},
}
Expand All @@ -101,4 +206,80 @@ impl<'a> KeySpanRef<'a> {
self.end
}
}

pub fn end(&self) -> Cow<'a, [u8]> {
if self.end.is_empty() {
let mut end = Vec::with_capacity(self.key.len() + 1);
end.extend(self.key);
end.push(0);
Cow::Owned(end)
} else {
Cow::Borrowed(self.end)
}
}
}

#[cfg(test)]
mod tests {
use assertor::*;

use super::{KeySpanRef, SpanOrdering};

#[test]
fn compare() {
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k1", b""))).is_equal_to(SpanOrdering::Equal);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k2", b"")))
.is_equal_to(SpanOrdering::LessDisjoint);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k0", b"")))
.is_equal_to(SpanOrdering::GreaterDisjoint);

assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k2", b"k20")))
.is_equal_to(SpanOrdering::LessDisjoint);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k1", b"k10")))
.is_equal_to(SpanOrdering::SubsetLeft);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k0", b"k10")))
.is_equal_to(SpanOrdering::ContainAll);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k0", b"k1")))
.is_equal_to(SpanOrdering::GreaterContiguous);
assert_that!(KeySpanRef::new(b"k1", b"").compare(KeySpanRef::new(b"k0", b"k01")))
.is_equal_to(SpanOrdering::GreaterDisjoint);

assert_that!(KeySpanRef::new(b"k2", b"k20").compare(KeySpanRef::new(b"k1", b"")))
.is_equal_to(SpanOrdering::GreaterDisjoint);
assert_that!(KeySpanRef::new(b"k1", b"k10").compare(KeySpanRef::new(b"k1", b"")))
.is_equal_to(SpanOrdering::ContainLeft);
assert_that!(KeySpanRef::new(b"k0", b"k10").compare(KeySpanRef::new(b"k1", b"")))
.is_equal_to(SpanOrdering::SubsetAll);
assert_that!(KeySpanRef::new(b"k0", b"k1").compare(KeySpanRef::new(b"k1", b"")))
.is_equal_to(SpanOrdering::LessContiguous);
assert_that!(KeySpanRef::new(b"k0", b"k01").compare(KeySpanRef::new(b"k1", b"")))
.is_equal_to(SpanOrdering::LessDisjoint);

assert_that!(KeySpanRef::new(b"k1", b"k10").compare(KeySpanRef::new(b"k2", b"k20")))
.is_equal_to(SpanOrdering::LessDisjoint);
assert_that!(KeySpanRef::new(b"k1", b"k10").compare(KeySpanRef::new(b"k10", b"k20")))
.is_equal_to(SpanOrdering::LessContiguous);
assert_that!(KeySpanRef::new(b"k1", b"k10").compare(KeySpanRef::new(b"k0", b"k1")))
.is_equal_to(SpanOrdering::GreaterContiguous);
assert_that!(KeySpanRef::new(b"k1", b"k10").compare(KeySpanRef::new(b"k0", b"k00")))
.is_equal_to(SpanOrdering::GreaterDisjoint);
assert_that!(KeySpanRef::new(b"k1", b"k11").compare(KeySpanRef::new(b"k10", b"k12")))
.is_equal_to(SpanOrdering::IntersectRight);
assert_that!(KeySpanRef::new(b"k1", b"k11").compare(KeySpanRef::new(b"k10", b"k11")))
.is_equal_to(SpanOrdering::ContainRight);
assert_that!(KeySpanRef::new(b"k1", b"k12").compare(KeySpanRef::new(b"k10", b"k11")))
.is_equal_to(SpanOrdering::ContainAll);
assert_that!(KeySpanRef::new(b"k1", b"k12").compare(KeySpanRef::new(b"k1", b"k12")))
.is_equal_to(SpanOrdering::Equal);
assert_that!(KeySpanRef::new(b"k1", b"k12").compare(KeySpanRef::new(b"k1", b"k13")))
.is_equal_to(SpanOrdering::SubsetLeft);
assert_that!(KeySpanRef::new(b"k1", b"k12").compare(KeySpanRef::new(b"k1", b"k11")))
.is_equal_to(SpanOrdering::ContainLeft);
assert_that!(KeySpanRef::new(b"k10", b"k12").compare(KeySpanRef::new(b"k1", b"k11")))
.is_equal_to(SpanOrdering::IntersectLeft);
assert_that!(KeySpanRef::new(b"k10", b"k11").compare(KeySpanRef::new(b"k1", b"k11")))
.is_equal_to(SpanOrdering::SubsetRight);
assert_that!(KeySpanRef::new(b"k10", b"k11").compare(KeySpanRef::new(b"k1", b"k12")))
.is_equal_to(SpanOrdering::SubsetAll);
}
}
10 changes: 8 additions & 2 deletions src/protos/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ use std::time::Duration;
use super::*;

impl Temporal {
pub fn txn_id(&self) -> Uuid {
match self {
Self::Transaction(txn) => txn.id(),
Self::Timestamp(_) => Uuid::nil(),
}
}

pub fn timestamp(&self) -> Timestamp {
match self {
Temporal::Timestamp(ts) => *ts,
Expand Down Expand Up @@ -81,18 +88,17 @@ impl Transaction {
assert_eq!(self.id(), other.id());
assert_eq!(self.key(), other.key());
assert_eq!(self.start_ts(), other.start_ts());
self.commit_ts.forward(other.commit_ts);
self.heartbeat_ts.forward(other.heartbeat_ts);
match self.epoch().cmp(&other.epoch()) {
Less => {
self.status = other.status;
self.meta.epoch = other.epoch();
self.rollbacked_sequences.clear();
self.commit_ts = other.commit_ts;
self.commit_set.clear();
self.commit_set.extend(other.commit_set.iter().cloned());
},
Equal => {
self.commit_ts.forward(other.commit_ts);
let n = self.rollbacked_sequences.len();
if n < other.rollbacked_sequences.len() {
self.rollbacked_sequences.extend(other.rollbacked_sequences[n..].iter().copied());
Expand Down
28 changes: 28 additions & 0 deletions src/protos/uuid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,38 @@ use std::fmt::{self, Debug, Display, Formatter};
use super::Uuid;

impl Uuid {
pub fn nil() -> Self {
Self { lsb: 0, msb: 0 }
}

pub fn max() -> Self {
Self { lsb: u64::MAX, msb: u64::MAX }
}

pub fn is_nil(self) -> bool {
self.lsb == 0 && self.msb == 0
}

pub fn new_random() -> Self {
let id = uuid::Uuid::new_v4();
id.into()
}

pub fn xor(self, other: Self) -> Self {
if self == other {
self
} else {
Self::max()
}
}

pub fn normalize(self) -> Self {
if self == Self::max() {
Self::nil()
} else {
self
}
}
}

impl From<uuid::Uuid> for Uuid {
Expand Down
Loading

0 comments on commit a4e304c

Please sign in to comment.