diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 53e38ce37507..d7de73b7283d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -989,6 +989,67 @@ public void lastFirstMaxMinTest() { tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } + @Test + public void lastByFirstByTest() { + String[] expectedHeader1 = buildHeaders(13); + String[] expectedHeader2 = buildHeaders(14); + + sql = + "select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time) from table0 where device='d2'"; + retArray = + new String[] { + "1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,", + }; + tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + sql = + "select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(blob,time) from table0 where device='d2'"; + retArray = + new String[] { + "1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,null,", + }; + tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME); + + sql = + "select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'"; + retArray = + new String[] { + "1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,", + }; + tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + sql = + "select last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv),last_by(time,blob) from table0 where device='d2'"; + retArray = + new String[] { + "1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1970-01-01T00:00:00.080Z,", + }; + tableResultSetEqualTest(sql, expectedHeader2, retArray, DATABASE_NAME); + + String[] expectedHeader11 = buildHeaders(expectedHeader1.length * 2); + sql = + "select last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(str,time),last_by(bool,time),last_by(date,time),last_by(ts,time),last_by(stringv,time),last_by(time,time),last_by(time,device),last_by(time,level),last_by(time,attr1),last_by(time,attr2),last_by(time,num),last_by(time,bignum),last_by(time,floatnum),last_by(time,str),last_by(time,bool),last_by(time,date),last_by(time,ts),last_by(time,stringv) from table0 where device='d2'"; + retArray = + new String[] { + "1971-08-20T11:33:20.000Z,d2,l5,null,null,15,3147483648,235.213,watermelon,true,2023-01-01,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-04-26T17:46:40.000Z,1971-01-01T00:01:40.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,", + }; + tableResultSetEqualTest(sql, expectedHeader11, retArray, DATABASE_NAME); + + sql = + "select first_by(time,time),first_by(device,time),first_by(level,time),first_by(attr1,time),first_by(attr2,time),first_by(num,time),first_by(bignum,time),first_by(floatnum,time),first_by(str,time),first_by(bool,time),first_by(date,time),first_by(ts,time),first_by(stringv,time) from table0 where device='d2' and time>80"; + retArray = + new String[] { + "1970-01-01T00:00:00.100Z,d2,l5,null,null,8,2147483964,4654.231,papaya,true,null,null,null,", + }; + tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + + sql = + "select first_by(time,time),first_by(time,device),first_by(time,level),first_by(time,attr1),first_by(time,attr2),first_by(time,num),first_by(time,bignum),first_by(time,floatnum),first_by(time,str),first_by(time,bool),first_by(time,date),first_by(time,ts),first_by(time,stringv) from table0 where device='d2' and time>80"; + retArray = + new String[] { + "1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-01-01T00:00:00.000Z,1971-01-01T00:00:00.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.100Z,1971-08-20T11:33:20.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,", + }; + tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME); + } + // ================================================================== // ============================ Join Test =========================== // ================================================================== @@ -1274,4 +1335,12 @@ public void fullOuterJoinTest2() { + "ORDER BY time, t1.device, t2.device"; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); } + + public static String[] buildHeaders(int length) { + String[] expectedHeader = new String[length]; + for (int i = 0; i < length; i++) { + expectedHeader[i] = "_col" + i; + } + return expectedHeader; + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index bf9fd40bc524..dca6fac949e4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java @@ -67,6 +67,7 @@ import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; +import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2; import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; public class RegionMaintainHandler { @@ -146,7 +147,8 @@ public TSStatus createNewRegionPeer(TConsensusGroupId regionId, TDataNodeLocatio List currentPeerNodes; if (TConsensusGroupType.DataRegion.equals(regionId.getType()) - && IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) { + && (IOT_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass()) + || IOT_CONSENSUS_V2.equals(CONF.getDataRegionConsensusProtocolClass()))) { // parameter of createPeer for MultiLeader should be all peers currentPeerNodes = new ArrayList<>(regionReplicaNodes); currentPeerNodes.add(destDataNode); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java index bea781772c3d..cc42a35431fb 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java @@ -30,8 +30,11 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.KillPoint.DataNodeKillPoints; +import org.apache.iotdb.commons.utils.KillPoint.IoTConsensusDeleteLocalPeerKillPoints; +import org.apache.iotdb.commons.utils.KillPoint.IoTConsensusRemovePeerCoordinatorKillPoints; +import org.apache.iotdb.commons.utils.KillPoint.KillPoint; import org.apache.iotdb.commons.utils.StatusUtils; -import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.consensus.IConsensus; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; @@ -258,12 +261,12 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) if (!peers.contains(new Peer(groupId, thisNodeId, thisNode))) { throw new IllegalPeerEndpointException(thisNode, peers); } - if (stateMachineMap.containsKey(groupId)) { - throw new ConsensusGroupAlreadyExistException(groupId); - } try { stateMachineMapLock.lock(); + if (stateMachineMap.containsKey(groupId)) { + throw new ConsensusGroupAlreadyExistException(groupId); + } final String path = getPeerDir(groupId); if (!new File(path).mkdirs()) { @@ -283,6 +286,7 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) syncClientManager); stateMachineMap.put(groupId, consensus); consensus.start(false); // pipe will start after creating + KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER); } catch (IOException e) { LOGGER.warn("Cannot create local peer for group {} with peers {}", groupId, peers, e); throw new ConsensusException(e); @@ -293,17 +297,18 @@ public void createLocalPeer(ConsensusGroupId groupId, List peers) @Override public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException { - if (!stateMachineMap.containsKey(groupId)) { - throw new ConsensusGroupNotExistException(groupId); - } - + KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE); try { stateMachineMapLock.lock(); + if (!stateMachineMap.containsKey(groupId)) { + throw new ConsensusGroupNotExistException(groupId); + } final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId); consensus.clear(); FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId))); + KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE); } catch (IOException e) { LOGGER.warn("Cannot delete local peer for group {}", groupId, e); throw new ConsensusException(e); @@ -328,6 +333,7 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE // step 2: notify all the other Peers to create consensus pipes to newPeer LOGGER.info("[{}] notify current peers to create consensus pipes...", CLASS_NAME); impl.notifyPeersToCreateConsensusPipes(peer); + KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_TRANSITION); // step 3: wait until all the other Peers finish transferring LOGGER.info("[{}] wait until all the other peers finish transferring...", CLASS_NAME); @@ -336,6 +342,7 @@ public void addRemotePeer(ConsensusGroupId groupId, Peer peer) throws ConsensusE // step 4: active new Peer LOGGER.info("[{}] activate new peer...", CLASS_NAME); impl.setRemotePeerActive(peer, true); + KillPoint.setKillPoint(DataNodeKillPoints.COORDINATOR_ADD_PEER_DONE); } catch (ConsensusGroupModifyPeerException e) { try { LOGGER.info("[{}] add remote peer failed, automatic cleanup side effects...", CLASS_NAME); @@ -359,22 +366,28 @@ public void removeRemotePeer(ConsensusGroupId groupId, Peer peer) throws Consens if (!impl.containsPeer(peer)) { throw new PeerNotInConsensusGroupException(groupId, peer.toString()); } + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.INIT); try { - // let other peers remove the consensus pipe to target peer - impl.notifyPeersToDropConsensusPipe(peer); // let target peer reject new write impl.setRemotePeerActive(peer, false); + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER); // wait its consensus pipes to complete impl.waitTargetPeerToPeersTransmissionCompleted(peer); + // remove consensus pipes between target peer and other peers + impl.notifyPeersToDropConsensusPipe(peer); + // wait target peer to release all resource + impl.waitReleaseAllRegionRelatedResource(peer); } catch (ConsensusGroupModifyPeerException e) { throw new ConsensusException(e.getMessage()); } + KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.FINISH); } @Override public void resetPeerList(ConsensusGroupId groupId, List correctPeers) throws ConsensusException { + LOGGER.info("[RESET PEER LIST] Start to reset peer list to {}", correctPeers); PipeConsensusServerImpl impl = Optional.ofNullable(stateMachineMap.get(groupId)) .orElseThrow(() -> new ConsensusGroupNotExistException(groupId)); @@ -470,16 +483,11 @@ public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) { @Override public void reloadConsensusConfig(ConsensusConfig consensusConfig) { - // TODO: impl for hot config loading + // PipeConsensus doesn't support reload consensus config, related config can be reloaded in + // iotdb-core layer. } public PipeConsensusServerImpl getImpl(ConsensusGroupId groupId) { return stateMachineMap.get(groupId); } - - //////////////////////////// APIs provided for Test //////////////////////////// - @TestOnly - public int getPipeCount() { - return this.consensusPipeManager.getAllConsensusPipe().size(); - } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java index b6b6829894af..488852994e28 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncPipeConsensusServiceClient; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; @@ -49,10 +50,13 @@ import org.apache.iotdb.consensus.pipe.thrift.TNotifyPeerToDropConsensusPipeResp; import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq; import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp; +import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq; +import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.RpcUtils; import com.google.common.collect.ImmutableMap; +import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -442,6 +446,9 @@ public void notifyPeersToDropConsensusPipe(Peer targetPeer) final List otherPeers = peerManager.getOtherPeers(thisNode); Exception exception = null; for (Peer peer : otherPeers) { + if (peer.equals(targetPeer)) { + continue; + } try (SyncPipeConsensusServiceClient client = syncClientManager.borrowClient(peer.getEndpoint())) { TNotifyPeerToDropConsensusPipeResp resp = @@ -591,6 +598,43 @@ public synchronized boolean isConsensusPipesTransmissionCompleted( } } + public void waitReleaseAllRegionRelatedResource(Peer targetPeer) + throws ConsensusGroupModifyPeerException { + long checkIntervalInMs = 10_000L; + try (SyncPipeConsensusServiceClient client = + syncClientManager.borrowClient(targetPeer.getEndpoint())) { + while (true) { + TWaitReleaseAllRegionRelatedResourceResp res = + client.waitReleaseAllRegionRelatedResource( + new TWaitReleaseAllRegionRelatedResourceReq( + targetPeer.getGroupId().convertToTConsensusGroupId())); + if (res.releaseAllResource) { + LOGGER.info("[WAIT RELEASE] {} has released all region related resource", targetPeer); + return; + } + LOGGER.info("[WAIT RELEASE] {} is still releasing all region related resource", targetPeer); + Thread.sleep(checkIntervalInMs); + } + } catch (ClientManagerException | TException e) { + throw new ConsensusGroupModifyPeerException( + String.format( + "error when waiting %s to release all region related resource. %s", + targetPeer, e.getMessage()), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new ConsensusGroupModifyPeerException( + String.format( + "thread interrupted when waiting %s to release all region related resource. %s", + targetPeer, e.getMessage()), + e); + } + } + + public boolean hasReleaseAllRegionRelatedResource(ConsensusGroupId groupId) { + return stateMachine.hasReleaseAllRegionRelatedResource(groupId); + } + public boolean isReadOnly() { return stateMachine.isReadOnly(); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java index 84a7cd8dbe32..5a7099bf8e0d 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/service/PipeConsensusRPCServiceProcessor.java @@ -39,6 +39,8 @@ import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferResp; import org.apache.iotdb.consensus.pipe.thrift.TSetActiveReq; import org.apache.iotdb.consensus.pipe.thrift.TSetActiveResp; +import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceReq; +import org.apache.iotdb.consensus.pipe.thrift.TWaitReleaseAllRegionRelatedResourceResp; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -186,5 +188,23 @@ public TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted( return new TCheckConsensusPipeCompletedResp(responseStatus, isCompleted); } + @Override + public TWaitReleaseAllRegionRelatedResourceResp waitReleaseAllRegionRelatedResource( + TWaitReleaseAllRegionRelatedResourceReq req) { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); + PipeConsensusServerImpl impl = pipeConsensus.getImpl(groupId); + if (impl == null) { + String message = + String.format( + "unexpected consensusGroupId %s for TWaitReleaseAllRegionRelatedResourceRes request", + groupId); + LOGGER.error(message); + return new TWaitReleaseAllRegionRelatedResourceResp(true); + } + return new TWaitReleaseAllRegionRelatedResourceResp( + impl.hasReleaseAllRegionRelatedResource(groupId)); + } + public void handleExit() {} } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index 4f54ec26f033..5ae2cf6188a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -30,10 +30,10 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import java.util.stream.Collectors; public class TsFileInsertionEventTableParserTabletIterator implements Iterator { @@ -43,6 +43,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator columnSchemas; + private final List columnTypes; private final List columnNames; private final TsBlockReader tsBlockReader; @@ -56,18 +57,19 @@ public TsFileInsertionEventTableParserTabletIterator( this.startTime = startTime; this.endTime = endTime; + columnSchemas = new ArrayList<>(); + columnTypes = new ArrayList<>(); + columnNames = new ArrayList<>(); try { - columnSchemas = - tableSchema.getColumnSchemas().stream() - // time column in aligned time-series should not be a query column - .filter( - schema -> - schema.getMeasurementId() != null && !schema.getMeasurementId().isEmpty()) - .collect(Collectors.toList()); - columnNames = - columnSchemas.stream() - .map(IMeasurementSchema::getMeasurementId) - .collect(Collectors.toList()); + for (int i = 0, size = tableSchema.getColumnSchemas().size(); i < size; i++) { + final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i); + if (schema.getMeasurementId() != null && !schema.getMeasurementId().isEmpty()) { + columnSchemas.add(schema); + columnTypes.add(tableSchema.getColumnTypes().get(i)); + columnNames.add(schema.getMeasurementId()); + } + } + tsBlockReader = tableQueryExecutor.query(tableName, columnNames, null, null, null); } catch (final ReadProcessException e) { throw new PipeException("Failed to build query data set", e); @@ -96,7 +98,8 @@ public Tablet next() { private Tablet buildNextTablet() throws IOException { final TsBlock tsBlock = tsBlockReader.next(); - final Tablet tablet = new Tablet(tableName, columnSchemas, tsBlock.getPositionCount()); + final Tablet tablet = + new Tablet(tableName, columnSchemas, columnTypes, tsBlock.getPositionCount()); tablet.initBitMaps(); final TsBlock.TsBlockRowIterator rowIterator = tsBlock.getTsBlockRowIterator(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java index d7e33ee0a49f..7065324aa35d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/IoTDBDataRegionExtractor.java @@ -61,16 +61,25 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STREAMING_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_IOTDB_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_FORMAT_PREFIX_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FILE_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_MODE_FORCED_LOG_VALUE; @@ -84,13 +93,22 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_SNAPSHOT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STREAMING_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_PATTERN_FORMAT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_WATERMARK_INTERVAL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._EXTRACTOR_WATERMARK_INTERVAL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant._SOURCE_WATERMARK_INTERVAL_KEY; public class IoTDBDataRegionExtractor extends IoTDBExtractor { @@ -225,7 +243,13 @@ public void validate(final PipeParameterValidator validator) throws Exception { .getBooleanOrDefault( Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE) - || validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)) { + || validator + .getParameters() + .hasAnyAttributes( + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY)) { validator.validateAttributeValueRange( validator.getParameters().hasAttribute(EXTRACTOR_REALTIME_MODE_KEY) ? EXTRACTOR_REALTIME_MODE_KEY @@ -239,26 +263,7 @@ public void validate(final PipeParameterValidator validator) throws Exception { EXTRACTOR_REALTIME_MODE_BATCH_MODE_VALUE); } - // Validate source.start-time and source.end-time - if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY) - && validator - .getParameters() - .hasAnyAttributes( - EXTRACTOR_HISTORY_ENABLE_KEY, - EXTRACTOR_REALTIME_ENABLE_KEY, - SOURCE_HISTORY_ENABLE_KEY, - SOURCE_REALTIME_ENABLE_KEY)) { - LOGGER.warn( - "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", - SOURCE_START_TIME_KEY, - EXTRACTOR_START_TIME_KEY, - SOURCE_END_TIME_KEY, - EXTRACTOR_END_TIME_KEY, - SOURCE_HISTORY_START_TIME_KEY, - EXTRACTOR_HISTORY_START_TIME_KEY, - SOURCE_HISTORY_END_TIME_KEY, - EXTRACTOR_HISTORY_END_TIME_KEY); - } + checkInvalidParameters(validator.getParameters()); constructHistoricalExtractor(); constructRealtimeExtractor(validator.getParameters()); @@ -290,6 +295,120 @@ private void validatePattern(final TreePattern treePattern, final TablePattern t } } + private void checkInvalidParameters(final PipeParameters parameters) { + // Enable history and realtime if specifying start-time or end-time + if (parameters.hasAnyAttributes( + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY) + && parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_ENABLE_KEY, + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_HISTORY_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY)) { + LOGGER.warn( + "When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.", + SOURCE_START_TIME_KEY, + EXTRACTOR_START_TIME_KEY, + SOURCE_END_TIME_KEY, + EXTRACTOR_END_TIME_KEY, + SOURCE_HISTORY_START_TIME_KEY, + EXTRACTOR_HISTORY_START_TIME_KEY, + SOURCE_HISTORY_END_TIME_KEY, + EXTRACTOR_HISTORY_END_TIME_KEY); + } + + // Check coexistence of mode.snapshot and mode + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_SNAPSHOT_KEY, + SOURCE_MODE_SNAPSHOT_KEY, + EXTRACTOR_MODE_KEY, + SOURCE_MODE_KEY); + } + + // Check coexistence of mode.streaming and realtime.mode + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY, + EXTRACTOR_REALTIME_MODE_KEY, + SOURCE_REALTIME_MODE_KEY); + } + + // Check coexistence of mode.strict, history.loose-range and realtime.loose-range + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { + if (parameters.hasAnyAttributes( + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STRICT_KEY, + SOURCE_MODE_STRICT_KEY, + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, + SOURCE_HISTORY_LOOSE_RANGE_KEY); + } + if (parameters.hasAnyAttributes( + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, SOURCE_REALTIME_LOOSE_RANGE_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODE_STRICT_KEY, + SOURCE_MODE_STRICT_KEY, + EXTRACTOR_REALTIME_LOOSE_RANGE_KEY, + SOURCE_REALTIME_LOOSE_RANGE_KEY); + } + } + + // Check coexistence of mods and mods.enable + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_ENABLE_KEY, SOURCE_MODS_ENABLE_KEY) + && parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_MODS_KEY, + SOURCE_MODS_KEY, + EXTRACTOR_MODS_ENABLE_KEY, + SOURCE_MODS_ENABLE_KEY); + } + + // Check coexistence of watermark.interval-ms and watermark-interval-ms + if (parameters.hasAnyAttributes(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY) + && parameters.hasAnyAttributes( + _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) { + LOGGER.warn( + "When {} or {} is specified, specifying {} and {} is invalid.", + EXTRACTOR_WATERMARK_INTERVAL_KEY, + SOURCE_WATERMARK_INTERVAL_KEY, + _EXTRACTOR_WATERMARK_INTERVAL_KEY, + _SOURCE_WATERMARK_INTERVAL_KEY); + } + + // Check if specifying mode.snapshot or mode.streaming when disable realtime extractor + if (!parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY), + EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + LOGGER.info( + "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY, + EXTRACTOR_MODE_SNAPSHOT_KEY, + SOURCE_MODE_SNAPSHOT_KEY); + } + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) { + LOGGER.info( + "When '{}' ('{}') is set to false, specifying {} and {} is invalid.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY, + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY); + } + } + } + private void constructHistoricalExtractor() { // Enable historical extractor by default historicalExtractor = new PipeHistoricalDataRegionTsFileExtractor(); @@ -303,31 +422,59 @@ private void constructRealtimeExtractor(final PipeParameters parameters) EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE)) { realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor(); LOGGER.info( - "Pipe: '{}' is set to false, use heartbeat realtime extractor.", - EXTRACTOR_REALTIME_ENABLE_KEY); + "Pipe: '{}' ('{}') is set to false, use heartbeat realtime extractor.", + EXTRACTOR_REALTIME_ENABLE_KEY, + SOURCE_REALTIME_ENABLE_KEY); return; } + final boolean isSnapshotMode; + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY)) { + isSnapshotMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_SNAPSHOT_KEY, SOURCE_MODE_SNAPSHOT_KEY), + EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE); + } else { + final String extractorModeValue = + parameters.getStringOrDefault( + Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); + isSnapshotMode = + extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE) + || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE); + } + // Use heartbeat only extractor if enable snapshot mode - final String extractorModeValue = - parameters.getStringOrDefault( - Arrays.asList(EXTRACTOR_MODE_KEY, SOURCE_MODE_KEY), EXTRACTOR_MODE_DEFAULT_VALUE); - if (extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE) - || extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)) { + if (isSnapshotMode) { realtimeExtractor = new PipeRealtimeDataRegionHeartbeatExtractor(); - LOGGER.info( - "Pipe: '{}' is set to {}, use heartbeat realtime extractor.", - EXTRACTOR_MODE_KEY, - EXTRACTOR_MODE_SNAPSHOT_VALUE); + LOGGER.info("Pipe: snapshot mode is enabled, use heartbeat realtime extractor."); return; } // Use hybrid mode by default - if (!parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { + if (!parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY) + && !parameters.hasAnyAttributes(EXTRACTOR_REALTIME_MODE_KEY, SOURCE_REALTIME_MODE_KEY)) { checkWalEnable(parameters); realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); LOGGER.info( - "Pipe: '{}' is not set, use hybrid mode by default.", EXTRACTOR_REALTIME_MODE_KEY); + "Pipe: '{}' ('{}') and '{}' ('{}') is not set, use hybrid mode by default.", + EXTRACTOR_MODE_STREAMING_KEY, + SOURCE_MODE_STREAMING_KEY, + EXTRACTOR_REALTIME_MODE_KEY, + SOURCE_REALTIME_MODE_KEY); + return; + } + + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY)) { + final boolean isStreamingMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_STREAMING_KEY, SOURCE_MODE_STREAMING_KEY), + EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE); + if (isStreamingMode) { + checkWalEnable(parameters); + realtimeExtractor = new PipeRealtimeDataRegionHybridExtractor(); + } else { + realtimeExtractor = new PipeRealtimeDataRegionTsFileExtractor(); + } return; } @@ -381,20 +528,27 @@ public void customize( realtimeExtractor.customize(parameters, configuration); // Set watermark injector + long watermarkIntervalInMs = EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE; if (parameters.hasAnyAttributes( EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY)) { - final long watermarkIntervalInMs = + watermarkIntervalInMs = parameters.getLongOrDefault( - Arrays.asList(EXTRACTOR_WATERMARK_INTERVAL_KEY, SOURCE_WATERMARK_INTERVAL_KEY), + Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY), EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE); - if (watermarkIntervalInMs > 0) { - watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs); - LOGGER.info( - "Pipe {}@{}: Set watermark injector with interval {} ms.", - pipeName, - regionId, - watermarkInjector.getInjectionIntervalInMs()); - } + } else if (parameters.hasAnyAttributes( + _EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY)) { + watermarkIntervalInMs = + parameters.getLongOrDefault( + Arrays.asList(_EXTRACTOR_WATERMARK_INTERVAL_KEY, _SOURCE_WATERMARK_INTERVAL_KEY), + EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE); + } + if (watermarkIntervalInMs > 0) { + watermarkInjector = new DataRegionWatermarkInjector(regionId, watermarkIntervalInMs); + LOGGER.info( + "Pipe {}@{}: Set watermark injector with interval {} ms.", + pipeName, + regionId, + watermarkInjector.getInjectionIntervalInMs()); } // register metric after generating taskID diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index 3acc589ca313..f56ec5c7199a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -76,15 +76,21 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_STRICT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT; import static org.apache.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR; @@ -144,28 +150,39 @@ public void validate(final PipeParameterValidator validator) { throw new PipeParameterNotValidException(e.getMessage()); } - final String extractorHistoryLooseRangeValue = - parameters - .getStringOrDefault( - Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), - EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE) - .trim(); - if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase(extractorHistoryLooseRangeValue)) { - sloppyTimeRange = true; - sloppyPattern = true; + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { + final boolean isStrictMode = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY), + EXTRACTOR_MODE_STRICT_DEFAULT_VALUE); + sloppyTimeRange = !isStrictMode; + sloppyPattern = !isStrictMode; } else { - final Set sloppyOptionSet = - Arrays.stream(extractorHistoryLooseRangeValue.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .map(String::toLowerCase) - .collect(Collectors.toSet()); - sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE); - sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE); - if (!sloppyOptionSet.isEmpty()) { - throw new PipeParameterNotValidException( - String.format( - "Parameters in set %s are not allowed in 'history.loose-range'", sloppyOptionSet)); + final String extractorHistoryLooseRangeValue = + parameters + .getStringOrDefault( + Arrays.asList(EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), + EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE) + .trim(); + if (EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE.equalsIgnoreCase( + extractorHistoryLooseRangeValue)) { + sloppyTimeRange = true; + sloppyPattern = true; + } else { + final Set sloppyOptionSet = + Arrays.stream(extractorHistoryLooseRangeValue.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(String::toLowerCase) + .collect(Collectors.toSet()); + sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE); + sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE); + if (!sloppyOptionSet.isEmpty()) { + throw new PipeParameterNotValidException( + String.format( + "Parameters in set %s are not allowed in 'history.loose-range'", + sloppyOptionSet)); + } } } @@ -333,12 +350,21 @@ public void customize( } } - shouldTransferModFile = - parameters.getBooleanOrDefault( - Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), - EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE - || // Should extract deletion - listeningOptionPair.getRight()); + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY), + EXTRACTOR_MODS_DEFAULT_VALUE + || // Should extract deletion + listeningOptionPair.getRight()); + } else { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), + EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE + || // Should extract deletion + listeningOptionPair.getRight()); + } final String extractorModeValue = parameters.getStringOrDefault( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java index d992d87b3d78..2924eb4fc0ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java @@ -63,8 +63,10 @@ import static com.google.common.base.MoreObjects.toStringHelper; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_LOOSE_RANGE_KEY; @@ -73,6 +75,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_ENABLE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_START_TIME_KEY; @@ -240,10 +243,17 @@ public void customize( PipeExtractorConstant.SOURCE_FORWARDING_PIPE_REQUESTS_KEY), PipeExtractorConstant.EXTRACTOR_FORWARDING_PIPE_REQUESTS_DEFAULT_VALUE); - shouldTransferModFile = - parameters.getBooleanOrDefault( - Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), - EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); + if (parameters.hasAnyAttributes(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY)) { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(EXTRACTOR_MODS_KEY, SOURCE_MODS_KEY), + EXTRACTOR_MODS_DEFAULT_VALUE || shouldExtractDeletion); + } else { + shouldTransferModFile = + parameters.getBooleanOrDefault( + Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), + EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE || shouldExtractDeletion); + } if (LOGGER.isInfoEnabled()) { LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9cfdc249df11..1b6ec932eea4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -165,11 +165,13 @@ private void bindOrUpdateProgressIndexForTsFileInsertionEvent( .isProgressIndexAfterOrEquals( dataRegionId, event.getTimePartitionId(), event.getProgressIndex())) { event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get()); - LOGGER.info( - "Data region {} bind {} to event {} because it was flushed prematurely.", - dataRegionId, - maxProgressIndexForTsFileInsertionEvent, - event.coreReportMessage()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "Data region {} bind {} to event {} because it was flushed prematurely.", + dataRegionId, + maxProgressIndexForTsFileInsertionEvent, + event.coreReportMessage()); + } } else { maxProgressIndexForTsFileInsertionEvent.updateAndGet( index -> index.updateToMinimumEqualOrIsAfterProgressIndex(event.getProgressIndex())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index 59f8cb498c98..f5a2a84c4739 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -25,7 +25,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.utils.PathUtils; @@ -71,6 +70,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant._PROCESSOR_OUTPUT_SERIES_KEY; + public class TwoStageCountProcessor implements PipeProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageCountProcessor.class); @@ -98,10 +100,16 @@ public class TwoStageCountProcessor implements PipeProcessor { @Override public void validate(PipeParameterValidator validator) throws Exception { - validator.validateRequiredAttribute(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); + checkInvalidParameters(validator.getParameters()); + + final String rawOutputSeries; + if (!validator.getParameters().hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY)) { + validator.validateRequiredAttribute(_PROCESSOR_OUTPUT_SERIES_KEY); + rawOutputSeries = validator.getParameters().getString(_PROCESSOR_OUTPUT_SERIES_KEY); + } else { + rawOutputSeries = validator.getParameters().getString(PROCESSOR_OUTPUT_SERIES_KEY); + } - final String rawOutputSeries = - validator.getParameters().getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY); try { PathUtils.isLegalPath(rawOutputSeries); } catch (IllegalPathException e) { @@ -109,6 +117,17 @@ public void validate(PipeParameterValidator validator) throws Exception { } } + private void checkInvalidParameters(final PipeParameters parameters) { + // Check coexistence of output.series and output-series + if (parameters.hasAttribute(PROCESSOR_OUTPUT_SERIES_KEY) + && parameters.hasAttribute(_PROCESSOR_OUTPUT_SERIES_KEY)) { + LOGGER.warn( + "When {} is specified, specifying {} is invalid.", + PROCESSOR_OUTPUT_SERIES_KEY, + _PROCESSOR_OUTPUT_SERIES_KEY); + } + } + @Override public void customize(PipeParameters parameters, PipeProcessorRuntimeConfiguration configuration) throws Exception { @@ -119,8 +138,7 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati regionId = runtimeEnvironment.getRegionId(); pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta(); - outputSeries = - new PartialPath(parameters.getString(PipeProcessorConstant.PROCESSOR_OUTPUT_SERIES_KEY)); + outputSeries = new PartialPath(parameters.getString(_PROCESSOR_OUTPUT_SERIES_KEY)); if (Objects.nonNull(pipeTaskMeta) && Objects.nonNull(pipeTaskMeta.getProgressIndex())) { if (pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 2fe29f0088a5..42e6d46d9815 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -1265,11 +1265,11 @@ private class RequestExecutor { private final Condition condition; private final PipeConsensusReceiverMetrics metric; private final PipeConsensusTsFileWriterPool tsFileWriterPool; + private final AtomicInteger WALEventCount = new AtomicInteger(0); + private final AtomicInteger tsFileEventCount = new AtomicInteger(0); private long onSyncedCommitIndex = 0; private int connectorRebootTimes = 0; private int pipeTaskRestartTimes = 0; - private AtomicInteger WALEventCount = new AtomicInteger(0); - private AtomicInteger tsFileEventCount = new AtomicInteger(0); public RequestExecutor( PipeConsensusReceiverMetrics metric, PipeConsensusTsFileWriterPool tsFileWriterPool) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java index 4d6d02d466c6..fc672796b047 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java @@ -29,9 +29,8 @@ import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedMaxAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedMinAccumulator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator; -import org.apache.iotdb.db.queryengine.plan.expression.Expression; -import org.apache.iotdb.db.queryengine.plan.expression.binary.CompareBinaryExpression; -import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.tsfile.enums.TSDataType; @@ -39,7 +38,9 @@ import java.util.Map; import static com.google.common.base.Preconditions.checkState; -import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST; +import static org.apache.iotdb.commons.schema.table.TsTable.TIME_COLUMN_NAME; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.FIRST_BY; +import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.LAST_BY; public class AccumulatorFactory { @@ -53,6 +54,23 @@ public static TableAccumulator createAccumulator( if (aggregationType == TAggregationType.UDAF) { // If UDAF accumulator receives raw input, it needs to check input's attribute throw new UnsupportedOperationException(); + } else if ((LAST_BY.getFunctionName().equals(functionName) + || FIRST_BY.getFunctionName().equals(functionName)) + && inputExpressions.size() > 1) { + boolean xIsTimeColumn = false; + boolean yIsTimeColumn = false; + if (isTimeColumn(inputExpressions.get(1))) { + yIsTimeColumn = true; + } else if (isTimeColumn(inputExpressions.get(0))) { + xIsTimeColumn = true; + } + if (LAST_BY.getFunctionName().equals(functionName)) { + return new LastByAccumulator( + inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn); + } else { + return new FirstByAccumulator( + inputDataTypes.get(0), inputDataTypes.get(1), xIsTimeColumn, yIsTimeColumn); + } } else { return createBuiltinAccumulator( aggregationType, inputDataTypes, inputExpressions, inputAttributes, ascending); @@ -124,6 +142,10 @@ public static TableAccumulator createBuiltinAccumulator( return new MaxAccumulator(inputDataTypes.get(0)); case MIN: return new MinAccumulator(inputDataTypes.get(0)); + case LAST_BY: + return new LastByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false); + case FIRST_BY: + return new FirstByAccumulator(inputDataTypes.get(0), inputDataTypes.get(1), false, false); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } @@ -237,34 +259,8 @@ public interface KeepEvaluator { boolean apply(long keep); } - public static KeepEvaluator initKeepEvaluator(Expression keepExpression) { - // We have checked semantic in FE, - // keep expression must be ConstantOperand or CompareBinaryExpression here - if (keepExpression instanceof ConstantOperand) { - return keep -> keep >= Long.parseLong(keepExpression.getExpressionString()); - } else { - long constant = - Long.parseLong( - ((CompareBinaryExpression) keepExpression) - .getRightExpression() - .getExpressionString()); - switch (keepExpression.getExpressionType()) { - case LESS_THAN: - return keep -> keep < constant; - case LESS_EQUAL: - return keep -> keep <= constant; - case GREATER_THAN: - return keep -> keep > constant; - case GREATER_EQUAL: - return keep -> keep >= constant; - case EQUAL_TO: - return keep -> keep == constant; - case NON_EQUAL: - return keep -> keep != constant; - default: - throw new IllegalArgumentException( - "unsupported expression type: " + keepExpression.getExpressionType()); - } - } + public static boolean isTimeColumn(Expression expression) { + return expression instanceof SymbolReference + && TIME_COLUMN_NAME.equals(((SymbolReference) expression).getName()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java index 4c33e6ac3714..427c891e9a19 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java @@ -138,7 +138,7 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics == null) { + if (statistics == null || statistics[0] == null) { return; } initResult = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/CountAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/CountAccumulator.java index 0d1c8de9e471..0111beba7173 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/CountAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/CountAccumulator.java @@ -82,9 +82,10 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics[0] == null) { + if (statistics == null || statistics[0] == null) { return; } + countState += statistics[0].getCount(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java index b3b94ca2f052..13bdf79a68b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java @@ -196,12 +196,12 @@ public void evaluateFinal(ColumnBuilder columnBuilder) { @Override public boolean hasFinalResult() { - return initResult; + return false; } @Override public void addStatistics(Statistics[] statistics) { - if (statistics[0] == null) { + if (statistics == null || statistics[0] == null) { return; } switch (seriesDataType) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java new file mode 100644 index 000000000000..166512328165 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.tsfile.read.common.block.column.BinaryColumn; +import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_BY_AGGREGATION; + +public class FirstByAccumulator implements TableAccumulator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(FirstByAccumulator.class); + + private final TSDataType xDataType; + private final TSDataType yDataType; + + private final boolean xIsTimeColumn; + private final boolean yIsTimeColumn; + + private long yFirstTime = Long.MAX_VALUE; + + private final TsPrimitiveType xResult; + private boolean xIsNull = true; + + private boolean initResult = false; + + public FirstByAccumulator( + TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, boolean yIsTimeColumn) { + this.xDataType = xDataType; + this.yDataType = yDataType; + this.xIsTimeColumn = xIsTimeColumn; + this.yIsTimeColumn = yIsTimeColumn; + + this.xResult = TsPrimitiveType.getByType(xDataType); + } + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE; + } + + @Override + public TableAccumulator copy() { + return new FirstByAccumulator(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn); + } + + @Override + public void addInput(Column[] arguments) { + checkArgument(arguments.length == 3, "Length of input Column[] for LastBy/FirstBy should be 3"); + + // arguments[0] is x column, arguments[1] is y column, arguments[2] is time column + switch (xDataType) { + case INT32: + case DATE: + addIntInput(arguments[0], arguments[1], arguments[2]); + return; + case INT64: + case TIMESTAMP: + addLongInput(arguments[0], arguments[1], arguments[2]); + return; + case FLOAT: + addFloatInput(arguments[0], arguments[1], arguments[2]); + return; + case DOUBLE: + addDoubleInput(arguments[0], arguments[1], arguments[2]); + return; + case TEXT: + case STRING: + case BLOB: + addBinaryInput(arguments[0], arguments[1], arguments[2]); + return; + case BOOLEAN: + addBooleanInput(arguments[0], arguments[1], arguments[2]); + return; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in LastBy: %s", yDataType)); + } + } + + @Override + public void addIntermediate(Column argument) { + checkArgument( + argument instanceof BinaryColumn || argument instanceof RunLengthEncodedColumn, + "intermediate input and output of LastBy should be BinaryColumn"); + + for (int i = 0; i < argument.getPositionCount(); i++) { + if (argument.isNull(i)) { + continue; + } + + byte[] bytes = argument.getBinary(i).getValues(); + long curTime = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); + int offset = Long.BYTES; + boolean isXNull = BytesUtils.bytesToBool(bytes, offset); + offset += 1; + + if (isXNull) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = true; + } + continue; + } + + switch (xDataType) { + case INT32: + case DATE: + int xIntVal = BytesUtils.bytesToInt(bytes, offset); + updateIntLastValue(xIntVal, curTime); + break; + case INT64: + case TIMESTAMP: + long longVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, offset); + updateLongLastValue(longVal, curTime); + break; + case FLOAT: + float floatVal = BytesUtils.bytesToFloat(bytes, offset); + updateFloatLastValue(floatVal, curTime); + break; + case DOUBLE: + double doubleVal = BytesUtils.bytesToDouble(bytes, offset); + updateDoubleLastValue(doubleVal, curTime); + break; + case TEXT: + case BLOB: + case STRING: + int length = BytesUtils.bytesToInt(bytes, offset); + offset += Integer.BYTES; + Binary binaryVal = new Binary(BytesUtils.subBytes(bytes, offset, length)); + updateBinaryLastValue(binaryVal, curTime); + break; + case BOOLEAN: + boolean boolVal = BytesUtils.bytesToBool(bytes, offset); + updateBooleanLastValue(boolVal, curTime); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in First Aggregation: %s", yDataType)); + } + } + } + + @Override + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + checkArgument( + columnBuilder instanceof BinaryColumnBuilder, + "intermediate input and output of FirstBy should be BinaryColumn"); + + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeBinary(new Binary(serializeTimeWithValue())); + } + } + + @Override + public void evaluateFinal(ColumnBuilder columnBuilder) { + if (!initResult || xIsNull) { + columnBuilder.appendNull(); + return; + } + + switch (xDataType) { + case INT32: + case DATE: + columnBuilder.writeInt(xResult.getInt()); + break; + case INT64: + case TIMESTAMP: + columnBuilder.writeLong(xResult.getLong()); + break; + case FLOAT: + columnBuilder.writeFloat(xResult.getFloat()); + break; + case DOUBLE: + columnBuilder.writeDouble(xResult.getDouble()); + break; + case TEXT: + case BLOB: + case STRING: + columnBuilder.writeBinary(xResult.getBinary()); + break; + case BOOLEAN: + columnBuilder.writeBoolean(xResult.getBoolean()); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in LastBy: %s", xDataType)); + } + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public void addStatistics(Statistics[] statistics) { + + // only last_by(x, time) and last_by(time, x) can use statistics optimization + + Statistics xStatistics = statistics[0]; + Statistics yStatistics = statistics[1]; + + if (yIsTimeColumn && yStatistics == null || xIsTimeColumn && xStatistics == null) { + return; + } + + if (yIsTimeColumn) { + if (xStatistics == null || xStatistics.getStartTime() < yStatistics.getStartTime()) { + if (!initResult || yStatistics.getStartTime() < yFirstTime) { + initResult = true; + yFirstTime = yStatistics.getStartTime(); + xIsNull = true; + } + } else { + if (!initResult || yStatistics.getStartTime() < yFirstTime) { + initResult = true; + yFirstTime = yStatistics.getStartTime(); + xIsNull = false; + + if (xStatistics instanceof TimeStatistics) { + xResult.setLong(xStatistics.getStartTime()); + return; + } + + switch (xDataType) { + case INT32: + case DATE: + xResult.setInt((int) xStatistics.getLastValue()); + break; + case INT64: + case TIMESTAMP: + xResult.setLong((Long) xStatistics.getLastValue()); + break; + case FLOAT: + xResult.setFloat((float) statistics[0].getLastValue()); + break; + case DOUBLE: + xResult.setDouble((double) statistics[0].getLastValue()); + break; + case TEXT: + case BLOB: + case STRING: + xResult.setBinary((Binary) statistics[0].getLastValue()); + break; + case BOOLEAN: + xResult.setBoolean((boolean) statistics[0].getLastValue()); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type: %s in Aggregation: %s", + yDataType, LAST_BY_AGGREGATION)); + } + } + } + } else { + // x is time column + if (yStatistics != null && (!initResult || yStatistics.getStartTime() < yFirstTime)) { + initResult = true; + xIsNull = false; + yFirstTime = yStatistics.getStartTime(); + xResult.setLong(yStatistics.getStartTime()); + } + } + } + + @Override + public void reset() { + initResult = false; + xIsNull = true; + this.yFirstTime = Long.MAX_VALUE; + this.xResult.reset(); + } + + private byte[] serializeTimeWithValue() { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + try { + dataOutputStream.writeLong(yFirstTime); + dataOutputStream.writeBoolean(xIsNull); + if (!xIsNull) { + switch (xDataType) { + case INT32: + case DATE: + dataOutputStream.writeInt(xResult.getInt()); + break; + case INT64: + case TIMESTAMP: + dataOutputStream.writeLong(xResult.getLong()); + break; + case FLOAT: + dataOutputStream.writeFloat(xResult.getFloat()); + break; + case DOUBLE: + dataOutputStream.writeDouble(xResult.getDouble()); + break; + case TEXT: + case BLOB: + case STRING: + dataOutputStream.writeInt(xResult.getBinary().getValues().length); + dataOutputStream.write(xResult.getBinary().getValues()); + break; + case BOOLEAN: + dataOutputStream.writeBoolean(xResult.getBoolean()); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type: %s in aggregation %s", xDataType, LAST_BY_AGGREGATION)); + } + } + } catch (IOException e) { + throw new UnsupportedOperationException( + String.format( + "Failed to serialize intermediate result for Accumulator %s, errorMsg: %s.", + LAST_BY_AGGREGATION, e.getMessage())); + } + return byteArrayOutputStream.toByteArray(); + } + + // TODO can add last position optimization if last position is null ? + private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateIntLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateIntLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setInt(xColumn.getInt(xIdx)); + } + } + } + + protected void updateIntLastValue(int val, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setInt(val); + } + } + + private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateLongLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateLongLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setLong(xColumn.getLong(xIdx)); + } + } + } + + protected void updateLongLastValue(long value, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setLong(value); + } + } + + private void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateFloatLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateFloatLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setFloat(xColumn.getFloat(xIdx)); + } + } + } + + protected void updateFloatLastValue(float value, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setFloat(value); + } + } + + private void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateDoubleLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateDoubleLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setDouble(xColumn.getDouble(xIdx)); + } + } + } + + protected void updateDoubleLastValue(double val, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setDouble(val); + } + } + + private void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBinaryLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateBinaryLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setBinary(xColumn.getBinary(xIdx)); + } + } + } + + protected void updateBinaryLastValue(Binary val, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setBinary(val); + } + } + + private void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBooleanLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateBooleanLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setBoolean(xColumn.getBoolean(xIdx)); + } + } + } + + protected void updateBooleanLastValue(boolean val, long curTime) { + if (!initResult || curTime < yFirstTime) { + initResult = true; + yFirstTime = curTime; + xIsNull = false; + xResult.setBoolean(val); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java index a6e58e78d0be..0a359de585aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java @@ -201,9 +201,10 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics[0] == null) { + if (statistics == null || statistics[0] == null) { return; } + switch (seriesDataType) { case INT32: case DATE: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java index dd4758645746..9add1617d439 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java @@ -21,31 +21,209 @@ import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.file.metadata.statistics.TimeStatistics; +import org.apache.tsfile.read.common.block.column.BinaryColumn; +import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_BY_AGGREGATION; public class LastByAccumulator implements TableAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(LastByAccumulator.class); + + private final TSDataType xDataType; + private final TSDataType yDataType; + + private final boolean xIsTimeColumn; + private final boolean yIsTimeColumn; + + private long yLastTime = Long.MIN_VALUE; + + private final TsPrimitiveType xResult; + private boolean xIsNull = true; + + private boolean initResult = false; + + public LastByAccumulator( + TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, boolean yIsTimeColumn) { + this.xDataType = xDataType; + this.yDataType = yDataType; + this.xIsTimeColumn = xIsTimeColumn; + this.yIsTimeColumn = yIsTimeColumn; + + this.xResult = TsPrimitiveType.getByType(xDataType); + } + @Override public long getEstimatedSize() { - return 0; + return INSTANCE_SIZE; } @Override public TableAccumulator copy() { - return null; + return new LastByAccumulator(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn); } @Override - public void addInput(Column[] arguments) {} + public void addInput(Column[] arguments) { + checkArgument(arguments.length == 3, "Length of input Column[] for LastBy/FirstBy should be 3"); + + // arguments[0] is x column, arguments[1] is y column, arguments[2] is time column + switch (xDataType) { + case INT32: + case DATE: + addIntInput(arguments[0], arguments[1], arguments[2]); + return; + case INT64: + case TIMESTAMP: + addLongInput(arguments[0], arguments[1], arguments[2]); + return; + case FLOAT: + addFloatInput(arguments[0], arguments[1], arguments[2]); + return; + case DOUBLE: + addDoubleInput(arguments[0], arguments[1], arguments[2]); + return; + case TEXT: + case STRING: + case BLOB: + addBinaryInput(arguments[0], arguments[1], arguments[2]); + return; + case BOOLEAN: + addBooleanInput(arguments[0], arguments[1], arguments[2]); + return; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in LastBy: %s", yDataType)); + } + } @Override - public void addIntermediate(Column argument) {} + public void addIntermediate(Column argument) { + checkArgument( + argument instanceof BinaryColumn || argument instanceof RunLengthEncodedColumn, + "intermediate input and output of LastBy should be BinaryColumn"); + + for (int i = 0; i < argument.getPositionCount(); i++) { + if (argument.isNull(i)) { + continue; + } + + byte[] bytes = argument.getBinary(i).getValues(); + long curTime = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0); + int offset = Long.BYTES; + boolean isXNull = BytesUtils.bytesToBool(bytes, offset); + offset += 1; + + if (isXNull) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = true; + } + continue; + } + + switch (xDataType) { + case INT32: + case DATE: + int xIntVal = BytesUtils.bytesToInt(bytes, offset); + updateIntLastValue(xIntVal, curTime); + break; + case INT64: + case TIMESTAMP: + long longVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, offset); + updateLongLastValue(longVal, curTime); + break; + case FLOAT: + float floatVal = BytesUtils.bytesToFloat(bytes, offset); + updateFloatLastValue(floatVal, curTime); + break; + case DOUBLE: + double doubleVal = BytesUtils.bytesToDouble(bytes, offset); + updateDoubleLastValue(doubleVal, curTime); + break; + case TEXT: + case BLOB: + case STRING: + int length = BytesUtils.bytesToInt(bytes, offset); + offset += Integer.BYTES; + Binary binaryVal = new Binary(BytesUtils.subBytes(bytes, offset, length)); + updateBinaryLastValue(binaryVal, curTime); + break; + case BOOLEAN: + boolean boolVal = BytesUtils.bytesToBool(bytes, offset); + updateBooleanLastValue(boolVal, curTime); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in Last Aggregation: %s", yDataType)); + } + } + } @Override - public void evaluateIntermediate(ColumnBuilder columnBuilder) {} + public void evaluateIntermediate(ColumnBuilder columnBuilder) { + checkArgument( + columnBuilder instanceof BinaryColumnBuilder, + "intermediate input and output of LastBy should be BinaryColumn"); + + if (!initResult) { + columnBuilder.appendNull(); + } else { + columnBuilder.writeBinary(new Binary(serializeTimeWithValue())); + } + } @Override - public void evaluateFinal(ColumnBuilder columnBuilder) {} + public void evaluateFinal(ColumnBuilder columnBuilder) { + if (!initResult || xIsNull) { + columnBuilder.appendNull(); + return; + } + + switch (xDataType) { + case INT32: + case DATE: + columnBuilder.writeInt(xResult.getInt()); + break; + case INT64: + case TIMESTAMP: + columnBuilder.writeLong(xResult.getLong()); + break; + case FLOAT: + columnBuilder.writeFloat(xResult.getFloat()); + break; + case DOUBLE: + columnBuilder.writeDouble(xResult.getDouble()); + break; + case TEXT: + case BLOB: + case STRING: + columnBuilder.writeBinary(xResult.getBinary()); + break; + case BOOLEAN: + columnBuilder.writeBoolean(xResult.getBoolean()); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in LastBy: %s", xDataType)); + } + } @Override public boolean hasFinalResult() { @@ -53,8 +231,309 @@ public boolean hasFinalResult() { } @Override - public void addStatistics(Statistics[] statistics) {} + public void addStatistics(Statistics[] statistics) { + + // only last_by(x, time) and last_by(time, x) can use statistics optimization + + Statistics xStatistics = statistics[0]; + Statistics yStatistics = statistics[1]; + + if (yIsTimeColumn && yStatistics == null || xIsTimeColumn && xStatistics == null) { + return; + } + + if (yIsTimeColumn) { + if (xStatistics == null || xStatistics.getEndTime() < yStatistics.getEndTime()) { + if (!initResult || yStatistics.getEndTime() > yLastTime) { + initResult = true; + yLastTime = yStatistics.getEndTime(); + xIsNull = true; + } + } else { + if (!initResult || yStatistics.getEndTime() > yLastTime) { + initResult = true; + yLastTime = yStatistics.getEndTime(); + xIsNull = false; + + if (xStatistics instanceof TimeStatistics) { + xResult.setLong(xStatistics.getEndTime()); + return; + } + + switch (xDataType) { + case INT32: + case DATE: + xResult.setInt((int) xStatistics.getLastValue()); + break; + case INT64: + case TIMESTAMP: + xResult.setLong((Long) xStatistics.getLastValue()); + break; + case FLOAT: + xResult.setFloat((float) statistics[0].getLastValue()); + break; + case DOUBLE: + xResult.setDouble((double) statistics[0].getLastValue()); + break; + case TEXT: + case BLOB: + case STRING: + xResult.setBinary((Binary) statistics[0].getLastValue()); + break; + case BOOLEAN: + xResult.setBoolean((boolean) statistics[0].getLastValue()); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type: %s in Aggregation: %s", + yDataType, LAST_BY_AGGREGATION)); + } + } + } + } else { + // x is time column + if (yStatistics != null && (!initResult || yStatistics.getEndTime() > yLastTime)) { + initResult = true; + xIsNull = false; + yLastTime = yStatistics.getEndTime(); + xResult.setLong(yStatistics.getEndTime()); + } + } + } @Override - public void reset() {} + public void reset() { + initResult = false; + xIsNull = true; + this.yLastTime = Long.MIN_VALUE; + this.xResult.reset(); + } + + private byte[] serializeTimeWithValue() { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + try { + dataOutputStream.writeLong(yLastTime); + dataOutputStream.writeBoolean(xIsNull); + if (!xIsNull) { + switch (xDataType) { + case INT32: + case DATE: + dataOutputStream.writeInt(xResult.getInt()); + break; + case INT64: + case TIMESTAMP: + dataOutputStream.writeLong(xResult.getLong()); + break; + case FLOAT: + dataOutputStream.writeFloat(xResult.getFloat()); + break; + case DOUBLE: + dataOutputStream.writeDouble(xResult.getDouble()); + break; + case TEXT: + case BLOB: + case STRING: + dataOutputStream.writeInt(xResult.getBinary().getValues().length); + dataOutputStream.write(xResult.getBinary().getValues()); + break; + case BOOLEAN: + dataOutputStream.writeBoolean(xResult.getBoolean()); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type: %s in aggregation %s", xDataType, LAST_BY_AGGREGATION)); + } + } + } catch (IOException e) { + throw new UnsupportedOperationException( + String.format( + "Failed to serialize intermediate result for Accumulator %s, errorMsg: %s.", + LAST_BY_AGGREGATION, e.getMessage())); + } + return byteArrayOutputStream.toByteArray(); + } + + // TODO can add last position optimization if last position is null ? + private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateIntLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateIntLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setInt(xColumn.getInt(xIdx)); + } + } + } + + protected void updateIntLastValue(int val, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setInt(val); + } + } + + private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateLongLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateLongLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setLong(xColumn.getLong(xIdx)); + } + } + } + + protected void updateLongLastValue(long value, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setLong(value); + } + } + + private void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateFloatLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateFloatLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setFloat(xColumn.getFloat(xIdx)); + } + } + } + + protected void updateFloatLastValue(float value, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setFloat(value); + } + } + + private void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateDoubleLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateDoubleLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setDouble(xColumn.getDouble(xIdx)); + } + } + } + + protected void updateDoubleLastValue(double val, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setDouble(val); + } + } + + private void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBinaryLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateBinaryLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setBinary(xColumn.getBinary(xIdx)); + } + } + } + + protected void updateBinaryLastValue(Binary val, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setBinary(val); + } + } + + private void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBooleanLastValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + protected void updateBooleanLastValue(Column xColumn, int xIdx, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + if (xColumn.isNull(xIdx)) { + xIsNull = true; + } else { + xIsNull = false; + xResult.setBoolean(xColumn.getBoolean(xIdx)); + } + } + } + + protected void updateBooleanLastValue(boolean val, long curTime) { + if (!initResult || curTime > yLastTime) { + initResult = true; + yLastTime = curTime; + xIsNull = false; + xResult.setBoolean(val); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaxAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaxAccumulator.java index 7fa93ef8062b..1d07fab38095 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaxAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaxAccumulator.java @@ -200,7 +200,7 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics[0] == null) { + if (statistics == null || statistics[0] == null) { return; } switch (seriesDataType) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MinAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MinAccumulator.java index 74efe30260b9..7f86d73f783a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MinAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MinAccumulator.java @@ -200,7 +200,7 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics[0] == null) { + if (statistics == null || statistics[0] == null) { return; } switch (seriesDataType) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java index ad89c40c5f1a..591c1f6ad77b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java @@ -114,9 +114,10 @@ public boolean hasFinalResult() { @Override public void addStatistics(Statistics[] statistics) { - if (statistics == null) { + if (statistics == null || statistics[0] == null) { return; } + initResult = true; if (statistics[0] instanceof IntegerStatistics) { sumValue += statistics[0].getSumLongValue(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 718b0958f5fe..cdff7f25e96b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -175,6 +175,7 @@ import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createAccumulator; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createGroupedAccumulator; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.ASC_TIME_COMPARATOR; import static org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.IDENTITY_FILL; @@ -184,8 +185,10 @@ import static org.apache.iotdb.db.queryengine.plan.relational.metadata.TableBuiltinAggregationFunction.getAggregationTypeByFuncName; import static org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST; import static org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType; -import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST; -import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST; +import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_BY_AGGREGATION; import static org.apache.tsfile.read.common.type.TimestampType.TIMESTAMP; /** This Visitor is responsible for transferring Table PlanNode Tree to Table Operator Tree. */ @@ -1374,7 +1377,7 @@ private TableAggregator buildAggregator( functionName, getAggregationTypeByFuncName(functionName), argumentTypes, - Collections.emptyList(), + aggregation.getArguments(), Collections.emptyMap(), true); @@ -1479,6 +1482,7 @@ public Operator visitAggregationTableScan( List aggregators = new ArrayList<>(node.getAggregations().size()); Map columnLayout = new HashMap<>(node.getAggregations().size()); + int distinctArgumentCount = node.getAssignments().size(); int aggregationsCount = node.getAggregations().size(); List aggColumnIndexes = new ArrayList<>(); int channel = 0; @@ -1488,23 +1492,37 @@ public Operator visitAggregationTableScan( Map idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap(); Map columnSchemaMap = node.getAssignments(); List columnSchemas = new ArrayList<>(aggregationsCount); - int[] columnsIndexArray = new int[aggregationsCount * 2]; + int[] columnsIndexArray = new int[distinctArgumentCount]; List measurementColumnNames = new ArrayList<>(); List measurementSchemas = new ArrayList<>(); for (Map.Entry entry : node.getAggregations().entrySet()) { - String funcName = entry.getValue().getResolvedFunction().getSignature().getName(); + AggregationNode.Aggregation aggregation = entry.getValue(); + String funcName = aggregation.getResolvedFunction().getSignature().getName(); + + // first/last/first_by/last_by aggregation with BLOB type can not use statistics + if (FIRST_AGGREGATION.equals(funcName) + || LAST_AGGREGATION.equals(funcName) + || LAST_BY_AGGREGATION.equals(funcName) + || FIRST_BY_AGGREGATION.equals(funcName)) { + Symbol argument = Symbol.from(aggregation.getArguments().get(0)); + if (!columnSchemaMap.containsKey(argument) + || BlobType.BLOB.equals(columnSchemaMap.get(argument).getType())) { + canUseStatistic = false; + } - for (Expression argument : entry.getValue().getArguments()) { - idx++; - Symbol symbol = Symbol.from(argument); - ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), symbol + " is null"); - if (schema.getType().equals(BlobType.BLOB) - && (FIRST.equals(funcName) || LAST.equals(funcName))) { - // first/last aggregation with BLOB type can not use statistics + // only last_by(time, x) or last_by(x,time) can use statistic + if ((LAST_BY_AGGREGATION.equals(funcName) || FIRST_BY_AGGREGATION.equals(funcName)) + && !isTimeColumn(aggregation.getArguments().get(0)) + && !isTimeColumn(aggregation.getArguments().get(1))) { canUseStatistic = false; } + } + for (Expression argument : aggregation.getArguments()) { + idx++; + Symbol symbol = Symbol.from(argument); + ColumnSchema schema = requireNonNull(columnSchemaMap.get(symbol), symbol + " is null"); switch (schema.getColumnCategory()) { case ID: case ATTRIBUTE: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java index a4ccc8885e6a..f1384a1997a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java @@ -64,9 +64,9 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.expression.unary.LikeExpression.getEscapeCharacter; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor.getLongValue; -import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor.isTimeColumn; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isLiteral; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isSymbolReference; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java index 26040b407fc4..2b09c1e01cce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullIfExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SearchedCaseExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SimpleCaseExpression; -import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.factory.FilterFactory; @@ -47,7 +46,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.isTimeColumn; public class ConvertPredicateToTimeFilterVisitor extends PredicateVisitor { @@ -209,11 +208,6 @@ protected Filter visitBetweenPredicate(BetweenPredicate node, Void context) { } } - public static boolean isTimeColumn(Expression expression) { - return expression instanceof SymbolReference - && TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(((SymbolReference) expression).getName()); - } - public static long getLongValue(Expression expression) { return ((LongLiteral) expression).getParsedValue(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java index b47a2f984503..cad0eecd7e1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java @@ -550,8 +550,8 @@ && isIntegerNumber(argumentTypes.get(2)))) { "Aggregate functions [%s] should only have one argument", functionName)); } break; - case SqlConstant.FIRST: - case SqlConstant.LAST: + case SqlConstant.FIRST_AGGREGATION: + case SqlConstant.LAST_AGGREGATION: if (argumentTypes.size() != 2) { throw new SemanticException( String.format( @@ -562,8 +562,14 @@ && isIntegerNumber(argumentTypes.get(2)))) { "Second argument of Aggregate functions [%s] should be orderable", functionName)); } break; - case SqlConstant.FIRST_BY: - case SqlConstant.LAST_BY: + case SqlConstant.FIRST_BY_AGGREGATION: + case SqlConstant.LAST_BY_AGGREGATION: + if (argumentTypes.size() != 3) { + throw new SemanticException( + String.format( + "Aggregate functions [%s] should only have three arguments", functionName)); + } + break; case SqlConstant.MAX_BY: case SqlConstant.MIN_BY: if (argumentTypes.size() != 2) { @@ -587,10 +593,10 @@ && isIntegerNumber(argumentTypes.get(2)))) { switch (functionName.toLowerCase(Locale.ENGLISH)) { case SqlConstant.COUNT: return INT64; - case SqlConstant.FIRST: - case SqlConstant.LAST: - case SqlConstant.FIRST_BY: - case SqlConstant.LAST_BY: + case SqlConstant.FIRST_AGGREGATION: + case SqlConstant.LAST_AGGREGATION: + case SqlConstant.FIRST_BY_AGGREGATION: + case SqlConstant.LAST_BY_AGGREGATION: case SqlConstant.EXTREME: case SqlConstant.MODE: case SqlConstant.MAX: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index b937c1d4619f..e95a8633d438 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -208,6 +208,10 @@ import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets.Type.ROLLUP; import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName.mapIdentifier; import static org.apache.iotdb.db.queryengine.transformation.dag.column.unary.scalar.TableBuiltinScalarFunction.DATE_BIN; +import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.FIRST_BY_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_AGGREGATION; +import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_BY_AGGREGATION; public class AstBuilder extends RelationalSqlBaseVisitor { @@ -1881,7 +1885,8 @@ public Node visitFunctionCall(RelationalSqlParser.FunctionCallContext ctx) { new DereferenceExpression(getLocation(ctx.label), (Identifier) visit(ctx.label))); } - if (name.toString().equalsIgnoreCase("first") || name.toString().equalsIgnoreCase("last")) { + if (name.toString().equalsIgnoreCase(FIRST_AGGREGATION) + || name.toString().equalsIgnoreCase(LAST_AGGREGATION)) { if (arguments.size() == 1) { arguments.add( new Identifier( @@ -1897,6 +1902,23 @@ public Node visitFunctionCall(RelationalSqlParser.FunctionCallContext ctx) { } else { throw parseError("Invalid number of arguments for 'first' or 'last' function", ctx); } + } else if (name.toString().equalsIgnoreCase(FIRST_BY_AGGREGATION) + || name.toString().equalsIgnoreCase(LAST_BY_AGGREGATION)) { + if (arguments.size() == 2) { + arguments.add( + new Identifier( + TimestampOperand.TIMESTAMP_EXPRESSION_STRING.toLowerCase(Locale.ENGLISH))); + } else if (arguments.size() == 3) { + check( + arguments + .get(2) + .toString() + .equalsIgnoreCase(TimestampOperand.TIMESTAMP_EXPRESSION_STRING), + "The third argument of 'first_by' or 'last_by' function must be 'time'", + ctx); + } else { + throw parseError("Invalid number of arguments for 'first_by' or 'last_by' function", ctx); + } } return new FunctionCall(getLocation(ctx), name, distinct, arguments); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java index b30ae505e13d..43267f9e48e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java @@ -57,10 +57,10 @@ protected SqlConstant() { public static final String EXTREME = "extreme"; public static final String FIRST_VALUE = "first_value"; public static final String LAST_VALUE = "last_value"; - public static final String FIRST = "first"; - public static final String FIRST_BY = "first_by"; - public static final String LAST_BY = "last_by"; - public static final String LAST = "last"; + public static final String FIRST_BY_AGGREGATION = "first_by"; + public static final String LAST_BY_AGGREGATION = "last_by"; + public static final String LAST_AGGREGATION = "last"; + public static final String FIRST_AGGREGATION = "first"; public static final String COUNT = "count"; public static final String AVG = "avg"; public static final String SUM = "sum"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index 1ddca0c3edff..f1e32337368b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -82,6 +82,9 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false; + public static final String EXTRACTOR_MODS_KEY = "extractor.mods"; + public static final String SOURCE_MODS_KEY = "source.mods"; + public static final boolean EXTRACTOR_MODS_DEFAULT_VALUE = EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; public static final String EXTRACTOR_REALTIME_ENABLE_KEY = "extractor.realtime.enable"; public static final String SOURCE_REALTIME_ENABLE_KEY = "source.realtime.enable"; @@ -101,16 +104,29 @@ public class PipeExtractorConstant { public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_ALL_VALUE = "all"; public static final String EXTRACTOR_REALTIME_LOOSE_RANGE_DEFAULT_VALUE = ""; + public static final String EXTRACTOR_MODE_STREAMING_KEY = "extractor.mode.streaming"; + public static final String SOURCE_MODE_STREAMING_KEY = "source.mode.streaming"; + public static final boolean EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE = true; + public static final String EXTRACTOR_MODE_STRICT_KEY = "extractor.mode.strict"; + public static final String SOURCE_MODE_STRICT_KEY = "source.mode.strict"; + public static final boolean EXTRACTOR_MODE_STRICT_DEFAULT_VALUE = true; + public static final String EXTRACTOR_MODE_SNAPSHOT_KEY = "extractor.mode.snapshot"; + public static final String SOURCE_MODE_SNAPSHOT_KEY = "source.mode.snapshot"; + public static final boolean EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE = false; + public static final String EXTRACTOR_START_TIME_KEY = "extractor.start-time"; public static final String SOURCE_START_TIME_KEY = "source.start-time"; public static final String EXTRACTOR_END_TIME_KEY = "extractor.end-time"; public static final String SOURCE_END_TIME_KEY = "source.end-time"; - public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms"; - public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms"; + public static final String _EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark-interval-ms"; + public static final String _SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark-interval-ms"; public static final long EXTRACTOR_WATERMARK_INTERVAL_DEFAULT_VALUE = -1; // -1 means no watermark + public static final String EXTRACTOR_WATERMARK_INTERVAL_KEY = "extractor.watermark.interval-ms"; + public static final String SOURCE_WATERMARK_INTERVAL_KEY = "source.watermark.interval-ms"; ///////////////////// pipe consensus ///////////////////// + public static final String EXTRACTOR_CONSENSUS_GROUP_ID_KEY = "extractor.consensus.group-id"; public static final String EXTRACTOR_CONSENSUS_SENDER_DATANODE_ID_KEY = "extractor.consensus.sender-dn-id"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java index 22bc87b2917b..f8aef880bd96 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java @@ -77,7 +77,8 @@ public class PipeProcessorConstant { public static final long PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE = Long.MAX_VALUE; - public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; + public static final String _PROCESSOR_OUTPUT_SERIES_KEY = "processor.output-series"; + public static final String PROCESSOR_OUTPUT_SERIES_KEY = "processor.output.series"; private PipeProcessorConstant() { throw new IllegalStateException("Utility class"); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index cafc2a5288b8..7358ce32ec57 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -64,7 +64,7 @@ public boolean isRoot() { } /** - * Interpret from source parameters and get a {@link PipePattern}. + * Interpret from source parameters and get a {@link TreePattern}. * * @return The interpreted {@link TreePattern} which is not {@code null}. */ diff --git a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift index 65a2728e9aed..73b7f0075703 100644 --- a/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift +++ b/iotdb-protocol/thrift-consensus/src/main/thrift/pipeconsensus.thrift @@ -89,6 +89,14 @@ struct TCheckConsensusPipeCompletedResp { 2: required bool isCompleted } +struct TWaitReleaseAllRegionRelatedResourceReq { + 1: required common.TConsensusGroupId consensusGroupId +} + +struct TWaitReleaseAllRegionRelatedResourceResp { + 1: required bool releaseAllResource +} + service PipeConsensusIService { /** * Transfer stream data in a given ConsensusGroup, used by PipeConsensus @@ -107,4 +115,6 @@ service PipeConsensusIService { TNotifyPeerToDropConsensusPipeResp notifyPeerToDropConsensusPipe(TNotifyPeerToDropConsensusPipeReq req) TCheckConsensusPipeCompletedResp checkConsensusPipeCompleted(TCheckConsensusPipeCompletedReq req) + + TWaitReleaseAllRegionRelatedResourceResp waitReleaseAllRegionRelatedResource(TWaitReleaseAllRegionRelatedResourceReq req) } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4aeaf6db76e7..2ea7ee7e908a 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ 0.14.1 1.9 1.5.6-3 - 1.2.0-d28fef17-SNAPSHOT + 1.2.0-241018-SNAPSHOT