Skip to content

Commit

Permalink
Merge branch 'develop' into feature/GH-510-eable-async-data-reads
Browse files Browse the repository at this point in the history
  • Loading branch information
jtnelson authored Feb 20, 2025
2 parents c3681d2 + 125d33a commit 20a87a3
Show file tree
Hide file tree
Showing 4 changed files with 420 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ We made several changes to improve the safety, scalability and operational effic
* Reduced the amount of heap space required for essential storage metadata.
* **Efficient Metadata:** Added the `enable_efficient_metadata` configuration option to further reduce the amount of heap space required for essential storage metadata. When this option is set to `true`, metadata will occupy approximately one-third less heap space and likely improve overall system performance due to a decrease in garbage collection pauses (although per-operation performance may be slightly affected by additional overhead).
* **Asynchronous Data Reads:** Added the `enable_async_data_reads` configuration option to allow Concourse Server to *potentially* use multiple threads to read data from disk. When data records are either no longer cached or not eligible to ever be cached (due to space limitations), Concourse Server streams the relevant information from disk on-demand. By default, this is a synchronous process and the performance is linear based on the number of Segment files in the database. With this new configuration option, Concourse Server can now stream the data using multiple threads. Even under high contention, the read performance should be no worse than the default synchronous performance, but there may be additional overhead that reduces peak performance on a per-operation basis.
* Improved write performance of the `set` method in large transactions by creating normalized views of existing data, which are consulted during the method’s implicit `select` read operation.
* Improved the performance of the `verifyOrSet` method by removing redundant internal verification that occurred while finalizing the write.

##### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ public final class Convert {
OPERATOR_STRINGS.put("~", Operator.CONTAINS);
OPERATOR_STRINGS.put("search", Operator.CONTAINS);
OPERATOR_STRINGS.put("search_match", Operator.CONTAINS);
OPERATOR_STRINGS.put("contains", Operator.CONTAINS);
OPERATOR_STRINGS.put("!~", Operator.NOT_CONTAINS);
OPERATOR_STRINGS.put("search_exclude", Operator.NOT_CONTAINS);
OPERATOR_STRINGS.put("not_contains", Operator.NOT_CONTAINS);
OPERATOR_STRINGS.put("ncontains", Operator.NOT_CONTAINS);
for (Operator operator : Operator.values()) {
OPERATOR_STRINGS.put(operator.name(), operator);
OPERATOR_STRINGS.put(operator.symbol(), operator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;

import javax.annotation.Nullable;
Expand All @@ -26,7 +29,10 @@
import com.cinchapi.concourse.server.storage.Action;
import com.cinchapi.concourse.server.storage.DurableStore;
import com.cinchapi.concourse.server.storage.cache.BloomFilter;
import com.cinchapi.concourse.server.storage.view.Table;
import com.cinchapi.concourse.thrift.TObject;
import com.cinchapi.concourse.thrift.Type;
import com.cinchapi.concourse.time.Time;
import com.cinchapi.concourse.util.EagerProducer;

/**
Expand Down Expand Up @@ -68,6 +74,16 @@ public class Queue extends Limbo {
*/
private BloomFilter filter = null;

/**
* A cache of the presently contained data that is used to speed up reads.
* <p>
* Since there is overhead to maintaining this data during write operations,
* it is only utilized if the number of {@link Write Writes} in this store
* exceeds {@link #BLOOM_FILTER_CREATION_THRESHOLD}.
* </p>
*/
private Table table = null;

/**
* A cache of the {@link #getOldestWriteTimstamp() timestamp} for the oldest
* write in the Queue. This value is not expected to change often, so it
Expand Down Expand Up @@ -100,6 +116,17 @@ protected Queue(List<Write> writes) {
this.writes = writes;
}

@Override
public Set<String> describe(long record, long timestamp,
Map<String, Set<TObject>> context) {
if(context.isEmpty() && timestamp == Time.NONE && isReadOptimized()) {
return table().select(record).keySet();
}
else {
return super.describe(record, timestamp, context);
}
}

/**
* Return an unmodifiable copy of the writes contained in the Queue.
*
Expand All @@ -116,13 +143,8 @@ public boolean insert(Write write, boolean sync) {
filter.putCached(write.getKey(), write.getValue(),
write.getRecord());
}
else if(writes.size() > BLOOM_FILTER_CREATION_THRESHOLD) {
filter = BLOOM_FILTER_PRODUCER.consume();
for (int i = 0; i < writes.size(); ++i) {
Write stored = writes.get(i);
filter.put(stored.getKey(), stored.getValue(),
stored.getRecord());
}
if(table != null) {
table.put(write);
}
return true;
}
Expand All @@ -132,6 +154,31 @@ public Iterator<Write> iterator() {
return writes.iterator();
}

@Override
public Map<String, Set<TObject>> select(long record, long timestamp,
Map<String, Set<TObject>> context) {
if(context.isEmpty() && timestamp == Time.NONE && isReadOptimized()) {
Map<String, Set<TObject>> data = new TreeMap<>(
(s1, s2) -> s1.compareToIgnoreCase(s2));
data.putAll(table().select(record));
return data;
}
else {
return super.select(record, timestamp, context);
}
}

@Override
public Set<TObject> select(String key, long record, long timestamp,
Set<TObject> context) {
if(context.isEmpty() && timestamp == Time.NONE && isReadOptimized()) {
return table().lookup(record, key);
}
else {
return super.select(key, record, timestamp, context);
}
}

/**
* Return the number of writes in this Queue.
*
Expand Down Expand Up @@ -177,9 +224,9 @@ public void transport(DurableStore destination, boolean sync) {

@Override
public boolean verify(Write write, long timestamp) {
if(filter == null
|| (filter != null && filter.mightContainCached(write.getKey(),
write.getValue(), write.getRecord()))) {
if(!isReadOptimized()
|| (isReadOptimized() && filter().mightContainCached(
write.getKey(), write.getValue(), write.getRecord()))) {
return super.verify(write, timestamp);
}
else {
Expand All @@ -190,9 +237,9 @@ public boolean verify(Write write, long timestamp) {
@Override
@Nullable
protected Action getLastWriteAction(Write write, long timestamp) {
if(filter == null
|| (filter != null && filter.mightContainCached(write.getKey(),
write.getValue(), write.getRecord()))) {
if(!isReadOptimized()
|| (isReadOptimized() && filter().mightContainCached(
write.getKey(), write.getValue(), write.getRecord()))) {
return super.getLastWriteAction(write, timestamp);
}
else {
Expand Down Expand Up @@ -229,4 +276,83 @@ protected boolean isPossibleSearchMatch(String key, Write write,
&& value.getType() == Type.STRING;
}

/**
* Returns the {@link #filter} if this {@link Queue} is
* {@link #isReadOptimized() read optimized}, creating it on demand if
* necessary.
* <p>
* This method should <b>only</b> be used for read operations that rely on
* the {@link #filter}. It ensures that the table is generated and populated
* only when needed, preventing unnecessary memory consumption.
* </p>
* <p>
* <b>Important:</b> Do not use this method for write operations. Instead,
* access {@link #filter} directly and check for {@code null} before
* writing.
* </p>
*
* @return the {@link #filter} if this {@link Queue} is read optimized;
* otherwise, returns {@code null}.
*/
@Nullable
private BloomFilter filter() {
if(isReadOptimized()) {
if(filter == null) {
filter = BLOOM_FILTER_PRODUCER.consume();
for (Write write : writes) {
filter.putCached(write.getKey(), write.getValue(),
write.getRecord());
}
}
return filter;
}
else {
return null;
}
}

/**
* Return a boolean that indicates whether this {@link Queue} optimizes
* reads by using internal structures that attempt to reduce the need for
* log replay.
*
* @return {@code true} if this {@link Queue} is read optimized
*/
private boolean isReadOptimized() {
return writes.size() > BLOOM_FILTER_CREATION_THRESHOLD;
}

/**
* Returns the {@link #table} if this {@link Queue} is
* {@link #isReadOptimized() read optimized}, creating it on demand if
* necessary.
* <p>
* This method should <b>only</b> be used for read operations that rely on
* the {@link #table}. It ensures that the table is generated and populated
* only when needed, preventing unnecessary memory consumption.
* </p>
* <p>
* <b>Important:</b> Do not use this method for write operations. Instead,
* access {@link #table} directly and check for {@code null} before writing.
* </p>
*
* @return the {@link #table} if this {@link Queue} is read optimized;
* otherwise, returns {@code null}.
*/
@Nullable
private Table table() {
if(isReadOptimized()) {
if(table == null) {
table = new Table();
for (Write write : writes) {
table.put(write);
}
}
return table;
}
else {
return null;
}
}

}
Loading

0 comments on commit 20a87a3

Please sign in to comment.