diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml
index 816e08ad32a6..44f6fdff39ea 100644
--- a/hbase-examples/pom.xml
+++ b/hbase-examples/pom.xml
@@ -37,10 +37,6 @@
4.28.2
-
- org.apache.hbase.thirdparty
- hbase-shaded-miscellaneous
-
org.apache.hbase.thirdparty
hbase-shaded-netty
@@ -49,6 +45,27 @@
org.apache.hbase
hbase-protocol-shaded
+
+ org.apache.hbase.thirdparty
+ hbase-shaded-miscellaneous
+
+
+ com.lmax
+ disruptor
+
+
+ org.apache.logging.log4j
+ log4j-1.2-api
+ test
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.apache.commons
+ commons-lang3
+
org.apache.hbase
hbase-logging
@@ -106,10 +123,6 @@
commons-io
commons-io
-
- org.slf4j
- slf4j-api
-
org.apache.zookeeper
zookeeper
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java
new file mode 100644
index 000000000000..18e08ab7b33d
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatistics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.Map;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface RowStatistics {
+ String getTable();
+
+ String getRegion();
+
+ String getColumnFamily();
+
+ boolean isMajor();
+
+ long getLargestRowNumBytes();
+
+ int getLargestRowCellsCount();
+
+ long getLargestCellNumBytes();
+
+ int getCellsLargerThanOneBlockCount();
+
+ int getRowsLargerThanOneBlockCount();
+
+ int getCellsLargerThanMaxCacheSizeCount();
+
+ int getTotalDeletesCount();
+
+ int getTotalCellsCount();
+
+ int getTotalRowsCount();
+
+ long getTotalBytes();
+
+ String getLargestRowAsString();
+
+ String getLargestCellAsString();
+
+ Map getRowSizeBuckets();
+
+ Map getValueSizeBuckets();
+
+ String getJsonString();
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java
new file mode 100644
index 000000000000..78e92aa18ae2
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsCompactionObserver.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.CF;
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACE;
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME;
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.TABLE_RECORDER_KEY;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.MasterObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Shipper;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class RowStatisticsCompactionObserver
+ implements RegionCoprocessor, RegionObserver, MasterCoprocessor, MasterObserver {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsCompactionObserver.class);
+
+ // From private field BucketAllocator.DEFAULT_BUCKET_SIZES
+ private static final long DEFAULT_MAX_BUCKET_SIZE = 512 * 1024 + 1024;
+ private static final ConcurrentMap TABLE_COUNTERS = new ConcurrentHashMap();
+ private static final String ROW_STATISTICS_DROPPED = "rowStatisticsDropped";
+ private static final String ROW_STATISTICS_PUT_FAILED = "rowStatisticsPutFailures";
+ private Counter rowStatisticsDropped;
+ private Counter rowStatisticsPutFailed;
+ private long maxCacheSize;
+ private final RowStatisticsRecorder recorder;
+
+ @InterfaceAudience.Private
+ public RowStatisticsCompactionObserver(RowStatisticsRecorder recorder) {
+ this.recorder = recorder;
+ }
+
+ public RowStatisticsCompactionObserver() {
+ this(null);
+ }
+
+ @Override
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public Optional getMasterObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment e) throws IOException {
+ if (!(e instanceof RegionCoprocessorEnvironment)) {
+ return;
+ }
+ RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
+ if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
+ return;
+ }
+ String[] configuredBuckets =
+ regionEnv.getConfiguration().getStrings(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY);
+ maxCacheSize = DEFAULT_MAX_BUCKET_SIZE;
+ if (configuredBuckets != null && configuredBuckets.length > 0) {
+ String lastBucket = configuredBuckets[configuredBuckets.length - 1];
+ try {
+ maxCacheSize = Integer.parseInt(lastBucket.trim());
+ } catch (NumberFormatException ex) {
+ LOG.warn("Failed to parse {} value {} as int", BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY,
+ lastBucket, ex);
+ }
+ }
+ rowStatisticsDropped =
+ regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
+ rowStatisticsPutFailed =
+ regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
+ TableName tableName = regionEnv.getRegionInfo().getTable();
+ TABLE_COUNTERS.merge(tableName, 1L, Long::sum);
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) throws IOException {
+ if (!(e instanceof RegionCoprocessorEnvironment)) {
+ return;
+ }
+ RegionCoprocessorEnvironment regionEnv = (RegionCoprocessorEnvironment) e;
+ if (regionEnv.getRegionInfo().getTable().isSystemTable()) {
+ return;
+ }
+ TableName tableName = regionEnv.getRegionInfo().getTable();
+ long tableCount = TABLE_COUNTERS.merge(tableName, -1L, Long::sum);
+ if (tableCount == 0) {
+ long regionCount = 0;
+ for (long count : TABLE_COUNTERS.values()) {
+ regionCount += count;
+ }
+ if (regionCount == 0) {
+ regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_DROPPED,
+ rowStatisticsDropped);
+ regionEnv.getMetricRegistryForRegionServer().remove(ROW_STATISTICS_PUT_FAILED,
+ rowStatisticsPutFailed);
+ RowStatisticsTableRecorder tableRecorder =
+ (RowStatisticsTableRecorder) regionEnv.getSharedData().get(TABLE_RECORDER_KEY);
+ if (tableRecorder != null) {
+ regionEnv.getSharedData().remove(TABLE_RECORDER_KEY, tableRecorder);
+ tableRecorder.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void postStartMaster(ObserverContext ctx)
+ throws IOException {
+ try (Admin admin = ctx.getEnvironment().getConnection().getAdmin()) {
+ if (admin.tableExists(NAMESPACED_TABLE_NAME)) {
+ LOG.info("Table {} already exists. Skipping table creation process.",
+ NAMESPACED_TABLE_NAME);
+ } else {
+ boolean shouldCreateNamespace =
+ Arrays.stream(admin.listNamespaces()).filter(namespace -> namespace.equals(NAMESPACE))
+ .collect(Collectors.toUnmodifiableSet()).isEmpty();
+ if (shouldCreateNamespace) {
+ NamespaceDescriptor nd = NamespaceDescriptor.create(NAMESPACE).build();
+ try {
+ admin.createNamespace(nd);
+ } catch (IOException e) {
+ LOG.error("Failed to create namespace {}", NAMESPACE, e);
+ }
+ }
+ ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(CF).setMaxVersions(25)
+ .setTimeToLive((int) Duration.ofDays(7).toSeconds()).build();
+ TableDescriptor td =
+ TableDescriptorBuilder.newBuilder(NAMESPACED_TABLE_NAME).setColumnFamily(cfd).build();
+ LOG.info("Creating table {}", NAMESPACED_TABLE_NAME);
+ try {
+ admin.createTable(td);
+ } catch (IOException e) {
+ LOG.error("Failed to create table {}", NAMESPACED_TABLE_NAME, e);
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to get Connection or Admin. Cannot determine if table {} exists.",
+ NAMESPACED_TABLE_NAME, e);
+ }
+ }
+
+ @Override
+ public InternalScanner preCompact(ObserverContext extends RegionCoprocessorEnvironment> context,
+ Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) {
+ if (store.getTableName().isSystemTable()) {
+ return scanner;
+ }
+ int blocksize = store.getColumnFamilyDescriptor().getBlocksize();
+ boolean isMajor = request.isMajor();
+ RowStatisticsImpl stats = new RowStatisticsImpl(store.getTableName().getNameAsString(),
+ store.getRegionInfo().getEncodedName(), store.getColumnFamilyName(), blocksize, maxCacheSize,
+ isMajor);
+ return new RowStatisticsScanner(scanner, stats, context.getEnvironment(), recorder);
+ }
+
+ private static class RowStatisticsScanner implements InternalScanner, Shipper {
+
+ private final InternalScanner scanner;
+ private final Shipper shipper;
+ private final RowStatisticsImpl rowStatistics;
+ private final RegionCoprocessorEnvironment regionEnv;
+ private final Counter rowStatisticsDropped;
+ private final Counter rowStatisticsPutFailed;
+ private final RowStatisticsRecorder customRecorder;
+ private RawCellBuilder cellBuilder;
+ private Cell lastCell;
+
+ public RowStatisticsScanner(InternalScanner scanner, RowStatisticsImpl rowStatistics,
+ RegionCoprocessorEnvironment regionEnv, RowStatisticsRecorder customRecorder) {
+ this.scanner = scanner;
+ if (scanner instanceof Shipper) {
+ this.shipper = (Shipper) scanner;
+ } else {
+ this.shipper = null;
+ }
+ this.rowStatistics = rowStatistics;
+ this.regionEnv = regionEnv;
+ this.cellBuilder = regionEnv.getCellBuilder();
+ this.rowStatisticsDropped =
+ regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_DROPPED);
+ this.rowStatisticsPutFailed =
+ regionEnv.getMetricRegistryForRegionServer().counter(ROW_STATISTICS_PUT_FAILED);
+ this.customRecorder = customRecorder;
+ }
+
+ @Override
+ public boolean next(List super ExtendedCell> result, ScannerContext scannerContext)
+ throws IOException {
+ boolean ret = scanner.next(result, scannerContext);
+ consumeCells(result);
+ return ret;
+ }
+
+ @Override
+ public boolean next(List super ExtendedCell> result) throws IOException {
+ boolean ret = scanner.next(result);
+ consumeCells(result);
+ return ret;
+ }
+
+ @Override
+ public void close() throws IOException {
+ rowStatistics.handleRowChanged(lastCell);
+ rowStatistics.shipped(cellBuilder);
+ record();
+ scanner.close();
+ }
+
+ @Override
+ public void shipped() throws IOException {
+ if (shipper != null) {
+ lastCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, lastCell);
+ rowStatistics.shipped(cellBuilder);
+ shipper.shipped();
+ }
+ }
+
+ private void consumeCells(List super ExtendedCell> result) {
+ if (result.isEmpty()) {
+ return;
+ }
+ // each next() call returns at most 1 row (maybe less for large rows)
+ // so we just need to check if the first cell has changed rows
+ ExtendedCell first = (ExtendedCell) result.get(0);
+ if (rowChanged(first)) {
+ rowStatistics.handleRowChanged(lastCell);
+ }
+ for (int i = 0; i < result.size(); i++) {
+ ExtendedCell cell = (ExtendedCell) result.get(i);
+ rowStatistics.consumeCell(cell);
+ lastCell = cell;
+ }
+ }
+
+ private boolean rowChanged(Cell cell) {
+ if (lastCell == null) {
+ return false;
+ }
+ return !CellUtil.matchingRows(lastCell, cell);
+ }
+
+ private void record() {
+ RowStatisticsTableRecorder tableRecorder =
+ (RowStatisticsTableRecorder) regionEnv.getSharedData().computeIfAbsent(TABLE_RECORDER_KEY,
+ k -> RowStatisticsTableRecorder.forClusterConnection(regionEnv.getConnection(),
+ rowStatisticsDropped, rowStatisticsPutFailed));
+ if (tableRecorder != null) {
+ tableRecorder.record(this.rowStatistics,
+ Optional.of(regionEnv.getRegion().getRegionInfo().getRegionName()));
+ } else {
+ LOG.error(
+ "Failed to initialize a TableRecorder. Will not record row statistics for region={}",
+ rowStatistics.getRegion());
+ rowStatisticsDropped.increment();
+ }
+ if (customRecorder != null) {
+ customRecorder.record(this.rowStatistics, Optional.empty());
+ }
+ }
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java
new file mode 100644
index 000000000000..365b7eb5ef62
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/RowStatisticsImpl.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.Map;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsUtil;
+import org.apache.hadoop.hbase.regionserver.Shipper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
+/**
+ * Holder for accumulating row statistics in {@link RowStatisticsCompactionObserver} Creates various
+ * cell, row, and total stats.
+ */
+@InterfaceAudience.Private
+public class RowStatisticsImpl implements RowStatistics {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsImpl.class);
+ private static final Gson GSON = GsonUtil.createGson().create();
+
+ //
+ // Transient fields which are not included in gson serialization
+ //
+ private final transient long blockSize;
+ private final transient long maxCacheSize;
+ private transient int rowCells;
+ private transient long rowBytes;
+ private transient byte[] largestRow;
+ private transient Cell largestCell;
+ private final transient boolean isMajor;
+ private final transient SizeBucketTracker rowSizeBuckets;
+ private final transient SizeBucketTracker valueSizeBuckets;
+
+ // We don't need to clone anything until shipped() is called on scanner.
+ // To avoid allocations, we keep a reference until that point
+ private transient Cell largestRowRef;
+ private transient Cell largestCellRef;
+ //
+ // Non-transient fields which are included in gson
+ //
+ private final String table;
+ private final String region;
+ private final String columnFamily;
+ private long largestRowNumBytes;
+ private int largestRowCellsCount;
+ private long largestCellNumBytes;
+ private int cellsLargerThanOneBlockCount;
+ private int rowsLargerThanOneBlockCount;
+ private int cellsLargerThanMaxCacheSizeCount;
+ private int totalDeletesCount;
+ private int totalCellsCount;
+ private int totalRowsCount;
+ private long totalBytesCount;
+
+ RowStatisticsImpl(String table, String encodedRegion, String columnFamily, long blockSize,
+ long maxCacheSize, boolean isMajor) {
+ this.table = table;
+ this.region = encodedRegion;
+ this.columnFamily = columnFamily;
+ this.blockSize = blockSize;
+ this.maxCacheSize = maxCacheSize;
+ this.isMajor = isMajor;
+ this.rowSizeBuckets = new SizeBucketTracker();
+ this.valueSizeBuckets = new SizeBucketTracker();
+ }
+
+ public void handleRowChanged(Cell lastCell) {
+ if (rowBytes > largestRowNumBytes) {
+ largestRowRef = lastCell;
+ largestRowNumBytes = rowBytes;
+ largestRowCellsCount = rowCells;
+ }
+ if (rowBytes > blockSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RowTooLarge: rowBytes={}, blockSize={}, table={}, rowKey={}", rowBytes,
+ blockSize, table, Bytes.toStringBinary(lastCell.getRowArray(), lastCell.getRowOffset(),
+ lastCell.getRowLength()));
+ }
+ rowsLargerThanOneBlockCount++;
+ }
+ rowSizeBuckets.add(rowBytes);
+ rowBytes = 0;
+ rowCells = 0;
+ totalRowsCount++;
+ }
+
+ public void consumeCell(Cell cell) {
+ int cellSize = cell.getSerializedSize();
+
+ rowBytes += cellSize;
+ rowCells++;
+
+ boolean tooLarge = false;
+ if (cellSize > maxCacheSize) {
+ cellsLargerThanMaxCacheSizeCount++;
+ tooLarge = true;
+ }
+ if (cellSize > blockSize) {
+ cellsLargerThanOneBlockCount++;
+ tooLarge = true;
+ }
+
+ if (tooLarge && LOG.isDebugEnabled()) {
+ LOG.debug("CellTooLarge: size={}, blockSize={}, maxCacheSize={}, table={}, cell={}", cellSize,
+ blockSize, maxCacheSize, table, CellUtil.toString(cell, false));
+ }
+
+ if (cellSize > largestCellNumBytes) {
+ largestCellRef = cell;
+ largestCellNumBytes = cellSize;
+ }
+ valueSizeBuckets.add(cell.getValueLength());
+
+ totalCellsCount++;
+ if (CellUtil.isDelete(cell)) {
+ totalDeletesCount++;
+ }
+ totalBytesCount += cellSize;
+ }
+
+ /**
+ * Clone the cell refs so they can be cleaned up by {@link Shipper#shipped()}. Doing this lazily
+ * here, rather than eagerly in the above two methods can save us on some allocations. We might
+ * change the largestCell/largestRow multiple times between shipped() calls.
+ */
+ public void shipped(RawCellBuilder cellBuilder) {
+ if (largestRowRef != null) {
+ largestRow = CellUtil.cloneRow(largestRowRef);
+ largestRowRef = null;
+ }
+ if (largestCellRef != null) {
+ largestCell = RowStatisticsUtil.cloneWithoutValue(cellBuilder, largestCellRef);
+ largestCellRef = null;
+ }
+ }
+
+ @Override
+ public String getTable() {
+ return table;
+ }
+
+ @Override
+ public String getRegion() {
+ return region;
+ }
+
+ @Override
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ @Override
+ public boolean isMajor() {
+ return isMajor;
+ }
+
+ public byte[] getLargestRow() {
+ return largestRow;
+ }
+
+ @Override
+ public String getLargestRowAsString() {
+ return Bytes.toStringBinary(getLargestRow());
+ }
+
+ @Override
+ public long getLargestRowNumBytes() {
+ return largestRowNumBytes;
+ }
+
+ @Override
+ public int getLargestRowCellsCount() {
+ return largestRowCellsCount;
+ }
+
+ public Cell getLargestCell() {
+ return largestCell;
+ }
+
+ @Override
+ public String getLargestCellAsString() {
+ return CellUtil.toString(getLargestCell(), false);
+ }
+
+ @Override
+ public long getLargestCellNumBytes() {
+ return largestCellNumBytes;
+ }
+
+ @Override
+ public int getCellsLargerThanOneBlockCount() {
+ return cellsLargerThanOneBlockCount;
+ }
+
+ @Override
+ public int getRowsLargerThanOneBlockCount() {
+ return rowsLargerThanOneBlockCount;
+ }
+
+ @Override
+ public int getCellsLargerThanMaxCacheSizeCount() {
+ return cellsLargerThanMaxCacheSizeCount;
+ }
+
+ @Override
+ public int getTotalDeletesCount() {
+ return totalDeletesCount;
+ }
+
+ @Override
+ public int getTotalCellsCount() {
+ return totalCellsCount;
+ }
+
+ @Override
+ public int getTotalRowsCount() {
+ return totalRowsCount;
+ }
+
+ @Override
+ public long getTotalBytes() {
+ return totalBytesCount;
+ }
+
+ @Override
+ public Map getRowSizeBuckets() {
+ return rowSizeBuckets.toMap();
+ }
+
+ @Override
+ public Map getValueSizeBuckets() {
+ return valueSizeBuckets.toMap();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append("largestRowAsString", Bytes.toStringBinary(largestRow))
+ .append("largestCellAsString", largestCell).append("largestRowNumBytes", largestRowNumBytes)
+ .append("largestRowCellsCount", largestRowCellsCount)
+ .append("largestCellNumBytes", largestCellNumBytes)
+ .append("cellsLargerThanOneBlockCount", cellsLargerThanOneBlockCount)
+ .append("rowsLargerThanOneBlockCount", rowsLargerThanOneBlockCount)
+ .append("cellsLargerThanMaxCacheSizeCount", cellsLargerThanMaxCacheSizeCount)
+ .append("totalDeletesCount", totalDeletesCount).append("totalCellsCount", totalCellsCount)
+ .append("totalRowsCount", totalRowsCount).append("totalBytesCount", totalBytesCount)
+ .append("rowSizeBuckets", getRowSizeBuckets())
+ .append("valueSizeBuckets", getValueSizeBuckets()).append("isMajor", isMajor).toString();
+ }
+
+ @Override
+ public String getJsonString() {
+ JsonObject json = (JsonObject) GSON.toJsonTree(this);
+ json.add("largestCellParts", buildLargestCellPartsJson());
+ json.addProperty("largestRowAsString", getLargestRowAsString());
+ json.add("rowSizeBuckets", rowSizeBuckets.toJsonObject());
+ json.add("valueSizeBuckets", valueSizeBuckets.toJsonObject());
+ return json.toString();
+ }
+
+ private JsonObject buildLargestCellPartsJson() {
+ JsonObject cellJson = new JsonObject();
+ Cell cell = getLargestCell();
+ cellJson.addProperty("rowKey",
+ Bytes.toStringBinary(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
+ cellJson.addProperty("family",
+ Bytes.toStringBinary(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
+ cellJson.addProperty("qualifier", Bytes.toStringBinary(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength()));
+ cellJson.addProperty("timestamp", cell.getTimestamp());
+ cellJson.addProperty("type", cell.getType().toString());
+ return cellJson;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java
new file mode 100644
index 000000000000..da49eb86351f
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucket.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public enum SizeBucket {
+ KILOBYTES_1(0, 1 * 1024, "[0, 1)"),
+ KILOBYTES_2(1 * 1024, 2 * 1024, "[1, 2)"),
+ KILOBYTES_4(2 * 1024, 4 * 1024, "[2, 4)"),
+ KILOBYTES_8(4 * 1024, 8 * 1024, "[4, 8)"),
+ KILOBYTES_16(8 * 1024, 16 * 1024, "[8, 16)"),
+ KILOBYTES_32(16 * 1024, 32 * 1024, "[16, 32)"),
+ KILOBYTES_64(32 * 1024, 64 * 1024, "[32, 64)"),
+ KILOBYTES_128(64 * 1024, 128 * 1024, "[64, 128)"),
+ KILOBYTES_256(128 * 1024, 256 * 1024, "[128, 256)"),
+ KILOBYTES_512(256 * 1024, 512 * 1024, "[256, 512)"),
+ KILOBYTES_MAX(512 * 1024, Long.MAX_VALUE, "[512, inf)");
+
+ private final long minBytes;
+ private final long maxBytes;
+ private final String bucket;
+
+ SizeBucket(long minBytes, long maxBytes, String bucket) {
+ this.minBytes = minBytes;
+ this.maxBytes = maxBytes;
+ this.bucket = bucket;
+ }
+
+ public long minBytes() {
+ return minBytes;
+ }
+
+ public long maxBytes() {
+ return maxBytes;
+ }
+
+ public String bucket() {
+ return bucket;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java
new file mode 100644
index 000000000000..702eb32d8d8d
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/SizeBucketTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
+@InterfaceAudience.Private
+public class SizeBucketTracker {
+
+ private static final SizeBucket[] SIZE_BUCKET_ARRAY = SizeBucket.values();
+ private final Map bucketToCount;
+
+ public SizeBucketTracker() {
+ SizeBucket[] sizeBucketsArray = SizeBucket.values();
+
+ bucketToCount = new HashMap<>(sizeBucketsArray.length);
+ for (SizeBucket sizeBucket : sizeBucketsArray) {
+ bucketToCount.put(sizeBucket, 0L);
+ }
+ }
+
+ public void add(long rowBytes) {
+ if (rowBytes < 0) {
+ return;
+ }
+ SizeBucket sizeBucket = search(rowBytes);
+ if (sizeBucket == null) {
+ return;
+ }
+ long val = bucketToCount.get(sizeBucket);
+ bucketToCount.put(sizeBucket, getSafeIncrementedValue(val));
+ }
+
+ public Map toMap() {
+ Map copy = new HashMap<>(SIZE_BUCKET_ARRAY.length);
+ for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+ long val = bucketToCount.get(sizeBucket);
+ copy.put(sizeBucket.bucket(), val);
+ }
+ return copy;
+ }
+
+ public JsonObject toJsonObject() {
+ JsonObject bucketJson = new JsonObject();
+ for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+ long val = bucketToCount.get(sizeBucket);
+ bucketJson.addProperty(sizeBucket.bucket(), val);
+ }
+ return bucketJson;
+ }
+
+ private SizeBucket search(long val) {
+ for (SizeBucket sizeBucket : SIZE_BUCKET_ARRAY) {
+ if (val < sizeBucket.maxBytes()) {
+ return sizeBucket;
+ }
+ }
+ return val == Long.MAX_VALUE ? SizeBucket.KILOBYTES_MAX : null;
+ }
+
+ private static long getSafeIncrementedValue(long val) {
+ return val == Long.MAX_VALUE ? val : val + 1;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java
new file mode 100644
index 000000000000..6ceae50d9c07
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsCombinedRecorder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class RowStatisticsCombinedRecorder implements RowStatisticsRecorder {
+
+ private final RowStatisticsRecorder one;
+ private final RowStatisticsRecorder two;
+
+ public RowStatisticsCombinedRecorder(RowStatisticsRecorder one, RowStatisticsRecorder two) {
+ this.one = one;
+ this.two = two;
+ }
+
+ @Override
+ public void record(RowStatisticsImpl stats, Optional fullRegionName) {
+ one.record(stats, fullRegionName);
+ two.record(stats, fullRegionName);
+ }
+
+ public RowStatisticsRecorder getOne() {
+ return one;
+ }
+
+ public RowStatisticsRecorder getTwo() {
+ return two;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java
new file mode 100644
index 000000000000..78f5eb376472
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsRecorder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import java.util.Optional;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface RowStatisticsRecorder {
+ void record(RowStatisticsImpl stats, Optional fullRegionName);
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java
new file mode 100644
index 000000000000..23fe4c344e83
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/recorder/RowStatisticsTableRecorder.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder;
+
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsConfigurationUtil.getInt;
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsConfigurationUtil.getLong;
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.NAMESPACED_TABLE_NAME;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.TimeoutException;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatisticsImpl;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsDisruptorExceptionHandler;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsEventHandler;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferEnvelope;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferPayload;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+@InterfaceAudience.Private
+public final class RowStatisticsTableRecorder implements RowStatisticsRecorder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsTableRecorder.class);
+ // Must be multiple of 2. Should be greater than num regions/RS
+ private static final int DEFAULT_EVENT_COUNT = 1024;
+ private static final long DISRUPTOR_SHUTDOWN_TIMEOUT_MS = 60_0000L;
+ private final BufferedMutator bufferedMutator;
+ private final Counter rowStatisticsDropped;
+ private final Disruptor disruptor;
+ private final RingBuffer ringBuffer;
+ private final AtomicBoolean closed;
+
+ /*
+ * This constructor is ONLY for testing. Use TableRecorder#forClusterConnection if you want to
+ * instantiate a TableRecorder object.
+ */
+ private RowStatisticsTableRecorder(BufferedMutator bufferedMutator,
+ Disruptor disruptor, Counter rowStatisticsDropped) {
+ this.bufferedMutator = bufferedMutator;
+ this.disruptor = disruptor;
+ this.ringBuffer = disruptor.getRingBuffer();
+ this.rowStatisticsDropped = rowStatisticsDropped;
+ this.closed = new AtomicBoolean(false);
+ }
+
+ public static RowStatisticsTableRecorder forClusterConnection(Connection clusterConnection,
+ Counter rowStatisticsDropped, Counter rowStatisticsPutFailed) {
+ BufferedMutator bufferedMutator =
+ initializeBufferedMutator(clusterConnection, rowStatisticsPutFailed);
+ if (bufferedMutator == null) {
+ return null;
+ }
+
+ Disruptor disruptor =
+ initializeDisruptor(bufferedMutator, rowStatisticsPutFailed);
+ disruptor.start();
+
+ return new RowStatisticsTableRecorder(bufferedMutator, disruptor, rowStatisticsDropped);
+ }
+
+ @Override
+ public void record(RowStatisticsImpl rowStatistics, Optional fullRegionName) {
+ if (!closed.get()) {
+ if (
+ !ringBuffer.tryPublishEvent((envelope, seqId) -> envelope
+ .load(new RowStatisticsRingBufferPayload(rowStatistics, fullRegionName.get())))
+ ) {
+ rowStatisticsDropped.increment();
+ LOG.error("Failed to load row statistics for region={} into the ring buffer",
+ rowStatistics.getRegion());
+ }
+ } else {
+ rowStatisticsDropped.increment();
+ LOG.error("TableRecorder is closed. Will not record row statistics for region={}",
+ rowStatistics.getRegion());
+ }
+ }
+
+ public void close() throws IOException {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ try {
+ disruptor.shutdown(DISRUPTOR_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.warn(
+ "Disruptor shutdown timed out after {} ms. Forcing halt. Some row statistics may be lost",
+ DISRUPTOR_SHUTDOWN_TIMEOUT_MS);
+ disruptor.halt();
+ disruptor.shutdown();
+ }
+ bufferedMutator.close();
+ }
+
+ private static BufferedMutator initializeBufferedMutator(Connection conn,
+ Counter rowStatisticsPutFailed) {
+ Configuration conf = conn.getConfiguration();
+ TableRecorderExceptionListener exceptionListener =
+ new TableRecorderExceptionListener(rowStatisticsPutFailed);
+ BufferedMutatorParams params = new BufferedMutatorParams(NAMESPACED_TABLE_NAME)
+ .rpcTimeout(getInt(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 15_000))
+ .operationTimeout(getInt(conf, HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30_000))
+ .setWriteBufferPeriodicFlushTimeoutMs(
+ getLong(conf, ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, 60_000L))
+ .writeBufferSize(getLong(conf, ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY,
+ ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT))
+ .listener(exceptionListener);
+ BufferedMutator bufferedMutator = null;
+ try {
+ bufferedMutator = conn.getBufferedMutator(params);
+ } catch (IOException e) {
+ LOG.error("This should NEVER print!", e);
+ }
+ return bufferedMutator;
+ }
+
+ private static Disruptor
+ initializeDisruptor(BufferedMutator bufferedMutator, Counter rowStatisticsPutFailures) {
+ Disruptor disruptor =
+ new Disruptor<>(RowStatisticsRingBufferEnvelope::new, DEFAULT_EVENT_COUNT,
+ new ThreadFactoryBuilder().setNameFormat("rowstats.append-pool-%d").setDaemon(true)
+ .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
+ ProducerType.MULTI, new BlockingWaitStrategy());
+ disruptor.setDefaultExceptionHandler(new RowStatisticsDisruptorExceptionHandler());
+ RowStatisticsEventHandler rowStatisticsEventHandler =
+ new RowStatisticsEventHandler(bufferedMutator, rowStatisticsPutFailures);
+ disruptor.handleEventsWith(new RowStatisticsEventHandler[] { rowStatisticsEventHandler });
+ return disruptor;
+ }
+
+ protected static class TableRecorderExceptionListener
+ implements BufferedMutator.ExceptionListener {
+
+ private final Counter rowStatisticsPutFailures;
+
+ TableRecorderExceptionListener(Counter counter) {
+ this.rowStatisticsPutFailures = counter;
+ }
+
+ public void onException(RetriesExhaustedWithDetailsException exception,
+ BufferedMutator mutator) {
+ long failedPuts = mutator.getWriteBufferSize();
+ rowStatisticsPutFailures.increment(failedPuts);
+ LOG.error(
+ "Periodic flush of buffered mutator failed. Cannot persist {} row stats stored in buffer",
+ failedPuts, exception);
+ }
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java
new file mode 100644
index 000000000000..07f26977d671
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsDisruptorExceptionHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import com.lmax.disruptor.ExceptionHandler;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class RowStatisticsDisruptorExceptionHandler
+ implements ExceptionHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RowStatisticsDisruptorExceptionHandler.class);
+
+ @Override
+ public void handleEventException(Throwable e, long sequence,
+ RowStatisticsRingBufferEnvelope event) {
+ if (event != null) {
+ LOG.error("Unable to persist event={} with sequence={}", event.getPayload(), sequence, e);
+ } else {
+ LOG.error("Event with sequence={} was null", sequence, e);
+ }
+ }
+
+ @Override
+ public void handleOnStartException(Throwable e) {
+ LOG.error("Disruptor onStartException", e);
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable e) {
+ LOG.error("Disruptor onShutdownException", e);
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java
new file mode 100644
index 000000000000..8b9ca1ff4cbc
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsEventHandler.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import static org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil.buildPutForRegion;
+
+import com.lmax.disruptor.EventHandler;
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class RowStatisticsEventHandler implements EventHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsEventHandler.class);
+ private final BufferedMutator bufferedMutator;
+ private final Counter rowStatisticsPutFailures;
+
+ public RowStatisticsEventHandler(BufferedMutator bufferedMutator,
+ Counter rowStatisticsPutFailures) {
+ this.bufferedMutator = bufferedMutator;
+ this.rowStatisticsPutFailures = rowStatisticsPutFailures;
+ }
+
+ @Override
+ public void onEvent(RowStatisticsRingBufferEnvelope event, long sequence, boolean endOfBatch)
+ throws Exception {
+ final RowStatisticsRingBufferPayload payload = event.getPayload();
+ if (payload != null) {
+ final RowStatistics rowStatistics = payload.getRowStatistics();
+ final byte[] fullRegionName = payload.getFullRegionName();
+ Put put = buildPutForRegion(fullRegionName, rowStatistics, rowStatistics.isMajor());
+ try {
+ bufferedMutator.mutate(put);
+ } catch (IOException e) {
+ rowStatisticsPutFailures.increment();
+ LOG.error("Mutate operation failed. Cannot persist row statistics for region {}",
+ rowStatistics.getRegion(), e);
+ }
+ }
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java
new file mode 100644
index 000000000000..75601ae9eaf9
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferEnvelope.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class RowStatisticsRingBufferEnvelope {
+
+ private RowStatisticsRingBufferPayload payload;
+
+ public RowStatisticsRingBufferEnvelope() {
+ }
+
+ public void load(RowStatisticsRingBufferPayload payload) {
+ this.payload = payload;
+ }
+
+ public RowStatisticsRingBufferPayload getPayload() {
+ final RowStatisticsRingBufferPayload payload = this.payload;
+ this.payload = null;
+ return payload;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java
new file mode 100644
index 000000000000..498f7a660d20
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/ringbuffer/RowStatisticsRingBufferPayload.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer;
+
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class RowStatisticsRingBufferPayload {
+
+ private final RowStatistics rowStatistics;
+ private final byte[] fullRegionName;
+
+ public RowStatisticsRingBufferPayload(RowStatistics rowStatistics, byte[] fullRegionName) {
+ this.rowStatistics = rowStatistics;
+ this.fullRegionName = fullRegionName;
+ }
+
+ public RowStatistics getRowStatistics() {
+ return rowStatistics;
+ }
+
+ public byte[] getFullRegionName() {
+ return fullRegionName;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java
new file mode 100644
index 000000000000..f7f3a373c3f6
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsConfigurationUtil.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class RowStatisticsConfigurationUtil {
+
+ private RowStatisticsConfigurationUtil() {
+ }
+
+ private static final String ROW_STATISTICS_PREFIX =
+ "org.apache.hbase.coprocessor.row.statistics.";
+
+ public static int getInt(Configuration conf, String name, int defaultValue) {
+ return conf.getInt(ROW_STATISTICS_PREFIX + name, defaultValue);
+ }
+
+ public static long getLong(Configuration conf, String name, long defaultValue) {
+ return conf.getLong(ROW_STATISTICS_PREFIX + name, defaultValue);
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java
new file mode 100644
index 000000000000..cd06ce7dbe4e
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsTableUtil.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.RowStatistics;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class RowStatisticsTableUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RowStatisticsTableUtil.class);
+ public static final String NAMESPACE = "stats";
+ public static final String TABLE_STRING = "row-statistics";
+ public static final TableName NAMESPACED_TABLE_NAME = TableName.valueOf(NAMESPACE, TABLE_STRING);
+ public static final byte[] CF = Bytes.toBytes("0");
+ public static final String TABLE_RECORDER_KEY = "tableRecorder";
+
+ private RowStatisticsTableUtil() {
+ }
+
+ public static Put buildPutForRegion(byte[] regionRowKey, RowStatistics rowStatistics,
+ boolean isMajor) {
+ Put put = new Put(regionRowKey);
+ String cq = rowStatistics.getColumnFamily() + (isMajor ? "1" : "0");
+ String jsonString = rowStatistics.getJsonString();
+ put.addColumn(CF, Bytes.toBytes(cq), Bytes.toBytes(jsonString));
+ LOG.debug(jsonString);
+ return put;
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java
new file mode 100644
index 000000000000..4189da8db24a
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/utils/RowStatisticsUtil.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.utils;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.RawCellBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public final class RowStatisticsUtil {
+
+ private RowStatisticsUtil() {
+ }
+
+ public static Cell cloneWithoutValue(RawCellBuilder cellBuilder, Cell cell) {
+ return cellBuilder.clear().setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+ .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
+ .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())
+ .setTimestamp(cell.getTimestamp()).setType(cell.getType()).build();
+ }
+}
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java
new file mode 100644
index 000000000000..d0ee8d3bc578
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsCompactionObserver.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsRecorder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRowStatisticsCompactionObserver {
+
+ public static final TestableRowStatisticsRecorder RECORDER = new TestableRowStatisticsRecorder();
+ private static final TableName TABLE_NAME = TableName.valueOf("test-table");
+ private static final byte[] FAMILY = Bytes.toBytes("0");
+ private static SingleProcessHBaseCluster cluster;
+ private static Connection connection;
+ private static Table table;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ cluster = TEST_UTIL.startMiniCluster(1);
+ connection = ConnectionFactory.createConnection(cluster.getConf());
+ table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
+ HConstants.DEFAULT_BLOCKSIZE, TestableRowStatisticsCompactionObserver.class.getName());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ RECORDER.clear();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ cluster.close();
+ TEST_UTIL.shutdownMiniCluster();
+ table.close();
+ connection.close();
+ }
+
+ @Test
+ public void itRecordsStats() throws IOException, InterruptedException {
+ int numRows = 10;
+ int largestRowNum = -1;
+ int largestRowSize = 0;
+
+ int largestCellRowNum = -1;
+ int largestCellColNum = -1;
+ long largestCellSize = 0;
+
+ for (int i = 0; i < numRows; i++) {
+ int cells = ThreadLocalRandom.current().nextInt(1000) + 10;
+
+ Put p = new Put(Bytes.toBytes(i));
+ for (int j = 0; j < cells; j++) {
+ byte[] val = new byte[ThreadLocalRandom.current().nextInt(100) + 1];
+ p.addColumn(FAMILY, Bytes.toBytes(j), val);
+ }
+
+ int rowSize = 0;
+ CellScanner cellScanner = p.cellScanner();
+ int j = 0;
+ while (cellScanner.advance()) {
+ Cell current = cellScanner.current();
+ int serializedSize = current.getSerializedSize();
+ if (serializedSize > largestCellSize) {
+ largestCellSize = serializedSize;
+ largestCellRowNum = i;
+ largestCellColNum = j;
+ }
+ rowSize += serializedSize;
+ j++;
+ }
+
+ if (rowSize > largestRowSize) {
+ largestRowNum = i;
+ largestRowSize = rowSize;
+ }
+
+ table.put(p);
+ connection.getAdmin().flush(table.getName());
+ }
+
+ for (int i = 0; i < numRows; i++) {
+ Delete d = new Delete(Bytes.toBytes(i));
+ d.addColumn(FAMILY, Bytes.toBytes(0));
+ table.delete(d);
+ }
+
+ System.out.println("Final flush");
+ connection.getAdmin().flush(table.getName());
+ Thread.sleep(5000);
+ System.out.println("Compacting");
+
+ RowStatisticsImpl lastStats = RECORDER.getLastStats(); // Just initialize
+ Boolean lastIsMajor = RECORDER.getLastIsMajor();
+ connection.getAdmin().compact(table.getName());
+ while (lastStats == null) {
+ Thread.sleep(1000);
+
+ System.out.println("Checking stats");
+ lastStats = RECORDER.getLastStats();
+ lastIsMajor = RECORDER.getLastIsMajor();
+ }
+ assertFalse(lastIsMajor);
+ assertEquals(lastStats.getTotalDeletesCount(), 10);
+ assertEquals(lastStats.getTotalRowsCount(), 10);
+
+ RECORDER.clear();
+ lastStats = RECORDER.getLastStats();
+ lastIsMajor = RECORDER.getLastIsMajor();
+ connection.getAdmin().majorCompact(table.getName());
+
+ // Must wait for async majorCompaction to complete
+ while (lastStats == null) {
+ Thread.sleep(1000);
+
+ System.out.println("Checking stats");
+ lastStats = RECORDER.getLastStats();
+ lastIsMajor = RECORDER.getLastIsMajor();
+ }
+ assertTrue(lastIsMajor);
+ // no deletes after major compact
+ assertEquals(lastStats.getTotalDeletesCount(), 0);
+ assertEquals(lastStats.getTotalRowsCount(), 10);
+ // can only check largest values after major compact, since the above minor compact might not
+ // contain all storefiles
+ assertEquals(Bytes.toInt(lastStats.getLargestRow()), largestRowNum);
+ assertEquals(
+ Bytes.toInt(lastStats.getLargestCell().getRowArray(),
+ lastStats.getLargestCell().getRowOffset(), lastStats.getLargestCell().getRowLength()),
+ largestCellRowNum);
+ assertEquals(Bytes.toInt(lastStats.getLargestCell().getQualifierArray(),
+ lastStats.getLargestCell().getQualifierOffset(),
+ lastStats.getLargestCell().getQualifierLength()), largestCellColNum);
+ }
+
+ public static class TestableRowStatisticsCompactionObserver
+ extends RowStatisticsCompactionObserver {
+
+ public TestableRowStatisticsCompactionObserver() {
+ super(TestRowStatisticsCompactionObserver.RECORDER);
+ }
+ }
+
+ public static class TestableRowStatisticsRecorder implements RowStatisticsRecorder {
+
+ private volatile RowStatisticsImpl lastStats = null;
+ private volatile Boolean lastIsMajor = null;
+
+ @Override
+ public void record(RowStatisticsImpl stats, Optional fullRegionName) {
+ System.out.println("Record called with isMajor=" + stats.isMajor() + ", stats=" + stats
+ + ", fullRegionName=" + fullRegionName);
+ lastStats = stats;
+ }
+
+ public void clear() {
+ lastStats = null;
+ lastIsMajor = null;
+ }
+
+ public RowStatisticsImpl getLastStats() {
+ return lastStats;
+ }
+
+ public Boolean getLastIsMajor() {
+ return lastIsMajor;
+ }
+ }
+}
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java
new file mode 100644
index 000000000000..e009ceaed5e0
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsEventHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsEventHandler;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferEnvelope;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.ringbuffer.RowStatisticsRingBufferPayload;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.impl.CounterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRowStatisticsEventHandler {
+
+ private static final String REGION_STRING = "REGION_STRING";
+ private static final byte[] FULL_REGION = Bytes.toBytes("FULL_REGION_STRING");
+ private static final String JSON_STRING = "JSON_STRING";
+ private static final RowStatisticsRingBufferEnvelope EVENT =
+ new RowStatisticsRingBufferEnvelope();
+ private static final RowStatistics ROW_STATISTICS = mock(RowStatistics.class);
+ private BufferedMutator bufferedMutator;
+ private Counter failureCounter;
+ private RowStatisticsEventHandler eventHandler;
+
+ @Before
+ public void setup() {
+ bufferedMutator = mock(BufferedMutator.class);
+ failureCounter = new CounterImpl();
+ eventHandler = new RowStatisticsEventHandler(bufferedMutator, failureCounter);
+ when(ROW_STATISTICS.getRegion()).thenReturn(REGION_STRING);
+ when(ROW_STATISTICS.getJsonString()).thenReturn(JSON_STRING);
+ }
+
+ @Test
+ public void itPersistsRowStatistics() throws Exception {
+ EVENT.load(new RowStatisticsRingBufferPayload(ROW_STATISTICS, FULL_REGION));
+ doNothing().when(bufferedMutator).mutate(any(Put.class));
+ eventHandler.onEvent(EVENT, 0L, true);
+ verify(bufferedMutator, times(1)).mutate(any(Put.class));
+ assertEquals(failureCounter.getCount(), 0);
+ }
+
+ @Test
+ public void itDoesNotPublishNullRowStatistics() throws Exception {
+ EVENT.load(null);
+ eventHandler.onEvent(EVENT, 0L, true);
+ verify(bufferedMutator, times(0)).mutate(any(Put.class));
+ assertEquals(failureCounter.getCount(), 0);
+ }
+
+ @Test
+ public void itCountsFailedPersists() throws Exception {
+ EVENT.load(new RowStatisticsRingBufferPayload(ROW_STATISTICS, FULL_REGION));
+ doThrow(new IOException()).when(bufferedMutator).mutate(any(Put.class));
+ eventHandler.onEvent(EVENT, 0L, true);
+ verify(bufferedMutator, times(1)).mutate(any(Put.class));
+ assertEquals(failureCounter.getCount(), 1);
+ }
+}
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java
new file mode 100644
index 000000000000..39a7e36c2022
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/TestRowStatisticsTableRecorder.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats;
+
+import static org.apache.hadoop.hbase.util.TestRegionSplitCalculator.TEST_UTIL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.recorder.RowStatisticsTableRecorder;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.utils.RowStatisticsTableUtil;
+import org.apache.hadoop.hbase.metrics.Counter;
+import org.apache.hadoop.hbase.metrics.impl.CounterImpl;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestRowStatisticsTableRecorder {
+
+ private static final NamespaceDescriptor NAMESPACE_DESCRIPTOR =
+ NamespaceDescriptor.create(RowStatisticsTableUtil.NAMESPACE).build();
+ private static final byte[] FULL_REGION_NAME = Bytes.toBytes("fullRegionName");
+ private static SingleProcessHBaseCluster cluster;
+ private static Connection connection;
+ private RowStatisticsImpl rowStatistics;
+ private Counter counter;
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+ cluster = TEST_UTIL.startMiniCluster(1);
+ connection = ConnectionFactory.createConnection(cluster.getConf());
+ connection.getAdmin().createNamespace(NAMESPACE_DESCRIPTOR);
+ // need this table to write to
+ TEST_UTIL.createTable(RowStatisticsTableUtil.NAMESPACED_TABLE_NAME, RowStatisticsTableUtil.CF);
+ }
+
+ @Before
+ public void setup() {
+ rowStatistics = mock(RowStatisticsImpl.class);
+ counter = new CounterImpl();
+ }
+
+ @Test
+ public void itReturnsNullRecorderOnFailedBufferedMutator() throws IOException {
+ Connection badConnection = mock(Connection.class);
+ Configuration conf = mock(Configuration.class);
+ when(badConnection.getConfiguration()).thenReturn(conf);
+ when(badConnection.getBufferedMutator(any(BufferedMutatorParams.class)))
+ .thenThrow(IOException.class);
+ RowStatisticsTableRecorder recorder =
+ RowStatisticsTableRecorder.forClusterConnection(badConnection, counter, counter);
+ assertNull(recorder);
+ }
+
+ @Test
+ public void itDoesNotIncrementCounterWhenRecordSucceeds() throws IOException {
+ RowStatisticsTableRecorder recorder =
+ RowStatisticsTableRecorder.forClusterConnection(connection, counter, counter);
+ assertNotNull(recorder);
+ recorder.record(rowStatistics, Optional.of(FULL_REGION_NAME));
+ assertEquals(counter.getCount(), 0);
+ }
+
+ @Test
+ public void itIncrementsCounterWhenRecordFails() throws IOException {
+ RowStatisticsTableRecorder recorder =
+ RowStatisticsTableRecorder.forClusterConnection(connection, counter, counter);
+ assertNotNull(recorder);
+ recorder.close();
+ recorder.record(rowStatistics, Optional.of(FULL_REGION_NAME));
+ assertEquals(counter.getCount(), 1);
+ }
+}
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java
new file mode 100644
index 000000000000..de5bc583416a
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/row/stats/sizebucket/TestSizeBucketTracker.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example.row.stats.sizebucket;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.SizeBucket;
+import org.apache.hadoop.hbase.coprocessor.example.row.stats.SizeBucketTracker;
+import org.junit.Test;
+
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+
+public class TestSizeBucketTracker {
+
+ @Test
+ public void itUpdatesSizeBuckets() {
+ SizeBucketTracker sizeBucketTracker = new SizeBucketTracker();
+ SizeBucket[] sizeBuckets = SizeBucket.values();
+
+ // Initialize
+ Map bucketToCount = sizeBucketTracker.toMap();
+ for (SizeBucket sizeBucket : SizeBucket.values()) {
+ assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 0L);
+ }
+
+ // minBytes
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ sizeBucketTracker.add(sizeBucket.minBytes());
+ }
+ bucketToCount = sizeBucketTracker.toMap();
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 1L);
+ }
+
+ // maxBytes - 1
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ sizeBucketTracker.add(sizeBucket.maxBytes() - 1);
+ }
+ bucketToCount = sizeBucketTracker.toMap();
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ assertEquals((long) bucketToCount.get(sizeBucket.bucket()), 2L);
+ }
+
+ // maxBytes
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ sizeBucketTracker.add(sizeBucket.maxBytes());
+ }
+ bucketToCount = sizeBucketTracker.toMap();
+ for (int i = 0; i < sizeBuckets.length - 1; i++) {
+ SizeBucket currBucket = sizeBuckets[i];
+ if (currBucket == SizeBucket.KILOBYTES_1) {
+ assertEquals((long) bucketToCount.get(currBucket.bucket()), 2L);
+ } else {
+ SizeBucket nextBucket = sizeBuckets[i + 1];
+ if (nextBucket == SizeBucket.KILOBYTES_MAX) {
+ assertEquals((long) bucketToCount.get(nextBucket.bucket()), 4L);
+ } else {
+ assertEquals((long) bucketToCount.get(nextBucket.bucket()), 3L);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void itCreatesJson() {
+ SizeBucketTracker sizeBucketTracker = new SizeBucketTracker();
+ SizeBucket[] sizeBuckets = SizeBucket.values();
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ sizeBucketTracker.add(sizeBucket.minBytes());
+ }
+ JsonObject mapJson = sizeBucketTracker.toJsonObject();
+ for (SizeBucket sizeBucket : sizeBuckets) {
+ Number count = mapJson.get(sizeBucket.bucket()).getAsNumber();
+ assertEquals(count.longValue(), 1L);
+ }
+ }
+}
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
index afd4a544cfa1..70d0a09367b3 100644
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMapReduceExamples.java
@@ -163,7 +163,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
@Test
public void testMainIndexBuilder() throws Exception {
PrintStream oldPrintStream = System.err;
- SecurityManager SECURITY_MANAGER = System.getSecurityManager();
LauncherSecurityManager newSecurityManager = new LauncherSecurityManager();
System.setSecurityManager(newSecurityManager);
ByteArrayOutputStream data = new ByteArrayOutputStream();
@@ -182,7 +181,6 @@ public void testMainIndexBuilder() throws Exception {
}
} finally {
System.setErr(oldPrintStream);
- System.setSecurityManager(SECURITY_MANAGER);
}
}
}