Skip to content

Commit

Permalink
Enhance leader quantity metric for load scenario (#12785)
Browse files Browse the repository at this point in the history
* finish

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix review issue

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix compile

Signed-off-by: OneSizeFitQuorum <[email protected]>

* fix bug

Signed-off-by: OneSizeFitQuorum <[email protected]>

---------

Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum authored Jun 21, 2024
1 parent 0b8e314 commit a96349e
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,14 @@ public interface IConsensus {
*/
Peer getLeader(ConsensusGroupId groupId);

/**
* Return the replicationNum of the corresponding consensus group.
*
* @param groupId the consensus group
* @return return 0 if group doesn't exist, or return replicationNum
*/
int getReplicationNum(ConsensusGroupId groupId);

/**
* Return all consensus group ids.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ public Peer getLeader(ConsensusGroupId groupId) {
return new Peer(groupId, thisNodeId, thisNode);
}

@Override
public int getReplicationNum(ConsensusGroupId groupId) {
IoTConsensusServerImpl impl = stateMachineMap.get(groupId);
return impl != null ? impl.getConfiguration().size() : 0;
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ public Peer getLeader(ConsensusGroupId groupId) {
return new Peer(groupId, thisNodeId, thisNode);
}

@Override
public int getReplicationNum(ConsensusGroupId groupId) {
PipeConsensusServerImpl impl = stateMachineMap.get(groupId);
return impl != null ? impl.getPeers().size() : 0;
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,16 @@ public Peer getLeader(ConsensusGroupId groupId) {
return new Peer(groupId, nodeId, null);
}

@Override
public int getReplicationNum(ConsensusGroupId groupId) {
RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
try {
return server.get().getDivision(raftGroupId).getGroup().getPeers().size();
} catch (IOException e) {
return 0;
}
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
List<ConsensusGroupId> ids = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public Peer getLeader(ConsensusGroupId groupId) {
return new Peer(groupId, thisNodeId, thisNode);
}

@Override
public int getReplicationNum(ConsensusGroupId groupId) {
return stateMachineMap.containsKey(groupId) ? 1 : 0;
}

@Override
public List<ConsensusGroupId> getAllConsensusGroupIds() {
return new ArrayList<>(stateMachineMap.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,11 @@ public void allTest() throws Exception {

public void addConsensusGroup() {
try {
Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort))));
Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
Expand Down Expand Up @@ -151,7 +153,9 @@ public void removeConsensusGroup() {
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", basePort))));
Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.deleteLocalPeer(dataRegionId);
Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ public void basicConsensus3Copy() throws Exception {
public void addMemberToGroup() throws Exception {
List<Peer> original = peers.subList(0, 1);

Assert.assertEquals(0, servers.get(0).getReplicationNum(group.getGroupId()));
servers.get(0).createLocalPeer(group.getGroupId(), original);
Assert.assertEquals(1, servers.get(0).getReplicationNum(group.getGroupId()));
doConsensus(0, 10, 10);

Assert.assertThrows(
Expand All @@ -127,9 +129,11 @@ public void addMemberToGroup() throws Exception {
// add 2 members
servers.get(1).createLocalPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).addRemotePeer(group.getGroupId(), peers.get(1));
Assert.assertEquals(2, servers.get(1).getReplicationNum(group.getGroupId()));

servers.get(2).createLocalPeer(group.getGroupId(), Collections.emptyList());
servers.get(0).addRemotePeer(group.getGroupId(), peers.get(2));
Assert.assertEquals(3, servers.get(1).getReplicationNum(group.getGroupId()));

miniCluster.waitUntilActiveLeaderElectedAndReady();

Expand Down Expand Up @@ -157,9 +161,12 @@ public void removeMemberFromGroup() throws Exception {
doConsensus(0, 10, 10);

servers.get(0).transferLeader(gid, peers.get(0));
Assert.assertEquals(3, servers.get(0).getReplicationNum(gid));
servers.get(0).removeRemotePeer(gid, peers.get(1));
Assert.assertEquals(2, servers.get(0).getReplicationNum(gid));
servers.get(1).deleteLocalPeer(gid);
servers.get(0).removeRemotePeer(gid, peers.get(2));
Assert.assertEquals(1, servers.get(0).getReplicationNum(gid));
servers.get(2).deleteLocalPeer(gid);

miniCluster.waitUntilActiveLeaderElectedAndReady();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ public void tearDown() throws Exception {
@Test
public void addConsensusGroup() {
try {
Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667))));
Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
Expand Down Expand Up @@ -211,7 +213,7 @@ public void addConsensusGroup() {
}

@Test
public void removeConsensusGroup() throws ConsensusException {
public void removeConsensusGroup() {
try {
consensusImpl.deleteLocalPeer(dataRegionId);
Assert.fail();
Expand All @@ -223,7 +225,9 @@ public void removeConsensusGroup() throws ConsensusException {
consensusImpl.createLocalPeer(
dataRegionId,
Collections.singletonList(new Peer(dataRegionId, 1, new TEndPoint("0.0.0.0", 6667))));
Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
consensusImpl.deleteLocalPeer(dataRegionId);
Assert.assertEquals(0, consensusImpl.getReplicationNum(dataRegionId));
} catch (ConsensusException e) {
Assert.fail();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.iotdb.db.queryengine.execution.load;

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.commons.service.metric.MetricService;
Expand All @@ -29,6 +31,7 @@
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
Expand Down Expand Up @@ -312,6 +315,7 @@ private void forceCloseWriterManager(String uuid) {
}

private static class TsFileWriterManager {

private final File taskDir;
private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
private Map<DataPartitionInfo, String> dataPartition2LastDevice;
Expand Down Expand Up @@ -421,7 +425,6 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
// Report load tsFile points to IoTDB flush metrics
MemTableFlushTask.recordFlushPointsMetricInternal(
writePointCount, databaseName, dataRegion.getDataRegionId());

MetricService.getInstance()
.count(
writePointCount,
Expand All @@ -435,6 +438,32 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
// Because we cannot accurately judge who is the leader here,
// we directly divide the writePointCount by the replicationNum to ensure the
// correctness of this metric, which will be accurate in most cases
int replicationNum =
DataRegionConsensusImpl.getInstance()
.getReplicationNum(
ConsensusGroupId.Factory.create(
TConsensusGroupType.DataRegion.getValue(),
Integer.parseInt(dataRegion.getDataRegionId())));
// It may happen that the replicationNum is 0 when load and db deletion occurs
// concurrently, so we can just not to count the number of points in this case
if (replicationNum != 0) {
MetricService.getInstance()
.count(
writePointCount / replicationNum,
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
Metric.POINTS_IN.toString(),
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
}
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,21 +462,19 @@ private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException {
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
if (!node.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
node.getWritePointCount(),
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
Metric.POINTS_IN.toString(),
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
}
MetricService.getInstance()
.count(
node.getWritePointCount(),
Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
Metric.POINTS_IN.toString(),
Tag.DATABASE.toString(),
databaseName,
Tag.REGION.toString(),
dataRegion.getDataRegionId(),
Tag.TYPE.toString(),
Metric.LOAD_POINT_COUNT.toString());
});

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,9 @@ public void insert(InsertRowNode insertRowNode) {
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -250,7 +252,9 @@ public void insert(InsertRowNode insertRowNode) {
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
}

Expand Down Expand Up @@ -290,7 +294,9 @@ public void insertAlignedRow(InsertRowNode insertRowNode) {
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -302,7 +308,9 @@ public void insertAlignedRow(InsertRowNode insertRowNode) {
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
}

Expand All @@ -326,7 +334,9 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -338,7 +348,9 @@ public void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
} catch (RuntimeException e) {
throw new WriteProcessException(e);
Expand All @@ -365,7 +377,9 @@ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, in
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
Expand All @@ -377,7 +391,9 @@ public void insertAlignedTablet(InsertTabletNode insertTabletNode, int start, in
Tag.DATABASE.toString(),
database,
Tag.REGION.toString(),
dataRegionId);
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
}
} catch (RuntimeException e) {
throw new WriteProcessException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public enum Metric {
LOAD_DISK_IO("load_disk_io"),
LOAD_TIME_COST("load_time_cost"),
LOAD_POINT_COUNT("load_point_count"),
MEMTABLE_POINT_COUNT("memtable_point_count"),
;

final String value;
Expand Down

0 comments on commit a96349e

Please sign in to comment.