toTime, final ResultOrder order) {
+ this.key = key;
+ this.fromTime = fromTime;
+ this.toTime = toTime;
+ this.order = order;
+ }
+
+ /**
+ * Creates a query that will retrieve the set of records identified by {@code key} if any exists
+ * (or {@code null} otherwise).
+ *
+ *
+ * While the query by default returns the all the record versions of the specified {@code key}, setting
+ * the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp}
+ * (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated
+ * to the specified time range.
+ *
+ * @param key The specified key by the query
+ * @param The type of the key
+ * @param The type of the value that will be retrieved
+ * @throws NullPointerException if {@code key} is null
+ */
+ public static MultiVersionedKeyQuery withKey(final K key) {
+ Objects.requireNonNull(key, "key cannot be null.");
+ return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), ResultOrder.ANY);
+ }
+
+ /**
+ * Specifies the starting time point for the key query.
+ *
+ * The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can
+ * be records which have been inserted before the {@code fromTime} and are still valid in the query specified time range (the whole time range
+ * or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}.
+ *
+ * @param fromTime The starting time point
+ * If {@code fromTime} is null, it will be considered as negative infinity, ie, no lower bound
+ */
+ public MultiVersionedKeyQuery fromTime(final Instant fromTime) {
+ return new MultiVersionedKeyQuery<>(key, Optional.ofNullable(fromTime), toTime, order);
+ }
+
+ /**
+ * Specifies the ending time point for the key query.
+ * The key query returns all the records that have timestamp <= toTime.
+ *
+ * @param toTime The ending time point
+ * If @param toTime is null, will be considered as positive infinity, ie, no upper bound
+ */
+ public MultiVersionedKeyQuery toTime(final Instant toTime) {
+ return new MultiVersionedKeyQuery<>(key, fromTime, Optional.ofNullable(toTime), order);
+ }
+
+ /**
+ * Specifies the order of the returned records by the query as descending by timestamp.
+ */
+ public MultiVersionedKeyQuery withDescendingTimestamps() {
+ return new MultiVersionedKeyQuery<>(key, fromTime, toTime, ResultOrder.DESCENDING);
+ }
+
+ /**
+ * Specifies the order of the returned records by the query as ascending by timestamp.
+ */
+ public MultiVersionedKeyQuery withAscendingTimestamps() {
+ return new MultiVersionedKeyQuery<>(key, fromTime, toTime, ResultOrder.ASCENDING);
+ }
+
+ /**
+ * The key that was specified for this query.
+ * @return The specified {@code key} of the query.
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * The starting time point of the query, if specified
+ * @return The specified {@code fromTime} of the query.
+ */
+ public Optional fromTime() {
+ return fromTime;
+ }
+
+ /**
+ * The ending time point of the query, if specified
+ * @return The specified {@code toTime} of the query.
+ */
+ public Optional toTime() {
+ return toTime;
+ }
+
+ /**
+ * The order of the returned records by timestamp.
+ * @return the order of returned records based on timestamp (can be unordered, or in ascending, or in descending order of timestamps).
+ */
+ public ResultOrder resultOrder() {
+ return order;
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/ResultOrder.java b/streams/src/main/java/org/apache/kafka/streams/query/ResultOrder.java
new file mode 100644
index 0000000000000..06665f49c61d1
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/query/ResultOrder.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.query;
+
+public enum ResultOrder {
+ ANY,
+ ASCENDING,
+ DESCENDING
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java
index 3d9ae2b673e56..6df07562853a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecord.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;
import java.util.Objects;
+import java.util.Optional;
/**
* Combines a value (from a key-value record) with a timestamp, for use as the return type
@@ -27,18 +28,34 @@
public final class VersionedRecord {
private final V value;
private final long timestamp;
+ private final Optional validTo;
/**
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
*
- * @param value the value
- * @param timestamp the timestamp
+ * @param value The value
+ * @param timestamp The type of the result returned by this query.
*/
public VersionedRecord(final V value, final long timestamp) {
+ this.value = Objects.requireNonNull(value, "value cannot be null.");
+ this.timestamp = timestamp;
+ this.validTo = Optional.empty();
+ }
+
+ /**
+ * Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
+ *
+ * @param value The value
+ * @param timestamp The timestamp
+ * @param validTo The exclusive upper bound of the validity interval
+ */
+ public VersionedRecord(final V value, final long timestamp, final long validTo) {
this.value = Objects.requireNonNull(value);
this.timestamp = timestamp;
+ this.validTo = Optional.of(validTo);
}
+
public V value() {
return value;
}
@@ -47,9 +64,13 @@ public long timestamp() {
return timestamp;
}
+ public Optional validTo() {
+ return validTo;
+ }
+
@Override
public String toString() {
- return "<" + value + "," + timestamp + ">";
+ return "<" + value + "," + timestamp + "," + validTo + ">";
}
@Override
@@ -61,12 +82,12 @@ public boolean equals(final Object o) {
return false;
}
final VersionedRecord> that = (VersionedRecord>) o;
- return timestamp == that.timestamp &&
+ return timestamp == that.timestamp && validTo == that.validTo &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
- return Objects.hash(value, timestamp);
+ return Objects.hash(value, timestamp, validTo);
}
}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecordIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecordIterator.java
new file mode 100644
index 0000000000000..1dbd7502934cb
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedRecordIterator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import java.io.Closeable;
+import java.util.Iterator;
+
+
+/**
+ * Iterator interface of {@link VersionedRecord VersionedRecord}.
+ *
+ * Users must call its {@code close} method explicitly upon completeness to release resources,
+ * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
+ * Note that {@code remove()} is not supported.
+ *
+ * @param Type of values
+ */
+public interface VersionedRecordIterator extends Iterator>, Closeable {
+
+ @Override
+ void close();
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
index f0e8cdd72a450..0c49749e8acdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java
@@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -34,7 +35,9 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
+import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -170,7 +173,30 @@ public boolean isOpen() {
@Override
public synchronized byte[] get(final Bytes key) {
- return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+ return get(key, Optional.empty());
+ }
+
+ public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
+ return get(key, Optional.of(snapshot));
+ }
+
+ private synchronized byte[] get(final Bytes key, final Optional snapshot) {
+ if (snapshot.isPresent()) {
+ try (ReadOptions readOptions = new ReadOptions()) {
+ readOptions.setSnapshot(snapshot.get());
+ return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions);
+ }
+ } else {
+ return physicalStore.get(prefixKeyFormatter.addPrefix(key));
+ }
+ }
+
+ public Snapshot getSnapshot() {
+ return physicalStore.getSnapshot();
+ }
+
+ public void releaseSnapshot(final Snapshot snapshot) {
+ physicalStore.releaseSnapshot(snapshot);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
new file mode 100644
index 0000000000000..b7567682694e3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.rocksdb.Snapshot;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+public class LogicalSegmentIterator implements VersionedRecordIterator {
+ private final ListIterator segmentIterator;
+ private final Bytes key;
+ private final Long fromTime;
+ private final Long toTime;
+ private final ResultOrder order;
+ private ListIterator> iterator;
+ private volatile boolean open = true;
+
+ // defined for creating/releasing the snapshot.
+ private LogicalKeyValueSegment snapshotOwner = null;
+ private Snapshot snapshot = null;
+
+
+
+ public LogicalSegmentIterator(final ListIterator segmentIterator,
+ final Bytes key,
+ final Long fromTime,
+ final Long toTime,
+ final ResultOrder order) {
+
+ this.segmentIterator = segmentIterator;
+ this.key = key;
+ this.fromTime = fromTime;
+ this.toTime = toTime;
+ this.iterator = Collections.emptyListIterator();
+ this.order = order;
+ }
+
+ @Override
+ public void close() {
+ open = false;
+ // user may refuse consuming all returned records, so release the snapshot when closing the iterator if it is not released yet!
+ releaseSnapshot();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!open) {
+ throw new IllegalStateException("The iterator is out of scope.");
+ }
+ // since data is stored in descending order in the segments, check whether there is any previous record, if the order is Ascending.
+ final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? iterator.hasPrevious() : iterator.hasNext();
+ return hasStillLoad || maybeFillIterator();
+ }
+
+ @Override
+ public Object next() {
+ if (hasNext()) {
+ // since data is stored in descending order in the segments, retrieve previous record, if the order is Ascending.
+ return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : iterator.next();
+ }
+ throw new NoSuchElementException();
+ }
+
+ private boolean maybeFillIterator() {
+
+ final List> queryResults = new ArrayList<>();
+ while (segmentIterator.hasNext()) {
+ final LogicalKeyValueSegment segment = segmentIterator.next();
+
+ if (snapshot == null) { // create the snapshot (this will happen only one time).
+ // any (random) segment, the latestValueStore or any of the older ones, can be the snapshotOwner, because in
+ // fact all use the same physical RocksDB under-the-hood.
+ this.snapshotOwner = segment;
+ // take a RocksDB snapshot to return the segments content at the query time (in order to guarantee consistency)
+ this.snapshot = snapshotOwner.getSnapshot();
+ }
+
+ final byte[] rawSegmentValue = segment.get(key, snapshot);
+ if (rawSegmentValue != null) { // this segment contains record(s) with the specified key
+ if (segment.id() == -1) { // this is the latestValueStore
+ final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
+ if (recordTimestamp <= toTime) {
+ // latest value satisfies timestamp bound
+ queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp));
+ }
+ } else {
+ // this segment contains records with the specified key and time range
+ final List searchResults =
+ RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime);
+ for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) {
+ queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo()));
+ }
+ }
+ }
+ if (!queryResults.isEmpty()) {
+ break;
+ }
+ }
+ if (!queryResults.isEmpty()) {
+ // since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending.
+ this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
+ return true;
+ }
+ // if all segments have been processed, release the snapshot
+ releaseSnapshot();
+ return false;
+ }
+
+ private void releaseSnapshot() {
+ if (snapshot != null) {
+ snapshotOwner.releaseSnapshot(snapshot);
+ snapshot = null;
+ }
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
new file mode 100644
index 0000000000000..47f98d933c204
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.function.Function;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+public class MeteredMultiVersionedKeyQueryIterator implements VersionedRecordIterator {
+
+ private final VersionedRecordIterator iterator;
+ private final Function, VersionedRecord> deserializeValue;
+
+
+ public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator iterator,
+ final Function, VersionedRecord> deserializeValue) {
+ this.iterator = iterator;
+ this.deserializeValue = deserializeValue;
+ }
+
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public VersionedRecord next() {
+ return deserializeValue.apply(iterator.next());
+ }
+}
+
+
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 919601effcbbc..eaf1e20c5fe78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -23,6 +23,7 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
@@ -35,6 +36,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -43,6 +45,7 @@
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
@@ -50,6 +53,7 @@
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
/**
@@ -112,6 +116,10 @@ private class MeteredVersionedKeyValueStoreInternal
mkEntry(
VersionedKeyQuery.class,
(query, positionBound, config, store) -> runVersionedKeyQuery(query, positionBound, config)
+ ),
+ mkEntry(
+ MultiVersionedKeyQuery.class,
+ (query, positionBound, config, store) -> runMultiVersionedKeyQuery(query, positionBound, config)
)
);
@@ -235,6 +243,38 @@ private QueryResult runVersionedKeyQuery(final Query query,
return result;
}
+ @SuppressWarnings("unchecked")
+ private QueryResult runMultiVersionedKeyQuery(final Query query, final PositionBound positionBound, final QueryConfig config) {
+ final QueryResult result;
+ final MultiVersionedKeyQuery typedKeyQuery = (MultiVersionedKeyQuery) query;
+
+ final Instant fromTime = typedKeyQuery.fromTime().orElse(Instant.ofEpochMilli(Long.MIN_VALUE));
+ final Instant toTime = typedKeyQuery.toTime().orElse(Instant.ofEpochMilli(Long.MAX_VALUE));
+ if (fromTime.compareTo(toTime) > 0) {
+ throw new IllegalArgumentException("The `fromTime` timestamp must be smaller than the `toTime` timestamp.");
+ }
+ MultiVersionedKeyQuery rawKeyQuery = MultiVersionedKeyQuery.withKey(keyBytes(typedKeyQuery.key()));
+ rawKeyQuery = rawKeyQuery.fromTime(fromTime).toTime(toTime);
+ if (typedKeyQuery.resultOrder().equals(ResultOrder.DESCENDING)) {
+ rawKeyQuery = rawKeyQuery.withDescendingTimestamps();
+ } else if (typedKeyQuery.resultOrder().equals(ResultOrder.ASCENDING)) {
+ rawKeyQuery = rawKeyQuery.withAscendingTimestamps();
+ }
+
+ final QueryResult> rawResult = wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final MeteredMultiVersionedKeyQueryIterator typedResult =
+ new MeteredMultiVersionedKeyQueryIterator(rawResult.getResult(), StoreQueryUtils.getDeserializeValue(plainValueSerdes));
+ final QueryResult> typedQueryResult =
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
+ result = (QueryResult) typedQueryResult;
+ } else {
+ // the generic type doesn't matter, since failed queries have no result set.
+ result = (QueryResult) rawResult;
+ }
+ return result;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Serde> prepareValueSerdeForStore(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 77e0c1e05e396..86b7b1d34b126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.serialization.Serializer;
@@ -52,9 +53,11 @@
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
import org.rocksdb.Statistics;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBatch;
@@ -372,6 +375,14 @@ private void validateStoreOpen() {
}
}
+ public Snapshot getSnapshot() {
+ return db.getSnapshot();
+ }
+
+ public void releaseSnapshot(final Snapshot snapshot) {
+ db.releaseSnapshot(snapshot);
+ }
+
@Override
public synchronized void put(final Bytes key,
final byte[] value) {
@@ -455,9 +466,17 @@ , P> KeyValueIterator doPrefixScan(final
@Override
public synchronized byte[] get(final Bytes key) {
+ return get(key, Optional.empty());
+ }
+
+ public synchronized byte[] get(final Bytes key, final ReadOptions readOptions) {
+ return get(key, Optional.of(readOptions));
+ }
+
+ private synchronized byte[] get(final Bytes key, final Optional readOptions) {
validateStoreOpen();
try {
- return dbAccessor.get(key.get());
+ return readOptions.isPresent() ? dbAccessor.get(key.get(), readOptions.get()) : dbAccessor.get(key.get());
} catch (final RocksDBException e) {
// String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
throw new ProcessorStateException("Error while getting value for key from store " + name, e);
@@ -704,6 +723,8 @@ void prepareBatch(final List> entries,
byte[] get(final byte[] key) throws RocksDBException;
+ byte[] get(final byte[] key, ReadOptions readOptions) throws RocksDBException;
+
/**
* In contrast to get(), we don't migrate the key to new CF.
*
@@ -777,6 +798,11 @@ public byte[] get(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
}
+ @Override
+ public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
+ return db.get(columnFamily, readOptions, key);
+ }
+
@Override
public byte[] getOnly(final byte[] key) throws RocksDBException {
return db.get(columnFamily, key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index 7ef8d1ba06c12..6f54c385f135a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.util.Optional;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -27,6 +28,7 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
@@ -139,12 +141,21 @@ public void prepareBatch(final List> entries,
@Override
public byte[] get(final byte[] key) throws RocksDBException {
- final byte[] valueWithTimestamp = db.get(newColumnFamily, key);
+ return get(key, Optional.empty());
+ }
+
+ @Override
+ public byte[] get(final byte[] key, final ReadOptions readOptions) throws RocksDBException {
+ return get(key, Optional.of(readOptions));
+ }
+
+ private byte[] get(final byte[] key, final Optional readOptions) throws RocksDBException {
+ final byte[] valueWithTimestamp = readOptions.isPresent() ? db.get(newColumnFamily, readOptions.get(), key) : db.get(newColumnFamily, key);
if (valueWithTimestamp != null) {
return valueWithTimestamp;
}
- final byte[] plainValue = db.get(oldColumnFamily, key);
+ final byte[] plainValue = readOptions.isPresent() ? db.get(oldColumnFamily, readOptions.get(), key) : db.get(oldColumnFamily, key);
if (plainValue != null) {
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
// this does only work, because the changelog topic contains correct data already
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
index c67d9a2fb5e84..3379d3cebc974 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java
@@ -21,9 +21,12 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
@@ -45,8 +48,10 @@
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -204,8 +209,8 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) {
if (latestTimestamp <= asOfTimestamp) {
// latest value satisfies timestamp bound
return new VersionedRecord<>(
- LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
- latestTimestamp
+ LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+ latestTimestamp
);
}
}
@@ -248,11 +253,11 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) {
// the desired result is contained in this segment
final SegmentSearchResult searchResult =
- RocksDBVersionedStoreSegmentValueFormatter
- .deserialize(rawSegmentValue)
- .find(asOfTimestamp, true);
+ RocksDBVersionedStoreSegmentValueFormatter
+ .deserialize(rawSegmentValue)
+ .find(asOfTimestamp, true);
if (searchResult.value() != null) {
- return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+ return new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo());
} else {
return null;
}
@@ -263,6 +268,31 @@ public VersionedRecord get(final Bytes key, final long asOfTimestamp) {
return null;
}
+ @SuppressWarnings("unchecked")
+ VersionedRecordIterator get(final Bytes key, final long fromTimestamp, final long toTimestamp, final ResultOrder order) {
+ validateStoreOpen();
+
+ if (toTimestamp < observedStreamTime - historyRetention) {
+ // history retention exceeded. we still check the latest value store in case the
+ // latest record version satisfies the timestamp bound, in which case it should
+ // still be returned (i.e., the latest record version per key never expires).
+ return new LogicalSegmentIterator(Collections.singletonList(latestValueStore).listIterator(), key, fromTimestamp, toTimestamp, order);
+ } else {
+ final List segments = new ArrayList<>();
+ // add segment stores
+ // consider the search lower bound as -INF (LONG.MIN_VALUE) to find the record that has been inserted before the {@code fromTimestamp}
+ // but is still valid in query specified time interval.
+ if (order.equals(ResultOrder.ASCENDING)) {
+ segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, true));
+ segments.add(latestValueStore);
+ } else {
+ segments.add(latestValueStore);
+ segments.addAll(segmentStores.segments(Long.MIN_VALUE, toTimestamp, false));
+ }
+ return new LogicalSegmentIterator(segments.listIterator(), key, fromTimestamp, toTimestamp, order);
+ }
+ }
+
@Override
public String name() {
return name;
@@ -286,9 +316,9 @@ public void close() {
@Override
public QueryResult query(
- final Query query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ final Query query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
return StoreQueryUtils.handleBasicQueries(
query,
positionBound,
@@ -324,9 +354,9 @@ public void init(final ProcessorContext context, final StateStore root) {
final String taskName = context.taskId().toString();
expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
- threadId,
- taskName,
- metrics
+ threadId,
+ taskName,
+ metrics
);
metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
@@ -339,17 +369,17 @@ public void init(final ProcessorContext context, final StateStore root) {
// register and possibly restore the state from the logs
stateStoreContext.register(
- root,
- (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
- () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+ root,
+ (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+ () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
);
open = true;
consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
- context.appConfigs(),
- IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
- false
+ context.appConfigs(),
+ IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+ false
);
}
@@ -388,18 +418,18 @@ void restoreBatch(final Collection> records) {
observedStreamTime = Math.max(observedStreamTime, record.timestamp());
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
- record,
- consistencyEnabled,
- position
+ record,
+ consistencyEnabled,
+ position
);
// put records to write buffer
doPut(
- restoreClient,
- endOfBatchStreamTime,
- new Bytes(record.key()),
- record.value(),
- record.timestamp()
+ restoreClient,
+ endOfBatchStreamTime,
+ new Bytes(record.key()),
+ record.value(),
+ record.timestamp()
);
}
@@ -462,8 +492,8 @@ interface VersionedStoreClient {
/**
* @return all segments in the store which contain timestamps at least the provided
- * timestamp bound, in reverse order by segment id (and time), i.e., such that
- * the most recent segment is first
+ * timestamp bound, in reverse order by segment id (and time), i.e., such that
+ * the most recent segment is first
*/
List getReverseSegments(long timestampFrom);
@@ -548,11 +578,11 @@ public void writeLatestValues(final WriteBatch batch) throws RocksDBException {
* putting records into the store which will have expired by the end of the restore.
*/
private long doPut(
- final VersionedStoreClient versionedStoreClient,
- final long observedStreamTime,
- final Bytes key,
- final byte[] value,
- final long timestamp
+ final VersionedStoreClient versionedStoreClient,
+ final long observedStreamTime,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp
) {
segmentStores.cleanupExpiredSegments(observedStreamTime);
@@ -563,11 +593,11 @@ private long doPut(
// check latest value store
PutStatus status = maybePutToLatestValueStore(
- versionedStoreClient,
- observedStreamTime,
- key,
- value,
- timestamp
+ versionedStoreClient,
+ observedStreamTime,
+ key,
+ value,
+ timestamp
);
if (status.isComplete) {
return status.foundTs == SENTINEL_TIMESTAMP ? PUT_RETURN_CODE_VALID_TO_UNDEFINED : status.foundTs;
@@ -577,12 +607,12 @@ private long doPut(
// continue search in segments
status = maybePutToSegments(
- versionedStoreClient,
- observedStreamTime,
- key,
- value,
- timestamp,
- foundTs
+ versionedStoreClient,
+ observedStreamTime,
+ key,
+ value,
+ timestamp,
+ foundTs
);
if (status.isComplete) {
return status.foundTs == SENTINEL_TIMESTAMP ? PUT_RETURN_CODE_VALID_TO_UNDEFINED : status.foundTs;
@@ -593,12 +623,12 @@ private long doPut(
// the record did not unconditionally belong in any specific store (latest value store
// or segments store). insert based on foundTs here instead.
foundTs = finishPut(
- versionedStoreClient,
- observedStreamTime,
- key,
- value,
- timestamp,
- foundTs
+ versionedStoreClient,
+ observedStreamTime,
+ key,
+ value,
+ timestamp,
+ foundTs
);
return foundTs == SENTINEL_TIMESTAMP ? PUT_RETURN_CODE_VALID_TO_UNDEFINED : foundTs;
}
@@ -627,11 +657,11 @@ private static class PutStatus {
}
private PutStatus maybePutToLatestValueStore(
- final VersionedStoreClient versionedStoreClient,
- final long observedStreamTime,
- final Bytes key,
- final byte[] value,
- final long timestamp
+ final VersionedStoreClient versionedStoreClient,
+ final long observedStreamTime,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp
) {
// initialize with a starting "sentinel timestamp" which represents
// that the segment should be inserted into the latest value store.
@@ -665,10 +695,10 @@ private PutStatus maybePutToLatestValueStore(
final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue == null) {
segment.put(
- key,
- RocksDBVersionedStoreSegmentValueFormatter
- .newSegmentValueWithRecord(rawValueToMove, latestValueStoreTimestamp, timestamp)
- .serialize()
+ key,
+ RocksDBVersionedStoreSegmentValueFormatter
+ .newSegmentValueWithRecord(rawValueToMove, latestValueStoreTimestamp, timestamp)
+ .serialize()
);
} else {
final SegmentValue segmentValue = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
@@ -693,12 +723,12 @@ private PutStatus maybePutToLatestValueStore(
}
private PutStatus maybePutToSegments(
- final VersionedStoreClient versionedStoreClient,
- final long observedStreamTime,
- final Bytes key,
- final byte[] value,
- final long timestamp,
- final long prevFoundTs
+ final VersionedStoreClient versionedStoreClient,
+ final long observedStreamTime,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp,
+ final long prevFoundTs
) {
// initialize with current foundTs value
long foundTs = prevFoundTs;
@@ -720,13 +750,13 @@ private PutStatus maybePutToSegments(
// the record being inserted belongs in this segment.
// insert and conclude the procedure.
foundTs = putToSegment(
- versionedStoreClient,
- observedStreamTime,
- segment,
- rawSegmentValue,
- key,
- value,
- timestamp
+ versionedStoreClient,
+ observedStreamTime,
+ segment,
+ rawSegmentValue,
+ key,
+ value,
+ timestamp
);
return new PutStatus(true, foundTs);
}
@@ -752,13 +782,13 @@ private PutStatus maybePutToSegments(
* @return updated {@code foundTs} value, i.e., the validTo timestamp of the record being put
*/
private long putToSegment(
- final VersionedStoreClient versionedStoreClient,
- final long observedStreamTime,
- final T segment,
- final byte[] rawSegmentValue,
- final Bytes key,
- final byte[] value,
- final long timestamp
+ final VersionedStoreClient versionedStoreClient,
+ final long observedStreamTime,
+ final T segment,
+ final byte[] rawSegmentValue,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp
) {
final long segmentIdForTimestamp = versionedStoreClient.segmentIdForTimestamp(timestamp);
// it's possible that putting the current record into a segment will require moving an
@@ -794,7 +824,7 @@ private long putToSegment(
// first means there will not be data loss. (rather, there will be
// duplicated data which is fine as it can/will be reconciled later.)
final T olderSegment = versionedStoreClient
- .getOrCreateSegmentIfLive(segmentIdForTimestamp, context, observedStreamTime);
+ .getOrCreateSegmentIfLive(segmentIdForTimestamp, context, observedStreamTime);
// `olderSegment == null` implies that all data in the older segment is older than the
// history retention of this store, and therefore does not need to tracked.
// as a result, we only need to move the existing record from the newer segment
@@ -808,14 +838,14 @@ private long putToSegment(
final byte[] rawOlderSegmentValue = olderSegment.get(key);
if (rawOlderSegmentValue == null) {
olderSegment.put(
- key,
- RocksDBVersionedStoreSegmentValueFormatter.newSegmentValueWithRecord(
- searchResult.value(), searchResult.validFrom(), timestamp
- ).serialize()
+ key,
+ RocksDBVersionedStoreSegmentValueFormatter.newSegmentValueWithRecord(
+ searchResult.value(), searchResult.validFrom(), timestamp
+ ).serialize()
);
} else {
final SegmentValue olderSegmentValue
- = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawOlderSegmentValue);
+ = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawOlderSegmentValue);
olderSegmentValue.insertAsLatest(searchResult.validFrom(), timestamp, searchResult.value());
olderSegment.put(key, olderSegmentValue.serialize());
}
@@ -837,12 +867,12 @@ private long putToSegment(
* @return updated {@code foundTs} value, i.e., the validTo timestamp of the record being put
*/
private long finishPut(
- final VersionedStoreClient versionedStoreClient,
- final long observedStreamTime,
- final Bytes key,
- final byte[] value,
- final long timestamp,
- final long foundTs
+ final VersionedStoreClient versionedStoreClient,
+ final long observedStreamTime,
+ final Bytes key,
+ final byte[] value,
+ final long timestamp,
+ final long foundTs
) {
if (foundTs == SENTINEL_TIMESTAMP) {
// insert into latest value store
@@ -852,7 +882,7 @@ private long finishPut(
// tombstones are not inserted into the latest value store. insert into segment instead.
// the specific segment to insert to is determined based on the tombstone's timestamp
final T segment = versionedStoreClient.getOrCreateSegmentIfLive(
- versionedStoreClient.segmentIdForTimestamp(timestamp), context, observedStreamTime);
+ versionedStoreClient.segmentIdForTimestamp(timestamp), context, observedStreamTime);
if (segment == null) {
// the record being inserted does not affect version history. discard and return.
// this can happen during restore because individual put calls are executed after
@@ -868,10 +898,10 @@ private long finishPut(
// record versions for this key, create a new "degenerate" segment with the
// tombstone's timestamp as both validFrom and validTo timestamps for the segment
segment.put(
- key,
- RocksDBVersionedStoreSegmentValueFormatter
- .newSegmentValueWithRecord(null, timestamp, timestamp)
- .serialize()
+ key,
+ RocksDBVersionedStoreSegmentValueFormatter
+ .newSegmentValueWithRecord(null, timestamp, timestamp)
+ .serialize()
);
} else {
// insert as latest, since foundTs = sentinel means nothing later exists
@@ -881,11 +911,11 @@ private long finishPut(
return foundTs;
}
final SegmentValue segmentValue
- = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+ = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
segmentValue.insertAsLatest(
- RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue),
- timestamp,
- null
+ RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue),
+ timestamp,
+ null
);
segment.put(key, segmentValue.serialize());
}
@@ -899,7 +929,7 @@ private long finishPut(
// minTimestamp <= timestamp < nextTimestamp, and putSegments would've completed the
// put procedure without reaching this fall-through case.)
final T segment = versionedStoreClient.getOrCreateSegmentIfLive(
- versionedStoreClient.segmentIdForTimestamp(foundTs), context, observedStreamTime);
+ versionedStoreClient.segmentIdForTimestamp(foundTs), context, observedStreamTime);
if (segment == null) {
// the record being inserted does not affect version history. discard and return.
// this can happen during restore because individual put calls are executed after
@@ -911,10 +941,10 @@ private long finishPut(
final byte[] rawSegmentValue = segment.get(key);
if (rawSegmentValue == null) {
segment.put(
- key,
- RocksDBVersionedStoreSegmentValueFormatter
- .newSegmentValueWithRecord(value, timestamp, foundTs)
- .serialize()
+ key,
+ RocksDBVersionedStoreSegmentValueFormatter
+ .newSegmentValueWithRecord(value, timestamp, foundTs)
+ .serialize()
);
} else {
final long foundNextTs = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
@@ -941,13 +971,13 @@ private long finishPut(
* Bytes layout for the value portion of rows stored in the latest value store. The layout is
* a fixed-size timestamp concatenated with the actual record value.
*/
- private static final class LatestValueFormatter {
+ static final class LatestValueFormatter {
private static final int TIMESTAMP_SIZE = 8;
/**
* @return the timestamp, from the latest value store value bytes (representing value
- * and timestamp)
+ * and timestamp)
*/
static long getTimestamp(final byte[] rawLatestValueAndTimestamp) {
return ByteBuffer.wrap(rawLatestValueAndTimestamp).getLong();
@@ -955,7 +985,7 @@ static long getTimestamp(final byte[] rawLatestValueAndTimestamp) {
/**
* @return the actual record value, from the latest value store value bytes (representing
- * value and timestamp)
+ * value and timestamp)
*/
static byte[] getValue(final byte[] rawLatestValueAndTimestamp) {
final byte[] rawValue = new byte[rawLatestValueAndTimestamp.length - TIMESTAMP_SIZE];
@@ -965,7 +995,7 @@ static byte[] getValue(final byte[] rawLatestValueAndTimestamp) {
/**
* @return the formatted bytes containing the provided {@code rawValue} and
- * {@code timestamp}, ready to be stored into the latest value store
+ * {@code timestamp}, ready to be stored into the latest value store
*/
static byte[] from(final byte[] rawValue, final long timestamp) {
if (rawValue == null) {
@@ -973,9 +1003,9 @@ static byte[] from(final byte[] rawValue, final long timestamp) {
}
return ByteBuffer.allocate(TIMESTAMP_SIZE + rawValue.length)
- .putLong(timestamp)
- .put(rawValue)
- .array();
+ .putLong(timestamp)
+ .put(rawValue)
+ .array();
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index c4f0eac010e06..fbd2ed74e7f5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -150,6 +150,8 @@ interface SegmentValue {
*/
SegmentSearchResult find(long timestamp, boolean includeValue);
+ List findAll(long fromTime, long toTime);
+
/**
* Inserts the provided record into the segment as the latest record in the segment row.
*
@@ -337,6 +339,35 @@ public SegmentSearchResult find(final long timestamp, final boolean includeValue
throw new IllegalStateException("Search in segment expected to find result but did not.");
}
+ @Override
+ public List findAll(final long fromTime, final long toTime) {
+ long currNextTimestamp = nextTimestamp;
+ final List segmentSearchResults = new ArrayList<>();
+ long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked
+ int currValueSize;
+ int currIndex = 0;
+ int cumValueSize = 0;
+ while (currTimestamp != minTimestamp) {
+ final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+ currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+ currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+ cumValueSize += Math.max(currValueSize, 0);
+ if (currValueSize >= 0) {
+ final byte[] value = new byte[currValueSize];
+ final int valueSegmentIndex = segmentValue.length - cumValueSize;
+ System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize);
+ if (currTimestamp <= toTime && currNextTimestamp > fromTime) {
+ segmentSearchResults.add(new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+ }
+ }
+
+ // prep for next iteration
+ currNextTimestamp = currTimestamp;
+ currIndex++;
+ }
+ return segmentSearchResults;
+ }
+
@Override
public void insertAsLatest(final long validFrom, final long validTo, final byte[] valueOrNull) {
final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 3278245f11bbe..5b51c5ae3857d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -42,6 +43,7 @@
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -97,6 +99,10 @@ QueryResult> apply(
mkEntry(
VersionedKeyQuery.class,
StoreQueryUtils::runVersionedKeyQuery
+ ),
+ mkEntry(
+ MultiVersionedKeyQuery.class,
+ StoreQueryUtils::runMultiVersionedKeyQuery
)
);
@@ -221,6 +227,7 @@ private static QueryResult runKeyQuery(final Query query,
final PositionBound positionBound,
final QueryConfig config,
final StateStore store) {
+
if (store instanceof KeyValueStore) {
final KeyQuery rawKeyQuery = (KeyQuery) query;
final KeyValueStore keyValueStore =
@@ -374,9 +381,33 @@ private static QueryResult runVersionedKeyQuery(final Query query,
}
}
+ @SuppressWarnings("unchecked")
+ private static QueryResult runMultiVersionedKeyQuery(final Query query,
+ final PositionBound positionBound,
+ final QueryConfig config,
+ final StateStore store) {
+
+ if (store instanceof VersionedKeyValueStore) {
+ final RocksDBVersionedStore rocksDBVersionedStore = (RocksDBVersionedStore) store;
+ final MultiVersionedKeyQuery rawKeyQuery = (MultiVersionedKeyQuery) query;
+ try {
+ final VersionedRecordIterator segmentIterator =
+ rocksDBVersionedStore.get(rawKeyQuery.key(),
+ rawKeyQuery.fromTime().get().toEpochMilli(),
+ rawKeyQuery.toTime().get().toEpochMilli(),
+ rawKeyQuery.resultOrder());
+ return (QueryResult) QueryResult.forResult(segmentIterator);
+ } catch (final Exception e) {
+ final String message = parseStoreException(e, store, query);
+ return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, message);
+ }
+ } else {
+ return QueryResult.forUnknownQueryType(query, store);
+ }
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
- public static Function getDeserializeValue(final StateSerdes, V> serdes,
- final StateStore wrapped) {
+ public static Function getDeserializeValue(final StateSerdes, V> serdes, final StateStore wrapped) {
final Serde valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped);
final Deserializer deserializer;
@@ -390,16 +421,25 @@ public static Function getDeserializeValue(final StateSerdes, V
return byteArray -> deserializer.deserialize(serdes.topic(), byteArray);
}
- public static VersionedRecord deserializeVersionedRecord(final StateSerdes, V> serdes,
- final VersionedRecord rawVersionedRecord) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public static Function, VersionedRecord> getDeserializeValue(final StateSerdes, V> serdes) {
+ final Serde valueSerde = serdes.valueSerde();
+ final Deserializer deserializer = valueSerde.deserializer();
+ return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent() ? new VersionedRecord<>(deserializer.deserialize(serdes.topic(), rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp(),
+ rawVersionedRecord.validTo().get())
+ : new VersionedRecord<>(deserializer.deserialize(serdes.topic(), rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp());
+ }
+
+ public static VersionedRecord deserializeVersionedRecord(final StateSerdes, V> serdes, final VersionedRecord rawVersionedRecord) {
final Deserializer valueDeserializer = serdes.valueDeserializer();
- final long timestamp = rawVersionedRecord.timestamp();
final V value = valueDeserializer.deserialize(serdes.topic(), rawVersionedRecord.value());
- return new VersionedRecord<>(value, timestamp);
+ return rawVersionedRecord.validTo().isPresent() ? new VersionedRecord<>(value, rawVersionedRecord.timestamp(), rawVersionedRecord.validTo().get())
+ : new VersionedRecord<>(value, rawVersionedRecord.timestamp());
}
- public static void checkpointPosition(final OffsetCheckpoint checkpointFile,
- final Position position) {
+ public static void checkpointPosition(final OffsetCheckpoint checkpointFile, final Position position) {
try {
checkpointFile.write(positionToTopicPartitionMap(position));
} catch (final IOException e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
index 339a62dd74b54..3c49b864185c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.java
@@ -19,16 +19,25 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import java.time.Duration;
import java.time.Instant;
+import java.time.temporal.ChronoField;
import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
@@ -38,13 +47,16 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@@ -60,37 +72,45 @@ public class IQv2VersionedStoreIntegrationTest {
private static final String INPUT_TOPIC_NAME = "input-topic";
private static final String STORE_NAME = "versioned-store";
private static final Duration HISTORY_RETENTION = Duration.ofDays(1);
- private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
- private static final Long RECORD_TIMESTAMP_OLD = BASE_TIMESTAMP.toEpochMilli();
- private static final Long RECORD_TIMESTAMP_NEW = RECORD_TIMESTAMP_OLD + 100;
+ private static final Duration SEGMENT_INTERVAL = Duration.ofHours(1);
+
private static final int RECORD_KEY = 2;
- private static final int RECORD_VALUE_OLD = 2;
- private static final int RECORD_VALUE_NEW = 3;
+ private static final int NON_EXISTING_KEY = 3;
+
+ private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
+ private static final Long BASE_TIMESTAMP_LONG = BASE_TIMESTAMP.getLong(ChronoField.INSTANT_SECONDS);
+ private static final Integer[] RECORD_VALUES = {2, 20, 200, 2000};
+ private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG, BASE_TIMESTAMP_LONG + 10, BASE_TIMESTAMP_LONG + 20, BASE_TIMESTAMP_LONG + 30};
+ private static final int RECORD_NUMBER = RECORD_VALUES.length;
+ private static final int LAST_INDEX = RECORD_NUMBER - 1;
private static final Position INPUT_POSITION = Position.emptyPosition();
- public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS,
- Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));
+
private KafkaStreams kafkaStreams;
@BeforeClass
public static void before() throws Exception {
CLUSTER.start();
+
final Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) {
- producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_OLD, RECORD_KEY, RECORD_VALUE_OLD)).get();
- producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMP_NEW, RECORD_KEY, RECORD_VALUE_NEW)).get();
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, RECORD_VALUES[0])).get();
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[1], RECORD_KEY, RECORD_VALUES[1])).get();
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[2], RECORD_KEY, RECORD_VALUES[2])).get();
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[3], RECORD_KEY, RECORD_VALUES[3])).get();
}
- INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 1);
+ INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3);
}
@Before
public void beforeTest() {
final StreamsBuilder builder = new StreamsBuilder();
builder.table(INPUT_TOPIC_NAME,
- Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION)));
+ Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION, SEGMENT_INTERVAL)));
final Properties configs = new Properties();
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
@@ -114,34 +134,55 @@ public static void after() {
@Test
public void verifyStore() {
+ /* Test Versioned Key Queries */
// retrieve the latest value
- shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.empty(), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
- shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.now()), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
- shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_NEW)), RECORD_VALUE_NEW, RECORD_TIMESTAMP_NEW);
+ shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
+ shouldHandleVersionedKeyQuery(Optional.of(Instant.now()), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
+ shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[3])), RECORD_VALUES[3], RECORD_TIMESTAMPS[3], Optional.empty());
// retrieve the old value
- shouldHandleVersionedKeyQuery(RECORD_KEY, Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD)), RECORD_VALUE_OLD, RECORD_TIMESTAMP_OLD);
+ shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0])), RECORD_VALUES[0], RECORD_TIMESTAMPS[0], Optional.of(RECORD_TIMESTAMPS[1]));
// there is no record for the provided timestamp
- shouldVerifyGetNull(RECORD_KEY, Instant.ofEpochMilli(RECORD_TIMESTAMP_OLD - 50));
+ shouldVerifyGetNullForVersionedKeyQuery(RECORD_KEY, Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50));
+ // there is no record with this key
+ shouldVerifyGetNullForVersionedKeyQuery(NON_EXISTING_KEY, Instant.now());
+
+ /* Test Multi Versioned Key Queries */
+ // retrieve all existing values
+ shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ANY, 0, LAST_INDEX);
+ // retrieve all existing values in ascending order
+ shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ASCENDING, 0, LAST_INDEX);
+ // retrieve existing values in query defined time range
+ shouldHandleMultiVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[1] + 5)), Optional.of(Instant.now()),
+ ResultOrder.ANY, 1, LAST_INDEX);
+ // there is no record in the query specified time range
+ shouldVerifyGetNullForMultiVersionedKeyQuery(RECORD_KEY,
+ Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 100)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50)),
+ ResultOrder.ANY);
+ // there is no record in the query specified time range even retrieving results in ascending order
+ shouldVerifyGetNullForMultiVersionedKeyQuery(RECORD_KEY,
+ Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 100)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0] - 50)),
+ ResultOrder.ASCENDING);
// there is no record with this key
- shouldVerifyGetNull(3, Instant.now());
+ shouldVerifyGetNullForMultiVersionedKeyQuery(NON_EXISTING_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY);
+ // there is no record with this key even retrieving results in ascending order
+ shouldVerifyGetNullForMultiVersionedKeyQuery(NON_EXISTING_KEY, Optional.empty(), Optional.empty(), ResultOrder.ASCENDING);
+ // test concurrent write while retrieving records
+ shouldHandleRaceCondition();
}
- private void shouldHandleVersionedKeyQuery(final Integer key,
- final Optional queryTimestamp,
+ private void shouldHandleVersionedKeyQuery(final Optional queryTimestamp,
final Integer expectedValue,
- final Long expectedTimestamp) {
+ final Long expectedTimestamp,
+ final Optional expectedValidToTime) {
- VersionedKeyQuery query = VersionedKeyQuery.withKey(key);
- if (queryTimestamp.isPresent()) {
- query = query.asOf(queryTimestamp.get());
- }
- final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
- final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ final VersionedKeyQuery query = defineQuery(RECORD_KEY, queryTimestamp);
- if (result.getOnlyPartitionResult() == null) {
+ final QueryResult> queryResult = sendRequestAndReceiveResults(query, kafkaStreams);
+
+ // verify results
+ if (queryResult == null) {
throw new AssertionError("The query returned null.");
}
- final QueryResult> queryResult = result.getOnlyPartitionResult();
if (queryResult.isFailure()) {
throw new AssertionError(queryResult.toString());
}
@@ -153,14 +194,187 @@ private void shouldHandleVersionedKeyQuery(final Integer key,
final VersionedRecord result1 = queryResult.getResult();
assertThat(result1.value(), is(expectedValue));
assertThat(result1.timestamp(), is(expectedTimestamp));
+ assertThat(result1.validTo(), is(expectedValidToTime));
assertThat(queryResult.getExecutionInfo(), is(empty()));
}
- private void shouldVerifyGetNull(final Integer key, final Instant queryTimestamp) {
+ private void shouldVerifyGetNullForVersionedKeyQuery(final Integer key, final Instant queryTimestamp) {
+ final VersionedKeyQuery query = defineQuery(key, Optional.of(queryTimestamp));
+ assertThat(sendRequestAndReceiveResults(query, kafkaStreams), nullValue());
+ }
+
+ private void shouldHandleMultiVersionedKeyQuery(final Optional fromTime, final Optional toTime,
+ final ResultOrder order, final int expectedArrayLowerBound, final int expectedArrayUpperBound) {
+
+ final MultiVersionedKeyQuery query = defineQuery(RECORD_KEY, fromTime, toTime, order);
+
+ final Map>> partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
+
+ // verify results
+ for (final Entry>> partitionResultsEntry : partitionResults.entrySet()) {
+ verifyPartitionResult(partitionResultsEntry.getValue());
+ try (final VersionedRecordIterator iterator = partitionResultsEntry.getValue().getResult()) {
+ int i = order.equals(ResultOrder.ASCENDING) ? 0 : expectedArrayUpperBound;
+ int iteratorSize = 0;
+ while (iterator.hasNext()) {
+ final VersionedRecord record = iterator.next();
+ final Long timestamp = record.timestamp();
+ final Optional validTo = record.validTo();
+ final Integer value = record.value();
+
+ final Optional expectedValidTo = i < expectedArrayUpperBound ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+ assertThat(value, is(RECORD_VALUES[i]));
+ assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+ assertThat(validTo, is(expectedValidTo));
+ i = order.equals(ResultOrder.ASCENDING) ? i + 1 : i - 1;
+ iteratorSize++;
+ }
+ // The number of returned records by query is equal to expected number of records
+ assertThat(iteratorSize, equalTo(expectedArrayUpperBound - expectedArrayLowerBound + 1));
+ }
+ }
+ }
+
+ private void shouldVerifyGetNullForMultiVersionedKeyQuery(final Integer key, final Optional fromTime, final Optional toTime, final ResultOrder order) {
+ final MultiVersionedKeyQuery query = defineQuery(key, fromTime, toTime, order);
+
+ final Map>> partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
+
+ // verify results
+ for (final Entry>> partitionResultsEntry : partitionResults.entrySet()) {
+ try (final VersionedRecordIterator iterator = partitionResultsEntry.getValue().getResult()) {
+ assertFalse(iterator.hasNext());
+ }
+ }
+ }
+
+ /**
+ * This method updates a record value in an existing timestamp, while it is retrieving records.
+ * Since IQv2 guarantees snapshot semantics, we expect that the old value is retrieved.
+ */
+ private void shouldHandleRaceCondition() {
+ final MultiVersionedKeyQuery query = defineQuery(RECORD_KEY, Optional.empty(), Optional.empty(), ResultOrder.ANY);
+
+ final Map>> partitionResults = sendRequestAndReceiveResults(query, kafkaStreams);
+
+ // verify results in two steps
+ for (final Entry>> partitionResultsEntry : partitionResults.entrySet()) {
+ try (final VersionedRecordIterator iterator = partitionResultsEntry.getValue().getResult()) {
+ int i = LAST_INDEX;
+ int iteratorSize = 0;
+
+ // step 1:
+ while (iterator.hasNext()) {
+ final VersionedRecord record = iterator.next();
+ final Long timestamp = record.timestamp();
+ final Optional validTo = record.validTo();
+ final Integer value = record.value();
+
+ final Optional expectedValidTo = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
+ assertThat(value, is(RECORD_VALUES[i]));
+ assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+ assertThat(validTo, is(expectedValidTo));
+ i--;
+ iteratorSize++;
+ if (i == 2) {
+ break;
+ }
+ }
+
+ // update the value of the oldest record
+ updateRecordValue();
+
+ // step 2: continue reading records from through the already opened iterator
+ while (iterator.hasNext()) {
+ final VersionedRecord record = iterator.next();
+ final Long timestamp = record.timestamp();
+ final Optional validTo = record.validTo();
+ final Integer value = record.value();
+
+ final Optional expectedValidTo = Optional.of(RECORD_TIMESTAMPS[i + 1]);
+ assertThat(value, is(RECORD_VALUES[i]));
+ assertThat(timestamp, is(RECORD_TIMESTAMPS[i]));
+ assertThat(validTo, is(expectedValidTo));
+ i--;
+ iteratorSize++;
+ }
+
+ // The number of returned records by query is equal to expected number of records
+ assertThat(iteratorSize, equalTo(RECORD_NUMBER));
+ }
+ }
+ }
+
+ private static VersionedKeyQuery defineQuery(final Integer key, final Optional queryTimestamp) {
VersionedKeyQuery query = VersionedKeyQuery.withKey(key);
- query = query.asOf(queryTimestamp);
+ if (queryTimestamp.isPresent()) {
+ query = query.asOf(queryTimestamp.get());
+ }
+ return query;
+ }
+
+ private static MultiVersionedKeyQuery defineQuery(final Integer key, final Optional fromTime, final Optional toTime, final ResultOrder order) {
+ MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey(key);
+ if (fromTime.isPresent()) {
+ query = query.fromTime(fromTime.get());
+ }
+ if (toTime.isPresent()) {
+ query = query.toTime(toTime.get());
+ }
+ if (order.equals(ResultOrder.ASCENDING)) {
+ query = query.withAscendingTimestamps();
+ }
+ return query;
+ }
+
+ private static Map>> sendRequestAndReceiveResults(final MultiVersionedKeyQuery query, final KafkaStreams kafkaStreams) {
+ final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
+ final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+ return result.getPartitionResults();
+ }
+
+ private static QueryResult> sendRequestAndReceiveResults(final VersionedKeyQuery query, final KafkaStreams kafkaStreams) {
final StateQueryRequest> request = StateQueryRequest.inStore(STORE_NAME).withQuery(query).withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult> result = IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
- assertThat(result.getOnlyPartitionResult(), nullValue());
+ return result.getOnlyPartitionResult();
+ }
+
+ private static void verifyPartitionResult(final QueryResult> result) {
+ assertThat(result.getExecutionInfo(), is(empty()));
+ if (result.isFailure()) {
+ throw new AssertionError(result.toString());
+ }
+ assertThat(result.isSuccess(), is(true));
+ assertThrows(IllegalArgumentException.class, result::getFailureReason);
+ assertThrows(IllegalArgumentException.class, result::getFailureMessage);
+ }
+
+ /**
+ * This method inserts a new value (999999) for the key in the oldest timestamp (RECORD_TIMESTAMPS[0]).
+ */
+ private static void updateRecordValue() {
+ // update the record value at RECORD_TIMESTAMPS[0]
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ try (final KafkaProducer producer = new KafkaProducer<>(producerProps)) {
+ producer.send(new ProducerRecord<>(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], RECORD_KEY, 999999));
+ }
+ INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4);
+ assertThat(INPUT_POSITION, equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4)));
+
+ // make sure that the new value is picked up by the store
+ final Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "foo");
+ consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ try {
+ IntegrationTestUtils.waitUntilMinRecordsReceived(consumerProps, INPUT_TOPIC_NAME, RECORD_NUMBER + 1);
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/MultiVersionedKeyQueryTest.java b/streams/src/test/java/org/apache/kafka/streams/query/MultiVersionedKeyQueryTest.java
new file mode 100644
index 0000000000000..32db99b00ecc6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/query/MultiVersionedKeyQueryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.junit.Test;
+
+public class MultiVersionedKeyQueryTest {
+
+ @Test
+ public void shouldThrowNPEWithNullKey() {
+ final Exception exception = assertThrows(NullPointerException.class, () -> MultiVersionedKeyQuery.withKey(null));
+ assertEquals("key cannot be null.", exception.getMessage());
+ }
+
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 6502f1bc8edec..b9d711abab531 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -24,6 +24,7 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
@@ -31,6 +32,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -53,6 +55,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.KeyQuery;
+import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
@@ -296,6 +299,18 @@ public void shouldThrowOnIQv2KeyQuery() {
assertThrows(UnsupportedOperationException.class, () -> store.query(mock(KeyQuery.class), null, null));
}
+ @Test
+ public void shouldThrowOnMultiVersionedKeyQueryInvalidTimeRange() {
+ MultiVersionedKeyQuery query = MultiVersionedKeyQuery.withKey("key");
+ final Instant fromTime = Instant.now();
+ final Instant toTime = Instant.ofEpochMilli(fromTime.toEpochMilli() - 100);
+ query = query.fromTime(fromTime).toTime(toTime);
+ final MultiVersionedKeyQuery finalQuery = query;
+ final Exception exception = assertThrows(IllegalArgumentException.class, () -> store.query(finalQuery, null, null));
+ assertEquals("The `fromTime` timestamp must be smaller than the `toTime` timestamp.", exception.getMessage());
+ }
+
+
@SuppressWarnings("unchecked")
@Test
public void shouldDelegateAndAddExecutionInfoOnCustomQuery() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index f4c596871ab53..c6cc12c93a753 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -215,6 +215,36 @@ public void shouldFindByTimestamp() {
assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.minTimestamp - 1, false));
}
+ @Test
+ public void shouldFindAll() {
+ if (testCase.isDegenerate) {
+ // cannot find() on degenerate segment
+ return;
+ }
+
+ final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+
+ // verify results
+ final List results =
+ segmentValue.findAll(testCase.records.get(testCase.records.size() - 1).timestamp, testCase.records.get(0).timestamp);
+
+ int i = 0;
+ for (final TestRecord expectedRecord : testCase.records) {
+ final long expectedValidTo = i == 0 ? testCase.nextTimestamp : testCase.records.get(i - 1).timestamp;
+ assertThat(results.get(i).index(), equalTo(i));
+ assertThat(results.get(i).value(), equalTo(expectedRecord.value));
+ assertThat(results.get(i).validFrom(), equalTo(expectedRecord.timestamp));
+ assertThat(results.get(i).validTo(), equalTo(expectedValidTo));
+ i++;
+ }
+
+ // verify exception when timestamp is out of range
+ assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.nextTimestamp, false));
+ assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.nextTimestamp + 1, false));
+ assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.minTimestamp - 1, false));
+ }
+
@Test
public void shouldGetTimestamps() {
final byte[] segmentValue = buildSegmentWithInsertLatest(testCase).serialize();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
index 071312a2fdc83..67d047dfcf1ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java
@@ -26,6 +26,8 @@
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -42,6 +44,8 @@
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.ResultOrder;
+import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
@@ -98,9 +102,9 @@ public void shouldPutLatest() {
putToStore("k", "v2", BASE_TIMESTAMP + 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 1);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, BASE_TIMESTAMP + 1);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v2", BASE_TIMESTAMP + 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
}
@Test
@@ -122,10 +126,10 @@ public void shouldPutOlderWithNonNullLatest() {
putToStore("k", "v4", BASE_TIMESTAMP - 4, BASE_TIMESTAMP - 2);
verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1, BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2, BASE_TIMESTAMP - 1);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4, BASE_TIMESTAMP - 2);
}
@Test
@@ -137,9 +141,9 @@ public void shouldPutOlderWithNullLatest() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 1, "v1", BASE_TIMESTAMP - 1, BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 2, "v2", BASE_TIMESTAMP - 2, BASE_TIMESTAMP - 1);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v4", BASE_TIMESTAMP - 4, BASE_TIMESTAMP - 2);
}
@Test
@@ -153,12 +157,12 @@ public void shouldPutOlderNullWithNonNullLatest() {
putToStore("k", null, BASE_TIMESTAMP - 6, BASE_TIMESTAMP - 5);
verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 1);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 2);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v3", BASE_TIMESTAMP - 3);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v3", BASE_TIMESTAMP - 3, BASE_TIMESTAMP - 2);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 4);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 5, "v5", BASE_TIMESTAMP - 5);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 5, "v5", BASE_TIMESTAMP - 5, BASE_TIMESTAMP - 4);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 6);
}
@@ -176,9 +180,9 @@ public void shouldPutOlderNullWithNullLatest() {
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 1);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 2);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v3", BASE_TIMESTAMP - 3);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 3, "v3", BASE_TIMESTAMP - 3, BASE_TIMESTAMP - 2);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 4);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 5, "v5", BASE_TIMESTAMP - 5);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP - 5, "v5", BASE_TIMESTAMP - 5, BASE_TIMESTAMP - 4);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 6);
}
@@ -188,7 +192,7 @@ public void shouldPutRepeatTimestampAsLatest() {
putToStore("k", "b", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 1);
putToStore("k", null, BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
@@ -206,7 +210,7 @@ public void shouldPutRepeatTimestampAsLatest() {
putToStore("k", "b", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "b", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "b", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP - 1);
}
@@ -232,12 +236,12 @@ public void shouldPutRepeatTimestamps() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
@@ -252,12 +256,12 @@ public void shouldPutIntoMultipleSegments() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10, SEGMENT_INTERVAL + 20);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 1);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
}
@@ -271,13 +275,13 @@ public void shouldMoveRecordToOlderSegmentDuringPut() {
putToStore("k", "vp1", SEGMENT_INTERVAL + 1, SEGMENT_INTERVAL + 10);
verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10, SEGMENT_INTERVAL + 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1, SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 2);
}
@Test
@@ -294,12 +298,12 @@ public void shouldMoveRecordToOlderSegmentWithNullsDuringPut() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
@@ -313,13 +317,13 @@ public void shouldFallThroughToExistingOlderSegmentAsLatestDuringPut() {
putToStore("k", "vn2", SEGMENT_INTERVAL - 2, SEGMENT_INTERVAL - 1);
verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 12);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2, SEGMENT_INTERVAL - 1);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
}
@Test
@@ -331,10 +335,10 @@ public void shouldPutNonLatestTombstoneIntoNewSegmentWithValidTo() {
putToStore("k", null, SEGMENT_INTERVAL - 2, SEGMENT_INTERVAL - 1);
verifyGetValueFromStore("k", "vp30", SEGMENT_INTERVAL + 30);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 10, "vn1", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 10, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 30);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 2);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 10);
}
@@ -411,7 +415,7 @@ public void shouldGetFromOlderSegments() {
putToStore("k", null, SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 30);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 15, "v", SEGMENT_INTERVAL - 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 15, "v", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 5);
// insert data to create non-empty (third) segment
@@ -420,7 +424,7 @@ public void shouldGetFromOlderSegments() {
// presence of non-empty later segment does not affect results of getting from earlier segment
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 30);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 15, "v", SEGMENT_INTERVAL - 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 15, "v", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 5);
}
@@ -430,12 +434,12 @@ public void shouldNotGetExpired() {
putToStore("k", "v", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// old record has not yet expired
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, "v_old", 0);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, "v_old", 0, SEGMENT_INTERVAL - 10);
putToStore("ko", "vo", HISTORY_RETENTION + SEGMENT_INTERVAL - 11, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// old record still has not yet expired
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, "v_old", 0);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, "v_old", 0, SEGMENT_INTERVAL - 10);
putToStore("ko", "vo2", HISTORY_RETENTION + SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
@@ -450,7 +454,7 @@ public void shouldGetExpiredIfLatestValue() {
putToStore("ko", "vo_new", HISTORY_RETENTION + 12, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// expired get on key where latest satisfies timestamp bound still returns data
- verifyTimestampedGetValueFromStore("k", 10, "v", 1);
+ verifyTimestampedGetValueFromStore("k", 10, "v", 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// same expired get on key where latest value does not satisfy timestamp bound does not return data
verifyTimestampedGetNullFromStore("ko", 10);
@@ -471,15 +475,189 @@ public void shouldDistinguishEmptyAndNull() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
+ @Test
+ public void shouldGetRecordVersionsFromOlderSegments() {
+ // use a different key to create three different segments
+ putToStore("ko", null, SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("ko", null, 2 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("ko", null, 3 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // return null after visiting all segments (the key does not exist.)
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL);
+
+ // insert data to create non-empty (first) segment
+ putToStore("k", "v1", SEGMENT_INTERVAL - 30, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v2", SEGMENT_INTERVAL - 25, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", null, SEGMENT_INTERVAL - 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", null, SEGMENT_INTERVAL - 15, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v3", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v4", SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+
+ // return null for the query with a time range prior to inserting values
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, SEGMENT_INTERVAL - 35);
+
+ // return values for the query with query time range in which values are still valid and there are multiple tombstones
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+ Arrays.asList("v4", "v3", "v2", "v1"),
+ Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+ Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+ // return values for the query with time range (MIN, MAX)
+ verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ANY,
+ Arrays.asList("v4", "v3", "v2", "v1"),
+ Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+ Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+ // return the latest record (retrieve only from the latestValueStore)
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, SEGMENT_INTERVAL, ResultOrder.ANY,
+ Collections.singletonList("v4"),
+ Collections.singletonList(SEGMENT_INTERVAL - 5),
+ Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+ // return one value for the query with time fromTimeStamp = toTimestamp
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+ Collections.singletonList("v4"),
+ Collections.singletonList(SEGMENT_INTERVAL - 5),
+ Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+ // return one values for the query with time fromTimeStamp = toTimestamp
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 4, SEGMENT_INTERVAL - 4, ResultOrder.ANY,
+ Collections.singletonList("v4"),
+ Collections.singletonList(SEGMENT_INTERVAL - 5),
+ Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+
+ // return values before insertion of any tombstone
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 31, SEGMENT_INTERVAL - 21, ResultOrder.ANY,
+ Arrays.asList("v2", "v1"),
+ Arrays.asList(SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+ Arrays.asList(SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+ // return values for the query with time range that covers both tombstones
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 24, SEGMENT_INTERVAL - 11, ResultOrder.ANY,
+ Collections.singletonList("v2"),
+ Collections.singletonList(SEGMENT_INTERVAL - 25),
+ Collections.singletonList(SEGMENT_INTERVAL - 20));
+
+ // return values for the query with time range that after insertion of tombstones
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 11, SEGMENT_INTERVAL - 4, ResultOrder.ANY,
+ Arrays.asList("v4", "v3"),
+ Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10),
+ Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5));
+
+ // return all the records that are valid during the query time range but inserted beforehand
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 26, SEGMENT_INTERVAL - 5, ResultOrder.ANY,
+ Arrays.asList("v4", "v3", "v2", "v1"),
+ Arrays.asList(SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 30),
+ Arrays.asList(PUT_RETURN_CODE_VALID_TO_UNDEFINED, SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 20, SEGMENT_INTERVAL - 25));
+
+ // return the valid record that has been inserted at the end of query time range (validFrom = query upper time bound)
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 15, SEGMENT_INTERVAL - 10, ResultOrder.ANY,
+ Collections.singletonList("v3"),
+ Collections.singletonList(SEGMENT_INTERVAL - 10),
+ Collections.singletonList(SEGMENT_INTERVAL - 5));
+
+ // return null in the time range where no value is valid
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 19, SEGMENT_INTERVAL - 16);
+
+
+
+ // insert data to create non-empty (third) segment
+ putToStore("k", "v5", 3 * SEGMENT_INTERVAL - 30, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", null, 3 * SEGMENT_INTERVAL - 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // presence of non-empty earlier segment does not affect results of getting from later segment
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 40, SEGMENT_INTERVAL - 35);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 26, ResultOrder.ANY,
+ Collections.singletonList("v1"),
+ Collections.singletonList(SEGMENT_INTERVAL - 30),
+ Collections.singletonList(SEGMENT_INTERVAL - 25));
+ verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 19, SEGMENT_INTERVAL - 16);
+ }
+
+ @Test
+ public void shouldGetRecordVersionsInAscendingOrder() {
+
+ // insert data to create non-empty (first) segment
+ putToStore("k", "v1", SEGMENT_INTERVAL - 30, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v2", SEGMENT_INTERVAL - 25, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v3", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v4", SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // return values in ascending order
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 5, ResultOrder.ASCENDING,
+ Arrays.asList("v1", "v2", "v3", "v4"),
+ Arrays.asList(SEGMENT_INTERVAL - 30, SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 5),
+ Arrays.asList(SEGMENT_INTERVAL - 25, SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+ }
+
+ @Test
+ public void shouldGetRecordVersionsFromMultipleOldSegmentsInAscendingOrder() {
+
+ // insert data to create two non-empty segments
+ putToStore("k", "v1", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v2", SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ putToStore("k", "v3", 2 * SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v4", 2 * SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // return values from two old segments + latestValueStore in ascending order
+ verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ASCENDING,
+ Arrays.asList("v1", "v2", "v3", "v4"),
+ Arrays.asList(SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 5, 2 * SEGMENT_INTERVAL - 10, 2 * SEGMENT_INTERVAL - 5),
+ Arrays.asList(SEGMENT_INTERVAL - 5, 2 * SEGMENT_INTERVAL - 10, 2 * SEGMENT_INTERVAL - 5, PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+ }
+
+ @Test
+ public void shouldNotGetExpiredRecordVersions() {
+ putToStore("k", "v_old", 0, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("k", "v", SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // old record has not yet expired
+ verifyTimestampedGetValueFromStore("k", 0, SEGMENT_INTERVAL - 11, ResultOrder.ANY,
+ Collections.singletonList("v_old"),
+ Collections.singletonList(0L),
+ Collections.singletonList(SEGMENT_INTERVAL - 10));
+
+ putToStore("ko", "vo", HISTORY_RETENTION + SEGMENT_INTERVAL - 11, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // old record still has not yet expired
+ verifyTimestampedGetValueFromStore("k", 0, SEGMENT_INTERVAL - 11, ResultOrder.ANY,
+ Collections.singletonList("v_old"),
+ Collections.singletonList(0L),
+ Collections.singletonList(SEGMENT_INTERVAL - 10));
+
+
+ putToStore("ko", "vo2", HISTORY_RETENTION + SEGMENT_INTERVAL - 10, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // old record is expired now
+ verifyTimestampedGetNullFromStore("k", 0, SEGMENT_INTERVAL - 11);
+ }
+
+ @Test
+ public void shouldGetExpiredIfLatestVersionValue() {
+ putToStore("k", "v", 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("ko", "vo_old", 1, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ putToStore("ko", "vo_new", HISTORY_RETENTION + 12, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+
+ // expired get on key where latest satisfies timestamp bound still returns data
+ verifyTimestampedGetValueFromStore("k", 0, 10, ResultOrder.ANY,
+ Collections.singletonList("v"),
+ Collections.singletonList(1L),
+ Collections.singletonList(PUT_RETURN_CODE_VALID_TO_UNDEFINED));
+ // same expired get on key where latest value does not satisfy timestamp bound does not return data
+ verifyTimestampedGetNullFromStore("ko", 0, 10);
+ }
+
@Test
public void shouldRestore() {
final List records = new ArrayList<>();
@@ -493,13 +671,13 @@ public void shouldRestore() {
store.restoreBatch(getChangelogRecords(records));
verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10, SEGMENT_INTERVAL + 20);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1, SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1, SEGMENT_INTERVAL + 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 2);
}
@Test
@@ -519,12 +697,12 @@ public void shouldRestoreWithNulls() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
@@ -553,12 +731,12 @@ public void shouldRestoreWithNullsAndRepeatTimestamps() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5, SEGMENT_INTERVAL + 10);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5, SEGMENT_INTERVAL - 1);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6, SEGMENT_INTERVAL - 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
}
@@ -579,12 +757,12 @@ public void shouldRestoreMultipleBatches() {
verifyGetNullFromStore("k");
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10, SEGMENT_INTERVAL + 20);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 5);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
- verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10);
+ verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10, SEGMENT_INTERVAL - 1);
verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
}
@@ -626,18 +804,18 @@ public void shouldAllowZeroHistoryRetention() {
// put and get
putToStore("k", "v", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED); // query in "future" is allowed
// update existing record at same timestamp
putToStore("k", "updated", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// put new record version
putToStore("k", "v2", BASE_TIMESTAMP + 2, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
- verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+ verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2, PUT_RETURN_CODE_VALID_TO_UNDEFINED);
// query in past (history retention expired) returns null
verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
@@ -679,10 +857,20 @@ private VersionedRecord getFromStore(final String key, final long asOfTi
return deserializedRecord(versionedRecord);
}
+ private List> getFromStore(final String key, final long fromTime, final long toTime, final ResultOrder order) {
+ final VersionedRecordIterator resultRecords = store.get(new Bytes(STRING_SERIALIZER.serialize(null, key)), fromTime, toTime, order);
+ final List> versionedRecordsList = new ArrayList<>();
+ while (resultRecords.hasNext()) {
+ versionedRecordsList.add(deserializedRecord(resultRecords.next()));
+ }
+ return versionedRecordsList;
+ }
+
private void verifyGetValueFromStore(final String key, final String expectedValue, final long expectedTimestamp) {
final VersionedRecord latest = getFromStore(key);
assertThat(latest.value(), equalTo(expectedValue));
assertThat(latest.timestamp(), equalTo(expectedTimestamp));
+ assertThat(latest.validTo().isPresent(), equalTo(false));
}
private void verifyGetNullFromStore(final String key) {
@@ -690,10 +878,36 @@ private void verifyGetNullFromStore(final String key) {
assertThat(record, nullValue());
}
- private void verifyTimestampedGetValueFromStore(final String key, final long timestamp, final String expectedValue, final long expectedTimestamp) {
+ private void verifyTimestampedGetValueFromStore(final String key, final long timestamp, final String expectedValue, final long expectedTimestamp, final long expectedValidTo) {
final VersionedRecord latest = getFromStore(key, timestamp);
assertThat(latest.value(), equalTo(expectedValue));
assertThat(latest.timestamp(), equalTo(expectedTimestamp));
+ if (expectedValidTo == PUT_RETURN_CODE_VALID_TO_UNDEFINED) {
+ assertThat(latest.validTo().isPresent(), equalTo(false));
+ } else {
+ assertThat(latest.validTo().get(), equalTo(expectedValidTo));
+ }
+ }
+
+ private void verifyTimestampedGetValueFromStore(final String key,
+ final long fromTime,
+ final long toTime,
+ final ResultOrder order,
+ final List expectedValues,
+ final List expectedTimestamps,
+ final List expectedValidTos) {
+ final List> results = getFromStore(key, fromTime, toTime, order);
+ assertThat(results.size(), equalTo(expectedValues.size()));
+ for (int i = 0; i < results.size(); i++) {
+ final VersionedRecord record = results.get(i);
+ assertThat(record.value(), equalTo(expectedValues.get(i)));
+ assertThat(record.timestamp(), equalTo(expectedTimestamps.get(i)));
+ if (expectedValidTos.get(i) == PUT_RETURN_CODE_VALID_TO_UNDEFINED) {
+ assertThat(record.validTo().isPresent(), equalTo(false));
+ } else {
+ assertThat(record.validTo().get(), equalTo(expectedValidTos.get(i)));
+ }
+ }
}
private void verifyTimestampedGetNullFromStore(final String key, final long timestamp) {
@@ -701,6 +915,11 @@ private void verifyTimestampedGetNullFromStore(final String key, final long time
assertThat(record, nullValue());
}
+ private void verifyTimestampedGetNullFromStore(final String key, final long fromTime, final long toTime) {
+ final List> results = getFromStore(key, fromTime, toTime, ResultOrder.ANY);
+ assertThat(results.size(), equalTo(0));
+ }
+
private void verifyExpiredRecordSensor(final int expectedValue) {
final Metric metric = context.metrics().metrics().get(
new MetricName(DROPPED_RECORDS_METRIC, TASK_LEVEL_GROUP, "", expectedMetricsTags)
@@ -711,9 +930,12 @@ private void verifyExpiredRecordSensor(final int expectedValue) {
private static VersionedRecord deserializedRecord(final VersionedRecord versionedRecord) {
return versionedRecord == null
? null
- : new VersionedRecord<>(
- STRING_DESERIALIZER.deserialize(null, versionedRecord.value()),
- versionedRecord.timestamp());
+ : versionedRecord.validTo().isPresent()
+ ? new VersionedRecord<>(STRING_DESERIALIZER.deserialize(null, versionedRecord.value()),
+ versionedRecord.timestamp(),
+ versionedRecord.validTo().get())
+ : new VersionedRecord<>(STRING_DESERIALIZER.deserialize(null, versionedRecord.value()),
+ versionedRecord.timestamp());
}
private static List> getChangelogRecords(final List data) {