Skip to content

Commit

Permalink
Pipe: Support "source.history.loose-range" = "path" in iotdb-source (
Browse files Browse the repository at this point in the history
…#12651)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
Caideyipi and SteveYurongSu authored Jun 12, 2024
1 parent 0a3ca29 commit cf4aed7
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -51,6 +54,39 @@
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeExtractorIT extends AbstractPipeDualAutoIT {

@Before
public void setUp() {
MultiEnvFactory.createEnv(2);
senderEnv = MultiEnvFactory.getEnv(0);
receiverEnv = MultiEnvFactory.getEnv(1);

// TODO: delete ratis configurations
senderEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
// Disable sender compaction for tsfile determination in loose range test
.setEnableSeqSpaceCompaction(false)
.setEnableUnseqSpaceCompaction(false)
.setEnableCrossSpaceCompaction(false);
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS);

// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);

senderEnv.initClusterEnvironment();
receiverEnv.initClusterEnvironment();
}

@Test
public void testExtractorValidParameter() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
Expand Down Expand Up @@ -867,6 +903,67 @@ public void testSourceStartTimeAndEndTimeWorkingWithOrWithoutPattern() throws Ex
}
}

@Test
public void testLooseRange() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
// TsFile 1, extracted without parse
"insert into root.db.d1 (time, at1, at2)" + " values (1000, 1, 2), (2000, 3, 4)",
// TsFile 2, not extracted because pattern not overlapped
"insert into root.db1.d1 (time, at1, at2)" + " values (1000, 1, 2), (2000, 3, 4)",
"flush"))) {
return;
}

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
// TsFile 3, not extracted because time range not overlapped
"insert into root.db.d1 (time, at1, at2)" + " values (3000, 1, 2), (4000, 3, 4)",
"flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("source.path", "root.db.d1.at1");
extractorAttributes.put("source.inclusion", "data.insert");
extractorAttributes.put("source.history.start-time", "1500");
extractorAttributes.put("source.history.end-time", "2500");
extractorAttributes.put("source.history.loose-range", "time, path");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode());

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select count(*) from root.** group by level=0",
"count(root.*.*.*),",
Collections.singleton("4,"));
}
}

private void assertTimeseriesCountOnReceiver(BaseEnv receiverEnv, int count) {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeseries", "count(timeseries),", Collections.singleton(count + ","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.DataRegionListeningFilter;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
Expand All @@ -45,6 +46,8 @@
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,13 +62,17 @@
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODS_ENABLE_KEY;
Expand Down Expand Up @@ -100,6 +107,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
private long historicalDataExtractionTimeLowerBound; // Arrival time

private boolean sloppyPattern;
private boolean sloppyTimeRange; // true to disable time range filter after extraction

private Pair<Boolean, Boolean> listeningOptionPair;
Expand All @@ -123,6 +131,27 @@ public void validate(final PipeParameterValidator validator) {
throw new PipeParameterNotValidException(e.getMessage());
}

final Set<String> sloppyOptionSet =
Arrays.stream(
parameters
.getStringOrDefault(
Arrays.asList(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY),
EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE)
.split(","))
.map(String::trim)
.map(String::toLowerCase)
.collect(Collectors.toSet());
// Avoid empty string
sloppyOptionSet.remove("");
sloppyTimeRange = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE);
sloppyPattern = sloppyOptionSet.remove(EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE);
if (!sloppyOptionSet.isEmpty()) {
throw new PipeParameterNotValidException(
String.format(
"Parameters in set %s are not allowed in 'history.loose-range'", sloppyOptionSet));
}

if (parameters.hasAnyAttributes(
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
Expand Down Expand Up @@ -280,19 +309,6 @@ public void customize(
}
}

sloppyTimeRange =
Arrays.stream(
parameters
.getStringOrDefault(
Arrays.asList(
EXTRACTOR_HISTORY_LOOSE_RANGE_KEY, SOURCE_HISTORY_LOOSE_RANGE_KEY),
"")
.split(","))
.map(String::trim)
.map(String::toLowerCase)
.collect(Collectors.toSet())
.contains("time");

shouldTransferModFile =
parameters.getBooleanOrDefault(
Arrays.asList(SOURCE_MODS_ENABLE_KEY, EXTRACTOR_MODS_ENABLE_KEY),
Expand All @@ -309,17 +325,20 @@ public void customize(
PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE)
.equalsIgnoreCase(PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE);

LOGGER.info(
"Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy time range {}, should transfer mod file {}, should terminate pipe on all historical events consumed {}",
pipeName,
dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
historicalDataExtractionStartTime,
DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
historicalDataExtractionEndTime,
sloppyTimeRange,
shouldTransferModFile,
shouldTerminatePipeOnAllHistoricalEventsConsumed);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Pipe {}@{}: historical data extraction time range, start time {}({}), end time {}({}), sloppy pattern {}, sloppy time range {}, should transfer mod file {}, should terminate pipe on all historical events consumed {}",
pipeName,
dataRegionId,
DateTimeUtils.convertLongToDate(historicalDataExtractionStartTime),
historicalDataExtractionStartTime,
DateTimeUtils.convertLongToDate(historicalDataExtractionEndTime),
historicalDataExtractionEndTime,
sloppyPattern,
sloppyTimeRange,
shouldTransferModFile,
shouldTerminatePipeOnAllHistoricalEventsConsumed);
}
}

private void flushDataRegionAllTsFiles() {
Expand Down Expand Up @@ -399,7 +418,8 @@ public synchronized void start() {
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource))
.collect(Collectors.toList());
resourceList.addAll(sequenceTsFileResources);

Expand All @@ -412,7 +432,8 @@ && isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
!resource.isClosed()
|| mayTsFileContainUnprocessedData(resource)
&& isTsFileResourceOverlappedWithTimeRange(resource)
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
&& isTsFileGeneratedAfterExtractionTimeLowerBound(resource)
&& mayTsFileResourceOverlappedWithPattern(resource))
.collect(Collectors.toList());
resourceList.addAll(unsequenceTsFileResources);

Expand Down Expand Up @@ -474,6 +495,35 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) {
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
}

private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) {
if (!sloppyPattern) {
return true;
}

final Set<IDeviceID> deviceSet;
try {
final Map<IDeviceID, Boolean> deviceIsAlignedMap =
PipeResourceManager.tsfile()
.getDeviceIsAlignedMapFromCache(
PipeTsFileResourceManager.getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
deviceSet =
Objects.nonNull(deviceIsAlignedMap) ? deviceIsAlignedMap.keySet() : resource.getDevices();
} catch (final IOException e) {
LOGGER.warn(
"Pipe {}@{}: failed to get devices from TsFile {}, extract it anyway",
pipeName,
dataRegionId,
resource.getTsFilePath(),
e);
return true;
}

return deviceSet.stream()
.anyMatch(
// TODO: use IDeviceID
deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID) deviceID).toStringID()));
}

private boolean isTsFileResourceOverlappedWithTimeRange(final TsFileResource resource) {
return !(resource.getFileEndTime() < historicalDataExtractionStartTime
|| historicalDataExtractionEndTime < resource.getFileStartTime());
Expand Down Expand Up @@ -530,7 +580,7 @@ public synchronized Event supply() {
pipePattern,
historicalDataExtractionStartTime,
historicalDataExtractionEndTime);
if (isDbNameCoveredByPattern) {
if (sloppyPattern || isDbNameCoveredByPattern) {
event.skipParsingPattern();
}

Expand Down
Loading

0 comments on commit cf4aed7

Please sign in to comment.