Skip to content

Commit

Permalink
KAFKA-15347: add support for 'single key multi timestamp' IQs with ve…
Browse files Browse the repository at this point in the history
…rsioned state stores (KIP-968) (apache#14626)

Implements KIP-968.

Add new query type MultiVersionedKeyQuery for IQv2 supported by versioned state stores.
  • Loading branch information
aliehsaeedii authored Dec 6, 2023
1 parent 71b4cba commit 9658942
Show file tree
Hide file tree
Showing 19 changed files with 1,354 additions and 230 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/>

<suppress checks="MethodLength"
files="KTableImpl.java"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.query;

import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.streams.state.VersionedRecordIterator;

/**
* Interactive query for retrieving a set of records with the same specified key and different timestamps within the specified time range.
* No ordering is guaranteed for the results, but the results can be sorted by timestamp (in ascending or descending order) by calling the corresponding defined methods.
*
* @param <K> The type of the key.
* @param <V> The type of the result returned by this query.
*/
@Evolving
public final class MultiVersionedKeyQuery<K, V> implements Query<VersionedRecordIterator<V>> {

private final K key;
private final Optional<Instant> fromTime;
private final Optional<Instant> toTime;
private final ResultOrder order;

private MultiVersionedKeyQuery(final K key, final Optional<Instant> fromTime, final Optional<Instant> toTime, final ResultOrder order) {
this.key = key;
this.fromTime = fromTime;
this.toTime = toTime;
this.order = order;
}

/**
* Creates a query that will retrieve the set of records identified by {@code key} if any exists
* (or {@code null} otherwise).
*
* <p>
* While the query by default returns the all the record versions of the specified {@code key}, setting
* the {@code fromTimestamp} (by calling the {@link #fromTime(Instant)} method), and the {@code toTimestamp}
* (by calling the {@link #toTime(Instant)} method) makes the query to return the record versions associated
* to the specified time range.
*
* @param key The specified key by the query
* @param <K> The type of the key
* @param <V> The type of the value that will be retrieved
* @throws NullPointerException if {@code key} is null
*/
public static <K, V> MultiVersionedKeyQuery<K, V> withKey(final K key) {
Objects.requireNonNull(key, "key cannot be null.");
return new MultiVersionedKeyQuery<>(key, Optional.empty(), Optional.empty(), ResultOrder.ANY);
}

/**
* Specifies the starting time point for the key query.
* <p>
* The key query returns all the records that are still existing in the time range starting from the timestamp {@code fromTime}. There can
* be records which have been inserted before the {@code fromTime} and are still valid in the query specified time range (the whole time range
* or even partially). The key query in fact returns all the records that have NOT become tombstone at or after {@code fromTime}.
*
* @param fromTime The starting time point
* If {@code fromTime} is null, it will be considered as negative infinity, ie, no lower bound
*/
public MultiVersionedKeyQuery<K, V> fromTime(final Instant fromTime) {
return new MultiVersionedKeyQuery<>(key, Optional.ofNullable(fromTime), toTime, order);
}

/**
* Specifies the ending time point for the key query.
* The key query returns all the records that have timestamp &lt;= toTime.
*
* @param toTime The ending time point
* If @param toTime is null, will be considered as positive infinity, ie, no upper bound
*/
public MultiVersionedKeyQuery<K, V> toTime(final Instant toTime) {
return new MultiVersionedKeyQuery<>(key, fromTime, Optional.ofNullable(toTime), order);
}

/**
* Specifies the order of the returned records by the query as descending by timestamp.
*/
public MultiVersionedKeyQuery<K, V> withDescendingTimestamps() {
return new MultiVersionedKeyQuery<>(key, fromTime, toTime, ResultOrder.DESCENDING);
}

/**
* Specifies the order of the returned records by the query as ascending by timestamp.
*/
public MultiVersionedKeyQuery<K, V> withAscendingTimestamps() {
return new MultiVersionedKeyQuery<>(key, fromTime, toTime, ResultOrder.ASCENDING);
}

/**
* The key that was specified for this query.
* @return The specified {@code key} of the query.
*/
public K key() {
return key;
}

/**
* The starting time point of the query, if specified
* @return The specified {@code fromTime} of the query.
*/
public Optional<Instant> fromTime() {
return fromTime;
}

/**
* The ending time point of the query, if specified
* @return The specified {@code toTime} of the query.
*/
public Optional<Instant> toTime() {
return toTime;
}

/**
* The order of the returned records by timestamp.
* @return the order of returned records based on timestamp (can be unordered, or in ascending, or in descending order of timestamps).
*/
public ResultOrder resultOrder() {
return order;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.streams.query;

public enum ResultOrder {
ANY,
ASCENDING,
DESCENDING
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state;

import java.util.Objects;
import java.util.Optional;

/**
* Combines a value (from a key-value record) with a timestamp, for use as the return type
Expand All @@ -27,18 +28,34 @@
public final class VersionedRecord<V> {
private final V value;
private final long timestamp;
private final Optional<Long> validTo;

/**
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
*
* @param value the value
* @param timestamp the timestamp
* @param value The value
* @param timestamp The type of the result returned by this query.
*/
public VersionedRecord(final V value, final long timestamp) {
this.value = Objects.requireNonNull(value, "value cannot be null.");
this.timestamp = timestamp;
this.validTo = Optional.empty();
}

/**
* Create a new {@link VersionedRecord} instance. {@code value} cannot be {@code null}.
*
* @param value The value
* @param timestamp The timestamp
* @param validTo The exclusive upper bound of the validity interval
*/
public VersionedRecord(final V value, final long timestamp, final long validTo) {
this.value = Objects.requireNonNull(value);
this.timestamp = timestamp;
this.validTo = Optional.of(validTo);
}


public V value() {
return value;
}
Expand All @@ -47,9 +64,13 @@ public long timestamp() {
return timestamp;
}

public Optional<Long> validTo() {
return validTo;
}

@Override
public String toString() {
return "<" + value + "," + timestamp + ">";
return "<" + value + "," + timestamp + "," + validTo + ">";
}

@Override
Expand All @@ -61,12 +82,12 @@ public boolean equals(final Object o) {
return false;
}
final VersionedRecord<?> that = (VersionedRecord<?>) o;
return timestamp == that.timestamp &&
return timestamp == that.timestamp && validTo == that.validTo &&
Objects.equals(value, that.value);
}

@Override
public int hashCode() {
return Objects.hash(value, timestamp);
return Objects.hash(value, timestamp, validTo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state;

import java.io.Closeable;
import java.util.Iterator;


/**
* Iterator interface of {@link VersionedRecord VersionedRecord<V>}.
* <p>
* Users must call its {@code close} method explicitly upon completeness to release resources,
* or use try-with-resources statement (available since JDK7) for this {@link Closeable} class.
* Note that {@code remove()} is not supported.
*
* @param <V> Type of values
*/
public interface VersionedRecordIterator<V> extends Iterator<VersionedRecord<V>>, Closeable {

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -34,7 +35,9 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -170,7 +173,30 @@ public boolean isOpen() {

@Override
public synchronized byte[] get(final Bytes key) {
return physicalStore.get(prefixKeyFormatter.addPrefix(key));
return get(key, Optional.empty());
}

public synchronized byte[] get(final Bytes key, final Snapshot snapshot) {
return get(key, Optional.of(snapshot));
}

private synchronized byte[] get(final Bytes key, final Optional<Snapshot> snapshot) {
if (snapshot.isPresent()) {
try (ReadOptions readOptions = new ReadOptions()) {
readOptions.setSnapshot(snapshot.get());
return physicalStore.get(prefixKeyFormatter.addPrefix(key), readOptions);
}
} else {
return physicalStore.get(prefixKeyFormatter.addPrefix(key));
}
}

public Snapshot getSnapshot() {
return physicalStore.getSnapshot();
}

public void releaseSnapshot(final Snapshot snapshot) {
physicalStore.releaseSnapshot(snapshot);
}

@Override
Expand Down
Loading

0 comments on commit 9658942

Please sign in to comment.