Skip to content

Commit

Permalink
Bulkload (#140)
Browse files Browse the repository at this point in the history
* bulkload初步测试成功

* 解决部分hardcode,还差一点

* 整理了一下代码

* 添加logger,consumer

* 修改连接配置

* 添加远程hdfs配置

* 整理代码

* 配置信息处理,日志及监控

* configutation配置读取

* 取消双写,只用bulkload上传

* 解决key生成错误的问题

* close connection

* 根据rowkey指定文件名生成hfile并自己删除,只创建一次connection

* record备份

* hfilechahe,去掉打印消息的日志

* 修改hdfs临时存放路径至trace文件夹下

Co-authored-by: zhipeng.cai <[email protected]>
  • Loading branch information
zhipeng-cai and zhipeng.cai authored Jul 21, 2021
1 parent e02eedf commit 3e48dbf
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 13 deletions.
13 changes: 9 additions & 4 deletions qmq-backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
Expand Down Expand Up @@ -80,6 +76,15 @@
<groupId>${project.groupId}</groupId>
<artifactId>qmq-sync</artifactId>
</dependency>
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.3</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DefaultBackupConfig implements BackupConfig {
public static final String DEFAULT_DELAY_DB_DIC_TABLE = "qmq_delay_dic";
public static final String DEFAULT_STORE_FACTORY_TYPE = "hbase";
public static final int DEFAULT_ROCKS_DB_TTL = 7200;
public static final int DEFAULT_MESSAGE_SIZE_PER_HFILE = 10000;

public static final String HBASE_MESSAGE_INDEX_TABLE_CONFIG_KEY = "hbase.message.table";
public static final String HBASE_DELAY_MESSAGE_INDEX_TABLE_CONFIG_KEY = "hbase.delay.message.table";
Expand All @@ -51,6 +52,7 @@ public class DefaultBackupConfig implements BackupConfig {
public static final String ROCKS_DB_PATH_CONFIG_KEY = "rocks.db.path";
public static final String ROCKS_DB_TTL_CONFIG_KEY = "rocks.db.ttl";
public static final String ACQUIRE_BACKUP_META_URL = "acquire.server.meta.url";
public static final String MESSAGE_SIZE_PER_HFILE_CONFIG_KEY = "message.backup.hfile.size";

private volatile String brokerGroup;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

package qunar.tc.qmq.backup.service.impl;

import qunar.tc.qmq.backup.service.BackupKeyGenerator;
import qunar.tc.qmq.backup.service.BatchBackup;
import qunar.tc.qmq.backup.store.impl.HFileIndexStore;
import qunar.tc.qmq.store.MessageQueryIndex;
import qunar.tc.qmq.utils.RetrySubjectUtils;

import java.io.IOException;
import java.util.function.Consumer;

/**
Expand All @@ -32,9 +35,12 @@ public class IndexEventBusListener extends AbstractEventBusListener {

private final Consumer<MessageQueryIndex> consumer;

public IndexEventBusListener(BatchBackup<MessageQueryIndex> indexBatchBackup, Consumer<MessageQueryIndex> consumer) {
private final HFileIndexStore hFileIndexStore;

public IndexEventBusListener(BatchBackup<MessageQueryIndex> indexBatchBackup, Consumer<MessageQueryIndex> consumer, BackupKeyGenerator keyGenerator) throws IOException {
this.indexBatchBackup = indexBatchBackup;
this.consumer = consumer;
this.hFileIndexStore = new HFileIndexStore(keyGenerator);
}

@Override
Expand All @@ -44,8 +50,10 @@ void post(MessageQueryIndex index) {
consumer.accept(index);
return;
}
//使用bulkload方式上传
hFileIndexStore.appendData(index, consumer);
// indexBatchBackup
indexBatchBackup.add(index, consumer);
//indexBatchBackup.add(index, consumer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.backup.base.ActionRecord;
Expand Down Expand Up @@ -47,6 +48,7 @@
import qunar.tc.qmq.sync.SyncType;
import qunar.tc.qmq.utils.NetworkUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
Expand Down Expand Up @@ -102,7 +104,7 @@ public void start() {
}
}

private void register(final DynamicConfig config, final DynamicConfig deadConfig) {
private void register(final DynamicConfig config, final DynamicConfig deadConfig) throws IOException {
BrokerRole role = BrokerConfig.getBrokerRole();
if (role != BrokerRole.BACKUP) throw new RuntimeException("Only support backup");

Expand Down Expand Up @@ -155,7 +157,7 @@ private void register(final DynamicConfig config, final DynamicConfig deadConfig
backupManager.registerBatchBackup(recordBackup);

final SyncLogIterator<Action, ByteBuf> actionIterator = new ActionSyncLogIterator();
BackupActionLogSyncProcessor actionLogSyncProcessor = new BackupActionLogSyncProcessor(checkpointManager, config, actionIterator, recordBackup);
BackupActionLogSyncProcessor actionLogSyncProcessor = new BackupActionLogSyncProcessor(checkpointManager, config, actionIterator, recordBackup, keyGenerator, rocksDBStore);
masterSlaveSyncManager.registerProcessor(SyncType.action, actionLogSyncProcessor);

scheduleFlushManager.register(actionLogSyncProcessor);
Expand Down Expand Up @@ -185,13 +187,13 @@ private BackupStorageConfig(DynamicConfig config) {
return new BackupStorageConfig(config);
}

private FixedExecOrderEventBus.Listener<MessageQueryIndex> getConstructIndexListener(final BackupKeyGenerator keyGenerator, Consumer<MessageQueryIndex> consumer) {
private FixedExecOrderEventBus.Listener<MessageQueryIndex> getConstructIndexListener(final BackupKeyGenerator keyGenerator, Consumer<MessageQueryIndex> consumer) throws IOException {

final BatchBackup<MessageQueryIndex> indexBackup = new MessageIndexBatchBackup(config, indexStore, keyGenerator);
backupManager.registerBatchBackup(indexBackup);


return new IndexEventBusListener(indexBackup, consumer);
return new IndexEventBusListener(indexBackup, consumer, keyGenerator);
}

private FixedExecOrderEventBus.Listener<MessageQueryIndex> getConstructDeadIndexListener(final BackupKeyGenerator keyGenerator, Consumer<MessageQueryIndex> consumer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package qunar.tc.qmq.backup.store.impl;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.hbase.async.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import qunar.tc.qmq.backup.config.BackupConfig;
import qunar.tc.qmq.backup.config.DefaultBackupConfig;
import qunar.tc.qmq.backup.service.BackupKeyGenerator;
import qunar.tc.qmq.configuration.BrokerConfig;
import qunar.tc.qmq.configuration.DynamicConfig;
import qunar.tc.qmq.configuration.DynamicConfigLoader;
import qunar.tc.qmq.metrics.Metrics;
import qunar.tc.qmq.store.MessageQueryIndex;
import qunar.tc.qmq.utils.RetrySubjectUtils;

import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static qunar.tc.qmq.backup.config.DefaultBackupConfig.*;
import static qunar.tc.qmq.backup.store.impl.AbstractHBaseStore.B_FAMILY;
import static qunar.tc.qmq.backup.store.impl.AbstractHBaseStore.B_MESSAGE_QUALIFIERS;
import static qunar.tc.qmq.metrics.MetricsConstants.SUBJECT_ARRAY;

/**
* @Classname HFileIndexStore
* @Description 将数据写入hfile并用bulkload上传至hbase
* @Date 16.6.21 2:11 下午
* @Created by zhipeng.cai
*/
public class HFileIndexStore {
private static final Logger LOGGER = LoggerFactory.getLogger(HFileIndexStore.class);

protected static final String[] TYPE_ARRAY = new String[]{"type"};
protected static final String[] INDEX_TYPE = new String[]{"messageIndex"};

private final BackupConfig config;
private final DynamicConfig hbaseConfig;
private final DynamicConfig skipBackSubjects;
private final BackupKeyGenerator keyGenerator;
private final String brokerGroup;
private final byte[] brokerGroupBytes;
private final int brokerGroupLength;
private final Configuration conf;
private final Configuration tempConf;
private final String TABLE_NAME;
private final byte[] FAMILY_NAME;
private final byte[] QUALIFIERS_NAME;
private final Path HFILE_PARENT_PARENT_DIR;
private final Path HFILE_PATH;
private final int MESSAGE_SIZE_PER_HFILE;
private Connection conn;
private FileSystem fs;
private MessageQueryIndex lastIndex;
private HFile.Writer writer;
private Map<byte[], KeyValue> map = new TreeMap<>(new org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator());

public HFileIndexStore(BackupKeyGenerator keyGenerator) throws IOException {
this.config = new DefaultBackupConfig(DynamicConfigLoader.load("backup.properties", false));
this.brokerGroup = BrokerConfig.getBrokerName();
this.brokerGroupBytes = Bytes.UTF8(brokerGroup);
this.brokerGroupLength = this.brokerGroupBytes.length;
this.keyGenerator = keyGenerator;
this.skipBackSubjects = DynamicConfigLoader.load("skip_backup.properties", false);
this.hbaseConfig = DynamicConfigLoader.load(DEFAULT_HBASE_CONFIG_FILE, false);
this.conf = HBaseConfiguration.create();
this.conf.addResource("core-site.xml");
this.conf.addResource("hdfs-site.xml");
this.conf.set("hbase.zookeeper.quorum", hbaseConfig.getString("hbase.zookeeper.quorum", "localhost"));
this.conf.set("zookeeper.znode.parent", hbaseConfig.getString("hbase.zookeeper.znode.parent", "/hbase"));
this.conf.set("hbase.bulkload.retries.retryOnIOException", "true");
this.TABLE_NAME = this.config.getDynamicConfig().getString(HBASE_MESSAGE_INDEX_TABLE_CONFIG_KEY, DEFAULT_HBASE_MESSAGE_INDEX_TABLE);
this.FAMILY_NAME = B_FAMILY;
this.QUALIFIERS_NAME = B_MESSAGE_QUALIFIERS[0];//列名 TODO 这里可能要改
this.HFILE_PARENT_PARENT_DIR = new Path("/tmp/trace/index");
this.HFILE_PATH = new Path("/tmp/trace/index/" + new String(FAMILY_NAME));
this.tempConf = new Configuration(this.conf);
this.tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
this.MESSAGE_SIZE_PER_HFILE = this.config.getDynamicConfig().getInt(MESSAGE_SIZE_PER_HFILE_CONFIG_KEY, DEFAULT_MESSAGE_SIZE_PER_HFILE);
this.conn = ConnectionFactory.createConnection(this.conf);
this.fs = FileSystem.get(this.conf);
}

public void appendData(MessageQueryIndex index, Consumer<MessageQueryIndex> consumer) {
lastIndex = index;
String subject = index.getSubject();
String realSubject = RetrySubjectUtils.getRealSubject(subject);
if (skipBackup(realSubject)) {
return;
}
monitorBackupIndexQps(subject);
String subjectKey = realSubject;
String consumerGroup = null;
if (RetrySubjectUtils.isRetrySubject(subject)) {
subjectKey = RetrySubjectUtils.buildRetrySubject(realSubject);
consumerGroup = RetrySubjectUtils.getConsumerGroup(subject);
}
final byte[] key = keyGenerator.generateMessageKey(subjectKey, new Date(index.getCreateTime()), index.getMessageId(), brokerGroup, consumerGroup, index.getSequence());
final String messageId = index.getMessageId();
final byte[] messageIdBytes = Bytes.UTF8(messageId);

final byte[] value = new byte[20 + brokerGroupLength + messageIdBytes.length];
Bytes.setLong(value, index.getSequence(), 0);
Bytes.setLong(value, index.getCreateTime(), 8);
Bytes.setInt(value, brokerGroupLength, 16);
System.arraycopy(brokerGroupBytes, 0, value, 20, brokerGroupLength);
System.arraycopy(messageIdBytes, 0, value, 20 + brokerGroupLength, messageIdBytes.length);

long currentTime = System.currentTimeMillis();
KeyValue kv = new KeyValue(key, FAMILY_NAME, QUALIFIERS_NAME, currentTime, value);
//LOGGER.info("消息主题 subjectkey:" + subjectKey + " messageid:" + messageId + " key:" + new String(key));
//先添加到treemap中
map.put(key, kv);
if (map.size() >= MESSAGE_SIZE_PER_HFILE) {
//bulk load开始时间
long startTime = System.currentTimeMillis();
try {
Path HFilePath = new Path(HFILE_PATH, new String(key));
writeToHfile(HFilePath);
bulkLoad(HFILE_PATH);
map.clear();
if (consumer != null) consumer.accept(lastIndex);
} catch (IOException e) {
LOGGER.error("Message Index Bulk Load fail", e);
} finally {
Metrics.timer("Index.Bulkload.Timer", TYPE_ARRAY, INDEX_TYPE).update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS);
}
}
}

private void writeToHfile(Path path) throws IOException {
HFileContext fileContext = new HFileContext();
try {
writer = HFile.getWriterFactory(conf, new CacheConfig(tempConf))
.withPath(FileSystem.get(conf), path)
.withFileContext(fileContext).create();
for (Map.Entry<byte[], KeyValue> entry : map.entrySet()) {
writer.append(entry.getValue());
}
LOGGER.info("Message Index Write to HFile successfully");
} catch (IOException e) {
LOGGER.error("Message Index Write to HFile fail", e);
} finally {
writer.close();
}
}

private void bulkLoad(Path pathToDelete) throws IOException {
//用bulkload上传至hbase
try (Table htable = conn.getTable(TableName.valueOf(TABLE_NAME));
Admin admin = conn.getAdmin();) {
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
//新版本(2.x.y)里改用这个了
//BulkLoadHFilesTool loader=new BulkLoadHFilesTool(conf);
loader.doBulkLoad(HFILE_PARENT_PARENT_DIR, admin, htable, conn.getRegionLocator(TableName.valueOf(TABLE_NAME)));
fs.delete(pathToDelete, true);
LOGGER.info("Message Index Bulk Load to HBase successfully");
} catch (Exception e) {
LOGGER.error("Message Index Bulk Load to HBase fail", e);
}
}

private boolean skipBackup(String subject) {
return skipBackSubjects.getBoolean(subject, false);
}

private static void monitorBackupIndexQps(String subject) {
Metrics.meter("backup.message.index.qps", SUBJECT_ARRAY, new String[]{subject}).mark();
}
}
Loading

0 comments on commit 3e48dbf

Please sign in to comment.