diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java index 1befb2b765d7..f83699f30b7a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java @@ -24,7 +24,9 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; @@ -32,6 +34,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -51,6 +54,39 @@ @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2AutoCreateSchema.class}) public class IoTDBPipeExtractorIT extends AbstractPipeDualAutoIT { + + @Before + public void setUp() { + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + // TODO: delete ratis configurations + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + // Disable sender compaction for tsfile determination in loose range test + .setEnableSeqSpaceCompaction(false) + .setEnableUnseqSpaceCompaction(false) + .setEnableCrossSpaceCompaction(false); + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + + // 10 min, assert that the operations will not time out + senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + @Test public void testExtractorValidParameter() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -867,6 +903,67 @@ public void testSourceStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Ex } } + @Test + public void testLooseRange() throws Exception { + final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); + + final String receiverIp = receiverDataNode.getIp(); + final int receiverPort = receiverDataNode.getPort(); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + // TsFile 1, extracted without parse + "insert into root.db.d1 (time, at1, at2)" + " values (1000, 1, 2), (2000, 3, 4)", + // TsFile 2, not extracted because pattern not overlapped + "insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1, 2), (2000, 3, 4)", + "flush"))) { + return; + } + + if (!TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + // TsFile 3, not extracted because time range not overlapped + "insert into root.db.d1 (time, at1, at2)" + " values (3000, 1, 2), (4000, 3, 4)", + "flush"))) { + return; + } + + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("source.path", "root.db.d1.at1"); + extractorAttributes.put("source.inclusion", "data.insert"); + extractorAttributes.put("source.history.start-time", "1500"); + extractorAttributes.put("source.history.end-time", "2500"); + extractorAttributes.put("source.history.loose-range", "time, path"); + + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("connector.batch.enable", "false"); + connectorAttributes.put("connector.ip", receiverIp); + connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + + TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(*) from root.** group by level=0", + "count(root.*.*.*),", + Collections.singleton("4,")); + } + } + private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) { TestUtils.assertDataEventuallyOnEnv( receiverEnv, "count timeseries", "count(timeseries),", Collections.singleton(count + ",")); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java index dc86e82e361d..b1e2649e6db7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -33,6 +33,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter; import org.apache.iotdb.db.pipe.resource.PipeResourceManager; +import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; @@ -45,6 +46,8 @@ import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +62,17 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY; @@ -100,6 +107,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time private long historicalDataExtractionTimeLowerBound; // Arrival time + private boolean sloppyPattern; private boolean sloppyTimeRange; // true to disable time range filter after extraction private Pair listeningOptionPair; @@ -123,6 +131,27 @@ public void validate(final PipeParameterValidator validator) { throw new PipeParameterNotValidException(e.getMessage()); } + final Set sloppyOptionSet = + Arrays.stream( + parameters + .getStringOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), + EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE) + .split(",")) + .map(String::trim) + .map(String::toLowerCase) + .collect(Collectors.toSet()); + // Avoid empty string + sloppyOptionSet.remove(""); + sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE); + sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE); + if (!sloppyOptionSet.isEmpty()) { + throw new PipeParameterNotValidException( + String.format( + "Parameters in set %s are not allowed in 'history.loose-range'", sloppyOptionSet)); + } + if (parameters.hasAnyAttributes( SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY, @@ -280,19 +309,6 @@ public void customize( } } - sloppyTimeRange = - Arrays.stream( - parameters - .getStringOrDefault( - Arrays.asList( - EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY), - "") - .split(",")) - .map(String::trim) - .map(String::toLowerCase) - .collect(Collectors.toSet()) - .contains("time"); - shouldTransferModFile = parameters.getBooleanOrDefault( Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY), @@ -309,17 +325,20 @@ public void customize( PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE) .equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE); - LOGGER.info( - "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}, should transfer mod file {}, should terminate pipe on all historical events consumed {}", - pipeName, - dataRegionId, - DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime), - historicalDataExtractionStartTime, - DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime), - historicalDataExtractionEndTime, - sloppyTimeRange, - shouldTransferModFile, - shouldTerminatePipeOnAllHistoricalEventsConsumed); + if (LOGGER.isInfoEnabled()) { + LOGGER.info( + "Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}, should terminate pipe on all historical events consumed {}", + pipeName, + dataRegionId, + DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime), + historicalDataExtractionStartTime, + DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime), + historicalDataExtractionEndTime, + sloppyPattern, + sloppyTimeRange, + shouldTransferModFile, + shouldTerminatePipeOnAllHistoricalEventsConsumed); + } } private void flushDataRegionAllTsFiles() { @@ -399,7 +418,8 @@ public synchronized void start() { !resource.isClosed() || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) + && mayTsFileResourceOverlappedWithPattern(resource)) .collect(Collectors.toList()); resourceList.addAll(sequenceTsFileResources); @@ -412,7 +432,8 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) !resource.isClosed() || mayTsFileContainUnprocessedData(resource) && isTsFileResourceOverlappedWithTimeRange(resource) - && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) + && isTsFileGeneratedAfterExtractionTimeLowerBound(resource) + && mayTsFileResourceOverlappedWithPattern(resource)) .collect(Collectors.toList()); resourceList.addAll(unsequenceTsFileResources); @@ -474,6 +495,35 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) { return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()); } + private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) { + if (!sloppyPattern) { + return true; + } + + final Set deviceSet; + try { + final Map deviceIsAlignedMap = + PipeResourceManager.tsfile() + .getDeviceIsAlignedMapFromCache( + PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile())); + deviceSet = + Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : resource.getDevices(); + } catch (final IOException e) { + LOGGER.warn( + "Pipe {}@{}: failed to get devices from TsFile {}, extract it anyway", + pipeName, + dataRegionId, + resource.getTsFilePath(), + e); + return true; + } + + return deviceSet.stream() + .anyMatch( + // TODO: use IDeviceID + deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID) deviceID).toStringID())); + } + private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) { return !(resource.getFileEndTime() < historicalDataExtractionStartTime || historicalDataExtractionEndTime < resource.getFileStartTime()); @@ -530,7 +580,7 @@ public synchronized Event supply() { pipePattern, historicalDataExtractionStartTime, historicalDataExtractionEndTime); - if (isDbNameCoveredByPattern) { + if (sloppyPattern || isDbNameCoveredByPattern) { event.skipParsingPattern(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 0d064cd8b067..5e75e068fa27 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -71,7 +71,7 @@ private void tryTtlCheck() { } else { LOGGER.warn("failed to try lock when checking TTL because of timeout ({}s)", timeout); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); LOGGER.warn("failed to try lock when checking TTL because of interruption", e); } @@ -102,7 +102,7 @@ private void ttlCheck() { entry.getKey(), entry.getValue().getReferenceCount())); } - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e); } } @@ -129,7 +129,8 @@ private void ttlCheck() { * @return the hardlink or copied file * @throws IOException when create hardlink or copy file failed */ - public File increaseFileReference(File file, boolean isTsFile, TsFileResource tsFileResource) + public File increaseFileReference( + final File file, final boolean isTsFile, final TsFileResource tsFileResource) throws IOException { lock.lock(); try { @@ -165,7 +166,7 @@ public File increaseFileReference(File file, boolean isTsFile, TsFileResource ts } } - private boolean increaseReferenceIfExists(String path) { + private boolean increaseReferenceIfExists(final String path) { final PipeTsFileResource resource = hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path); if (resource != null) { resource.increaseAndGetReference(); @@ -174,10 +175,10 @@ private boolean increaseReferenceIfExists(String path) { return false; } - private static File getHardlinkOrCopiedFileInPipeDir(File file) throws IOException { + public static File getHardlinkOrCopiedFileInPipeDir(final File file) throws IOException { try { return new File(getPipeTsFileDirPath(file), getRelativeFilePath(file)); - } catch (Exception e) { + } catch (final Exception e) { throw new IOException( String.format( "failed to get hardlink or copied file in pipe dir " @@ -218,7 +219,7 @@ private static String getRelativeFilePath(File file) { * * @param hardlinkOrCopiedFile the copied or hardlinked file */ - public void decreaseFileReference(File hardlinkOrCopiedFile) { + public void decreaseFileReference(final File hardlinkOrCopiedFile) { lock.lock(); try { final String filePath = hardlinkOrCopiedFile.getPath(); @@ -237,7 +238,7 @@ public void decreaseFileReference(File hardlinkOrCopiedFile) { * @param hardlinkOrCopiedFile the copied or hardlinked file * @return the reference count of the file */ - public int getFileReferenceCount(File hardlinkOrCopiedFile) { + public int getFileReferenceCount(final File hardlinkOrCopiedFile) { lock.lock(); try { final String filePath = hardlinkOrCopiedFile.getPath(); @@ -254,7 +255,7 @@ public int getFileReferenceCount(File hardlinkOrCopiedFile) { * @return {@code true} if the maps are successfully put into cache or already cached. {@code * false} if they can not be cached. */ - public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws IOException { + public boolean cacheObjectsIfAbsent(final File hardlinkOrCopiedTsFile) throws IOException { lock.lock(); try { final PipeTsFileResource resource = @@ -265,8 +266,8 @@ public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws IOExcept } } - public Map> getDeviceMeasurementsMapFromCache(File hardlinkOrCopiedTsFile) - throws IOException { + public Map> getDeviceMeasurementsMapFromCache( + final File hardlinkOrCopiedTsFile) throws IOException { lock.lock(); try { final PipeTsFileResource resource = @@ -277,7 +278,7 @@ public Map> getDeviceMeasurementsMapFromCache(File hardl } } - public Map getDeviceIsAlignedMapFromCache(File hardlinkOrCopiedTsFile) + public Map getDeviceIsAlignedMapFromCache(final File hardlinkOrCopiedTsFile) throws IOException { lock.lock(); try { @@ -289,8 +290,8 @@ public Map getDeviceIsAlignedMapFromCache(File hardlinkOrCop } } - public Map getMeasurementDataTypeMapFromCache(File hardlinkOrCopiedTsFile) - throws IOException { + public Map getMeasurementDataTypeMapFromCache( + final File hardlinkOrCopiedTsFile) throws IOException { lock.lock(); try { final PipeTsFileResource resource = @@ -301,7 +302,8 @@ public Map getMeasurementDataTypeMapFromCache(File hardlinkO } } - public void pinTsFileResource(TsFileResource resource, boolean withMods) throws IOException { + public void pinTsFileResource(final TsFileResource resource, final boolean withMods) + throws IOException { lock.lock(); try { increaseFileReference(resource.getTsFile(), true, resource); @@ -313,13 +315,13 @@ public void pinTsFileResource(TsFileResource resource, boolean withMods) throws } } - public void unpinTsFileResource(TsFileResource resource) throws IOException { + public void unpinTsFileResource(final TsFileResource resource) throws IOException { lock.lock(); try { - File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); + final File pinnedFile = getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()); decreaseFileReference(pinnedFile); - File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); + final File modFile = new File(pinnedFile + ModificationFile.FILE_SUFFIX); if (modFile.exists()) { decreaseFileReference(modFile); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java index a67b594d361d..99efcd48d613 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeExtractorConstant.java @@ -64,6 +64,9 @@ public class PipeExtractorConstant { public static final String SOURCE_HISTORY_END_TIME_KEY = "source.history.end-time"; public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_KEY = "extractor.history.loose-range"; public static final String SOURCE_HISTORY_LOOSE_RANGE_KEY = "source.history.loose-range"; + public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE = "time"; + public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE = "path"; + public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE = ""; public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;