Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Oct 17, 2024
1 parent 8ff58c1 commit e88923d
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private TsFileInsertionEventParser initEventParser() {
if (eventParser == null) {
eventParser =
new TsFileInsertionEventParserProvider(
tsFile, treePattern, startTime, endTime, pipeTaskMeta, this)
tsFile, treePattern, tablePattern, startTime, endTime, pipeTaskMeta, this)
.provide();
}
return eventParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable {
protected final TreePattern treePattern; // used to filter data
protected final TablePattern tablePattern; // used to filter data
protected final GlobalTimeExpression timeFilterExpression; // used to filter data
protected final long startTime; // used to filter data
protected final long endTime; // used to filter data

protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final PipeInsertionEvent sourceEvent; // used to report progress
Expand All @@ -63,6 +65,8 @@ protected TsFileInsertionEventParser(
(startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
? null
: new GlobalTimeExpression(TimeFilterApi.between(startTime, endTime));
this.startTime = startTime;
this.endTime = endTime;

this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParserTabletIterator;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.controller.CachedChunkLoaderImpl;
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.tsfile.read.query.executor.TableQueryExecutor;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
Expand All @@ -61,11 +62,12 @@ public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser
LoggerFactory.getLogger(TsFileInsertionEventTableParser.class);

private final PipeMemoryBlock allocatedMemoryBlock;
private final TsFileReader tsFileReader;
private final TableQueryExecutor tableQueryExecutor;

private final Map<String, TableSchema> tableSchemaMap;

private final Iterator<Map.Entry<IDeviceID, List<String>>> deviceMeasurementsMapIterator;
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;

public TsFileInsertionEventTableParser(
final File tsFile,
Expand All @@ -78,7 +80,7 @@ public TsFileInsertionEventTableParser(
this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
}

public TsFileInsertionEventTableParser(
private TsFileInsertionEventTableParser(
final File tsFile,
final TablePattern pattern,
final long startTime,
Expand All @@ -97,15 +99,21 @@ public TsFileInsertionEventTableParser(
long memoryRequiredInBytes =
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);
tsFileReader = new TsFileReader(tsFileSequenceReader);

tableSchemaMap = tsFileSequenceReader.readFileMetadata().getTableSchemaMap();

tableQueryExecutor =
new TableQueryExecutor(
new MetadataQuerierByFileImpl(tsFileSequenceReader),
new CachedChunkLoaderImpl(tsFileSequenceReader),
TableQueryExecutor.TableQueryOrdering.DEVICE);

if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
? deviceIsAlignedMap
: tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile, true);
measurementDataTypeMap = tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile);
deviceMeasurementsMap = tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
} else {
// We need to create these objects here and remove them later.
Expand All @@ -123,10 +131,6 @@ public TsFileInsertionEventTableParser(
devices = deviceIsAlignedMap.keySet();
}

measurementDataTypeMap = readFilteredFullPathDataTypeMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);

deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
Expand All @@ -146,42 +150,6 @@ public TsFileInsertionEventTableParser(
}
}

private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap) {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();

// case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
// in this case, all data can be matched without checking the measurements
if (Objects.isNull(treePattern)
|| treePattern.isRoot()
|| treePattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
}

// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
else if (treePattern.mayOverlapWithDevice(deviceId)) {
final List<String> filteredMeasurements = new ArrayList<>();

for (final String measurement : entry.getValue()) {
if (treePattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
}
}

if (!filteredMeasurements.isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}
}

return filteredDeviceMeasurementsMap;
}

private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
Expand All @@ -194,45 +162,20 @@ private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
}

private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
if (Objects.isNull(treePattern) || treePattern.isRoot()) {
if (Objects.isNull(tablePattern)
|| !tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
return devices;
}

final Set<IDeviceID> filteredDevices = new HashSet<>();
for (final IDeviceID device : devices) {
if (treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) {
if (tablePattern.matchesTable(device.getTableName())) {
filteredDevices.add(device);
}
}
return filteredDevices;
}

/**
* This method is similar to {@link TsFileSequenceReader#getFullPathDataTypeMap()}, but only reads
* the given devices.
*/
private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final Set<IDeviceID> devices)
throws IOException {
final Map<String, TSDataType> result = new HashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
.forEach(
timeseriesMetadata ->
result.put(
device.toString() + "." + timeseriesMetadata.getMeasurementId(),
timeseriesMetadata.getTsDataType()));
}

return result;
}

/**
* This method is similar to {@link TsFileSequenceReader#getDeviceMeasurementsMap()}, but only
* reads the given devices.
*/
private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();
Expand All @@ -251,12 +194,24 @@ private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
return result;
}

private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap) {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();
if (Objects.isNull(tablePattern) || tablePattern.matchesTable(deviceId.getTableName())) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
}
return filteredDeviceMeasurementsMap;
}

@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
new Iterator<TabletInsertionEvent>() {

private TsFileInsertionEventQueryParserTabletIterator tabletIterator = null;
private TsFileInsertionEventTableParserTabletIterator tabletIterator = null;

@Override
public boolean hasNext() {
Expand All @@ -269,7 +224,16 @@ public boolean hasNext() {
final Map.Entry<IDeviceID, List<String>> entry = deviceMeasurementsMapIterator.next();

try {
tabletIterator = null;
tabletIterator =
new TsFileInsertionEventTableParserTabletIterator(
tableQueryExecutor,
entry.getKey(),
entry.getValue(),
tableSchemaMap,
timeFilterExpression,
startTime,
endTime,
allocatedMemoryBlockForTablet);
} catch (final Exception e) {
close();
throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e);
Expand All @@ -287,18 +251,15 @@ public TabletInsertionEvent next() {
}

final Tablet tablet = tabletIterator.next();
final boolean isAligned =
deviceIsAlignedMap.getOrDefault(
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);

final TabletInsertionEvent next;
if (!hasNext()) {
next =
new PipeRawTabletInsertionEvent(
sourceEvent != null ? sourceEvent.isTableModelEvent() : null,
Boolean.TRUE,
sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null,
tablet,
isAligned,
true,
sourceEvent != null ? sourceEvent.getPipeName() : null,
sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
Expand All @@ -308,10 +269,10 @@ public TabletInsertionEvent next() {
} else {
next =
new PipeRawTabletInsertionEvent(
sourceEvent != null ? sourceEvent.isTableModelEvent() : null,
Boolean.TRUE,
sourceEvent != null ? sourceEvent.getTreeModelDatabaseName() : null,
tablet,
isAligned,
true,
sourceEvent != null ? sourceEvent.getPipeName() : null,
sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
Expand All @@ -325,14 +286,6 @@ public TabletInsertionEvent next() {

@Override
public void close() {
try {
if (tsFileReader != null) {
tsFileReader.close();
}
} catch (final IOException e) {
LOGGER.warn("Failed to close TsFileReader", e);
}

super.close();

if (allocatedMemoryBlock != null) {
Expand Down
Loading

0 comments on commit e88923d

Please sign in to comment.