Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-29148: BufferedMutator should be able to flush after buffering a certain number of mutations #6718

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
*/
long getWriteBufferSize();

/**
* The maximum number of mutations that this buffered mutator will buffer before flushing them
*/
int getMaxMutations();

/**
* Returns the periodical flush interval, 0 means disabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);

/**
* Set the maximum number of mutations that this buffered mutator will buffer before flushing
* them. If you are talking to a cluster that uses hbase.rpc.rows.size.threshold.reject to reject
* large Multi requests, you may need this setting to avoid rejections. Default is no limit.
*/
AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations);

/**
* Create the {@link AsyncBufferedMutator} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder {

private int maxKeyValueSize;

private int maxMutations;

public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf,
AsyncTableBuilder<?> tableBuilder, HashedWheelTimer periodicalFlushTimer) {
this.tableBuilder = tableBuilder;
this.writeBufferSize = connConf.getWriteBufferSize();
this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs();
this.maxKeyValueSize = connConf.getMaxKeyValueSize();
this.maxMutations = connConf.getBufferedMutatorMaxMutations();
this.periodicalFlushTimer = periodicalFlushTimer;
}

Expand Down Expand Up @@ -115,9 +118,16 @@ public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) {
Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be > 0", maxMutations);
this.maxMutations = maxMutations;
return this;
}

@Override
public AsyncBufferedMutator build() {
return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize,
periodicFlushTimeoutNs, maxKeyValueSize);
periodicFlushTimeoutNs, maxKeyValueSize, maxMutations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
Expand All @@ -42,6 +44,8 @@
@InterfaceAudience.Private
class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {

private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class);

private final HashedWheelTimer periodicalFlushTimer;

private final AsyncTable<?> table;
Expand All @@ -52,6 +56,8 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {

private final int maxKeyValueSize;

private final int maxMutations;

private List<Mutation> mutations = new ArrayList<>();

private List<CompletableFuture<Void>> futures = new ArrayList<>();
Expand All @@ -63,12 +69,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator {
Timeout periodicFlushTask;

AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) {
this.periodicalFlushTimer = periodicalFlushTimer;
this.table = table;
this.writeBufferSize = writeBufferSize;
this.periodicFlushTimeoutNs = periodicFlushTimeoutNs;
this.maxKeyValueSize = maxKeyValueSize;
this.maxMutations = maxMutations;
}

@Override
Expand Down Expand Up @@ -145,6 +152,10 @@ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutation
this.futures.addAll(futures);
bufferedSize += heapSize;
if (bufferedSize >= writeBufferSize) {
LOG.trace("Flushing because write buffer size {} reached", writeBufferSize);
internalFlush();
} else if (maxMutations > 0 && this.mutations.size() >= maxMutations) {
LOG.trace("Flushing because max mutations {} reached", maxMutations);
internalFlush();
}
}
Expand Down Expand Up @@ -172,6 +183,11 @@ public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}

@Override
public int getMaxMutations() {
return maxMutations;
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT;
Expand Down Expand Up @@ -148,6 +150,8 @@ class AsyncConnectionConfiguration {

private final int maxKeyValueSize;

private final int bufferedMutatorMaxMutations;

AsyncConnectionConfiguration(Configuration conf) {
long operationTimeoutMs =
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
Expand Down Expand Up @@ -200,6 +204,8 @@ class AsyncConnectionConfiguration {
TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
this.bufferedMutatorMaxMutations =
conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);
}

long getMetaOperationTimeoutNs() {
Expand Down Expand Up @@ -285,4 +291,8 @@ long getPrimaryMetaScanTimeoutNs() {
int getMaxKeyValueSize() {
return maxKeyValueSize;
}

int getBufferedMutatorMaxMutations() {
return bufferedMutatorMaxMutations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,14 @@ default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}

/**
* The maximum number of mutations that this buffered mutator will buffer before flushing them
*/
default int getMaxMutations() {
throw new UnsupportedOperationException(
"The BufferedMutator::getMaxMutations has not been implemented");
}

/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
private int maxMutations = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
Expand Down Expand Up @@ -89,6 +90,23 @@ public int getOperationTimeout() {
return operationTimeout;
}

