diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java index 643c8360e847f..7dc3fb6e94d3e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java @@ -211,6 +211,14 @@ public interface IConsensus { */ Peer getLeader(ConsensusGroupId groupId); + /** + * Return the replicationNum of the corresponding consensus group. + * + * @param groupId the consensus group + * @return return 0 if group doesn't exist, or return replicationNum + */ + int getReplicationNum(ConsensusGroupId groupId); + /** * Return all consensus group ids. * diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 0526729033bee..e883aaa7e7962 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -427,6 +427,12 @@ public Peer getLeader(ConsensusGroupId groupId) { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + IoTConsensusServerImpl impl = stateMachineMap.get(groupId); + return impl != null ? impl.getConfiguration().size() : 0; + } + @Override public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index fccbef50c6795..2ee126970446d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -433,6 +433,12 @@ public Peer getLeader(ConsensusGroupId groupId) { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + PipeConsensusServerImpl impl = stateMachineMap.get(groupId); + return impl != null ? impl.getPeers().size() : 0; + } + @Override public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index c8fbeea4ced26..863f9c5ebed42 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -708,6 +708,16 @@ public Peer getLeader(ConsensusGroupId groupId) { return new Peer(groupId, nodeId, null); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId); + try { + return server.get().getDivision(raftGroupId).getGroup().getPeers().size(); + } catch (IOException e) { + return 0; + } + } + @Override public List getAllConsensusGroupIds() { List ids = new ArrayList<>(); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java index a383687c2590e..8547b52b7c453 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensus.java @@ -244,6 +244,11 @@ public Peer getLeader(ConsensusGroupId groupId) { return new Peer(groupId, thisNodeId, thisNode); } + @Override + public int getReplicationNum(ConsensusGroupId groupId) { + return stateMachineMap.containsKey(groupId) ? 1 : 0; + } + @Override public List getAllConsensusGroupIds() { return new ArrayList<>(stateMachineMap.keySet()); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java index 7e176242dc7ff..43e328e383351 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java @@ -100,9 +100,11 @@ public void allTest() throws Exception { public void addConsensusGroup() { try { + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } @@ -151,7 +153,9 @@ public void removeConsensusGroup() { consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.deleteLocalPeer(dataRegionId); + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java index 6883f78244324..685f580e4baee 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java @@ -117,7 +117,9 @@ public void basicConsensus3Copy() throws Exception { public void addMemberToGroup() throws Exception { List original = peers.subList(0, 1); + Assert.assertEquals(0, servers.get(0).getReplicationNum(group.getGroupId())); servers.get(0).createLocalPeer(group.getGroupId(), original); + Assert.assertEquals(1, servers.get(0).getReplicationNum(group.getGroupId())); doConsensus(0, 10, 10); Assert.assertThrows( @@ -127,9 +129,11 @@ public void addMemberToGroup() throws Exception { // add 2 members servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList()); servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1)); + Assert.assertEquals(2, servers.get(1).getReplicationNum(group.getGroupId())); servers.get(2).createLocalPeer(group.getGroupId(), Collections.emptyList()); servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2)); + Assert.assertEquals(3, servers.get(1).getReplicationNum(group.getGroupId())); miniCluster.waitUntilActiveLeaderElectedAndReady(); @@ -157,9 +161,12 @@ public void removeMemberFromGroup() throws Exception { doConsensus(0, 10, 10); servers.get(0).transferLeader(gid, peers.get(0)); + Assert.assertEquals(3, servers.get(0).getReplicationNum(gid)); servers.get(0).removeRemotePeer(gid, peers.get(1)); + Assert.assertEquals(2, servers.get(0).getReplicationNum(gid)); servers.get(1).deleteLocalPeer(gid); servers.get(0).removeRemotePeer(gid, peers.get(2)); + Assert.assertEquals(1, servers.get(0).getReplicationNum(gid)); servers.get(2).deleteLocalPeer(gid); miniCluster.waitUntilActiveLeaderElectedAndReady(); diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index 8bd8e64b4f0ef..047465a3d4f69 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -165,9 +165,11 @@ public void tearDown() throws Exception { @Test public void addConsensusGroup() { try { + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } @@ -211,7 +213,7 @@ public void addConsensusGroup() { } @Test - public void removeConsensusGroup() throws ConsensusException { + public void removeConsensusGroup() { try { consensusImpl.deleteLocalPeer(dataRegionId); Assert.fail(); @@ -223,7 +225,9 @@ public void removeConsensusGroup() throws ConsensusException { consensusImpl.createLocalPeer( dataRegionId, Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667)))); + Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId)); consensusImpl.deleteLocalPeer(dataRegionId); + Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId)); } catch (ConsensusException e) { Assert.fail(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index 61de9f2c90500..c068e7696c10b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -19,8 +19,10 @@ package org.apache.iotdb.db.queryengine.execution.load; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.service.metric.MetricService; @@ -29,6 +31,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; @@ -312,6 +315,7 @@ private void forceCloseWriterManager(String uuid) { } private static class TsFileWriterManager { + private final File taskDir; private Map dataPartition2Writer; private Map dataPartition2LastDevice; @@ -421,7 +425,6 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex) // Report load tsFile points to IoTDB flush metrics MemTableFlushTask.recordFlushPointsMetricInternal( writePointCount, databaseName, dataRegion.getDataRegionId()); - MetricService.getInstance() .count( writePointCount, @@ -435,6 +438,32 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex) dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); + // Because we cannot accurately judge who is the leader here, + // we directly divide the writePointCount by the replicationNum to ensure the + // correctness of this metric, which will be accurate in most cases + int replicationNum = + DataRegionConsensusImpl.getInstance() + .getReplicationNum( + ConsensusGroupId.Factory.create( + TConsensusGroupType.DataRegion.getValue(), + Integer.parseInt(dataRegion.getDataRegionId()))); + // It may happen that the replicationNum is 0 when load and db deletion occurs + // concurrently, so we can just not to count the number of points in this case + if (replicationNum != 0) { + MetricService.getInstance() + .count( + writePointCount / replicationNum, + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegion.getDataRegionId(), + Tag.TYPE.toString(), + Metric.LOAD_POINT_COUNT.toString()); + } }); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 91b01c4794988..165be5fb8ba28 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -462,21 +462,19 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException { dataRegion.getDataRegionId(), Tag.TYPE.toString(), Metric.LOAD_POINT_COUNT.toString()); - if (!node.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - node.getWritePointCount(), - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - Metric.POINTS_IN.toString(), - Tag.DATABASE.toString(), - databaseName, - Tag.REGION.toString(), - dataRegion.getDataRegionId(), - Tag.TYPE.toString(), - Metric.LOAD_POINT_COUNT.toString()); - } + MetricService.getInstance() + .count( + node.getWritePointCount(), + Metric.LEADER_QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + Metric.POINTS_IN.toString(), + Tag.DATABASE.toString(), + databaseName, + Tag.REGION.toString(), + dataRegion.getDataRegionId(), + Tag.TYPE.toString(), + Metric.LOAD_POINT_COUNT.toString()); }); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 1ce2213ad6eef..23e35b2c6498f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -238,7 +238,9 @@ public void insert(InsertRowNode insertRowNode) { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -250,7 +252,9 @@ public void insert(InsertRowNode insertRowNode) { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } @@ -290,7 +294,9 @@ public void insertAlignedRow(InsertRowNode insertRowNode) { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -302,7 +308,9 @@ public void insertAlignedRow(InsertRowNode insertRowNode) { Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } @@ -326,7 +334,9 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end) Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -338,7 +348,9 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end) Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } catch (RuntimeException e) { throw new WriteProcessException(e); @@ -365,7 +377,9 @@ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, in Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( @@ -377,7 +391,9 @@ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, in Tag.DATABASE.toString(), database, Tag.REGION.toString(), - dataRegionId); + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); } } catch (RuntimeException e) { throw new WriteProcessException(e); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 219ead6945182..7966617104ef0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -180,6 +180,7 @@ public enum Metric { LOAD_DISK_IO("load_disk_io"), LOAD_TIME_COST("load_time_cost"), LOAD_POINT_COUNT("load_point_count"), + MEMTABLE_POINT_COUNT("memtable_point_count"), ; final String value;