Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Oct 17, 2024
1 parent e88923d commit 8b24e60
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,32 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.controller.CachedChunkLoaderImpl;
import org.apache.tsfile.read.controller.MetadataQuerierByFileImpl;
import org.apache.tsfile.read.query.executor.TableQueryExecutor;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;

public class TsFileInsertionEventTableParser extends TsFileInsertionEventParser {

private static final Logger LOGGER =
LoggerFactory.getLogger(TsFileInsertionEventTableParser.class);

private final PipeMemoryBlock allocatedMemoryBlock;
private final TableQueryExecutor tableQueryExecutor;

private final Map<String, TableSchema> tableSchemaMap;

private final Iterator<Map.Entry<IDeviceID, List<String>>> deviceMeasurementsMapIterator;
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Iterator<Map.Entry<String, TableSchema>> filteredTableSchemaIterator;

public TsFileInsertionEventTableParser(
final File tsFile,
Expand All @@ -77,135 +55,25 @@ public TsFileInsertionEventTableParser(
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent)
throws IOException {
this(tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, null);
}

private TsFileInsertionEventTableParser(
final File tsFile,
final TablePattern pattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent,
final Map<IDeviceID, Boolean> deviceIsAlignedMap)
throws IOException {
super(null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);

try {
final PipeTsFileResourceManager tsFileResourceManager = PipeDataNodeResourceManager.tsfile();
final Map<IDeviceID, List<String>> deviceMeasurementsMap;

// TsFileReader is not thread-safe, so we need to create it here and close it later.
long memoryRequiredInBytes =
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);

tableSchemaMap = tsFileSequenceReader.readFileMetadata().getTableSchemaMap();

filteredTableSchemaIterator =
tsFileSequenceReader.readFileMetadata().getTableSchemaMap().entrySet().stream()
.filter(entry -> Objects.isNull(pattern) || pattern.matchesTable(entry.getKey()))
.iterator();
tableQueryExecutor =
new TableQueryExecutor(
new MetadataQuerierByFileImpl(tsFileSequenceReader),
new CachedChunkLoaderImpl(tsFileSequenceReader),
TableQueryExecutor.TableQueryOrdering.DEVICE);

if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
? deviceIsAlignedMap
: tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile, true);
deviceMeasurementsMap = tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
} else {
// We need to create these objects here and remove them later.
final Set<IDeviceID> devices;
if (Objects.isNull(deviceIsAlignedMap)) {
this.deviceIsAlignedMap = readDeviceIsAlignedMap();
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);

// Filter devices that may overlap with pattern first
// to avoid reading all time-series of all devices.
devices = filterDevicesByPattern(this.deviceIsAlignedMap.keySet());
} else {
this.deviceIsAlignedMap = deviceIsAlignedMap;
devices = deviceIsAlignedMap.keySet();
}

deviceMeasurementsMap = readFilteredDeviceMeasurementsMap(devices);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
}
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory().forceAllocate(memoryRequiredInBytes);

// Filter again to get the final deviceMeasurementsMap that exactly matches the pattern.
deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();

// No longer need this. Help GC.
tsFileSequenceReader.clearCachedDeviceMetadata();
} catch (final Exception e) {
close();
throw e;
}
}

private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
while (deviceIsAlignedIterator.hasNext()) {
final Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIsAlignedIterator.next();
deviceIsAlignedResultMap.put(deviceIsAlignedPair.getLeft(), deviceIsAlignedPair.getRight());
}
return deviceIsAlignedResultMap;
}

private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
if (Objects.isNull(tablePattern)
|| !tablePattern.hasUserSpecifiedDatabasePatternOrTablePattern()) {
return devices;
}

final Set<IDeviceID> filteredDevices = new HashSet<>();
for (final IDeviceID device : devices) {
if (tablePattern.matchesTable(device.getTableName())) {
filteredDevices.add(device);
}
}
return filteredDevices;
}

private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
.readDeviceMetadata(device)
.values()
.forEach(
timeseriesMetadata ->
result
.computeIfAbsent(device, d -> new ArrayList<>())
.add(timeseriesMetadata.getMeasurementId()));
}

return result;
}

private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap) {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();
if (Objects.isNull(tablePattern) || tablePattern.matchesTable(deviceId.getTableName())) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
}
return filteredDeviceMeasurementsMap;
}

@Override
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
return () ->
Expand All @@ -216,24 +84,17 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
@Override
public boolean hasNext() {
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
if (!filteredTableSchemaIterator.hasNext()) {
close();
return false;
}

final Map.Entry<IDeviceID, List<String>> entry = deviceMeasurementsMapIterator.next();
final Map.Entry<String, TableSchema> entry = filteredTableSchemaIterator.next();

try {
tabletIterator =
new TsFileInsertionEventTableParserTabletIterator(
tableQueryExecutor,
entry.getKey(),
entry.getValue(),
tableSchemaMap,
timeFilterExpression,
startTime,
endTime,
allocatedMemoryBlockForTablet);
tableQueryExecutor, entry.getKey(), entry.getValue(), startTime, endTime);
} catch (final Exception e) {
close();
throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e);
Expand Down Expand Up @@ -283,13 +144,4 @@ public TabletInsertionEvent next() {
}
};
}

@Override
public void close() {
super.close();

if (allocatedMemoryBlock != null) {
allocatedMemoryBlock.close();
}
}
}
Loading

0 comments on commit 8b24e60

Please sign in to comment.