/**
* Set the maximum number of mutations that this buffered mutator will buffer before flushing
* them. If you are talking to a cluster that uses hbase.rpc.rows.size.threshold.reject to reject
* large Multi requests, you may need this setting to avoid rejections. Default is no limit.
*/
public BufferedMutatorParams setMaxMutations(int maxMutations) {
this.maxMutations = maxMutations;
return this;
}

/**
* The maximum number of mutations that this buffered mutator will buffer before flushing them
*/
public int getMaxMutations() {
return maxMutations;
}

public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
Expand Down Expand Up @@ -222,6 +240,7 @@ public BufferedMutatorParams clone() {
clone.writeBufferPeriodicFlushTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs;
clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
clone.maxKeyValueSize = this.maxKeyValueSize;
clone.maxMutations = this.maxMutations;
clone.pool = this.pool;
clone.listener = this.listener;
clone.implementationClassName = this.implementationClassName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class ConnectionConfiguration {
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize";
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY =
"hbase.client.write.buffer.maxmutations";
public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1;
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
Expand All @@ -66,6 +69,7 @@ public class ConnectionConfiguration {
private final int metaReplicaCallTimeoutMicroSecondScan;
private final int retries;
private final int maxKeyValueSize;
private final int bufferedMutatorMaxMutations;
private final int rpcTimeout;
private final int readRpcTimeout;
private final int metaReadRpcTimeout;
Expand Down Expand Up @@ -117,6 +121,9 @@ public class ConnectionConfiguration {

this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);

this.bufferedMutatorMaxMutations =
conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT);

this.rpcTimeout =
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);

Expand Down Expand Up @@ -148,6 +155,7 @@ protected ConnectionConfiguration() {
this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT;
this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
Expand Down Expand Up @@ -210,6 +218,10 @@ public int getMaxKeyValueSize() {
return maxKeyValueSize;
}

public int getBufferedMutatorMaxMutations() {
return bufferedMutatorMaxMutations;
}

public long getScannerMaxResultSize() {
return scannerMaxResultSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
}
if (params.getMaxMutations() != BufferedMutatorParams.UNSET) {
builder.setMaxMutations(params.getMaxMutations());
}
if (!params.getRequestAttributes().isEmpty()) {

builder.setRequestAttributes(params.getRequestAttributes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public void testClone() {

BufferedMutator.ExceptionListener listener = new MockExceptionListener();
bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123)
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool)
.listener(listener);
.setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737)
.pool(pool).listener(listener);
bmp.implementationClassName("someClassName");
BufferedMutatorParams clone = bmp.clone();

Expand All @@ -151,6 +151,7 @@ public void testClone() {
assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs());
assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(13, clone.getMaxKeyValueSize());
assertEquals(3737, clone.getMaxMutations());
assertEquals("someClassName", clone.getImplementationClassName());

cloneTest(bmp, clone);
Expand Down Expand Up @@ -178,6 +179,7 @@ private void cloneTest(BufferedMutatorParams some, BufferedMutatorParams clone)
assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(),
clone.getWriteBufferPeriodicFlushTimerTickMs());
assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize());
assertTrue(some.getMaxMutations() == clone.getMaxMutations());
assertTrue(some.getListener() == clone.getListener());
assertTrue(some.getPool() == clone.getPool());
assertEquals(some.getImplementationClassName(), clone.getImplementationClassName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,23 @@ public void testPeriodicFlush() throws InterruptedException, ExecutionException
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}

@Test
public void testMaxMutationsFlush() throws InterruptedException, ExecutionException {
AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build();
CompletableFuture<?> future1 =
mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
CompletableFuture<?> future2 =
mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE));
CompletableFuture<?> future3 =
mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE));
CompletableFuture.allOf(future1, future2, future3).join();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).get().getValue(CF, CQ));
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).get().getValue(CF, CQ));
}

// a bit deep into the implementation
@Test
public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
Expand Down Expand Up @@ -244,8 +261,9 @@ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutato
private int flushCount;

AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize);
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize,
maxMutation);
}

@Override
Expand All @@ -261,7 +279,7 @@ public void testRaceBetweenNormalFlushAndPeriodicFlush()
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) {
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 100)) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
Expand Down