Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: Weihao Li <[email protected]>
  • Loading branch information
Wei-hao-Li committed Oct 18, 2024
2 parents 27121ce + 48593d7 commit 6d3f3dd
Show file tree
Hide file tree
Showing 33 changed files with 1,646 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,67 @@ public void lastFirstMaxMinTest() {
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

@Test
public void lastByFirstByTest() {
String[] expectedHeader1 = buildHeaders(13);
String[] expectedHeader2 = buildHeaders(14);

sql =
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time) from table0 where device='d2'";
retArray =
new String[] {
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
sql =
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(blob,time) from table0 where device='d2'";
retArray =
new String[] {
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,null,",
};
tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME);

sql =
"select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'";
retArray =
new String[] {
"1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
sql =
"select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv),last_by(time,blob) from table0 where device='d2'";
retArray =
new String[] {
"1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1970-01-01T00:00:00.080Z,",
};
tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME);

String[] expectedHeader11 = buildHeaders(expectedHeader1.length * 2);
sql =
"select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'";
retArray =
new String[] {
"1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader11, retArray, DATABASE_NAME);

sql =
"select first_by(time,time),first_by(device,time),first_by(level,time),first_by(attr1,time),first_by(attr2,time),first_by(num,time),first_by(bignum,time),first_by(floatnum,time),first_by(str,time),first_by(bool,time),first_by(date,time),first_by(ts,time),first_by(stringv,time) from table0 where device='d2' and time>80";
retArray =
new String[] {
"1970-01-01T00:00:00.100Z,d2,l5,null,null,8,2147483964,4654.231,papaya,true,null,null,null,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);

sql =
"select first_by(time,time),first_by(time,device),first_by(time,level),first_by(time,attr1),first_by(time,attr2),first_by(time,num),first_by(time,bignum),first_by(time,floatnum),first_by(time,str),first_by(time,bool),first_by(time,date),first_by(time,ts),first_by(time,stringv) from table0 where device='d2' and time>80";
retArray =
new String[] {
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,",
};
tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
}

// ==================================================================
// ============================ Join Test ===========================
// ==================================================================
Expand Down Expand Up @@ -1274,4 +1335,12 @@ public void fullOuterJoinTest2() {
+ "ORDER BY time, t1.device, t2.device";
tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
}

public static String[] buildHeaders(int length) {
String[] expectedHeader = new String[length];
for (int i = 0; i < length; i++) {
expectedHeader[i] = "_col" + i;
}
return expectedHeader;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;

public class RegionMaintainHandler {
Expand Down Expand Up @@ -146,7 +147,8 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio

List<TDataNodeLocation> currentPeerNodes;
if (TConsensusGroupType.DataRegion.equals(regionId.getType())
&& IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
&& (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())
|| IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) {
// parameter of createPeer for MultiLeader should be all peers
currentPeerNodes = new ArrayList<>(regionReplicaNodes);
currentPeerNodes.add(destDataNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
Expand Down Expand Up @@ -258,12 +261,12 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) {
throw new IllegalPeerEndpointException(thisNode, peers);
}
if (stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupAlreadyExistException(groupId);
}

try {
stateMachineMapLock.lock();
if (stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupAlreadyExistException(groupId);
}

final String path = getPeerDir(groupId);
if (!new File(path).mkdirs()) {
Expand All @@ -283,6 +286,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
syncClientManager);
stateMachineMap.put(groupId, consensus);
consensus.start(false); // pipe will start after creating
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
} catch (IOException e) {
LOGGER.warn("Cannot create local peer for group {} with peers {}", groupId, peers, e);
throw new ConsensusException(e);
Expand All @@ -293,17 +297,18 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)

@Override
public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException {
if (!stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupNotExistException(groupId);
}

KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
try {
stateMachineMapLock.lock();
if (!stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupNotExistException(groupId);
}

final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
consensus.clear();

FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
} catch (IOException e) {
LOGGER.warn("Cannot delete local peer for group {}", groupId, e);
throw new ConsensusException(e);
Expand All @@ -328,6 +333,7 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
// step 2: notify all the other Peers to create consensus pipes to newPeer
LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME);
impl.notifyPeersToCreateConsensusPipes(peer);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION);

// step 3: wait until all the other Peers finish transferring
LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME);
Expand All @@ -336,6 +342,7 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE
// step 4: active new Peer
LOGGER.info("[{}] activate new peer...", CLASS_NAME);
impl.setRemotePeerActive(peer, true);
KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE);
} catch (ConsensusGroupModifyPeerException e) {
try {
LOGGER.info("[{}] add remote peer failed, automatic cleanup side effects...", CLASS_NAME);
Expand All @@ -359,22 +366,28 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens
if (!impl.containsPeer(peer)) {
throw new PeerNotInConsensusGroupException(groupId, peer.toString());
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT);

try {
// let other peers remove the consensus pipe to target peer
impl.notifyPeersToDropConsensusPipe(peer);
// let target peer reject new write
impl.setRemotePeerActive(peer, false);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
// wait its consensus pipes to complete
impl.waitTargetPeerToPeersTransmissionCompleted(peer);
// remove consensus pipes between target peer and other peers
impl.notifyPeersToDropConsensusPipe(peer);
// wait target peer to release all resource
impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
}
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH);
}

@Override
public void resetPeerList(ConsensusGroupId groupId, List<Peer> correctPeers)
throws ConsensusException {
LOGGER.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers);
PipeConsensusServerImpl impl =
Optional.ofNullable(stateMachineMap.get(groupId))
.orElseThrow(() -> new ConsensusGroupNotExistException(groupId));
Expand Down Expand Up @@ -470,16 +483,11 @@ public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {

@Override
public void reloadConsensusConfig(ConsensusConfig consensusConfig) {
// TODO: impl for hot config loading
// PipeConsensus doesn't support reload consensus config, related config can be reloaded in
// iotdb-core layer.
}

public PipeConsensusServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}

//////////////////////////// APIs provided for Test ////////////////////////////
@TestOnly
public int getPipeCount() {
return this.consensusPipeManager.getAllConsensusPipe().size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
Expand All @@ -49,10 +50,13 @@
import org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToDropConsensusPipeResp;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;

import com.google.common.collect.ImmutableMap;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -442,6 +446,9 @@ public void notifyPeersToDropConsensusPipe(Peer targetPeer)
final List<Peer> otherPeers = peerManager.getOtherPeers(thisNode);
Exception exception = null;
for (Peer peer : otherPeers) {
if (peer.equals(targetPeer)) {
continue;
}
try (SyncPipeConsensusServiceClient client =
syncClientManager.borrowClient(peer.getEndpoint())) {
TNotifyPeerToDropConsensusPipeResp resp =
Expand Down Expand Up @@ -591,6 +598,43 @@ public synchronized boolean isConsensusPipesTransmissionCompleted(
}
}

public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
throws ConsensusGroupModifyPeerException {
long checkIntervalInMs = 10_000L;
try (SyncPipeConsensusServiceClient client =
syncClientManager.borrowClient(targetPeer.getEndpoint())) {
while (true) {
TWaitReleaseAllRegionRelatedResourceResp res =
client.waitReleaseAllRegionRelatedResource(
new TWaitReleaseAllRegionRelatedResourceReq(
targetPeer.getGroupId().convertToTConsensusGroupId()));
if (res.releaseAllResource) {
LOGGER.info("[WAIT RELEASE] {} has released all region related resource", targetPeer);
return;
}
LOGGER.info("[WAIT RELEASE] {} is still releasing all region related resource", targetPeer);
Thread.sleep(checkIntervalInMs);
}
} catch (ClientManagerException | TException e) {
throw new ConsensusGroupModifyPeerException(
String.format(
"error when waiting %s to release all region related resource. %s",
targetPeer, e.getMessage()),
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConsensusGroupModifyPeerException(
String.format(
"thread interrupted when waiting %s to release all region related resource. %s",
targetPeer, e.getMessage()),
e);
}
}

public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) {
return stateMachine.hasReleaseAllRegionRelatedResource(groupId);
}

public boolean isReadOnly() {
return stateMachine.isReadOnly();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq;
import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp;
import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq;
import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -186,5 +188,23 @@ public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(
return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted);
}

@Override
public TWaitReleaseAllRegionRelatedResourceResp waitReleaseAllRegionRelatedResource(
TWaitReleaseAllRegionRelatedResourceReq req) {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId);
if (impl == null) {
String message =
String.format(
"unexpected consensusGroupId %s for TWaitReleaseAllRegionRelatedResourceRes request",
groupId);
LOGGER.error(message);
return new TWaitReleaseAllRegionRelatedResourceResp(true);
}
return new TWaitReleaseAllRegionRelatedResourceResp(
impl.hasReleaseAllRegionRelatedResource(groupId));
}

public void handleExit() {}
}
Loading

0 comments on commit 6d3f3dd

Please sign in to comment.