From 509ae573f3c87b0136f012964795a7a7f30a9587 Mon Sep 17 00:00:00 2001 From: Ralph Goers Date: Mon, 27 Mar 2023 11:21:19 -0700 Subject: [PATCH] Remove hbase sink. Fix checkstyle issues --- .../flume/sink/hive/TestHiveWriter.java | 11 +- .../org/apache/flume/sink/hive/TestUtil.java | 11 +- flume-ng-sinks/flume-ng-hbase-sink/pom.xml | 209 ----- .../flume/sink/hbase/AsyncHBaseSink.java | 736 ----------------- .../sink/hbase/AsyncHbaseEventSerializer.java | 77 -- .../apache/flume/sink/hbase/BatchAware.java | 28 - .../apache/flume/sink/hbase/HBaseSink.java | 569 -------------- .../HBaseSinkConfigurationConstants.java | 83 -- .../flume/sink/hbase/HBaseVersionCheck.java | 43 - .../sink/hbase/HbaseEventSerializer.java | 61 -- .../sink/hbase/RegexHbaseEventSerializer.java | 215 ----- .../SimpleAsyncHbaseEventSerializer.java | 148 ---- .../hbase/SimpleHbaseEventSerializer.java | 146 ---- .../sink/hbase/SimpleRowKeyGenerator.java | 46 -- .../hbase/IncrementAsyncHBaseSerializer.java | 78 -- .../sink/hbase/IncrementHBaseSerializer.java | 83 -- .../hbase/MockSimpleHbaseEventSerializer.java | 38 - .../flume/sink/hbase/TestAsyncHBaseSink.java | 618 --------------- .../TestAsyncHBaseSinkConfiguration.java | 107 --- .../flume/sink/hbase/TestHBaseSink.java | 744 ------------------ .../sink/hbase/TestHBaseSinkCreation.java | 50 -- .../hbase/TestRegexHbaseEventSerializer.java | 232 ------ flume-ng-sinks/pom.xml | 1 - flume-parent/pom.xml | 2 +- 24 files changed, 11 insertions(+), 4325 deletions(-) delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/pom.xml delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSinkCreation.java delete mode 100644 flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java index 42b1fb3612..1e8ce1fec7 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java @@ -19,6 +19,11 @@ package org.apache.flume.sink.hive; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + import com.google.common.util.concurrent.ThreadFactoryBuilder; import junit.framework.Assert; import org.apache.flume.Context; @@ -26,7 +31,6 @@ import org.apache.flume.instrumentation.SinkCounter; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; @@ -36,11 +40,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - public class TestHiveWriter { static final String dbName = "testing"; static final String tblName = "alerts"; diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java index a3a8a2042d..35fe708ab4 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java @@ -17,7 +17,6 @@ * under the License. */ - package org.apache.flume.sink.hive; import org.apache.hadoop.fs.FileStatus; @@ -57,14 +56,14 @@ public static void setConfValues(HiveConf conf) { conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); conf.set("fs.raw.impl", RawFileSystem.class.getName()); - try{ + try { conf.setBoolVar(HiveConf.ConfVars.METASTORE_SCHEMA_VERIFICATION, false ); conf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:;databaseName=metastore_db;create=true"); conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver"); conf.setBoolVar(HiveConf.ConfVars.METASTORE_AUTO_CREATE_ALL, true); conf.setIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT, 0); conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, System.getProperty("java.io.tmpdir")); - }catch (Throwable t){ + } catch (Throwable t) { t.printStackTrace(); } @@ -227,9 +226,9 @@ public FileStatus getFileStatus(Path path) throws IOException { private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { int retryCount = 1; // # of times to retry if first attempt fails for (int attempt = 0; attempt <= retryCount; ++attempt) { - driver.run(sql); - continue; - } // for + driver.run(sql); + continue; + } return false; } diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml deleted file mode 100644 index 24049d30eb..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml +++ /dev/null @@ -1,209 +0,0 @@ - - - - 4.0.0 - - flume-ng-sinks - org.apache.flume - 1.11.1-SNAPSHOT - - org.apache.flume.flume-ng-sinks - flume-ng-hbase-sink - Flume NG HBase Sink - - - - 24 - 16 - org.apache.flume.sink.hbase - - - - - - org.apache.flume - flume-ng-sdk - - - - org.apache.flume - flume-ng-core - - - - org.apache.flume - flume-ng-configuration - - - - org.slf4j - slf4j-api - - - - com.google.guava - guava - - - - org.hbase - asynchbase - - - - org.apache.logging.log4j - log4j-slf4j-impl - test - - - - org.apache.logging.log4j - log4j-1.2-api - test - - - - junit - junit - test - - - - org.apache.hadoop - hadoop-common - true - - - - commons-io - commons-io - test - - - - commons-lang - commons-lang - - - - org.mockito - mockito-all - test - - - - org.apache.flume.flume-ng-sinks - flume-hdfs-sink - - - - org.apache.hadoop - hadoop-minicluster - - - - org.apache.hbase - hbase-client - true - - - - org.apache.hbase - hbase-annotations - - - - - - org.apache.hbase - hbase-client - tests - test - - - - org.apache.hbase - hbase-annotations - - - - - - - org.apache.hbase - hbase-server - test - - - - org.apache.hbase - hbase-annotations - - - - - - org.apache.hbase - hbase-server - tests - test - - - - org.apache.hbase - hbase-annotations - - - - - - - - org.apache.hbase - hbase-common - true - - - - org.apache.hbase - hbase-annotations - - - - - - org.apache.hbase - hbase-testing-util - test - - - - org.apache.hbase - hbase-annotations - - - - - - org.apache.zookeeper - zookeeper - test - - - - - diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java deleted file mode 100644 index bd0efa9901..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java +++ /dev/null @@ -1,736 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; -import com.google.common.primitives.UnsignedBytes; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.stumbleupon.async.Callback; -import org.apache.flume.Channel; -import org.apache.flume.ChannelException; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.Transaction; -import org.apache.flume.conf.BatchSizeSupported; -import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurationException; -import org.apache.flume.instrumentation.SinkCounter; -import org.apache.flume.sink.AbstractSink; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.hbase.async.AtomicIncrementRequest; -import org.hbase.async.Config; -import org.hbase.async.HBaseClient; -import org.hbase.async.PutRequest; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * A simple sink which reads events from a channel and writes them to HBase. - * This Sink uses an asynchronous API internally and is likely to - * perform better. - * The Hbase configuration is picked up from the first hbase-site.xml - * encountered in the classpath. This sink supports batch reading of - * events from the channel, and writing them to Hbase, to minimize the number - * of flushes on the hbase tables. To use this sink, it has to be configured - * with certain mandatory parameters:

- *

- * table: The name of the table in Hbase to write to.

- * columnFamily: The column family in Hbase to write to.

- * Other optional parameters are:

- * serializer: A class implementing - * {@link AsyncHbaseEventSerializer}. - * An instance of - * this class will be used to serialize events which are written to hbase.

- * serializer.*: Passed in the configure() method to - * serializer - * as an object of {@link org.apache.flume.Context}.

- * batchSize: This is the batch size used by the client. This is the - * maximum number of events the sink will commit per transaction. The default - * batch size is 100 events. - *

- * timeout: The length of time in milliseconds the sink waits for - * callbacks from hbase for all events in a transaction. - * If no timeout is specified, the sink will wait forever.

- *

- * Note: Hbase does not guarantee atomic commits on multiple - * rows. So if a subset of events in a batch are written to disk by Hbase and - * Hbase fails, the flume transaction is rolled back, causing flume to write - * all the events in the transaction all over again, which will cause - * duplicates. The serializer is expected to take care of the handling of - * duplicates etc. HBase also does not support batch increments, so if - * multiple increments are returned by the serializer, then HBase failure - * will cause them to be re-written, when HBase comes back up. - */ -public class AsyncHBaseSink extends AbstractSink implements Configurable, BatchSizeSupported { - - private String tableName; - private byte[] columnFamily; - private long batchSize; - private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class); - private AsyncHbaseEventSerializer serializer; - - @VisibleForTesting - Config asyncClientConfig; - private String eventSerializerType; - private Context serializerContext; - private HBaseClient client; - private Configuration conf; - private Transaction txn; - private volatile boolean open = false; - private SinkCounter sinkCounter; - private long timeout; - private String zkQuorum; - private String zkBaseDir; - private ExecutorService sinkCallbackPool; - private boolean isTimeoutTest; - private boolean isCoalesceTest; - private boolean enableWal = true; - private boolean batchIncrements = false; - private volatile int totalCallbacksReceived = 0; - private int maxConsecutiveFails; - private Map incrementBuffer; - // The HBaseClient buffers the requests until a callback is received. In the event of a - // timeout, there is no way to clear these buffers. If there is a major cluster issue, this - // buffer can become too big and cause crashes. So if we hit a fixed number of HBase write - // failures/timeouts, then close the HBase Client (gracefully or not) and force a GC to get rid - // of the buffered data. - private int consecutiveHBaseFailures = 0; - private boolean lastTxnFailed = false; - - // Does not need to be thread-safe. Always called only from the sink's - // process method. - private final Comparator COMPARATOR = UnsignedBytes.lexicographicalComparator(); - - public AsyncHBaseSink() { - this(null); - } - - public AsyncHBaseSink(Configuration conf) { - this(conf, false, false); - } - - @VisibleForTesting - AsyncHBaseSink(Configuration conf, boolean isTimeoutTest, - boolean isCoalesceTest) { - this.conf = conf; - this.isTimeoutTest = isTimeoutTest; - this.isCoalesceTest = isCoalesceTest; - } - - @Override - public Status process() throws EventDeliveryException { - /* - * Reference to the boolean representing failure of the current transaction. - * Since each txn gets a new boolean, failure of one txn will not affect - * the next even if errbacks for the current txn get called while - * the next one is being processed. - * - */ - if (!open) { - throw new EventDeliveryException("Sink was never opened. " + - "Please fix the configuration."); - } - if (client == null) { - client = initHBaseClient(); - if (client == null) { - throw new EventDeliveryException("Could not establish connection to HBase!"); - } - } - AtomicBoolean txnFail = new AtomicBoolean(false); - AtomicInteger callbacksReceived = new AtomicInteger(0); - AtomicInteger callbacksExpected = new AtomicInteger(0); - final Lock lock = new ReentrantLock(); - final Condition condition = lock.newCondition(); - if (incrementBuffer != null) { - incrementBuffer.clear(); - } - /* - * Callbacks can be reused per transaction, since they share the same - * locks and conditions. - */ - Callback putSuccessCallback = - new SuccessCallback( - lock, callbacksReceived, condition); - Callback putFailureCallback = - new FailureCallback( - lock, callbacksReceived, txnFail, condition); - - Callback incrementSuccessCallback = - new SuccessCallback( - lock, callbacksReceived, condition); - Callback incrementFailureCallback = - new FailureCallback( - lock, callbacksReceived, txnFail, condition); - - Status status = Status.READY; - Channel channel = getChannel(); - txn = channel.getTransaction(); - txn.begin(); - - int i = 0; - try { - for (; i < batchSize; i++) { - Event event = channel.take(); - if (event == null) { - status = Status.BACKOFF; - if (i == 0) { - sinkCounter.incrementBatchEmptyCount(); - } else { - sinkCounter.incrementBatchUnderflowCount(); - } - break; - } else { - serializer.setEvent(event); - List actions = serializer.getActions(); - List increments = serializer.getIncrements(); - callbacksExpected.addAndGet(actions.size()); - if (!batchIncrements) { - callbacksExpected.addAndGet(increments.size()); - } - - for (PutRequest action : actions) { - action.setDurable(enableWal); - client.put(action).addCallbacks(putSuccessCallback, putFailureCallback); - } - for (AtomicIncrementRequest increment : increments) { - if (batchIncrements) { - CellIdentifier identifier = new CellIdentifier(increment.key(), - increment.qualifier()); - AtomicIncrementRequest request - = incrementBuffer.get(identifier); - if (request == null) { - incrementBuffer.put(identifier, increment); - } else { - request.setAmount(request.getAmount() + increment.getAmount()); - } - } else { - client.atomicIncrement(increment).addCallbacks( - incrementSuccessCallback, incrementFailureCallback); - } - } - } - } - if (batchIncrements) { - Collection increments = incrementBuffer.values(); - for (AtomicIncrementRequest increment : increments) { - client.atomicIncrement(increment).addCallbacks( - incrementSuccessCallback, incrementFailureCallback); - } - callbacksExpected.addAndGet(increments.size()); - } - client.flush(); - } catch (Throwable e) { - this.handleTransactionFailure(txn); - this.checkIfChannelExceptionAndThrow(e); - } - if (i == batchSize) { - sinkCounter.incrementBatchCompleteCount(); - } - sinkCounter.addToEventDrainAttemptCount(i); - - lock.lock(); - long startTime = System.nanoTime(); - long timeRemaining; - try { - while ((callbacksReceived.get() < callbacksExpected.get()) - && !txnFail.get()) { - timeRemaining = timeout - (System.nanoTime() - startTime); - timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0; - try { - if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) { - txnFail.set(true); - logger.warn("HBase callbacks timed out. " - + "Transaction will be rolled back."); - } - } catch (Exception ex) { - logger.error("Exception while waiting for callbacks from HBase."); - this.handleTransactionFailure(txn); - Throwables.propagate(ex); - } - } - } finally { - lock.unlock(); - } - - if (isCoalesceTest) { - totalCallbacksReceived += callbacksReceived.get(); - } - - /* - * At this point, either the txn has failed - * or all callbacks received and txn is successful. - * - * This need not be in the monitor, since all callbacks for this txn - * have been received. So txnFail will not be modified any more(even if - * it is, it is set from true to true only - false happens only - * in the next process call). - * - */ - if (txnFail.get()) { - // We enter this if condition only if the failure was due to HBase failure, so we make sure - // we track the consecutive failures. - if (lastTxnFailed) { - consecutiveHBaseFailures++; - } - lastTxnFailed = true; - this.handleTransactionFailure(txn); - throw new EventDeliveryException("Could not write events to Hbase. " + - "Transaction failed, and rolled back."); - } else { - try { - lastTxnFailed = false; - consecutiveHBaseFailures = 0; - txn.commit(); - txn.close(); - sinkCounter.addToEventDrainSuccessCount(i); - } catch (Throwable e) { - this.handleTransactionFailure(txn); - this.checkIfChannelExceptionAndThrow(e); - } - } - - return status; - } - - @Override - public void configure(Context context) { - if (!HBaseVersionCheck.hasVersionLessThan2(logger)) { - throw new ConfigurationException( - "HBase major version number must be less than 2 for asynchbase sink. "); - } - tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE); - String cf = context.getString( - HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY); - batchSize = context.getLong( - HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, new Long(100)); - serializerContext = new Context(); - //If not specified, will use HBase defaults. - eventSerializerType = context.getString( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER); - Preconditions.checkNotNull(tableName, - "Table name cannot be empty, please specify in configuration file"); - Preconditions.checkNotNull(cf, - "Column family cannot be empty, please specify in configuration file"); - //Check foe event serializer, if null set event serializer type - if (eventSerializerType == null || eventSerializerType.isEmpty()) { - eventSerializerType = - "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"; - logger.info("No serializer defined, Will use default"); - } - serializerContext.putAll(context.getSubProperties( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); - columnFamily = cf.getBytes(Charsets.UTF_8); - try { - @SuppressWarnings("unchecked") - Class clazz = - (Class) - Class.forName(eventSerializerType); - serializer = clazz.newInstance(); - serializer.configure(serializerContext); - serializer.initialize(tableName.getBytes(Charsets.UTF_8), columnFamily); - } catch (Exception e) { - logger.error("Could not instantiate event serializer.", e); - Throwables.propagate(e); - } - - if (sinkCounter == null) { - sinkCounter = new SinkCounter(this.getName()); - } - timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT, - HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT); - if (timeout <= 0) { - logger.warn("Timeout should be positive for Hbase sink. " - + "Sink will not timeout."); - timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT; - } - //Convert to nanos. - timeout = TimeUnit.MILLISECONDS.toNanos(timeout); - - zkQuorum = context.getString( - HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim(); - if (!zkQuorum.isEmpty()) { - zkBaseDir = context.getString( - HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT); - } else { - if (conf == null) { //In tests, we pass the conf in. - conf = HBaseConfiguration.create(); - } - zkQuorum = ZKConfig.getZKQuorumServersString(conf); - zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT); - } - Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(), - "The Zookeeper quorum cannot be null and should be specified."); - - enableWal = context.getBoolean(HBaseSinkConfigurationConstants - .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); - logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); - if (!enableWal) { - logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " + - "All writes to HBase will have WAL disabled, and any data in the " + - "memstore of this region in the Region Server could be lost!"); - } - - batchIncrements = context.getBoolean( - HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); - - if (batchIncrements) { - incrementBuffer = Maps.newHashMap(); - logger.info("Increment coalescing is enabled. Increments will be " + - "buffered."); - } - - maxConsecutiveFails = - context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS, - HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS); - - - Map asyncProperties - = context.getSubProperties(HBaseSinkConfigurationConstants.ASYNC_PREFIX); - asyncClientConfig = new Config(); - asyncClientConfig.overrideConfig( - HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, zkQuorum - ); - asyncClientConfig.overrideConfig( - HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, zkBaseDir - ); - for (String property: asyncProperties.keySet()) { - asyncClientConfig.overrideConfig(property, asyncProperties.get(property)); - } - } - - @VisibleForTesting - int getTotalCallbacksReceived() { - return totalCallbacksReceived; - } - - @VisibleForTesting - boolean isConfNull() { - return conf == null; - } - - @Override - public long getBatchSize() { - return batchSize; - } - - @Override - public void start() { - Preconditions.checkArgument(client == null, "Please call stop " - + "before calling start on an old instance."); - sinkCounter.start(); - sinkCounter.incrementConnectionCreatedCount(); - client = initHBaseClient(); - super.start(); - } - - private HBaseClient initHBaseClient() { - logger.info("Initializing HBase Client"); - - sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat(this.getName() + " HBase Call Pool").build()); - logger.info("Callback pool created"); - client = new HBaseClient(asyncClientConfig, - new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool)); - - final CountDownLatch latch = new CountDownLatch(1); - final AtomicBoolean fail = new AtomicBoolean(false); - client.ensureTableFamilyExists( - tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks( - new Callback() { - @Override - public Object call(Object arg) throws Exception { - latch.countDown(); - logger.info("table found"); - return null; - } - }, - new Callback() { - @Override - public Object call(Object arg) throws Exception { - fail.set(true); - latch.countDown(); - return null; - } - }); - - try { - logger.info("waiting on callback"); - latch.await(); - logger.info("callback received"); - } catch (InterruptedException e) { - sinkCounter.incrementConnectionFailedCount(); - throw new FlumeException( - "Interrupted while waiting for Hbase Callbacks", e); - } - if (fail.get()) { - sinkCounter.incrementConnectionFailedCount(); - if (client != null) { - shutdownHBaseClient(); - } - throw new FlumeException( - "Could not start sink. " + - "Table or column family does not exist in Hbase."); - } else { - open = true; - } - client.setFlushInterval((short) 0); - return client; - } - - @Override - public void stop() { - serializer.cleanUp(); - if (client != null) { - shutdownHBaseClient(); - } - sinkCounter.incrementConnectionClosedCount(); - sinkCounter.stop(); - - try { - if (sinkCallbackPool != null) { - sinkCallbackPool.shutdown(); - if (!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) { - sinkCallbackPool.shutdownNow(); - } - } - } catch (InterruptedException e) { - logger.error("Interrupted while waiting for asynchbase sink pool to " + - "die", e); - if (sinkCallbackPool != null) { - sinkCallbackPool.shutdownNow(); - } - } - sinkCallbackPool = null; - client = null; - conf = null; - open = false; - super.stop(); - } - - private void shutdownHBaseClient() { - logger.info("Shutting down HBase Client"); - final CountDownLatch waiter = new CountDownLatch(1); - try { - client.shutdown().addCallback(new Callback() { - @Override - public Object call(Object arg) throws Exception { - waiter.countDown(); - return null; - } - }).addErrback(new Callback() { - @Override - public Object call(Object arg) throws Exception { - logger.error("Failed to shutdown HBase client cleanly! HBase cluster might be down"); - waiter.countDown(); - return null; - } - }); - if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) { - logger.error("HBase connection could not be closed within timeout! HBase cluster might " + - "be down!"); - } - } catch (Exception ex) { - logger.warn("Error while attempting to close connections to HBase"); - } finally { - // Dereference the client to force GC to clear up any buffered requests. - client = null; - } - } - - private void handleTransactionFailure(Transaction txn) - throws EventDeliveryException { - if (maxConsecutiveFails > 0 && consecutiveHBaseFailures >= maxConsecutiveFails) { - if (client != null) { - shutdownHBaseClient(); - } - consecutiveHBaseFailures = 0; - } - try { - txn.rollback(); - } catch (Throwable e) { - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - if (e instanceof Error || e instanceof RuntimeException) { - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - Throwables.propagate(e); - } else { - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - throw new EventDeliveryException("Failed to commit transaction." + - "Transaction rolled back.", e); - } - } finally { - txn.close(); - } - } - - private class SuccessCallback implements Callback { - private Lock lock; - private AtomicInteger callbacksReceived; - private Condition condition; - private final boolean isTimeoutTesting; - - public SuccessCallback(Lock lck, AtomicInteger callbacksReceived, - Condition condition) { - lock = lck; - this.callbacksReceived = callbacksReceived; - this.condition = condition; - isTimeoutTesting = isTimeoutTest; - } - - @Override - public R call(T arg) throws Exception { - if (isTimeoutTesting) { - try { - //tests set timeout to 10 seconds, so sleep for 4 seconds - TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4)); - } catch (InterruptedException e) { - //ignore - } - } - doCall(); - return null; - } - - private void doCall() throws Exception { - callbacksReceived.incrementAndGet(); - lock.lock(); - try { - condition.signal(); - } finally { - lock.unlock(); - } - } - } - - private class FailureCallback implements Callback { - private Lock lock; - private AtomicInteger callbacksReceived; - private AtomicBoolean txnFail; - private Condition condition; - private final boolean isTimeoutTesting; - - public FailureCallback(Lock lck, AtomicInteger callbacksReceived, - AtomicBoolean txnFail, Condition condition) { - this.lock = lck; - this.callbacksReceived = callbacksReceived; - this.txnFail = txnFail; - this.condition = condition; - isTimeoutTesting = isTimeoutTest; - } - - @Override - public R call(T arg) throws Exception { - logger.error("failure callback:", arg); - if (isTimeoutTesting) { - //tests set timeout to 10 seconds, so sleep for 4 seconds - try { - TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4)); - } catch (InterruptedException e) { - //ignore - } - } - doCall(); - return null; - } - - private void doCall() throws Exception { - callbacksReceived.incrementAndGet(); - this.txnFail.set(true); - lock.lock(); - try { - condition.signal(); - } finally { - lock.unlock(); - } - } - } - - private void checkIfChannelExceptionAndThrow(Throwable e) - throws EventDeliveryException { - if (e instanceof ChannelException) { - throw new EventDeliveryException("Error in processing transaction.", e); - } else if (e instanceof Error || e instanceof RuntimeException) { - Throwables.propagate(e); - } - throw new EventDeliveryException("Error in processing transaction.", e); - } - - private class CellIdentifier { - private final byte[] row; - private final byte[] column; - private final int hashCode; - - // Since the sink operates only on one table and one cf, - // we use the data from the owning sink - public CellIdentifier(byte[] row, byte[] column) { - this.row = row; - this.column = column; - this.hashCode = - (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31); - } - - @Override - public int hashCode() { - return hashCode; - } - - // Since we know that this class is used from only this class, - // skip the class comparison to save time - @Override - public boolean equals(Object other) { - CellIdentifier o = (CellIdentifier) other; - if (other == null) { - return false; - } else { - return (COMPARATOR.compare(row, o.row) == 0 - && COMPARATOR.compare(column, o.column) == 0); - } - } - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java deleted file mode 100644 index 481fce8f89..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import java.util.List; - -import org.apache.flume.Event; -import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurableComponent; -import org.hbase.async.AtomicIncrementRequest; -import org.hbase.async.PutRequest; - -/** - * Interface for an event serializer which serializes the headers and body - * of an event to write them to hbase. This is configurable, so any config - * params required should be taken through this. - * The table should be valid on the column family. An implementation - * of this interface is expected by the {@linkplain AsyncHBaseSink} to serialize - * the events. - */ -public interface AsyncHbaseEventSerializer extends Configurable, ConfigurableComponent { - - /** - * Initialize the event serializer. - * @param table - The table the serializer should use when creating - * {@link org.hbase.async.PutRequest} or - * {@link org.hbase.async.AtomicIncrementRequest}. - * @param cf - The column family to be used. - */ - public void initialize(byte[] table, byte[] cf); - - /** - * @param event Event to be written to HBase - */ - public void setEvent(Event event); - - /** - * Get the actions that should be written out to hbase as a result of this - * event. This list is written to hbase. - * @return List of {@link org.hbase.async.PutRequest} which - * are written as such to HBase. - * - * - */ - public List getActions(); - - /** - * Get the increments that should be made in hbase as a result of this - * event. This list is written to hbase. - * @return List of {@link org.hbase.async.AtomicIncrementRequest} which - * are written as such to HBase. - * - * - */ - public List getIncrements(); - - /** - * Clean up any state. This will be called when the sink is being stopped. - */ - public void cleanUp(); -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java deleted file mode 100644 index 0974241db3..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -/** - * This interface allows for implementing HBase serializers that are aware of - * batching. {@link #onBatchStart()} is called at the beginning of each batch - * by the sink. - */ -public interface BatchAware { - public void onBatchStart(); -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java deleted file mode 100644 index 9b9bce9675..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java +++ /dev/null @@ -1,569 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.Transaction; -import org.apache.flume.annotations.InterfaceAudience; -import org.apache.flume.auth.FlumeAuthenticationUtil; -import org.apache.flume.auth.PrivilegedExecutor; -import org.apache.flume.conf.BatchSizeSupported; -import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurationException; -import org.apache.flume.instrumentation.SinkCounter; -import org.apache.flume.sink.AbstractSink; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.util.Bytes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.security.PrivilegedExceptionAction; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; - -/** - * A simple sink which reads events from a channel and writes them to HBase. - * The Hbase configuration is picked up from the first hbase-site.xml - * encountered in the classpath. This sink supports batch reading of - * events from the channel, and writing them to Hbase, to minimize the number - * of flushes on the hbase tables. To use this sink, it has to be configured - * with certain mandatory parameters:

- * table: The name of the table in Hbase to write to.

- * columnFamily: The column family in Hbase to write to.

- * This sink will commit each transaction if the table's write buffer size is - * reached or if the number of events in the current transaction reaches the - * batch size, whichever comes first.

- * Other optional parameters are:

- * serializer: A class implementing {@link HbaseEventSerializer}. - * An instance of - * this class will be used to write out events to hbase.

- * serializer.*: Passed in the configure() method to serializer - * as an object of {@link org.apache.flume.Context}.

- * batchSize: This is the batch size used by the client. This is the - * maximum number of events the sink will commit per transaction. The default - * batch size is 100 events. - *

- *

- * Note: While this sink flushes all events in a transaction - * to HBase in one shot, Hbase does not guarantee atomic commits on multiple - * rows. So if a subset of events in a batch are written to disk by Hbase and - * Hbase fails, the flume transaction is rolled back, causing flume to write - * all the events in the transaction all over again, which will cause - * duplicates. The serializer is expected to take care of the handling of - * duplicates etc. HBase also does not support batch increments, so if - * multiple increments are returned by the serializer, then HBase failure - * will cause them to be re-written, when HBase comes back up. - */ -public class HBaseSink extends AbstractSink implements Configurable, BatchSizeSupported { - private String tableName; - private byte[] columnFamily; - private HTable table; - private long batchSize; - private Configuration config; - private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class); - private HbaseEventSerializer serializer; - private String eventSerializerType; - private Context serializerContext; - private String kerberosPrincipal; - private String kerberosKeytab; - private boolean enableWal = true; - private boolean batchIncrements = false; - private Method refGetFamilyMap = null; - private SinkCounter sinkCounter; - private PrivilegedExecutor privilegedExecutor; - - // Internal hooks used for unit testing. - private DebugIncrementsCallback debugIncrCallback = null; - - public HBaseSink() { - this(HBaseConfiguration.create()); - } - - public HBaseSink(Configuration conf) { - this.config = conf; - } - - @VisibleForTesting - @InterfaceAudience.Private - HBaseSink(Configuration conf, DebugIncrementsCallback cb) { - this(conf); - this.debugIncrCallback = cb; - } - - @Override - public void start() { - Preconditions.checkArgument(table == null, "Please call stop " + - "before calling start on an old instance."); - try { - privilegedExecutor = - FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab); - } catch (Exception ex) { - sinkCounter.incrementConnectionFailedCount(); - throw new FlumeException("Failed to login to HBase using " - + "provided credentials.", ex); - } - try { - table = privilegedExecutor.execute(new PrivilegedExceptionAction() { - @Override - public HTable run() throws Exception { - HTable table = new HTable(config, tableName); - table.setAutoFlush(false); - // Flush is controlled by us. This ensures that HBase changing - // their criteria for flushing does not change how we flush. - return table; - } - }); - } catch (Exception e) { - sinkCounter.incrementConnectionFailedCount(); - logger.error("Could not load table, " + tableName + - " from HBase", e); - throw new FlumeException("Could not load table, " + tableName + - " from HBase", e); - } - try { - if (!privilegedExecutor.execute(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws IOException { - return table.getTableDescriptor().hasFamily(columnFamily); - } - })) { - throw new IOException("Table " + tableName - + " has no such column family " + Bytes.toString(columnFamily)); - } - } catch (Exception e) { - //Get getTableDescriptor also throws IOException, so catch the IOException - //thrown above or by the getTableDescriptor() call. - sinkCounter.incrementConnectionFailedCount(); - throw new FlumeException("Error getting column family from HBase." - + "Please verify that the table " + tableName + " and Column Family, " - + Bytes.toString(columnFamily) + " exists in HBase, and the" - + " current user has permissions to access that table.", e); - } - - super.start(); - sinkCounter.incrementConnectionCreatedCount(); - sinkCounter.start(); - } - - @Override - public void stop() { - try { - if (table != null) { - table.close(); - } - table = null; - } catch (IOException e) { - throw new FlumeException("Error closing table.", e); - } - sinkCounter.incrementConnectionClosedCount(); - sinkCounter.stop(); - } - - @SuppressWarnings("unchecked") - @Override - public void configure(Context context) { - if (!HBaseVersionCheck.hasVersionLessThan2(logger)) { - throw new ConfigurationException( - "HBase major version number must be less than 2 for hbase-sink."); - } - tableName = context.getString(HBaseSinkConfigurationConstants.CONFIG_TABLE); - String cf = context.getString( - HBaseSinkConfigurationConstants.CONFIG_COLUMN_FAMILY); - batchSize = context.getLong( - HBaseSinkConfigurationConstants.CONFIG_BATCHSIZE, new Long(100)); - serializerContext = new Context(); - //If not specified, will use HBase defaults. - eventSerializerType = context.getString( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER); - Preconditions.checkNotNull(tableName, - "Table name cannot be empty, please specify in configuration file"); - Preconditions.checkNotNull(cf, - "Column family cannot be empty, please specify in configuration file"); - //Check foe event serializer, if null set event serializer type - if (eventSerializerType == null || eventSerializerType.isEmpty()) { - eventSerializerType = - "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer"; - logger.info("No serializer defined, Will use default"); - } - serializerContext.putAll(context.getSubProperties( - HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX)); - columnFamily = cf.getBytes(Charsets.UTF_8); - try { - Class clazz = - (Class) - Class.forName(eventSerializerType); - serializer = clazz.newInstance(); - serializer.configure(serializerContext); - } catch (Exception e) { - logger.error("Could not instantiate event serializer.", e); - Throwables.propagate(e); - } - kerberosKeytab = context.getString(HBaseSinkConfigurationConstants.CONFIG_KEYTAB); - kerberosPrincipal = context.getString(HBaseSinkConfigurationConstants.CONFIG_PRINCIPAL); - - enableWal = context.getBoolean(HBaseSinkConfigurationConstants - .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL); - logger.info("The write to WAL option is set to: " + String.valueOf(enableWal)); - if (!enableWal) { - logger.warn("HBase Sink's enableWal configuration is set to false. All " + - "writes to HBase will have WAL disabled, and any data in the " + - "memstore of this region in the Region Server could be lost!"); - } - - batchIncrements = context.getBoolean( - HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS); - - if (batchIncrements) { - logger.info("Increment coalescing is enabled. Increments will be " + - "buffered."); - refGetFamilyMap = reflectLookupGetFamilyMap(); - } - - String zkQuorum = context.getString(HBaseSinkConfigurationConstants - .ZK_QUORUM); - Integer port = null; - /** - * HBase allows multiple nodes in the quorum, but all need to use the - * same client port. So get the nodes in host:port format, - * and ignore the ports for all nodes except the first one. If no port is - * specified, use default. - */ - if (zkQuorum != null && !zkQuorum.isEmpty()) { - StringBuilder zkBuilder = new StringBuilder(); - logger.info("Using ZK Quorum: " + zkQuorum); - String[] zkHosts = zkQuorum.split(","); - int length = zkHosts.length; - for (int i = 0; i < length; i++) { - String[] zkHostAndPort = zkHosts[i].split(":"); - zkBuilder.append(zkHostAndPort[0].trim()); - if (i != length - 1) { - zkBuilder.append(","); - } else { - zkQuorum = zkBuilder.toString(); - } - if (zkHostAndPort[1] == null) { - throw new FlumeException("Expected client port for the ZK node!"); - } - if (port == null) { - port = Integer.parseInt(zkHostAndPort[1].trim()); - } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) { - throw new FlumeException("All Zookeeper nodes in the quorum must " + - "use the same client port."); - } - } - if (port == null) { - port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT; - } - this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port); - } - String hbaseZnode = context.getString( - HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT); - if (hbaseZnode != null && !hbaseZnode.isEmpty()) { - this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode); - } - sinkCounter = new SinkCounter(this.getName()); - } - - public Configuration getConfig() { - return config; - } - - @Override - public Status process() throws EventDeliveryException { - Status status = Status.READY; - Channel channel = getChannel(); - Transaction txn = channel.getTransaction(); - List actions = new LinkedList(); - List incs = new LinkedList(); - try { - txn.begin(); - - if (serializer instanceof BatchAware) { - ((BatchAware) serializer).onBatchStart(); - } - - long i = 0; - for (; i < batchSize; i++) { - Event event = channel.take(); - if (event == null) { - if (i == 0) { - status = Status.BACKOFF; - sinkCounter.incrementBatchEmptyCount(); - } else { - sinkCounter.incrementBatchUnderflowCount(); - } - break; - } else { - serializer.initialize(event, columnFamily); - actions.addAll(serializer.getActions()); - incs.addAll(serializer.getIncrements()); - } - } - if (i == batchSize) { - sinkCounter.incrementBatchCompleteCount(); - } - sinkCounter.addToEventDrainAttemptCount(i); - - putEventsAndCommit(actions, incs, txn); - - } catch (Throwable e) { - try { - txn.rollback(); - } catch (Exception e2) { - logger.error("Exception in rollback. Rollback might not have been " + - "successful.", e2); - } - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - if (e instanceof Error || e instanceof RuntimeException) { - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - Throwables.propagate(e); - } else { - logger.error("Failed to commit transaction." + - "Transaction rolled back.", e); - throw new EventDeliveryException("Failed to commit transaction." + - "Transaction rolled back.", e); - } - } finally { - txn.close(); - } - return status; - } - - private void putEventsAndCommit(final List actions, - final List incs, Transaction txn) throws Exception { - - privilegedExecutor.execute(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - for (Row r : actions) { - if (r instanceof Put) { - ((Put) r).setWriteToWAL(enableWal); - } - // Newer versions of HBase - Increment implements Row. - if (r instanceof Increment) { - ((Increment) r).setWriteToWAL(enableWal); - } - } - table.batch(actions); - return null; - } - }); - - privilegedExecutor.execute(new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - - List processedIncrements; - if (batchIncrements) { - processedIncrements = coalesceIncrements(incs); - } else { - processedIncrements = incs; - } - - // Only used for unit testing. - if (debugIncrCallback != null) { - debugIncrCallback.onAfterCoalesce(processedIncrements); - } - - for (final Increment i : processedIncrements) { - i.setWriteToWAL(enableWal); - table.increment(i); - } - return null; - } - }); - - txn.commit(); - sinkCounter.addToEventDrainSuccessCount(actions.size()); - } - - /** - * The method getFamilyMap() is no longer available in Hbase 0.96. - * We must use reflection to determine which version we may use. - */ - @VisibleForTesting - static Method reflectLookupGetFamilyMap() { - Method m = null; - String[] methodNames = {"getFamilyMapOfLongs", "getFamilyMap"}; - for (String methodName : methodNames) { - try { - m = Increment.class.getMethod(methodName); - if (m != null && m.getReturnType().equals(Map.class)) { - logger.debug("Using Increment.{} for coalesce", methodName); - break; - } - } catch (NoSuchMethodException e) { - logger.debug("Increment.{} does not exist. Exception follows.", - methodName, e); - } catch (SecurityException e) { - logger.debug("No access to Increment.{}; Exception follows.", - methodName, e); - } - } - if (m == null) { - throw new UnsupportedOperationException( - "Cannot find Increment.getFamilyMap()"); - } - return m; - } - - @SuppressWarnings("unchecked") - private Map> getFamilyMap(Increment inc) { - Preconditions.checkNotNull(refGetFamilyMap, - "Increment.getFamilymap() not found"); - Preconditions.checkNotNull(inc, "Increment required"); - Map> familyMap = null; - try { - Object familyObj = refGetFamilyMap.invoke(inc); - familyMap = (Map>) familyObj; - } catch (IllegalAccessException e) { - logger.warn("Unexpected error calling getFamilyMap()", e); - Throwables.propagate(e); - } catch (InvocationTargetException e) { - logger.warn("Unexpected error calling getFamilyMap()", e); - Throwables.propagate(e); - } - return familyMap; - } - - /** - * Perform "compression" on the given set of increments so that Flume sends - * the minimum possible number of RPC operations to HBase per batch. - * - * @param incs Input: Increment objects to coalesce. - * @return List of new Increment objects after coalescing the unique counts. - */ - private List coalesceIncrements(Iterable incs) { - Preconditions.checkNotNull(incs, "List of Increments must not be null"); - // Aggregate all of the increment row/family/column counts. - // The nested map is keyed like this: {row, family, qualifier} => count. - Map>> counters = - Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - for (Increment inc : incs) { - byte[] row = inc.getRow(); - Map> families = getFamilyMap(inc); - for (Map.Entry> familyEntry : families.entrySet()) { - byte[] family = familyEntry.getKey(); - NavigableMap qualifiers = familyEntry.getValue(); - for (Map.Entry qualifierEntry : qualifiers.entrySet()) { - byte[] qualifier = qualifierEntry.getKey(); - Long count = qualifierEntry.getValue(); - incrementCounter(counters, row, family, qualifier, count); - } - } - } - - // Reconstruct list of Increments per unique row/family/qualifier. - List coalesced = Lists.newLinkedList(); - for (Map.Entry>> rowEntry : - counters.entrySet()) { - byte[] row = rowEntry.getKey(); - Map> families = rowEntry.getValue(); - Increment inc = new Increment(row); - for (Map.Entry> familyEntry : families.entrySet()) { - byte[] family = familyEntry.getKey(); - NavigableMap qualifiers = familyEntry.getValue(); - for (Map.Entry qualifierEntry : qualifiers.entrySet()) { - byte[] qualifier = qualifierEntry.getKey(); - long count = qualifierEntry.getValue(); - inc.addColumn(family, qualifier, count); - } - } - coalesced.add(inc); - } - - return coalesced; - } - - /** - * Helper function for {@link #coalesceIncrements} to increment a counter - * value in the passed data structure. - * - * @param counters Nested data structure containing the counters. - * @param row Row key to increment. - * @param family Column family to increment. - * @param qualifier Column qualifier to increment. - * @param count Amount to increment by. - */ - private void incrementCounter( - Map>> counters, - byte[] row, byte[] family, byte[] qualifier, Long count) { - - Map> families = counters.get(row); - if (families == null) { - families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - counters.put(row, families); - } - - NavigableMap qualifiers = families.get(family); - if (qualifiers == null) { - qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - families.put(family, qualifiers); - } - - Long existingValue = qualifiers.get(qualifier); - if (existingValue == null) { - qualifiers.put(qualifier, count); - } else { - qualifiers.put(qualifier, existingValue + count); - } - } - - @VisibleForTesting - @InterfaceAudience.Private - HbaseEventSerializer getSerializer() { - return serializer; - } - - @Override - public long getBatchSize() { - return batchSize; - } - - @VisibleForTesting - @InterfaceAudience.Private - interface DebugIncrementsCallback { - public void onAfterCoalesce(Iterable increments); - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java deleted file mode 100644 index f9ca4bf497..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import org.apache.hadoop.hbase.HConstants; - -/** - * Constants used for configuration of HBaseSink and AsyncHBaseSink - * - */ -public class HBaseSinkConfigurationConstants { - /** - * The Hbase table which the sink should write to. - */ - public static final String CONFIG_TABLE = "table"; - /** - * The column family which the sink should use. - */ - public static final String CONFIG_COLUMN_FAMILY = "columnFamily"; - /** - * Maximum number of events the sink should take from the channel per - * transaction, if available. - */ - public static final String CONFIG_BATCHSIZE = "batchSize"; - /** - * The fully qualified class name of the serializer the sink should use. - */ - public static final String CONFIG_SERIALIZER = "serializer"; - /** - * Configuration to pass to the serializer. - */ - public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + "."; - - public static final String CONFIG_TIMEOUT = "timeout"; - - public static final String CONFIG_ENABLE_WAL = "enableWal"; - - public static final boolean DEFAULT_ENABLE_WAL = true; - - public static final long DEFAULT_TIMEOUT = 60000; - - public static final String CONFIG_KEYTAB = "kerberosKeytab"; - - public static final String CONFIG_PRINCIPAL = "kerberosPrincipal"; - - public static final String ZK_QUORUM = "zookeeperQuorum"; - - public static final String ZK_ZNODE_PARENT = "znodeParent"; - - public static final String DEFAULT_ZK_ZNODE_PARENT = - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; - - public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements"; - - public static final Boolean DEFAULT_COALESCE_INCREMENTS = false; - - public static final int DEFAULT_MAX_CONSECUTIVE_FAILS = 10; - - public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails"; - - public static final String ASYNC_PREFIX = "async."; - - public static final String ASYNC_ZK_QUORUM_KEY = "hbase.zookeeper.quorum"; - - public static final String ASYNC_ZK_BASEPATH_KEY = "hbase.zookeeper.znode.parent"; - -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java deleted file mode 100644 index 25d9faa6f1..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseVersionCheck.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hbase; - -import org.apache.hadoop.hbase.util.VersionInfo; -import org.slf4j.Logger; - -class HBaseVersionCheck { - - private static int getMajorVersion(String version) throws NumberFormatException { - return Integer.parseInt(version.split("\\.")[0]); - } - - static boolean hasVersionLessThan2(Logger logger) { - String version = VersionInfo.getVersion(); - try { - if (getMajorVersion(version) < 2) { - return true; - } - } catch (NumberFormatException ex) { - logger.error(ex.getMessage()); - } - logger.error("Invalid HBase version:" + version); - return false; - } -} \ No newline at end of file diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java deleted file mode 100644 index d4e3f84937..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HbaseEventSerializer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import java.util.List; - -import org.apache.flume.Event; -import org.apache.flume.conf.Configurable; -import org.apache.flume.conf.ConfigurableComponent; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Row; - -/** - * Interface for an event serializer which serializes the headers and body - * of an event to write them to hbase. This is configurable, so any config - * params required should be taken through this. Only the column family is - * passed in. The columns should exist in the table and column family - * specified in the configuration for the HbaseSink. - */ -public interface HbaseEventSerializer extends Configurable, ConfigurableComponent { - /** - * Initialize the event serializer. - * @param event Event to be written to HBase - * @param columnFamily Column family to write to - */ - public void initialize(Event event, byte[] columnFamily); - - /** - * Get the actions that should be written out to hbase as a result of this - * event. This list is written to hbase using the HBase batch API. - * @return List of {@link org.apache.hadoop.hbase.client.Row} which - * are written as such to HBase. - * - * 0.92 increments do not implement Row, so this is not generic. - * - */ - public List getActions(); - - public List getIncrements(); - - /* - * Clean up any state. This will be called when the sink is being stopped. - */ - public void close(); -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java deleted file mode 100644 index d1d5aab3df..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/RegexHbaseEventSerializer.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import org.apache.commons.lang.RandomStringUtils; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Row; - -import java.nio.charset.Charset; -import java.util.Calendar; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * An {@link HbaseEventSerializer} which parses columns based on a supplied - * regular expression and column name list. - *

- * Note that if the regular expression does not return the correct number of - * groups for a particular event, or it does not correctly match an event, - * the event is silently dropped. - *

- * Row keys for each event consist of a timestamp concatenated with an - * identifier which enforces uniqueness of keys across flume agents. - *

- * See static constant variables for configuration options. - */ -public class RegexHbaseEventSerializer implements HbaseEventSerializer { - // Config vars - /** Regular expression used to parse groups from event data. */ - public static final String REGEX_CONFIG = "regex"; - public static final String REGEX_DEFAULT = "(.*)"; - - /** Whether to ignore case when performing regex matches. */ - public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase"; - public static final boolean IGNORE_CASE_DEFAULT = false; - - /** Comma separated list of column names to place matching groups in. */ - public static final String COL_NAME_CONFIG = "colNames"; - public static final String COLUMN_NAME_DEFAULT = "payload"; - - /** Index of the row key in matched regex groups */ - public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex"; - - /** Placeholder in colNames for row key */ - public static final String ROW_KEY_NAME = "ROW_KEY"; - - /** Whether to deposit event headers into corresponding column qualifiers */ - public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders"; - public static final boolean DEPOSIT_HEADERS_DEFAULT = false; - - /** What charset to use when serializing into HBase's byte arrays */ - public static final String CHARSET_CONFIG = "charset"; - public static final String CHARSET_DEFAULT = "UTF-8"; - - /* This is a nonce used in HBase row-keys, such that the same row-key - * never gets written more than once from within this JVM. */ - protected static final AtomicInteger nonce = new AtomicInteger(0); - protected static String randomKey = RandomStringUtils.randomAlphanumeric(10); - - protected byte[] cf; - private byte[] payload; - private List colNames = Lists.newArrayList(); - private Map headers; - private boolean regexIgnoreCase; - private boolean depositHeaders; - private Pattern inputPattern; - private Charset charset; - private int rowKeyIndex; - - @Override - public void configure(Context context) { - String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT); - regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, - IGNORE_CASE_DEFAULT); - depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG, - DEPOSIT_HEADERS_DEFAULT); - inputPattern = Pattern.compile(regex, Pattern.DOTALL - + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); - charset = Charset.forName(context.getString(CHARSET_CONFIG, - CHARSET_DEFAULT)); - - String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); - String[] columnNames = colNameStr.split(","); - for (String s : columnNames) { - colNames.add(s.getBytes(charset)); - } - - //Rowkey is optional, default is -1 - rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1); - //if row key is being used, make sure it is specified correct - if (rowKeyIndex >= 0) { - if (rowKeyIndex >= columnNames.length) { - throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " + - "less than num columns " + columnNames.length); - } - if (!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) { - throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be " - + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]); - } - } - } - - @Override - public void configure(ComponentConfiguration conf) { - } - - @Override - public void initialize(Event event, byte[] columnFamily) { - this.headers = event.getHeaders(); - this.payload = event.getBody(); - this.cf = columnFamily; - } - - /** - * Returns a row-key with the following format: - * [time in millis]-[random key]-[nonce] - */ - protected byte[] getRowKey(Calendar cal) { - /* NOTE: This key generation strategy has the following properties: - * - * 1) Within a single JVM, the same row key will never be duplicated. - * 2) Amongst any two JVM's operating at different time periods (according - * to their respective clocks), the same row key will never be - * duplicated. - * 3) Amongst any two JVM's operating concurrently (according to their - * respective clocks), the odds of duplicating a row-key are non-zero - * but infinitesimal. This would require simultaneous collision in (a) - * the timestamp (b) the respective nonce and (c) the random string. - * The string is necessary since (a) and (b) could collide if a fleet - * of Flume agents are restarted in tandem. - * - * Row-key uniqueness is important because conflicting row-keys will cause - * data loss. */ - String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), - randomKey, nonce.getAndIncrement()); - return rowKey.getBytes(charset); - } - - protected byte[] getRowKey() { - return getRowKey(Calendar.getInstance()); - } - - @Override - public List getActions() throws FlumeException { - List actions = Lists.newArrayList(); - byte[] rowKey; - Matcher m = inputPattern.matcher(new String(payload, charset)); - if (!m.matches()) { - return Lists.newArrayList(); - } - - if (m.groupCount() != colNames.size()) { - return Lists.newArrayList(); - } - - try { - if (rowKeyIndex < 0) { - rowKey = getRowKey(); - } else { - rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8); - } - Put put = new Put(rowKey); - - for (int i = 0; i < colNames.size(); i++) { - if (i != rowKeyIndex) { - put.add(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8)); - } - } - if (depositHeaders) { - for (Map.Entry entry : headers.entrySet()) { - put.add(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset)); - } - } - actions.add(put); - } catch (Exception e) { - throw new FlumeException("Could not get row key!", e); - } - return actions; - } - - @Override - public List getIncrements() { - return Lists.newArrayList(); - } - - @Override - public void close() { - } -} \ No newline at end of file diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java deleted file mode 100644 index 3f442e8409..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleAsyncHbaseEventSerializer.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.base.Charsets; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType; -import org.hbase.async.AtomicIncrementRequest; -import org.hbase.async.PutRequest; - -import java.util.ArrayList; -import java.util.List; - -/** - * A simple serializer to be used with the AsyncHBaseSink - * that returns puts from an event, by writing the event - * body into it. The headers are discarded. It also updates a row in hbase - * which acts as an event counter. - * - * Takes optional parameters:

- * rowPrefix: The prefix to be used. Default: default

- * incrementRow The row to increment. Default: incRow

- * suffix: uuid/random/timestamp.Default: uuid

- * - * Mandatory parameters:

- * cf:Column family.

- * Components that have no defaults and will not be used if absent: - * payloadColumn: Which column to put payload in. If it is not present, - * event data will not be written.

- * incrementColumn: Which column to increment. If this is absent, it - * means no column is incremented. - */ -public class SimpleAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { - private byte[] table; - private byte[] cf; - private byte[] payload; - private byte[] payloadColumn; - private byte[] incrementColumn; - private String rowPrefix; - private byte[] incrementRow; - private KeyType keyType; - - @Override - public void initialize(byte[] table, byte[] cf) { - this.table = table; - this.cf = cf; - } - - @Override - public List getActions() { - List actions = new ArrayList(); - if (payloadColumn != null) { - byte[] rowKey; - try { - switch (keyType) { - case TS: - rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); - break; - case TSNANO: - rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); - break; - case RANDOM: - rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); - break; - default: - rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); - break; - } - PutRequest putRequest = new PutRequest(table, rowKey, cf, - payloadColumn, payload); - actions.add(putRequest); - } catch (Exception e) { - throw new FlumeException("Could not get row key!", e); - } - } - return actions; - } - - public List getIncrements() { - List actions = new ArrayList(); - if (incrementColumn != null) { - AtomicIncrementRequest inc = new AtomicIncrementRequest(table, - incrementRow, cf, incrementColumn); - actions.add(inc); - } - return actions; - } - - @Override - public void cleanUp() { - // TODO Auto-generated method stub - - } - - @Override - public void configure(Context context) { - String pCol = context.getString("payloadColumn", "pCol"); - String iCol = context.getString("incrementColumn", "iCol"); - rowPrefix = context.getString("rowPrefix", "default"); - String suffix = context.getString("suffix", "uuid"); - if (pCol != null && !pCol.isEmpty()) { - if (suffix.equals("timestamp")) { - keyType = KeyType.TS; - } else if (suffix.equals("random")) { - keyType = KeyType.RANDOM; - } else if (suffix.equals("nano")) { - keyType = KeyType.TSNANO; - } else { - keyType = KeyType.UUID; - } - payloadColumn = pCol.getBytes(Charsets.UTF_8); - } - if (iCol != null && !iCol.isEmpty()) { - incrementColumn = iCol.getBytes(Charsets.UTF_8); - } - incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); - } - - @Override - public void setEvent(Event event) { - this.payload = event.getBody(); - } - - @Override - public void configure(ComponentConfiguration conf) { - // TODO Auto-generated method stub - } - -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java deleted file mode 100644 index dc89fd725a..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleHbaseEventSerializer.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hbase; - -import com.google.common.base.Charsets; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.FlumeException; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Row; - -import java.util.LinkedList; -import java.util.List; - -/** - * A simple serializer that returns puts from an event, by writing the event - * body into it. The headers are discarded. It also updates a row in hbase - * which acts as an event counter. - *

Takes optional parameters:

- * rowPrefix: The prefix to be used. Default: default

- * incrementRow The row to increment. Default: incRow

- * suffix: uuid/random/timestamp.Default: uuid

- *

Mandatory parameters:

- * cf:Column family.

- * Components that have no defaults and will not be used if null: - * payloadColumn: Which column to put payload in. If it is null, - * event data will not be written.

- * incColumn: Which column to increment. Null means no column is - * incremented. - */ -public class SimpleHbaseEventSerializer implements HbaseEventSerializer { - private String rowPrefix; - private byte[] incrementRow; - private byte[] cf; - private byte[] plCol; - private byte[] incCol; - private KeyType keyType; - private byte[] payload; - - public SimpleHbaseEventSerializer() { - } - - @Override - public void configure(Context context) { - rowPrefix = context.getString("rowPrefix", "default"); - incrementRow = - context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); - String suffix = context.getString("suffix", "uuid"); - - String payloadColumn = context.getString("payloadColumn", "pCol"); - String incColumn = context.getString("incrementColumn", "iCol"); - if (payloadColumn != null && !payloadColumn.isEmpty()) { - if (suffix.equals("timestamp")) { - keyType = KeyType.TS; - } else if (suffix.equals("random")) { - keyType = KeyType.RANDOM; - } else if (suffix.equals("nano")) { - keyType = KeyType.TSNANO; - } else { - keyType = KeyType.UUID; - } - plCol = payloadColumn.getBytes(Charsets.UTF_8); - } - if (incColumn != null && !incColumn.isEmpty()) { - incCol = incColumn.getBytes(Charsets.UTF_8); - } - } - - @Override - public void configure(ComponentConfiguration conf) { - } - - @Override - public void initialize(Event event, byte[] cf) { - this.payload = event.getBody(); - this.cf = cf; - } - - @Override - public List getActions() throws FlumeException { - List actions = new LinkedList(); - if (plCol != null) { - byte[] rowKey; - try { - if (keyType == KeyType.TS) { - rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); - } else if (keyType == KeyType.RANDOM) { - rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); - } else if (keyType == KeyType.TSNANO) { - rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); - } else { - rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); - } - Put put = new Put(rowKey); - put.add(cf, plCol, payload); - actions.add(put); - } catch (Exception e) { - throw new FlumeException("Could not get row key!", e); - } - - } - return actions; - } - - @Override - public List getIncrements() { - List incs = new LinkedList(); - if (incCol != null) { - Increment inc = new Increment(incrementRow); - inc.addColumn(cf, incCol, 1); - incs.add(inc); - } - return incs; - } - - @Override - public void close() { - } - - public enum KeyType { - UUID, - RANDOM, - TS, - TSNANO; - } - -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java deleted file mode 100644 index 2d654f274f..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/SimpleRowKeyGenerator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import java.io.UnsupportedEncodingException; -import java.util.Random; -import java.util.UUID; - -/** - * Utility class for users to generate their own keys. Any key can be used, - * this is just a utility that provides a set of simple keys. - */ -public class SimpleRowKeyGenerator { - - public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException { - return (prefix + UUID.randomUUID().toString()).getBytes("UTF8"); - } - - public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException { - return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8"); - } - - public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException { - return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); - } - - public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException { - return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8"); - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java deleted file mode 100644 index 9a2be5a014..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.conf.ComponentConfiguration; -import org.hbase.async.AtomicIncrementRequest; -import org.hbase.async.PutRequest; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * An AsyncHBaseEventSerializer implementation that increments a configured - * column for the row whose row key is the event's body bytes. - */ -public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer { - private byte[] table; - private byte[] cf; - private byte[] column; - private Event currentEvent; - - @Override - public void initialize(byte[] table, byte[] cf) { - this.table = table; - this.cf = cf; - } - - @Override - public void setEvent(Event event) { - this.currentEvent = event; - } - - @Override - public List getActions() { - return Collections.emptyList(); - } - - @Override - public List getIncrements() { - List incrs = new ArrayList(); - AtomicIncrementRequest incr = new AtomicIncrementRequest(table, - currentEvent.getBody(), cf, column, 1); - incrs.add(incr); - return incrs; - } - - @Override - public void cleanUp() { - } - - @Override - public void configure(Context context) { - column = context.getString("column", "col").getBytes(); - } - - @Override - public void configure(ComponentConfiguration conf) { - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java deleted file mode 100644 index ae20350578..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import java.util.Collections; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.conf.ComponentConfiguration; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Row; - -import java.util.List; - -/** - * For Increment-related unit tests. - */ -class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware { - private Event event; - private byte[] family; - private int numBatchesStarted = 0; - - @Override public void configure(Context context) { - } - @Override public void configure(ComponentConfiguration conf) { - } - @Override public void close() { - } - - @Override - public void initialize(Event event, byte[] columnFamily) { - this.event = event; - this.family = columnFamily; - } - - // This class only creates Increments. - @Override - public List getActions() { - return Collections.emptyList(); - } - - // Treat each Event as a String, i,e, "row:qualifier". - @Override - public List getIncrements() { - List increments = Lists.newArrayList(); - String body = new String(event.getBody(), Charsets.UTF_8); - String[] pieces = body.split(":"); - String row = pieces[0]; - String qualifier = pieces[1]; - Increment inc = new Increment(row.getBytes(Charsets.UTF_8)); - inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L); - increments.add(inc); - return increments; - } - - @Override - public void onBatchStart() { - numBatchesStarted++; - } - - @VisibleForTesting - public int getNumBatchesStarted() { - return numBatchesStarted; - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java deleted file mode 100644 index 9b2a8506c5..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/MockSimpleHbaseEventSerializer.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hbase; - -import java.util.List; - -import org.apache.flume.FlumeException; -import org.apache.hadoop.hbase.client.Row; - -class MockSimpleHbaseEventSerializer extends SimpleHbaseEventSerializer { - - public static boolean throwException = false; - - @Override - public List getActions() throws FlumeException { - if (throwException) { - throw new FlumeException("Exception for testing"); - } - return super.getActions(); - } -} \ No newline at end of file diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java deleted file mode 100644 index 583f8d431c..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ /dev/null @@ -1,618 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hbase; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.OperatingSystemMXBean; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -import org.apache.flume.Channel; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.Transaction; -import org.apache.flume.Sink.Status; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import com.google.common.primitives.Longs; -import com.sun.management.UnixOperatingSystemMXBean; - -import org.junit.After; - -public class TestAsyncHBaseSink { - private static HBaseTestingUtility testUtility = new HBaseTestingUtility(); - - private static String tableName = "TestHbaseSink"; - private static String columnFamily = "TestColumnFamily"; - private static String inColumn = "iCol"; - private static String plCol = "pCol"; - private static Context ctx = new Context(); - private static String valBase = "testing hbase sink: jham"; - private boolean deleteTable = true; - private static OperatingSystemMXBean os; - - - @BeforeClass - public static void setUp() throws Exception { - testUtility.startMiniCluster(); - - Map ctxMap = new HashMap(); - ctxMap.put("table", tableName); - ctxMap.put("columnFamily", columnFamily); - ctxMap.put("serializer", - "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"); - ctxMap.put("serializer.payloadColumn", plCol); - ctxMap.put("serializer.incrementColumn", inColumn); - ctxMap.put("keep-alive", "0"); - ctxMap.put("timeout", "10000"); - ctx.putAll(ctxMap); - - os = ManagementFactory.getOperatingSystemMXBean(); - } - - @AfterClass - public static void tearDown() throws Exception { - testUtility.shutdownMiniCluster(); - } - - @After - public void tearDownTest() throws Exception { - if (deleteTable) { - testUtility.deleteTable(tableName.getBytes()); - } - } - - @Test - public void testOneEventWithDefaults() throws Exception { - Map ctxMap = new HashMap(); - ctxMap.put("table", tableName); - ctxMap.put("columnFamily", columnFamily); - ctxMap.put("serializer", - "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer"); - ctxMap.put("keep-alive", "0"); - ctxMap.put("timeout", "10000"); - Context tmpctx = new Context(); - tmpctx.putAll(ctxMap); - - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, tmpctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, tmpctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody( - Bytes.toBytes(valBase)); - channel.put(e); - tx.commit(); - tx.close(); - Assert.assertFalse(sink.isConfNull()); - sink.process(); - sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testOneEvent() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody( - Bytes.toBytes(valBase)); - channel.put(e); - tx.commit(); - tx.close(); - Assert.assertFalse(sink.isConfNull()); - sink.process(); - sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testThreeEvents() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - Assert.assertFalse(sink.isConfNull()); - sink.process(); - sink.stop(); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - //This will without FLUME-1842's timeout fix - but with FLUME-1842's testing - //oriented changes to the callback classes and using single threaded executor - //for tests. - @Test (expected = EventDeliveryException.class) - public void testTimeOut() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), true, false); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - channel.start(); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - Assert.assertFalse(sink.isConfNull()); - sink.process(); - Assert.fail(); - } - - @Test - public void testMultipleBatches() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - ctx.put("batchSize", "2"); - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - int count = 0; - Status status = Status.READY; - while (status != Status.BACKOFF) { - count++; - status = sink.process(); - } - Assert.assertFalse(sink.isConfNull()); - sink.stop(); - Assert.assertEquals(2, count); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - @Test - public void testMultipleBatchesBatchIncrementsWithCoalescing() throws Exception { - doTestMultipleBatchesBatchIncrements(true); - } - - @Test - public void testMultipleBatchesBatchIncrementsNoCoalescing() throws Exception { - doTestMultipleBatchesBatchIncrements(false); - } - - public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), false, true); - if (coalesce) { - ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, "true"); - } - ctx.put("batchSize", "2"); - ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName()); - ctx.put("serializer.column", "test"); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - // Restore the original serializer - ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName()); - //Restore the no coalescing behavior - ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - "false"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 4; i++) { - for (int j = 0; j < 3; j++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - } - tx.commit(); - tx.close(); - int count = 0; - Status status = Status.READY; - while (status != Status.BACKOFF) { - count++; - status = sink.process(); - } - Assert.assertFalse(sink.isConfNull()); - sink.stop(); - Assert.assertEquals(7, count); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(), "test".getBytes()); - scan.setStartRow(Bytes.toBytes(valBase)); - ResultScanner rs = table.getScanner(scan); - int i = 0; - try { - for (Result r = rs.next(); r != null; r = rs.next()) { - byte[] out = r.getValue(columnFamily.getBytes(), "test".getBytes()); - Assert.assertArrayEquals(Longs.toByteArray(3), out); - Assert.assertTrue(new String(r.getRow()).startsWith(valBase)); - i++; - } - } finally { - rs.close(); - } - Assert.assertEquals(4, i); - if (coalesce) { - Assert.assertEquals(8, sink.getTotalCallbacksReceived()); - } else { - Assert.assertEquals(12, sink.getTotalCallbacksReceived()); - } - } - - @Test - public void testWithoutConfigurationObject() throws Exception { - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - ctx.put("batchSize", "2"); - ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - ZKConfig.getZKQuorumServersString(testUtility.getConfiguration())); - ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); - AsyncHBaseSink sink = new AsyncHBaseSink(); - Configurables.configure(sink, ctx); - // Reset context to values usable by other tests. - ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null); - ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, null); - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - int count = 0; - Status status = Status.READY; - while (status != Status.BACKOFF) { - count++; - status = sink.process(); - } - /* - * Make sure that the configuration was picked up from the context itself - * and not from a configuration object which was created by the sink. - */ - Assert.assertTrue(sink.isConfNull()); - sink.stop(); - Assert.assertEquals(2, count); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - @Test(expected = FlumeException.class) - public void testMissingTable() throws Exception { - deleteTable = false; - ctx.put("batchSize", "2"); - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - sink.process(); - Assert.assertFalse(sink.isConfNull()); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 2); - byte[] out; - int found = 0; - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(2, found); - out = results[2]; - Assert.assertArrayEquals(Longs.toByteArray(2), out); - sink.process(); - sink.stop(); - } - - // We only have support for getting File Descriptor count for Unix from the JDK - private long getOpenFileDescriptorCount() { - if (os instanceof UnixOperatingSystemMXBean) { - return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount(); - } else { - return -1; - } - } - - /* - * Before the fix for FLUME-2738, consistently File Descriptors were leaked with at least - * > 10 FDs being leaked for every single shutdown-reinitialize routine - * If there is a leak, then the increase in FDs should be way higher than - * 50 and if there is no leak, there should not be any substantial increase in - * FDs. This is over a set of 10 shutdown-reinitialize runs - * This test makes sure that there is no File Descriptor leak, by continuously - * failing transactions and shutting down and reinitializing the client every time - * and this test will fail if a leak is detected - */ - @Test - public void testFDLeakOnShutdown() throws Exception { - if (getOpenFileDescriptorCount() < 0) { - return; - } - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), - true, false); - ctx.put("maxConsecutiveFails", "1"); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - channel.start(); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - Assert.assertFalse(sink.isConfNull()); - long initialFDCount = getOpenFileDescriptorCount(); - - // Since the isTimeOutTest is set to true, transaction will fail - // with EventDeliveryException - for (int i = 0; i < 10; i++) { - try { - sink.process(); - } catch (EventDeliveryException ex) { - } - } - long increaseInFD = getOpenFileDescriptorCount() - initialFDCount; - Assert.assertTrue("File Descriptor leak detected. FDs have increased by " + - increaseInFD + " from an initial FD count of " + initialFDCount, - increaseInFD < 50); - } - - /** - * This test must run last - it shuts down the minicluster :D - * - * @throws Exception - */ - @Ignore("For dev builds only:" + - "This test takes too long, and this has to be run after all other" + - "tests, since it shuts down the minicluster. " + - "Comment out all other tests" + - "and uncomment this annotation to run this test.") - @Test(expected = EventDeliveryException.class) - public void testHBaseFailure() throws Exception { - ctx.put("batchSize", "2"); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - deleteTable = false; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - sink.process(); - Assert.assertFalse(sink.isConfNull()); - HTable table = new HTable(testUtility.getConfiguration(), tableName); - byte[][] results = getResults(table, 2); - byte[] out; - int found = 0; - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(2, found); - out = results[2]; - Assert.assertArrayEquals(Longs.toByteArray(2), out); - testUtility.shutdownMiniCluster(); - sink.process(); - sink.stop(); - } - - /** - * Makes Hbase scans to get rows in the payload column and increment column - * in the table given. Expensive, so tread lightly. - * Calling this function multiple times for the same result set is a bad - * idea. Cache the result set once it is returned by this function. - * - * @param table - * @param numEvents Number of events inserted into the table - * @return array of byte array - * @throws IOException - */ - private byte[][] getResults(HTable table, int numEvents) throws IOException { - byte[][] results = new byte[numEvents + 1][]; - Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(), plCol.getBytes()); - scan.setStartRow(Bytes.toBytes("default")); - ResultScanner rs = table.getScanner(scan); - byte[] out = null; - int i = 0; - try { - for (Result r = rs.next(); r != null; r = rs.next()) { - out = r.getValue(columnFamily.getBytes(), plCol.getBytes()); - - if (i >= results.length - 1) { - rs.close(); - throw new FlumeException("More results than expected in the table." + - "Expected = " + numEvents + ". Found = " + i); - } - results[i++] = out; - System.out.println(out); - } - } finally { - rs.close(); - } - - Assert.assertEquals(i, results.length - 1); - scan = new Scan(); - scan.addColumn(columnFamily.getBytes(), inColumn.getBytes()); - scan.setStartRow(Bytes.toBytes("incRow")); - rs = table.getScanner(scan); - out = null; - try { - for (Result r = rs.next(); r != null; r = rs.next()) { - out = r.getValue(columnFamily.getBytes(), inColumn.getBytes()); - results[i++] = out; - System.out.println(out); - } - } finally { - rs.close(); - } - return results; - } -} - diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java deleted file mode 100644 index d4cc360da3..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flume.sink.hbase; - -import org.apache.flume.Context; -import org.apache.flume.conf.Configurables; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class TestAsyncHBaseSinkConfiguration { - - private static final String tableName = "TestHbaseSink"; - private static final String columnFamily = "TestColumnFamily"; - private static Context ctx = new Context(); - - - @Before - public void setUp() throws Exception { - Map ctxMap = new HashMap<>(); - ctxMap.put("table", tableName); - ctxMap.put("columnFamily", columnFamily); - ctx = new Context(); - ctx.putAll(ctxMap); - } - - - //FLUME-3186 Make asyncHbaseClient configuration parameters available from flume config - @Test - public void testAsyncConfigBackwardCompatibility() throws Exception { - //Old way: zookeeperQuorum - String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value"; - String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value"; - ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue); - ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue); - AsyncHBaseSink sink = new AsyncHBaseSink(); - Configurables.configure(sink, ctx); - Assert.assertEquals( - oldZkQuorumTestValue, - sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY)); - Assert.assertEquals( - oldZkZnodeParentValue, - sink.asyncClientConfig.getString( - HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY)); - } - - @Test - public void testAsyncConfigNewStyleOverwriteOldOne() throws Exception { - //Old way: zookeeperQuorum - String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value"; - String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value"; - ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue); - ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue); - - String newZkQuorumTestValue = "new_zookeeper_quorum_test_value"; - String newZkZnodeParentValue = "new_zookeeper_znode_parent_test_value"; - ctx.put( - HBaseSinkConfigurationConstants.ASYNC_PREFIX + - HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, - newZkQuorumTestValue); - ctx.put( - HBaseSinkConfigurationConstants.ASYNC_PREFIX + - HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, - newZkZnodeParentValue); - AsyncHBaseSink sink = new AsyncHBaseSink(); - Configurables.configure(sink, ctx); - Assert.assertEquals( - newZkQuorumTestValue, - sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY)); - Assert.assertEquals( - newZkZnodeParentValue, - sink.asyncClientConfig.getString( - HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY)); - } - - @Test - public void testAsyncConfigAnyKeyCanBePassed() throws Exception { - String valueOfANewProp = "vale of the new property"; - String keyOfANewProp = "some.key.to.be.passed"; - ctx.put(HBaseSinkConfigurationConstants.ASYNC_PREFIX + keyOfANewProp, valueOfANewProp); - AsyncHBaseSink sink = new AsyncHBaseSink(); - Configurables.configure(sink, ctx); - Assert.assertEquals(valueOfANewProp, sink.asyncClientConfig.getString(keyOfANewProp)); - } -} - - diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java deleted file mode 100644 index 3c366e2a79..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ /dev/null @@ -1,744 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.base.Charsets; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.primitives.Longs; -import org.apache.flume.Channel; -import org.apache.flume.ChannelException; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.EventDeliveryException; -import org.apache.flume.FlumeException; -import org.apache.flume.Sink.Status; -import org.apache.flume.Transaction; -import org.apache.flume.channel.MemoryChannel; -import org.apache.flume.conf.Configurables; -import org.apache.flume.event.EventBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; - -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.spy; - -public class TestHBaseSink { - private static final Logger logger = - LoggerFactory.getLogger(TestHBaseSink.class); - - private static final HBaseTestingUtility testUtility = new HBaseTestingUtility(); - private static final String tableName = "TestHbaseSink"; - private static final String columnFamily = "TestColumnFamily"; - private static final String inColumn = "iCol"; - private static final String plCol = "pCol"; - private static final String valBase = "testing hbase sink: jham"; - - private Configuration conf; - private Context ctx; - - @BeforeClass - public static void setUpOnce() throws Exception { - testUtility.startMiniCluster(); - } - - @AfterClass - public static void tearDownOnce() throws Exception { - testUtility.shutdownMiniCluster(); - } - - /** - * Most common context setup for unit tests using - * {@link SimpleHbaseEventSerializer}. - */ - @Before - public void setUp() throws IOException { - conf = new Configuration(testUtility.getConfiguration()); - ctx = new Context(); - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - } - - @After - public void tearDown() throws IOException { - testUtility.deleteTable(tableName.getBytes()); - } - - /** - * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}. - */ - private void initContextForSimpleHbaseEventSerializer() { - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); - ctx.put("serializer.payloadColumn", plCol); - ctx.put("serializer.incrementColumn", inColumn); - } - - /** - * Set up {@link Context} for use with {@link IncrementHBaseSerializer}. - */ - private void initContextForIncrementHBaseSerializer() { - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", IncrementHBaseSerializer.class.getName()); - } - - @Test - public void testOneEventWithDefaults() throws Exception { - //Create a context without setting increment column and payload Column - ctx = new Context(); - ctx.put("table", tableName); - ctx.put("columnFamily", columnFamily); - ctx.put("serializer", SimpleHbaseEventSerializer.class.getName()); - - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody(Bytes.toBytes(valBase)); - channel.put(e); - tx.commit(); - tx.close(); - - sink.process(); - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testOneEvent() throws Exception { - initContextForSimpleHbaseEventSerializer(); - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody( - Bytes.toBytes(valBase)); - channel.put(e); - tx.commit(); - tx.close(); - - sink.process(); - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testThreeEvents() throws Exception { - initContextForSimpleHbaseEventSerializer(); - ctx.put("batchSize", "3"); - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - sink.process(); - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - @Test - public void testMultipleBatches() throws Exception { - initContextForSimpleHbaseEventSerializer(); - ctx.put("batchSize", "2"); - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - int count = 0; - while (sink.process() != Status.BACKOFF) { - count++; - } - sink.stop(); - Assert.assertEquals(2, count); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - @Test(expected = FlumeException.class) - public void testMissingTable() throws Exception { - logger.info("Running testMissingTable()"); - initContextForSimpleHbaseEventSerializer(); - - // setUp() will create the table, so we delete it. - logger.info("Deleting table {}", tableName); - testUtility.deleteTable(tableName.getBytes()); - - ctx.put("batchSize", "2"); - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - - logger.info("Writing data into channel"); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - - logger.info("Starting sink and processing events"); - try { - logger.info("Calling sink.start()"); - sink.start(); // This method will throw. - - // We never get here, but we log in case the behavior changes. - logger.error("Unexpected error: Calling sink.process()"); - sink.process(); - logger.error("Unexpected error: Calling sink.stop()"); - sink.stop(); - } finally { - // Re-create the table so tearDown() doesn't throw. - testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); - } - - // FIXME: The test should never get here, the below code doesn't run. - Assert.fail(); - - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 2); - byte[] out; - int found = 0; - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(2, found); - out = results[2]; - Assert.assertArrayEquals(Longs.toByteArray(2), out); - sink.process(); - } - - // TODO: Move this test to a different class and run it stand-alone. - - /** - * This test must run last - it shuts down the minicluster :D - * - * @throws Exception - */ - @Ignore("For dev builds only:" + - "This test takes too long, and this has to be run after all other" + - "tests, since it shuts down the minicluster. " + - "Comment out all other tests" + - "and uncomment this annotation to run this test.") - @Test(expected = EventDeliveryException.class) - public void testHBaseFailure() throws Exception { - initContextForSimpleHbaseEventSerializer(); - ctx.put("batchSize", "2"); - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - //Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - sink.process(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 2); - byte[] out; - int found = 0; - for (int i = 0; i < 2; i++) { - for (int j = 0; j < 2; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(2, found); - out = results[2]; - Assert.assertArrayEquals(Longs.toByteArray(2), out); - testUtility.shutdownMiniCluster(); - sink.process(); - sink.stop(); - } - - /** - * Makes Hbase scans to get rows in the payload column and increment column - * in the table given. Expensive, so tread lightly. - * Calling this function multiple times for the same result set is a bad - * idea. Cache the result set once it is returned by this function. - * - * @param table - * @param numEvents Number of events inserted into the table - * @return array of byte arrays - * @throws IOException - */ - private byte[][] getResults(HTable table, int numEvents) throws IOException { - byte[][] results = new byte[numEvents + 1][]; - Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(), plCol.getBytes()); - scan.setStartRow(Bytes.toBytes("default")); - ResultScanner rs = table.getScanner(scan); - byte[] out = null; - int i = 0; - try { - for (Result r = rs.next(); r != null; r = rs.next()) { - out = r.getValue(columnFamily.getBytes(), plCol.getBytes()); - - if (i >= results.length - 1) { - rs.close(); - throw new FlumeException("More results than expected in the table." + - "Expected = " + numEvents + ". Found = " + i); - } - results[i++] = out; - System.out.println(out); - } - } finally { - rs.close(); - } - - Assert.assertEquals(i, results.length - 1); - scan = new Scan(); - scan.addColumn(columnFamily.getBytes(), inColumn.getBytes()); - scan.setStartRow(Bytes.toBytes("incRow")); - rs = table.getScanner(scan); - out = null; - try { - for (Result r = rs.next(); r != null; r = rs.next()) { - out = r.getValue(columnFamily.getBytes(), inColumn.getBytes()); - results[i++] = out; - System.out.println(out); - } - } finally { - rs.close(); - } - return results; - } - - @Test - public void testTransactionStateOnChannelException() throws Exception { - initContextForSimpleHbaseEventSerializer(); - ctx.put("batchSize", "1"); - - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - // Reset the context to a higher batchSize - Channel channel = spy(new MemoryChannel()); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0)); - channel.put(e); - tx.commit(); - tx.close(); - doThrow(new ChannelException("Mock Exception")).when(channel).take(); - try { - sink.process(); - Assert.fail("take() method should throw exception"); - } catch (ChannelException ex) { - Assert.assertEquals("Mock Exception", ex.getMessage()); - } - doReturn(e).when(channel).take(); - sink.process(); - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testTransactionStateOnSerializationException() throws Exception { - initContextForSimpleHbaseEventSerializer(); - ctx.put("batchSize", "1"); - ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, - "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); - - HBaseSink sink = new HBaseSink(conf); - Configurables.configure(sink, ctx); - // Reset the context to a higher batchSize - ctx.put("batchSize", "100"); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, new Context()); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + 0)); - channel.put(e); - tx.commit(); - tx.close(); - try { - MockSimpleHbaseEventSerializer.throwException = true; - sink.process(); - Assert.fail("FlumeException expected from serilazer"); - } catch (FlumeException ex) { - Assert.assertEquals("Exception for testing", ex.getMessage()); - } - MockSimpleHbaseEventSerializer.throwException = false; - sink.process(); - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 1); - byte[] out = results[0]; - Assert.assertArrayEquals(e.getBody(), out); - out = results[1]; - Assert.assertArrayEquals(Longs.toByteArray(1), out); - } - - @Test - public void testWithoutConfigurationObject() throws Exception { - initContextForSimpleHbaseEventSerializer(); - Context tmpContext = new Context(ctx.getParameters()); - tmpContext.put("batchSize", "2"); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - ZKConfig.getZKQuorumServersString(conf)); - System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM)); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); - - HBaseSink sink = new HBaseSink(); - Configurables.configure(sink, tmpContext); - Channel channel = new MemoryChannel(); - Configurables.configure(channel, ctx); - sink.setChannel(channel); - sink.start(); - Transaction tx = channel.getTransaction(); - tx.begin(); - for (int i = 0; i < 3; i++) { - Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); - channel.put(e); - } - tx.commit(); - tx.close(); - Status status = Status.READY; - while (status != Status.BACKOFF) { - status = sink.process(); - } - sink.stop(); - HTable table = new HTable(conf, tableName); - byte[][] results = getResults(table, 3); - byte[] out; - int found = 0; - for (int i = 0; i < 3; i++) { - for (int j = 0; j < 3; j++) { - if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { - found++; - break; - } - } - } - Assert.assertEquals(3, found); - out = results[3]; - Assert.assertArrayEquals(Longs.toByteArray(3), out); - } - - @Test - public void testZKQuorum() throws Exception { - initContextForSimpleHbaseEventSerializer(); - Context tmpContext = new Context(ctx.getParameters()); - String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " + - "zk3.flume.apache.org:3342"; - tmpContext.put("batchSize", "2"); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); - HBaseSink sink = new HBaseSink(); - Configurables.configure(sink, tmpContext); - Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," + - "zk3.flume.apache.org", sink.getConfig().get(HConstants.ZOOKEEPER_QUORUM)); - Assert.assertEquals(String.valueOf(3342), - sink.getConfig().get(HConstants.ZOOKEEPER_CLIENT_PORT)); - } - - @Test(expected = FlumeException.class) - public void testZKQuorumIncorrectPorts() throws Exception { - initContextForSimpleHbaseEventSerializer(); - Context tmpContext = new Context(ctx.getParameters()); - - String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " + - "zk3.flume.apache.org:3342"; - tmpContext.put("batchSize", "2"); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); - tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); - HBaseSink sink = new HBaseSink(); - Configurables.configure(sink, tmpContext); - Assert.fail(); - } - - @Test - public void testCoalesce() throws EventDeliveryException { - initContextForIncrementHBaseSerializer(); - ctx.put("batchSize", "100"); - ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - String.valueOf(true)); - - final Map expectedCounts = Maps.newHashMap(); - expectedCounts.put("r1:c1", 10L); - expectedCounts.put("r1:c2", 20L); - expectedCounts.put("r2:c1", 7L); - expectedCounts.put("r2:c3", 63L); - HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); - - HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - List events = Lists.newLinkedList(); - generateEvents(events, expectedCounts); - putEvents(channel, events); - - sink.start(); - sink.process(); // Calls CoalesceValidator instance. - sink.stop(); - } - - @Test(expected = AssertionError.class) - public void negativeTestCoalesce() throws EventDeliveryException { - initContextForIncrementHBaseSerializer(); - ctx.put("batchSize", "10"); - - final Map expectedCounts = Maps.newHashMap(); - expectedCounts.put("r1:c1", 10L); - HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts); - - HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - List events = Lists.newLinkedList(); - generateEvents(events, expectedCounts); - putEvents(channel, events); - - sink.start(); - sink.process(); // Calls CoalesceValidator instance. - sink.stop(); - } - - @Test - public void testBatchAware() throws EventDeliveryException { - logger.info("Running testBatchAware()"); - initContextForIncrementHBaseSerializer(); - HBaseSink sink = new HBaseSink(testUtility.getConfiguration()); - Configurables.configure(sink, ctx); - Channel channel = createAndConfigureMemoryChannel(sink); - - sink.start(); - int batchCount = 3; - for (int i = 0; i < batchCount; i++) { - sink.process(); - } - sink.stop(); - Assert.assertEquals(batchCount, - ((IncrementHBaseSerializer) sink.getSerializer()).getNumBatchesStarted()); - } - - /** - * For testing that the rows coalesced, serialized by - * {@link IncrementHBaseSerializer}, are of the expected batch size. - */ - private static class CoalesceValidator - implements HBaseSink.DebugIncrementsCallback { - - private final Map expectedCounts; - private final Method refGetFamilyMap; - - public CoalesceValidator(Map expectedCounts) { - this.expectedCounts = expectedCounts; - this.refGetFamilyMap = HBaseSink.reflectLookupGetFamilyMap(); - } - - @Override - @SuppressWarnings("unchecked") - public void onAfterCoalesce(Iterable increments) { - for (Increment inc : increments) { - byte[] row = inc.getRow(); - Map> families = null; - try { - families = (Map>) - refGetFamilyMap.invoke(inc); - } catch (Exception e) { - Throwables.propagate(e); - } - for (byte[] family : families.keySet()) { - NavigableMap qualifiers = families.get(family); - for (Map.Entry entry : qualifiers.entrySet()) { - byte[] qualifier = entry.getKey(); - Long count = entry.getValue(); - StringBuilder b = new StringBuilder(20); - b.append(new String(row, Charsets.UTF_8)); - b.append(':'); - b.append(new String(qualifier, Charsets.UTF_8)); - String key = b.toString(); - Assert.assertEquals("Expected counts don't match observed for " + key, - expectedCounts.get(key), count); - } - } - } - } - } - - /** - * Add number of Events corresponding to counts to the events list. - * @param events Destination list. - * @param counts How many events to generate for each row:qualifier pair. - */ - private void generateEvents(List events, Map counts) { - for (String key : counts.keySet()) { - long count = counts.get(key); - for (long i = 0; i < count; i++) { - events.add(EventBuilder.withBody(key, Charsets.UTF_8)); - } - } - } - - private Channel createAndConfigureMemoryChannel(HBaseSink sink) { - Channel channel = new MemoryChannel(); - Context channelCtx = new Context(); - channelCtx.put("capacity", String.valueOf(1000L)); - channelCtx.put("transactionCapacity", String.valueOf(1000L)); - Configurables.configure(channel, channelCtx); - sink.setChannel(channel); - channel.start(); - return channel; - } - - private void putEvents(Channel channel, Iterable events) { - Transaction tx = channel.getTransaction(); - tx.begin(); - for (Event event : events) { - channel.put(event); - } - tx.commit(); - tx.close(); - } - -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSinkCreation.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSinkCreation.java deleted file mode 100644 index 115bc62e96..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSinkCreation.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import org.apache.flume.FlumeException; -import org.apache.flume.Sink; -import org.apache.flume.SinkFactory; -import org.apache.flume.sink.DefaultSinkFactory; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class TestHBaseSinkCreation { - - private SinkFactory sinkFactory; - - @Before - public void setUp() { - sinkFactory = new DefaultSinkFactory(); - } - - private void verifySinkCreation(String name, String type, - Class typeClass) throws FlumeException { - Sink sink = sinkFactory.create(name, type); - Assert.assertNotNull(sink); - Assert.assertTrue(typeClass.isInstance(sink)); - } - - @Test - public void testSinkCreation() { - verifySinkCreation("hbase-sink", "hbase", HBaseSink.class); - verifySinkCreation("asynchbase-sink", "asynchbase", AsyncHBaseSink.class); - } -} diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java deleted file mode 100644 index 24bcf37cba..0000000000 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.flume.sink.hbase; - -import com.google.common.collect.Maps; -import org.apache.flume.Context; -import org.apache.flume.Event; -import org.apache.flume.event.EventBuilder; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; - -import java.nio.charset.Charset; -import java.util.Calendar; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class TestRegexHbaseEventSerializer { - - @Test - /** Ensure that when no config is specified, the a catch-all regex is used - * with default column name. */ - public void testDefaultBehavior() throws Exception { - RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); - Context context = new Context(); - s.configure(context); - String logMsg = "The sky is falling!"; - Event e = EventBuilder.withBody(Bytes.toBytes(logMsg)); - s.initialize(e, "CF".getBytes()); - List actions = s.getActions(); - assertTrue(actions.size() == 1); - assertTrue(actions.get(0) instanceof Put); - Put put = (Put) actions.get(0); - - assertTrue(put.getFamilyMap().containsKey(s.cf)); - List kvPairs = put.getFamilyMap().get(s.cf); - assertTrue(kvPairs.size() == 1); - - Map resultMap = Maps.newHashMap(); - for (KeyValue kv : kvPairs) { - resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); - } - - assertTrue(resultMap.containsKey( - RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT)); - assertEquals("The sky is falling!", - resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT)); - } - @Test - public void testRowIndexKey() throws Exception { - RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); - Context context = new Context(); - context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" + "([^\t]+)$"); - context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, "col1,col2,ROW_KEY"); - context.put("rowKeyIndex", "2"); - s.configure(context); - - String body = "val1\tval2\trow1"; - Event e = EventBuilder.withBody(Bytes.toBytes(body)); - s.initialize(e, "CF".getBytes()); - List actions = s.getActions(); - - Put put = (Put)actions.get(0); - - List kvPairs = put.getFamilyMap().get(s.cf); - assertTrue(kvPairs.size() == 2); - - Map resultMap = Maps.newHashMap(); - for (KeyValue kv : kvPairs) { - resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); - } - assertEquals("val1", resultMap.get("col1")); - assertEquals("val2", resultMap.get("col2")); - assertEquals("row1", Bytes.toString(put.getRow())); - } - - @Test - /** Test a common case where regex is used to parse apache log format. */ - public void testApacheRegex() throws Exception { - RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); - Context context = new Context(); - context.put(RegexHbaseEventSerializer.REGEX_CONFIG, - "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) \"([^ ]+) ([^ ]+)" + - " ([^\"]+)\" (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\")" + - " ([^ \"]*|\"[^\"]*\"))?"); - context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, - "host,identity,user,time,method,request,protocol,status,size," + - "referer,agent"); - s.configure(context); - String logMsg = "33.22.11.00 - - [20/May/2011:07:01:19 +0000] " + - "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " + - "\"http://www.cloudera.com/wp-admin/install.php\" \"Mozilla/5.0 (comp" + - "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\""; - - Event e = EventBuilder.withBody(Bytes.toBytes(logMsg)); - s.initialize(e, "CF".getBytes()); - List actions = s.getActions(); - assertEquals(1, s.getActions().size()); - assertTrue(actions.get(0) instanceof Put); - - Put put = (Put) actions.get(0); - assertTrue(put.getFamilyMap().containsKey(s.cf)); - List kvPairs = put.getFamilyMap().get(s.cf); - assertTrue(kvPairs.size() == 11); - - Map resultMap = Maps.newHashMap(); - for (KeyValue kv : kvPairs) { - resultMap.put(new String(kv.getQualifier()), new String(kv.getValue())); - } - - assertEquals("33.22.11.00", resultMap.get("host")); - assertEquals("-", resultMap.get("identity")); - assertEquals("-", resultMap.get("user")); - assertEquals("[20/May/2011:07:01:19 +0000]", resultMap.get("time")); - assertEquals("GET", resultMap.get("method")); - assertEquals("/wp-admin/css/install.css", resultMap.get("request")); - assertEquals("HTTP/1.0", resultMap.get("protocol")); - assertEquals("200", resultMap.get("status")); - assertEquals("813", resultMap.get("size")); - assertEquals("\"http://www.cloudera.com/wp-admin/install.php\"", - resultMap.get("referer")); - assertEquals("\"Mozilla/5.0 (compatible; Yahoo! Slurp; " + - "http://help.yahoo.com/help/us/ysearch/slurp)\"", - resultMap.get("agent")); - - List increments = s.getIncrements(); - assertEquals(0, increments.size()); - } - - @Test - public void testRowKeyGeneration() { - Context context = new Context(); - RegexHbaseEventSerializer s1 = new RegexHbaseEventSerializer(); - s1.configure(context); - RegexHbaseEventSerializer s2 = new RegexHbaseEventSerializer(); - s2.configure(context); - - // Reset shared nonce to zero - RegexHbaseEventSerializer.nonce.set(0); - String randomString = RegexHbaseEventSerializer.randomKey; - - Event e1 = EventBuilder.withBody(Bytes.toBytes("body")); - Event e2 = EventBuilder.withBody(Bytes.toBytes("body")); - Event e3 = EventBuilder.withBody(Bytes.toBytes("body")); - - Calendar cal = mock(Calendar.class); - when(cal.getTimeInMillis()).thenReturn(1L); - - s1.initialize(e1, "CF".getBytes()); - String rk1 = new String(s1.getRowKey(cal)); - assertEquals("1-" + randomString + "-0", rk1); - - when(cal.getTimeInMillis()).thenReturn(10L); - s1.initialize(e2, "CF".getBytes()); - String rk2 = new String(s1.getRowKey(cal)); - assertEquals("10-" + randomString + "-1", rk2); - - when(cal.getTimeInMillis()).thenReturn(100L); - s2.initialize(e3, "CF".getBytes()); - String rk3 = new String(s2.getRowKey(cal)); - assertEquals("100-" + randomString + "-2", rk3); - - } - - @Test - /** Test depositing of the header information. */ - public void testDepositHeaders() throws Exception { - Charset charset = Charset.forName("KOI8-R"); - RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); - Context context = new Context(); - context.put(RegexHbaseEventSerializer.DEPOSIT_HEADERS_CONFIG, - "true"); - context.put(RegexHbaseEventSerializer.CHARSET_CONFIG, - charset.toString()); - s.configure(context); - - String body = "body"; - Map headers = Maps.newHashMap(); - headers.put("header1", "value1"); - headers.put("заголовок2", "значение2"); - - Event e = EventBuilder.withBody(Bytes.toBytes(body), headers); - s.initialize(e, "CF".getBytes()); - List actions = s.getActions(); - assertEquals(1, s.getActions().size()); - assertTrue(actions.get(0) instanceof Put); - - Put put = (Put) actions.get(0); - assertTrue(put.getFamilyMap().containsKey(s.cf)); - List kvPairs = put.getFamilyMap().get(s.cf); - assertTrue(kvPairs.size() == 3); - - Map resultMap = Maps.newHashMap(); - for (KeyValue kv : kvPairs) { - resultMap.put(new String(kv.getQualifier(), charset), kv.getValue()); - } - - assertEquals(body, - new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset)); - assertEquals("value1", new String(resultMap.get("header1"), charset)); - assertArrayEquals("значение2".getBytes(charset), resultMap.get("заголовок2")); - assertEquals("значение2".length(), resultMap.get("заголовок2").length); - - List increments = s.getIncrements(); - assertEquals(0, increments.size()); - } -} diff --git a/flume-ng-sinks/pom.xml b/flume-ng-sinks/pom.xml index 1c350ce7e4..f148855d89 100644 --- a/flume-ng-sinks/pom.xml +++ b/flume-ng-sinks/pom.xml @@ -38,7 +38,6 @@ limitations under the License. flume-hdfs-sink flume-irc-sink - flume-ng-hbase-sink flume-ng-hbase2-sink flume-ng-morphline-solr-sink flume-http-sink diff --git a/flume-parent/pom.xml b/flume-parent/pom.xml index 51aabbc788..0ff2b3fc68 100644 --- a/flume-parent/pom.xml +++ b/flume-parent/pom.xml @@ -74,7 +74,7 @@ limitations under the License. 11.0.2 3.2.2 ${hadoop3.version} - 1.7.1 + 1.7.2 2.5.3-hadoop3 3.1.2 4.4.15