From 6c3ce7109b87670af158069da35ece71c2bbeb5a Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Mon, 16 Sep 2024 15:41:19 +0800 Subject: [PATCH] Fix fence table not sorted after fencing non intersecting span --- src/tablet/concurrency/fence.rs | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/tablet/concurrency/fence.rs b/src/tablet/concurrency/fence.rs index 4f07a04..495af3c 100644 --- a/src/tablet/concurrency/fence.rs +++ b/src/tablet/concurrency/fence.rs @@ -14,6 +14,8 @@ use std::cmp::Ordering::*; +use tracing::trace; + use crate::protos::{KeySpan, KeySpanRef, SpanOrdering, Timestamp, Uuid}; #[derive(Debug, Clone, Copy)] @@ -91,18 +93,14 @@ impl FenceTable { Barrier::new(Uuid::max(), self.closed_ts) } - /// Fences future writes to given span at or beneath given timestamp. - pub fn fence(&mut self, txn_id: Uuid, mut span: KeySpan, ts: Timestamp) { - if ts <= self.closed_ts { - return; - } + fn fence_internally(&mut self, txn_id: Uuid, mut span: KeySpan, ts: Timestamp) { let mut i = self.fences.partition_point(|fence| fence.span.is_before(&span.key)); self.clear_closed_fences(i); while i < self.fences.len() { let fence = &mut self.fences[i]; match span.as_ref().compare(fence.span.as_ref()) { SpanOrdering::LessDisjoint | SpanOrdering::LessContiguous => { - self.fences.push(Fence::new(span, txn_id, ts)); + self.fences.insert(i, Fence::new(span, txn_id, ts)); return; }, SpanOrdering::GreaterDisjoint | SpanOrdering::GreaterContiguous => unreachable!(), @@ -188,6 +186,26 @@ impl FenceTable { } } + /// Fences future writes to given span at or beneath given timestamp. + pub fn fence(&mut self, txn_id: Uuid, span: KeySpan, ts: Timestamp) { + trace!("fence txn {}, span {:?}, ts {}, closed_ts {}", txn_id, span, ts, self.closed_ts); + if ts <= self.closed_ts { + return; + } + self.fence_internally(txn_id, span, ts); + debug_assert!({ + for i in 0..self.fences.len() - 1 { + let before = &self.fences[i]; + let after = &self.fences[i + 1]; + match before.span.as_ref().compare(after.span.as_ref()) { + SpanOrdering::LessDisjoint | SpanOrdering::LessContiguous => {}, + _ => panic!("fence table at position {i} is not ordered. {self:?}"), + } + } + true + }); + } + pub fn min_write_ts(&mut self, txn_id: Uuid, writes: &[KeySpan], mut ts: Timestamp) -> Timestamp { for write in writes { let barrier = self.max_barrier(write.as_ref()); @@ -367,5 +385,7 @@ mod tests { assert_that!(fences.min_write_ts(txn1, &[KeySpan::new_range(b"k16", b"k21")], ts70)).is_equal_to(ts70.next()); assert_that!(fences.min_write_ts(txn2, &[KeySpan::new_range(b"k16", b"k21")], ts70)).is_equal_to(ts70.next()); assert_that!(fences.min_write_ts(txn3, &[KeySpan::new_range(b"k16", b"k21")], ts70)).is_equal_to(ts70.next()); + + fences.fence(Uuid::nil(), KeySpan::new_range("k19", "k20"), ts70); } }