Skip to content

Commit

Permalink
[dag indexer] Solve the problem of missing blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
welbon committed Jul 1, 2024
1 parent e5a3fbe commit 3ea2504
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ public class Constant {
public static final String MARKET_CAP_INDEX = "market_cap";
public static final String PAYLOAD_INDEX = "txn_payloads";
public static final String TOKEN_INFO_INDEX = "token_info";
public static final String DAG_INSPECTOR_BLOCK = "dag_inspector_block";
public static final String DAG_INSPECTOR_EDGE = "dag_inspector_edge";
public static final String DAG_INSPECT_HEIGHT_GROUP = "dag_inspector_height_group";
public static final String DAG_INSPECTOR_BLOCK_INDEX = "dag_inspector_block";
public static final String DAG_INSPECTOR_EDGE_INDEX = "dag_inspector_edge";
public static final String DAG_INSPECT_HEIGHT_GROUP_INDEX = "dag_inspector_height_group";


public static final String EVENT_FILTER_ADDRESS = "0x00000000000000000000000000000001";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package org.starcoin.indexer.handler;

import com.thetransactioncompany.jsonrpc2.client.JSONRPC2SessionException;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.starcoin.api.BlockRPCClient;
import org.starcoin.bean.Block;
import org.starcoin.bean.BlockHeader;
import org.starcoin.bean.BlockOffset;

import java.util.LinkedHashMap;
import java.util.Map;

public class BlockIndexerOffset {

private static final Logger logger = LoggerFactory.getLogger(BlockIndexerOffset.class);

private String offsetIndexer;

private BlockOffset localBlockOffset;

private BlockHeader currentHandleHeader;

private RestHighLevelClient esClient;

private BlockRPCClient blockRPCClient;


public BlockIndexerOffset(String offsetIndexer, BlockRPCClient blockRPCClient, RestHighLevelClient client) {
this.offsetIndexer = offsetIndexer;
this.esClient = client;
this.blockRPCClient = blockRPCClient;
}

public void initRemoteOffset() {
//update current handle header
try {
localBlockOffset = getRemoteOffset();
if (localBlockOffset != null) {
Block block = blockRPCClient.getBlockByHeight(localBlockOffset.getBlockHeight());
if (block != null) {
currentHandleHeader = block.getHeader();
} else {
logger.error("init offset block not exist on chain: {}", localBlockOffset);
}
} else {
logger.warn("offset is null,init reset to genesis");
currentHandleHeader = blockRPCClient.getBlockByHeight(0).getHeader();
updateBlockOffset(0L, currentHandleHeader.getBlockHash());
logger.info("init offset ok: {}", localBlockOffset);
}
} catch (JSONRPC2SessionException e) {
logger.error("set current header error:", e);
}
}

public void updateBlockOffset(Long blockHeight, String blockHash) {
if (localBlockOffset == null) {
localBlockOffset = new BlockOffset(blockHeight, blockHash);
} else {
localBlockOffset.setBlockHeight(blockHeight);
localBlockOffset.setBlockHash(blockHash);
}
setRemoteOffset(localBlockOffset);
}

public Long getLocalBlockOffsetHeight() {
return localBlockOffset.getBlockHeight();
}

public String getLocalOffsetBlockHash() {
return localBlockOffset.getBlockHash();
}

public BlockOffset getRemoteOffset() {
GetMappingsRequest request = new GetMappingsRequest();
try {
request.indices(offsetIndexer);
GetMappingsResponse response = esClient.indices().getMapping(request, RequestOptions.DEFAULT);
MappingMetadata data = response.mappings().get(offsetIndexer);
Object meta = data.getSourceAsMap().get("_meta");
if (meta != null) {
Map<String, Object> tip = (Map<String, Object>) ((LinkedHashMap<String, Object>) meta).get("tip");
String blockHash = tip.get("block_hash").toString();
Integer blockHeight = (Integer) tip.get("block_number");
return new BlockOffset(blockHeight.longValue(), blockHash);
}
} catch (Exception e) {
logger.error("get remote offset error:", e);
}
return null;
}

public void setRemoteOffset(BlockOffset blockOffset) {
PutMappingRequest request = new PutMappingRequest(offsetIndexer);
try {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("_meta");
{
builder.startObject("tip");
{
builder.field("block_hash", blockOffset.getBlockHash());
builder.field("block_number", blockOffset.getBlockHeight());
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
request.source(builder);
esClient.indices().putMapping(request, RequestOptions.DEFAULT);
logger.info("remote offset update ok : {}", blockOffset);
} catch (Exception e) {
logger.error("get remote offset error:", e);
}
}

@Override
public String toString() {
return "BlockIndexerOffset{" +
"offsetIndexer='" + offsetIndexer + '\'' +
", localBlockOffset=" + localBlockOffset +
", currentHandleHeader=" + currentHandleHeader +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.starcoin.indexer.handler;

import org.elasticsearch.client.RestHighLevelClient;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -18,65 +19,53 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.LongStream;

import static org.starcoin.constant.Constant.DAG_INSPECTOR_BLOCK_INDEX;

public class DagInspectorIndexer extends QuartzJobBean {

private static final Logger logger = LoggerFactory.getLogger(DagInspectorIndexer.class);

@Autowired
private DagInspectorIndexerHandler inspectorHandler;

@Autowired
private ElasticSearchHandler elasticSearchHandler;
private BlockIndexerOffset blockIndexerOffset;

@Autowired
private BlockRPCClient blockRPCClient;

@Autowired
private RestHighLevelClient esClient;

@Value("${starcoin.indexer.bulk_size}")
private long bulkSize;

private BlockOffset localBlockOffset;

private BlockHeader currentHandleHeader;
@Value("${starcoin.network}")
private String network;

@PostConstruct
public void initOffset() {
localBlockOffset = elasticSearchHandler.getRemoteOffset();
//update current handle header
try {
if (localBlockOffset != null) {
Block block = blockRPCClient.getBlockByHeight(localBlockOffset.getBlockHeight());
if (block != null) {
currentHandleHeader = block.getHeader();
} else {
logger.error("init offset block not exist on chain: {}", localBlockOffset);
}

} else {
logger.warn("offset is null,init reset to genesis");
currentHandleHeader = blockRPCClient.getBlockByHeight(0).getHeader();
localBlockOffset = new BlockOffset(0, currentHandleHeader.getBlockHash());
elasticSearchHandler.setRemoteOffset(localBlockOffset);
logger.info("init offset ok: {}", localBlockOffset);
}
} catch (JSONRPC2SessionException e) {
logger.error("set current header error:", e);
}
blockIndexerOffset = new BlockIndexerOffset(
ServiceUtils.getIndex(network, DAG_INSPECTOR_BLOCK_INDEX),
blockRPCClient,
esClient
);
blockIndexerOffset.initRemoteOffset();
}

@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) {
//read current offset
if (localBlockOffset == null || currentHandleHeader == null) {
if (blockIndexerOffset == null) {
initOffset();
}
BlockOffset remoteBlockOffset = elasticSearchHandler.getRemoteOffset();
BlockOffset remoteBlockOffset = blockIndexerOffset.getRemoteOffset();
logger.info("current remote offset: {}", remoteBlockOffset);
if (remoteBlockOffset == null) {
logger.warn("offset must not null, please check blocks.mapping!!");
return;
}

if (remoteBlockOffset.getBlockHeight() > localBlockOffset.getBlockHeight()) {
if (remoteBlockOffset.getBlockHeight() > blockIndexerOffset.getLocalBlockOffsetHeight()) {
logger.info("indexer equalize chain blocks.");
return;
}
Expand All @@ -93,12 +82,12 @@ public void fetchAndProcessBlocksParallel() {

// Calculate bulk size
long headHeight = chainHeader.getHeight();
long bulkNumber = Math.min(headHeight - localBlockOffset.getBlockHeight(), bulkSize);
long bulkNumber = Math.min(headHeight - blockIndexerOffset.getLocalBlockOffsetHeight(), bulkSize);

ConcurrentHashMap<String, Block> blockMap = new ConcurrentHashMap<>();

LongStream.rangeClosed(1, bulkNumber).parallel().forEach(index -> {
long currentBlockHeight = localBlockOffset.getBlockHeight() + index;
long currentBlockHeight = blockIndexerOffset.getLocalBlockOffsetHeight() + index;

logger.info("Start Get block number: {}, currentBlockHeight: {}", index, currentBlockHeight);
Block block;
Expand All @@ -125,10 +114,10 @@ public void fetchAndProcessBlocksParallel() {
.map(Block::getHeader)
.max(Comparator.comparingLong(BlockHeader::getHeight)).orElseThrow();

localBlockOffset.setBlockHeight(lastBlockHeader.getHeight());
localBlockOffset.setBlockHash(lastBlockHeader.getBlockHash());
elasticSearchHandler.setRemoteOffset(localBlockOffset);
logger.info("indexer update success: {}", localBlockOffset);
blockIndexerOffset.updateBlockOffset(lastBlockHeader.getHeight(), lastBlockHeader.getBlockHash());

logger.info("Index update success: {}", blockIndexerOffset);

} catch (JSONRPC2SessionException | IOException e) {
logger.error("chain header error:", e);
} finally {
Expand All @@ -140,18 +129,18 @@ public void fetchAndProcessBlocksParallel() {
public void fetchAndProcessBlockSequel() {
// Read chain header
try {
BlockHeader chainHeader = blockRPCClient.getChainHeader();

// Calculate bulk size
long headHeight = chainHeader.getHeight();
long bulkNumber = Math.min(headHeight - localBlockOffset.getBlockHeight(), bulkSize);
long bulkNumber = Math.min(blockRPCClient.getChainHeader().getHeight() - blockIndexerOffset.getLocalBlockOffsetHeight(), bulkSize);
int index = 1;
List<Block> blockList = new ArrayList<>();
long minHeight = localBlockOffset.getBlockHeight();
long minHeight = blockIndexerOffset.getLocalBlockOffsetHeight();
String currentBlockHash = blockIndexerOffset.getLocalOffsetBlockHash();
Set<String> visit = new HashSet<>();
Deque<Block> deque = new ArrayDeque<>();

long currentBlockHeight = minHeight;

while (index <= bulkNumber) {
long currentBlockHeight = localBlockOffset.getBlockHeight() + index;
currentBlockHeight = minHeight + index;

logger.info("Start Get block number: {}, currentBlockHeight: {}", index, currentBlockHeight);
Block block = blockRPCClient.getBlockByHeight(currentBlockHeight);
Expand All @@ -169,18 +158,18 @@ public void fetchAndProcessBlockSequel() {
}
}
blockList.add(block);
//update current header
currentHandleHeader = block.getHeader();
index++;

currentBlockHash = block.getHeader().getBlockHash();

logger.info("add block: {}", block.getHeader());
}
inspectorHandler.upsertDagInfoFromBlocks(blockList);

// Update offset
localBlockOffset.setBlockHeight(currentHandleHeader.getHeight());
localBlockOffset.setBlockHash(currentHandleHeader.getBlockHash());
elasticSearchHandler.setRemoteOffset(localBlockOffset);
logger.info("indexer update success: {}", localBlockOffset);
blockIndexerOffset.updateBlockOffset(currentBlockHeight, currentBlockHash);

logger.info("indexer update success: {}", blockIndexerOffset);
} catch (JSONRPC2SessionException | IOException e) {
logger.error("chain header error:", e);
}
Expand All @@ -193,17 +182,30 @@ void fetchParentsBlock(
List<Block> blockList,
long minHeight
) throws JSONRPC2SessionException {
List<String> parents = block.getHeader().getParentsHash();
if (parents == null || parents.isEmpty()) {
return ;
}

for (String parent : block.getHeader().getParentsHash()) {
if (!visit.contains(parent)) {
visit.add(parent);
Block block_parent = elasticSearchHandler.getBlockContent(parent);
if (block_parent == null) {
block_parent = blockRPCClient.getBlockByHash(parent);
if (block_parent.getHeader().getHeight() >= minHeight) {
deque.addLast(block_parent);
blockList.add(block_parent);
}
}
if (visit.contains(parent)) {
continue;
}
visit.add(parent);
Block block_parent = blockList
.stream()
.filter(b -> b.getHeader().getBlockHash().compareToIgnoreCase(parent) == 0)
.findAny()
.orElse(null);

if (block_parent != null) {
continue;
}

block_parent = blockRPCClient.getBlockByHash(parent);
if (block_parent.getHeader().getHeight() >= minHeight) {
deque.addLast(block_parent);
blockList.add(block_parent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public class DagInspectorIndexerHandler {

@PostConstruct
public void initIndexs() throws IOException {
dagInspectNodeIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECTOR_BLOCK);
dagInspectEdgeIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECTOR_EDGE);
dagInspectHeightGroupIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECT_HEIGHT_GROUP);
dagInspectNodeIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECTOR_BLOCK_INDEX);
dagInspectEdgeIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECTOR_EDGE_INDEX);
dagInspectHeightGroupIndex = ServiceUtils.createIndexIfNotExist(client, network, Constant.DAG_INSPECT_HEIGHT_GROUP_INDEX);
}

public void upsertDagInfoFromBlocks(List<Block> blockList) throws IOException, JSONRPC2SessionException {
Expand Down Expand Up @@ -139,11 +139,12 @@ List<DagInspectorEdge> buildEdgeDataFromDagBlockDataMaybeUpate(

List<DagInspectorBlock> newDagBlocks = new ArrayList<>();
for (DagInspectorBlock dagBlock : dagBlockMap.values()) {
if (dagBlock.getParentIds().isEmpty()) {
List<String> parentIds = dagBlock.getParentIds();
if (parentIds == null || parentIds.isEmpty()) {
continue;
}

for (String parentHash : dagBlock.getParentIds()) {
for (String parentHash : parentIds) {
DagInspectorBlock parentDagBlock = dagBlockMap.get(parentHash);
if (parentDagBlock == null) {
logger.info("Parent block not found: {} ", parentHash);
Expand Down
Loading

0 comments on commit 3ea2504

Please sign in to comment.