Skip to content

Commit

Permalink
Support blind writes for Consensus Commit
Browse files Browse the repository at this point in the history
  • Loading branch information
brfrn169 committed Oct 23, 2023
1 parent b7f3659 commit 077335e
Show file tree
Hide file tree
Showing 37 changed files with 1,547 additions and 628 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,8 @@ public void abort_forOngoingTransaction_ShouldAbortCorrectly() {}
@Disabled("JDBC transaction doesn't support rollback()")
@Override
public void rollback_forOngoingTransaction_ShouldRollbackCorrectly() {}

@Disabled
@Override
public void putAndCommit_BlindPutGivenForExisting_ShouldThrowCommitConflictException() {}
}
17 changes: 17 additions & 0 deletions core/src/main/java/com/scalar/db/api/OperationBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,23 @@ interface ClearValues<T> {
T clearValue(String columnName);
}

interface Blind<T> {
/**
* Sets this put operation as a blind write.
*
* @return the operation builder
*/
T blind();

/**
* Sets whether this put operation is a blind write or not.
*
* @param blind {@code true} if this put operation is a blind write, {@code false} otherwise
* @return the operation builder
*/
T blind(boolean blind);
}

interface Limit<T> {
/**
* Sets the specified number of results to be returned
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/java/com/scalar/db/api/Put.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class Put extends Mutation {

private final Map<String, Column<?>> columns;

private boolean blind;

/**
* Constructs a {@code Put} with the specified partition {@link Key}.
*
Expand Down Expand Up @@ -80,6 +82,7 @@ public Put(Key partitionKey, Key clusteringKey) {
public Put(Put put) {
super(put);
columns = new LinkedHashMap<>(put.columns);
blind = put.blind;
}

/**
Expand Down Expand Up @@ -705,6 +708,7 @@ private void checkIfExists(String name) {
throw new IllegalArgumentException(name + " doesn't exist");
}
}

/**
* @deprecated As of release 3.6.0. Will be removed in release 5.0.0. Use the setter method of the
* Put builder instead; to create a Put builder, use {@link Put#newBuilder()}
Expand Down Expand Up @@ -750,6 +754,25 @@ public Put withCondition(MutationCondition condition) {
return (Put) super.withCondition(condition);
}

/**
* Returns whether this Put is a blind write.
*
* @return whether this Put is a blind write
*/
public boolean isBlind() {
return blind;
}

/**
* Sets whether this Put is a blind write.
*
* @param blind whether this Put is a blind write
*/
Put setBlind(boolean blind) {
this.blind = blind;
return this;
}

/**
* Indicates whether some other object is "equal to" this object. The other object is considered
* equal if:
Expand All @@ -775,12 +798,12 @@ public boolean equals(Object o) {
return false;
}
Put other = (Put) o;
return columns.equals(other.columns);
return columns.equals(other.columns) && blind == other.blind;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), columns);
return Objects.hash(super.hashCode(), columns, blind);
}

@Override
Expand All @@ -793,6 +816,7 @@ public String toString() {
.add("columns", getColumns())
.add("consistency", getConsistency())
.add("condition", getCondition())
.add("blind", isBlind())
.toString();
}
}
31 changes: 30 additions & 1 deletion core/src/main/java/com/scalar/db/api/PutBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.scalar.db.api.OperationBuilder.Blind;
import com.scalar.db.api.OperationBuilder.ClearClusteringKey;
import com.scalar.db.api.OperationBuilder.ClearCondition;
import com.scalar.db.api.OperationBuilder.ClearNamespace;
Expand Down Expand Up @@ -76,11 +77,13 @@ public static class Buildable extends OperationBuilder.Buildable<Put>
implements ClusteringKey<Buildable>,
Consistency<Buildable>,
Condition<Buildable>,
Values<Buildable> {
Values<Buildable>,
Blind<Buildable> {
final Map<String, Column<?>> columns = new LinkedHashMap<>();
@Nullable Key clusteringKey;
@Nullable com.scalar.db.api.Consistency consistency;
@Nullable MutationCondition condition;
boolean blind;

private Buildable(@Nullable String namespace, String table, Key partitionKey) {
super(namespace, table, partitionKey);
Expand Down Expand Up @@ -199,6 +202,18 @@ public Buildable value(Column<?> column) {
return this;
}

@Override
public Buildable blind() {
blind = true;
return this;
}

@Override
public Buildable blind(boolean blind) {
this.blind = blind;
return this;
}

@Override
public Put build() {
Put put = new Put(partitionKey, clusteringKey);
Expand All @@ -210,6 +225,7 @@ public Put build() {
if (condition != null) {
put.withCondition(condition);
}
put.setBlind(blind);

return put;
}
Expand Down Expand Up @@ -237,6 +253,7 @@ public static class BuildableFromExisting extends Buildable
this.columns.putAll(put.getColumns());
this.consistency = put.getConsistency();
this.condition = put.getCondition().orElse(null);
this.blind = put.isBlind();
}

@Override
Expand Down Expand Up @@ -391,5 +408,17 @@ public BuildableFromExisting clearNamespace() {
this.namespaceName = null;
return this;
}

@Override
public BuildableFromExisting blind(boolean blind) {
super.blind(blind);
return this;
}

@Override
public BuildableFromExisting blind() {
super.blind();
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import com.scalar.db.api.Selection;
import com.scalar.db.common.AbstractDistributedTransaction;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.exception.transaction.CommitConflictException;
import com.scalar.db.exception.transaction.CommitException;
import com.scalar.db.exception.transaction.CrudConflictException;
import com.scalar.db.exception.transaction.CrudException;
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -94,7 +96,7 @@ public void put(Put put) throws CrudException {

@Override
public void put(List<Put> puts) throws CrudException {
checkArgument(puts.size() != 0);
checkArgument(!puts.isEmpty());
for (Put p : puts) {
put(p);
}
Expand All @@ -109,15 +111,15 @@ public void delete(Delete delete) throws CrudException {

@Override
public void delete(List<Delete> deletes) throws CrudException {
checkArgument(deletes.size() != 0);
checkArgument(!deletes.isEmpty());
for (Delete d : deletes) {
delete(d);
}
}

@Override
public void mutate(List<? extends Mutation> mutations) throws CrudException {
checkArgument(mutations.size() != 0);
checkArgument(!mutations.isEmpty());
for (Mutation m : mutations) {
if (m instanceof Put) {
put((Put) m);
Expand All @@ -129,6 +131,19 @@ public void mutate(List<? extends Mutation> mutations) throws CrudException {

@Override
public void commit() throws CommitException, UnknownTransactionStatusException {
// Fill the read set with records from the write and delete sets if they are unread
try {
crud.fillReadSetForRecordsFromWriteAndDeleteSetsIfUnread();
} catch (CrudConflictException e) {
throw new CommitConflictException(
"Conflict occurred while reading unread records in the write and delete sets",
e,
getId());
} catch (CrudException e) {
throw new CommitException(
"Failed to read unread records in the write and delete sets", e, getId());
}

commit.commit(crud.getSnapshot());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class ConsensusCommitConfig {
public static final String ASYNC_COMMIT_ENABLED = PREFIX + "async_commit.enabled";
public static final String ASYNC_ROLLBACK_ENABLED = PREFIX + "async_rollback.enabled";

public static final String PARALLEL_FILL_READ_SET_ENABLED =
PREFIX + "parallel_fill_read_set.enabled";

public static final int DEFAULT_PARALLEL_EXECUTOR_COUNT = 128;

public static final String INCLUDE_METADATA_ENABLED = PREFIX + "include_metadata.enabled";
Expand All @@ -48,6 +51,8 @@ public class ConsensusCommitConfig {

private final boolean isIncludeMetadataEnabled;

private final boolean parallelFillReadSetEnabled;

public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
String transactionManager = databaseConfig.getTransactionManager();
if (!"consensus-commit".equals(transactionManager)) {
Expand Down Expand Up @@ -102,10 +107,16 @@ public ConsensusCommitConfig(DatabaseConfig databaseConfig) {
databaseConfig.getProperties(), PARALLEL_ROLLBACK_ENABLED, parallelCommitEnabled);

asyncCommitEnabled = getBoolean(databaseConfig.getProperties(), ASYNC_COMMIT_ENABLED, false);

// Use the value of async commit for async rollback as default value
asyncRollbackEnabled =
getBoolean(databaseConfig.getProperties(), ASYNC_ROLLBACK_ENABLED, asyncCommitEnabled);

isIncludeMetadataEnabled =
getBoolean(databaseConfig.getProperties(), INCLUDE_METADATA_ENABLED, false);

parallelFillReadSetEnabled =
getBoolean(databaseConfig.getProperties(), PARALLEL_FILL_READ_SET_ENABLED, true);
}

public Isolation getIsolation() {
Expand Down Expand Up @@ -151,4 +162,8 @@ public boolean isAsyncRollbackEnabled() {
public boolean isIncludeMetadataEnabled() {
return isIncludeMetadataEnabled;
}

public boolean isParallelFillReadSetEnabled() {
return parallelFillReadSetEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ DistributedTransaction begin(String txId, Isolation isolation, SerializableStrat
Snapshot snapshot =
new Snapshot(txId, isolation, strategy, tableMetadataManager, parallelExecutor);
CrudHandler crud =
new CrudHandler(storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled);
new CrudHandler(
storage, snapshot, tableMetadataManager, isIncludeMetadataEnabled, parallelExecutor);
ConsensusCommit consensus =
new ConsensusCommit(crud, commit, recovery, mutationOperationChecker);
getNamespace().ifPresent(consensus::withNamespace);
Expand Down
Loading

0 comments on commit 077335e

Please sign in to comment.