From 0de147b3c4884f90b9bfafaa7f1ac22881b1f5f0 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Mon, 24 Feb 2025 12:32:09 -0500 Subject: [PATCH 1/2] HBASE-29148: BufferedMutator should be able to flush after buffering a certain number of mutations --- .../hbase/client/AsyncBufferedMutator.java | 5 ++++ .../client/AsyncBufferedMutatorBuilder.java | 7 ++++++ .../AsyncBufferedMutatorBuilderImpl.java | 12 +++++++++- .../client/AsyncBufferedMutatorImpl.java | 18 +++++++++++++- .../client/AsyncConnectionConfiguration.java | 10 ++++++++ .../hadoop/hbase/client/BufferedMutator.java | 8 +++++++ .../hbase/client/BufferedMutatorParams.java | 19 +++++++++++++++ .../hbase/client/ConnectionConfiguration.java | 8 +++++++ .../client/TestBufferedMutatorParams.java | 6 +++-- .../hbase/client/TestAsyncBufferMutator.java | 24 ++++++++++++++++--- 10 files changed, 110 insertions(+), 7 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java index 6cc2b5adf9d4..479446f8ea13 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java @@ -88,6 +88,11 @@ default CompletableFuture mutate(Mutation mutation) { */ long getWriteBufferSize(); + /** + * The maximum number of mutations that this buffered mutator will buffer before flushing them + */ + int getMaxMutations(); + /** * Returns the periodical flush interval, 0 means disabled. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java index d38aa625fb2b..57c609ebb038 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -114,6 +114,13 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) { */ AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize); + /** + * Set the maximum number of mutations that this buffered mutator will buffer before flushing + * them. If you are talking to a cluster that uses hbase.rpc.rows.size.threshold.reject to reject + * large Multi requests, you may need this setting to avoid rejections. Default is no limit. + */ + AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations); + /** * Create the {@link AsyncBufferedMutator} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java index 6905ff3065cb..7fa860dc3d4e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -40,12 +40,15 @@ class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { private int maxKeyValueSize; + private int maxMutations; + public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf, AsyncTableBuilder tableBuilder, HashedWheelTimer periodicalFlushTimer) { this.tableBuilder = tableBuilder; this.writeBufferSize = connConf.getWriteBufferSize(); this.periodicFlushTimeoutNs = connConf.getWriteBufferPeriodicFlushTimeoutNs(); this.maxKeyValueSize = connConf.getMaxKeyValueSize(); + this.maxMutations = connConf.getBufferedMutatorMaxMutations(); this.periodicalFlushTimer = periodicalFlushTimer; } @@ -115,9 +118,16 @@ public AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize) { return this; } + @Override + public AsyncBufferedMutatorBuilder setMaxMutations(int maxMutations) { + Preconditions.checkArgument(maxMutations > 0, "maxMutations %d must be > 0", maxMutations); + this.maxMutations = maxMutations; + return this; + } + @Override public AsyncBufferedMutator build() { return new AsyncBufferedMutatorImpl(periodicalFlushTimer, tableBuilder.build(), writeBufferSize, - periodicFlushTimeoutNs, maxKeyValueSize); + periodicFlushTimeoutNs, maxKeyValueSize, maxMutations); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java index 3acd8bebdada..0e7e1b91e443 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -32,6 +32,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; @@ -42,6 +44,8 @@ @InterfaceAudience.Private class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { + private static final Logger LOG = LoggerFactory.getLogger(AsyncBufferedMutatorImpl.class); + private final HashedWheelTimer periodicalFlushTimer; private final AsyncTable table; @@ -52,6 +56,8 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { private final int maxKeyValueSize; + private final int maxMutations; + private List mutations = new ArrayList<>(); private List> futures = new ArrayList<>(); @@ -63,12 +69,13 @@ class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { Timeout periodicFlushTask; AsyncBufferedMutatorImpl(HashedWheelTimer periodicalFlushTimer, AsyncTable table, - long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) { + long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutations) { this.periodicalFlushTimer = periodicalFlushTimer; this.table = table; this.writeBufferSize = writeBufferSize; this.periodicFlushTimeoutNs = periodicFlushTimeoutNs; this.maxKeyValueSize = maxKeyValueSize; + this.maxMutations = maxMutations; } @Override @@ -145,6 +152,10 @@ Stream.> generate(CompletableFuture::new).limit(mutation this.futures.addAll(futures); bufferedSize += heapSize; if (bufferedSize >= writeBufferSize) { + LOG.trace("Flushing because write buffer size {} reached", writeBufferSize); + internalFlush(); + } else if (maxMutations > 0 && this.mutations.size() >= maxMutations) { + LOG.trace("Flushing because max mutations {} reached", maxMutations); internalFlush(); } } @@ -172,6 +183,11 @@ public long getPeriodicalFlushTimeout(TimeUnit unit) { return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS); } + @Override + public int getMaxMutations() { + return maxMutations; + } + @Override public Map getRequestAttributes() { return table.getRequestAttributes(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 5dc9f6d3b41a..05fe89ae237e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -38,6 +38,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.BUFFERED_MUTATOR_MAX_MUTATIONS_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; @@ -148,6 +150,8 @@ class AsyncConnectionConfiguration { private final int maxKeyValueSize; + private final int bufferedMutatorMaxMutations; + AsyncConnectionConfiguration(Configuration conf) { long operationTimeoutMs = conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); @@ -200,6 +204,8 @@ class AsyncConnectionConfiguration { TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT)); this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + this.bufferedMutatorMaxMutations = + conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT); } long getMetaOperationTimeoutNs() { @@ -285,4 +291,8 @@ long getPrimaryMetaScanTimeoutNs() { int getMaxKeyValueSize() { return maxKeyValueSize; } + + int getBufferedMutatorMaxMutations() { + return bufferedMutatorMaxMutations; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 24563367bbbc..63d31cee0887 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -213,6 +213,14 @@ default Map getRequestAttributes() { return Collections.emptyMap(); } + /** + * The maximum number of mutations that this buffered mutator will buffer before flushing them + */ + default int getMaxMutations() { + throw new UnsupportedOperationException( + "The BufferedMutator::getMaxMutations has not been implemented"); + } + /** * Listens for asynchronous exceptions on a {@link BufferedMutator}. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index 44bc5e2be7cc..88cc062b5bc6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -41,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable { private String implementationClassName = null; private int rpcTimeout = UNSET; private int operationTimeout = UNSET; + private int maxMutations = UNSET; protected Map requestAttributes = Collections.emptyMap(); private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override @@ -89,6 +90,23 @@ public int getOperationTimeout() { return operationTimeout; } + /** + * Set the maximum number of mutations that this buffered mutator will buffer before flushing + * them. If you are talking to a cluster that uses hbase.rpc.rows.size.threshold.reject to reject + * large Multi requests, you may need this setting to avoid rejections. Default is no limit. + */ + public BufferedMutatorParams setMaxMutations(int maxMutations) { + this.maxMutations = maxMutations; + return this; + } + + /** + * The maximum number of mutations that this buffered mutator will buffer before flushing them + */ + public int getMaxMutations() { + return maxMutations; + } + public BufferedMutatorParams setRequestAttribute(String key, byte[] value) { if (requestAttributes.isEmpty()) { requestAttributes = new HashMap<>(); @@ -222,6 +240,7 @@ public BufferedMutatorParams clone() { clone.writeBufferPeriodicFlushTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs; clone.writeBufferPeriodicFlushTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs; clone.maxKeyValueSize = this.maxKeyValueSize; + clone.maxMutations = this.maxMutations; clone.pool = this.pool; clone.listener = this.listener; clone.implementationClassName = this.implementationClassName; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 56f3a65c9575..685b00ec9df7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -41,6 +41,9 @@ public class ConnectionConfiguration { public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760; + public static final String BUFFERED_MUTATOR_MAX_MUTATIONS_KEY = + "hbase.client.write.buffer.maxmutations"; + public static final int BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT = -1; public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND = "hbase.client.primaryCallTimeout.get"; public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms @@ -66,6 +69,7 @@ public class ConnectionConfiguration { private final int metaReplicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; + private final int bufferdMutatorMaxMutations; private final int rpcTimeout; private final int readRpcTimeout; private final int metaReadRpcTimeout; @@ -117,6 +121,9 @@ public class ConnectionConfiguration { this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); + this.bufferdMutatorMaxMutations = + conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT); + this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @@ -148,6 +155,7 @@ protected ConnectionConfiguration() { this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; + this.bufferdMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java index ba23d1053938..fdc7c305500f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutatorParams.java @@ -140,8 +140,8 @@ public void testClone() { BufferedMutator.ExceptionListener listener = new MockExceptionListener(); bmp.writeBufferSize(17).setWriteBufferPeriodicFlushTimeoutMs(123) - .setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).pool(pool) - .listener(listener); + .setWriteBufferPeriodicFlushTimerTickMs(456).maxKeyValueSize(13).setMaxMutations(3737) + .pool(pool).listener(listener); bmp.implementationClassName("someClassName"); BufferedMutatorParams clone = bmp.clone(); @@ -151,6 +151,7 @@ public void testClone() { assertEquals(123, clone.getWriteBufferPeriodicFlushTimeoutMs()); assertEquals(456, clone.getWriteBufferPeriodicFlushTimerTickMs()); assertEquals(13, clone.getMaxKeyValueSize()); + assertEquals(3737, clone.getMaxMutations()); assertEquals("someClassName", clone.getImplementationClassName()); cloneTest(bmp, clone); @@ -178,6 +179,7 @@ private void cloneTest(BufferedMutatorParams some, BufferedMutatorParams clone) assertEquals(some.getWriteBufferPeriodicFlushTimerTickMs(), clone.getWriteBufferPeriodicFlushTimerTickMs()); assertEquals(some.getMaxKeyValueSize(), clone.getMaxKeyValueSize()); + assertTrue(some.getMaxMutations() == clone.getMaxMutations()); assertTrue(some.getListener() == clone.getListener()); assertTrue(some.getPool() == clone.getPool()); assertEquals(some.getImplementationClassName(), clone.getImplementationClassName()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java index acfa25fecac3..08e68def2c45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -175,6 +175,23 @@ public void testPeriodicFlush() throws InterruptedException, ExecutionException assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); } + @Test + public void testMaxMutationsFlush() throws InterruptedException, ExecutionException { + AsyncBufferedMutator mutator = + CONN.getBufferedMutatorBuilder(TABLE_NAME).setMaxMutations(3).build(); + CompletableFuture future1 = + mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE)); + CompletableFuture future2 = + mutator.mutate(new Put(Bytes.toBytes(1)).addColumn(CF, CQ, VALUE)); + CompletableFuture future3 = + mutator.mutate(new Put(Bytes.toBytes(2)).addColumn(CF, CQ, VALUE)); + CompletableFuture.allOf(future1, future2, future3).join(); + AsyncTable table = CONN.getTable(TABLE_NAME); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ)); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(1))).get().getValue(CF, CQ)); + assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(2))).get().getValue(CF, CQ)); + } + // a bit deep into the implementation @Test public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException { @@ -244,8 +261,9 @@ private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutato private int flushCount; AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable table, - long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) { - super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize); + long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize, int maxMutation) { + super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize, + maxMutation); } @Override @@ -261,7 +279,7 @@ public void testRaceBetweenNormalFlushAndPeriodicFlush() Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); try (AsyncBufferMutatorForTest mutator = new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), - 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) { + 10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024, 100)) { CompletableFuture future = mutator.mutate(put); Timeout task = mutator.periodicFlushTask; // we should have scheduled a periodic flush task From bd119602e9931ef95fcc32a2a2ba43c203d25243 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Tue, 25 Feb 2025 08:33:12 -0500 Subject: [PATCH 2/2] More wiring --- .../hadoop/hbase/client/ConnectionConfiguration.java | 10 +++++++--- .../hbase/client/ConnectionOverAsyncConnection.java | 3 +++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 685b00ec9df7..068f0e459a2a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -69,7 +69,7 @@ public class ConnectionConfiguration { private final int metaReplicaCallTimeoutMicroSecondScan; private final int retries; private final int maxKeyValueSize; - private final int bufferdMutatorMaxMutations; + private final int bufferedMutatorMaxMutations; private final int rpcTimeout; private final int readRpcTimeout; private final int metaReadRpcTimeout; @@ -121,7 +121,7 @@ public class ConnectionConfiguration { this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); - this.bufferdMutatorMaxMutations = + this.bufferedMutatorMaxMutations = conf.getInt(BUFFERED_MUTATOR_MAX_MUTATIONS_KEY, BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT); this.rpcTimeout = @@ -155,7 +155,7 @@ protected ConnectionConfiguration() { this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; - this.bufferdMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT; + this.bufferedMutatorMaxMutations = BUFFERED_MUTATOR_MAX_MUTATIONS_DEFAULT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; @@ -218,6 +218,10 @@ public int getMaxKeyValueSize() { return maxKeyValueSize; } + public int getBufferedMutatorMaxMutations() { + return bufferedMutatorMaxMutations; + } + public long getScannerMaxResultSize() { return scannerMaxResultSize; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index d299e453266e..471cfa874458 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -107,6 +107,9 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) { builder.setMaxKeyValueSize(params.getMaxKeyValueSize()); } + if (params.getMaxMutations() != BufferedMutatorParams.UNSET) { + builder.setMaxMutations(params.getMaxMutations()); + } if (!params.getRequestAttributes().isEmpty()) { builder.setRequestAttributes(params.getRequestAttributes());