Skip to content

Commit

Permalink
IGNITE-17856 RW reads and scans should use timestamp based MVPartitio…
Browse files Browse the repository at this point in the history
…nStorage read and scan operations (apache#1181)
  • Loading branch information
sanpwc authored Oct 11, 2022
1 parent b1f99db commit 081b7c2
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public final class HybridTimestamp implements Comparable<HybridTimestamp>, Seria
/** Timestamp size in bytes. */
public static final int HYBRID_TIMESTAMP_SIZE = Long.BYTES + Integer.BYTES;

/** A constant holding the maximum value a {@code HybridTimestamp} can have. */
public static final HybridTimestamp MAX_VALUE = new HybridTimestamp(Long.MAX_VALUE, Integer.MAX_VALUE);

/** Physical clock. */
private final long physical;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,6 @@ interface WriteClosure<V> {
*/
long persistedIndex();

/**
* Reads either the committed value from the storage or the uncommitted value belonging to given transaction.
*
* @param rowId Row id.
* @param txId Transaction id.
* @return Read result that corresponds to the key or {@code null} if value is not found.
* @throws TxIdMismatchException If there's another pending update associated with different transaction id.
* @throws StorageException If failed to read data from the storage.
*/
@Nullable
BinaryRow read(RowId rowId, UUID txId) throws TxIdMismatchException, StorageException;

/**
* Reads the value from the storage as it was at the given timestamp.
* If there is a row with specified row id and timestamp - return it.
Expand Down Expand Up @@ -161,17 +149,6 @@ interface WriteClosure<V> {
*/
Cursor<BinaryRow> scanVersions(RowId rowId) throws StorageException;

/**
* Scans the partition and returns a cursor of values. All filtered values must either be uncommitted in the current transaction
* or already committed in a different transaction.
*
* @param keyFilter Key filter. Binary rows passed to the filter may or may not have a value, filter should only check keys.
* @param txId Transaction id.
* @return Cursor.
* @throws StorageException If failed to read data from the storage.
*/
Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException;

/**
* Scans the partition and returns a cursor of values at the given timestamp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest
protected final BinaryRow binaryRow2 = binaryRow(key, value2);
private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));

/**
* Reads a row inside of consistency closure.
*/
protected BinaryRow read(RowId rowId, UUID txId) {
return storage.runConsistently(() -> storage.read(rowId, txId));
}

/**
* Reads a row inside of consistency closure.
*/
Expand All @@ -95,13 +88,6 @@ protected PartitionTimestampCursor scan(Predicate<BinaryRow> filter, HybridTimes
return storage.runConsistently(() -> storage.scan(filter, timestamp));
}

/**
* Scans partition inside of consistency closure.
*/
protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, UUID txId) {
return storage.runConsistently(() -> storage.scan(filter, txId));
}

/**
* Inserts a row inside of consistency closure.
*/
Expand Down Expand Up @@ -154,14 +140,12 @@ public void testReadsFromEmpty() {

assertEquals(PARTITION_ID, rowId.partitionId());

assertNull(read(rowId, newTransactionId()));
assertNull(read(rowId, clock.now()));
}

@Test
public void testScanOverEmpty() throws Exception {
assertEquals(List.of(), convert(scan(row -> true, newTransactionId())));
assertEquals(List.of(), convertTsCursor(scan(row -> true, clock.now())));
assertEquals(List.of(), convert(scan(row -> true, clock.now())));
}

/**
Expand All @@ -177,12 +161,6 @@ public void testAddWrite() {
// Write from the same transaction.
addWrite(rowId, binaryRow, txId);

// Read without timestamp returns uncommitted row.
assertRowMatches(read(rowId, txId), binaryRow);

// Read with wrong transaction id should throw exception.
assertThrows(TxIdMismatchException.class, () -> read(rowId, newTransactionId()));

// Read with timestamp returns write-intent.
assertRowMatches(read(rowId, clock.now()), binaryRow);
}
Expand All @@ -197,7 +175,7 @@ public void testAbortWrite() {
abortWrite(rowId);

// Aborted row can't be read.
assertNull(read(rowId, txId));
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
}

/**
Expand Down Expand Up @@ -231,7 +209,7 @@ public void testCommitWrite() {
// Same checks, but now there are two different versions.
assertNull(read(rowId, tsBefore));

assertRowMatches(read(rowId, newTxId), newRow);
assertRowMatches(read(rowId, HybridTimestamp.MAX_VALUE), newRow);

assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), newRow);
Expand All @@ -241,7 +219,7 @@ public void testCommitWrite() {
HybridTimestamp newRowCommitTs = clock.now();
commitWrite(rowId, newRowCommitTs);

assertRowMatches(read(rowId, newTxId), newRow);
assertRowMatches(read(rowId, HybridTimestamp.MAX_VALUE), newRow);

assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
Expand All @@ -255,7 +233,7 @@ public void testCommitWrite() {

assertNull(read(rowId, tsBefore));

assertNull(read(rowId, removeTxId));
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));

assertRowMatches(read(rowId, tsExact), binaryRow);
assertRowMatches(read(rowId, tsAfter), binaryRow);
Expand All @@ -269,7 +247,7 @@ public void testCommitWrite() {

assertNull(read(rowId, tsBefore));

assertNull(read(rowId, removeTxId));
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
assertNull(read(rowId, removeTs));
assertNull(read(rowId, clock.now()));

Expand All @@ -288,18 +266,9 @@ public void testScan() throws Exception {
TestKey key2 = new TestKey(2, "2");
TestValue value2 = new TestValue(20, "yyy");

UUID txId1 = newTransactionId();
RowId rowId1 = insert(binaryRow(key1, value1), txId1);

UUID txId2 = newTransactionId();
RowId rowId2 = insert(binaryRow(key2, value2), txId2);

// Scan with and without filters.
assertThrows(TxIdMismatchException.class, () -> convert(scan(row -> true, txId1)));
assertThrows(TxIdMismatchException.class, () -> convert(scan(row -> true, txId2)));

assertEquals(List.of(value1), convert(storage.scan(row -> key(row).intKey == 1, txId1)));
assertEquals(List.of(value2), convert(storage.scan(row -> key(row).intKey == 2, txId2)));
UUID txId = newTransactionId();
RowId rowId1 = insert(binaryRow(key1, value1), txId);
RowId rowId2 = insert(binaryRow(key2, value2), txId);

HybridTimestamp ts1 = clock.now();

Expand All @@ -314,13 +283,13 @@ public void testScan() throws Exception {
HybridTimestamp ts5 = clock.now();

// Full scan with various timestamp values.
assertEquals(List.of(), convertTsCursor(scan(row -> true, ts1)));
assertEquals(List.of(), convert(scan(row -> true, ts1)));

assertEquals(List.of(value1), convertTsCursor(scan(row -> true, ts2)));
assertEquals(List.of(value1), convertTsCursor(scan(row -> true, ts3)));
assertEquals(List.of(value1), convert(scan(row -> true, ts2)));
assertEquals(List.of(value1), convert(scan(row -> true, ts3)));

assertEquals(List.of(value1, value2), convertTsCursor(scan(row -> true, ts4)));
assertEquals(List.of(value1, value2), convertTsCursor(scan(row -> true, ts5)));
assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4)));
assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5)));
}

@Test
Expand All @@ -335,18 +304,18 @@ public void testTransactionScanCursorInvariants() throws Exception {
RowId rowId2 = insert(binaryRow(new TestKey(2, "2"), value2), txId);
commitWrite(rowId2, clock.now());

try (Cursor<BinaryRow> cursor = scan(row -> true, txId)) {
try (PartitionTimestampCursor cursor = scan(row -> true, HybridTimestamp.MAX_VALUE)) {
assertTrue(cursor.hasNext());
assertTrue(cursor.hasNext());

List<TestValue> res = new ArrayList<>();

res.add(value(cursor.next()));
res.add(value(cursor.next().binaryRow()));

assertTrue(cursor.hasNext());
assertTrue(cursor.hasNext());

res.add(value(cursor.next()));
res.add(value(cursor.next().binaryRow()));

assertFalse(cursor.hasNext());
assertFalse(cursor.hasNext());
Expand Down Expand Up @@ -432,16 +401,7 @@ public void testTimestampScanCursorInvariants() throws Exception {
}
}

private List<TestValue> convert(Cursor<BinaryRow> cursor) throws Exception {
try (cursor) {
return cursor.stream()
.map(BaseMvStoragesTest::value)
.sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
.collect(Collectors.toList());
}
}

private List<TestValue> convertTsCursor(PartitionTimestampCursor cursor) throws Exception {
private List<TestValue> convert(PartitionTimestampCursor cursor) throws Exception {
try (cursor) {
return cursor.stream()
.map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
Expand All @@ -454,7 +414,7 @@ private List<TestValue> convertTsCursor(PartitionTimestampCursor cursor) throws
void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);

BinaryRow foundRow = read(rowId, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow);
}
Expand All @@ -464,19 +424,12 @@ protected final void assertRowMatches(BinaryRow rowUnderQuestion, BinaryRow expe
assertThat(rowUnderQuestion.bytes(), is(equalTo(expectedRow.bytes())));
}

@Test
void readOfUncommittedRowWithDifferentTransactionIdThrows() {
RowId rowId = insert(binaryRow, txId);

assertThrows(TxIdMismatchException.class, () -> read(rowId, newTransactionId()));
}

@Test
void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());

BinaryRow foundRow = read(rowId, newTransactionId());
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow);
}
Expand All @@ -488,7 +441,7 @@ void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {

RowId rowId2 = insert(binaryRow2, txId);

BinaryRow foundRow = read(rowId2, txId);
BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow2);
}
Expand All @@ -501,7 +454,7 @@ void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
RowId rowId2 = insert(binaryRow2, txId);
commitWrite(rowId2, clock.now());

BinaryRow foundRow = read(rowId2, txId);
BinaryRow foundRow = read(rowId2, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow2);
}
Expand Down Expand Up @@ -625,7 +578,7 @@ void secondUncommittedWriteWithSameTxIdReplacesExistingUncommittedWrite() {

addWrite(rowId, binaryRow2, txId);

BinaryRow foundRow = read(rowId, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow2);
}
Expand Down Expand Up @@ -656,7 +609,7 @@ void afterRemovalReadWithTxIdFindsNothing() {

addWrite(rowId, null, txId);

BinaryRow foundRow = read(rowId, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertThat(foundRow, is(nullValue()));
}
Expand Down Expand Up @@ -725,11 +678,11 @@ void commitAndAbortWriteNoOpIfNoUncommittedVersionExists() {

abortWrite(rowId);

assertRowMatches(read(rowId, newTransactionId()), binaryRow);
assertRowMatches(read(rowId, HybridTimestamp.MAX_VALUE), binaryRow);

commitWrite(rowId, clock.now());

assertRowMatches(read(rowId, newTransactionId()), binaryRow);
assertRowMatches(read(rowId, HybridTimestamp.MAX_VALUE), binaryRow);
}

@Test
Expand All @@ -741,7 +694,7 @@ void abortWriteRemovesUncommittedVersion() {

abortWrite(rowId);

BinaryRow foundRow = read(rowId, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertRowMatches(foundRow, binaryRow);
}
Expand All @@ -761,7 +714,7 @@ void abortOfInsertMakesRowNonExistentForReadByTimestamp() {
void abortOfInsertMakesRowNonExistentForReadWithTxId() {
RowId rowId = new RowId(PARTITION_ID);

BinaryRow foundRow = read(rowId, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);

assertThat(foundRow, is(nullValue()));
}
Expand All @@ -775,15 +728,6 @@ void abortWriteReturnsTheRemovedVersion() {
assertRowMatches(returnedRow, binaryRow);
}

@Test
void scanWithTxIdThrowsWhenOtherTransactionHasUncommittedChanges() {
insert(binaryRow, txId);

Cursor<BinaryRow> cursor = scan(row -> true, newTransactionId());

assertThrows(TxIdMismatchException.class, cursor::next);
}

@Test
void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
RowId rowId = commitAbortAndAddUncommitted();
Expand Down Expand Up @@ -974,7 +918,6 @@ void testWrongPartition() {
var row = new RowId(rowId.partitionId() + 1, rowId.mostSignificantBits(), rowId.leastSignificantBits());

assertThrows(IllegalArgumentException.class, () -> read(row, clock.now()));
assertThrows(IllegalArgumentException.class, () -> read(row, UUID.randomUUID()));
}

@Test
Expand All @@ -999,7 +942,7 @@ void testReadingNothingByTxIdWithLowerRowId() {
return null;
});

assertNull(read(lowerRowId, txId));
assertNull(read(lowerRowId, HybridTimestamp.MAX_VALUE));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TableIndexView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.hlc.HybridTimestamp;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.schema.BinaryTuple;
Expand Down Expand Up @@ -124,20 +125,20 @@ void testPartitionIndependence() throws Exception {

partitionStorage0.runConsistently(() -> partitionStorage0.addWrite(rowId0, testData0, txId, UUID.randomUUID(), 0));

assertThat(unwrap(partitionStorage0.read(rowId0, txId)), is(equalTo(unwrap(testData0))));
assertThrows(IllegalArgumentException.class, () -> partitionStorage1.read(rowId0, txId));
assertThat(unwrap(partitionStorage0.read(rowId0, HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData0))));
assertThrows(IllegalArgumentException.class, () -> partitionStorage1.read(rowId0, HybridTimestamp.MAX_VALUE));

var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));

RowId rowId1 = new RowId(PARTITION_ID_1);

partitionStorage1.runConsistently(() -> partitionStorage1.addWrite(rowId1, testData1, txId, UUID.randomUUID(), 0));

assertThrows(IllegalArgumentException.class, () -> partitionStorage0.read(rowId1, txId));
assertThat(unwrap(partitionStorage1.read(rowId1, txId)), is(equalTo(unwrap(testData1))));
assertThrows(IllegalArgumentException.class, () -> partitionStorage0.read(rowId1, HybridTimestamp.MAX_VALUE));
assertThat(unwrap(partitionStorage1.read(rowId1, HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData1))));

assertThat(toList(partitionStorage0.scan(row -> true, txId)), contains(unwrap(testData0)));
assertThat(toList(partitionStorage1.scan(row -> true, txId)), contains(unwrap(testData1)));
assertThat(toList(partitionStorage0.scan(row -> true, HybridTimestamp.MAX_VALUE)), contains(unwrap(testData0)));
assertThat(toList(partitionStorage1.scan(row -> true, HybridTimestamp.MAX_VALUE)), contains(unwrap(testData1)));
}

/**
Expand Down
Loading

0 comments on commit 081b7c2

Please sign in to comment.