Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Replaced IDeviceID in pipe module #12556

Merged
merged 16 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,8 @@ public boolean shouldParsePattern() {
? ((InsertRowsNode) node)
.getInsertRowNodeList().stream()
.anyMatch(
insertRowNode ->
!pipePattern.coversDevice(
insertRowNode.getDevicePath().getFullPath()))
: !pipePattern.coversDevice(node.getDevicePath().getFullPath())));
insertRowNode -> !pipePattern.coversDevice(insertRowNode.getDeviceID()))
: !pipePattern.coversDevice(node.getDeviceID())));
}

public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.write.UnSupportedDataTypeException;
Expand Down Expand Up @@ -62,7 +64,9 @@ public class TabletInsertionDataContainer {
private final EnrichedEvent
sourceEvent; // used to report progress and filter value columns by time range

private String deviceId;
// Used to preserve performance
private String deviceStr;
private IDeviceID deviceId;
private boolean isAligned;
private IMeasurementSchema[] measurementSchemaList;
private String[] columnNameStringList;
Expand Down Expand Up @@ -138,7 +142,9 @@ private void parse(final InsertRowNode insertRowNode, final PipePattern pattern)
final int originColumnSize = insertRowNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize];

this.deviceId = insertRowNode.getDevicePath().getFullPath();
// The full path is always cached when device path is deserialized
this.deviceStr = insertRowNode.getDevicePath().getFullPath();
this.deviceId = insertRowNode.getDeviceID();
this.isAligned = insertRowNode.isAligned();

final long[] originTimestampColumn = new long[] {insertRowNode.getTime()};
Expand Down Expand Up @@ -205,7 +211,9 @@ private void parse(final InsertTabletNode insertTabletNode, final PipePattern pa
final int originColumnSize = insertTabletNode.getMeasurements().length;
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize];

this.deviceId = insertTabletNode.getDevicePath().getFullPath();
// The full path is always cached when device path is deserialized
this.deviceStr = insertTabletNode.getDevicePath().getFullPath();
this.deviceId = insertTabletNode.getDeviceID();
this.isAligned = insertTabletNode.isAligned();

final long[] originTimestampColumn = insertTabletNode.getTimes();
Expand Down Expand Up @@ -288,7 +296,9 @@ private void parse(final Tablet tablet, final boolean isAligned, final PipePatte
final int originColumnSize = tablet.getSchemas().size();
final Integer[] originColumnIndex2FilteredColumnIndexMapperList = new Integer[originColumnSize];

this.deviceId = tablet.getDeviceId();
// Only support tree-model tablet
this.deviceStr = tablet.getDeviceId();
this.deviceId = new StringArrayDeviceID(tablet.getDeviceId());
this.isAligned = isAligned;

final long[] originTimestampColumn =
Expand Down Expand Up @@ -567,9 +577,10 @@ public List<TabletInsertionEvent> processRowByRow(BiConsumer<Row, RowCollector>
final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, sourceEvent);
for (int i = 0; i < rowCount; i++) {
consumer.accept(
// Used for tree model
new PipeRow(
i,
deviceId,
getDeviceStr(),
isAligned,
measurementSchemaList,
timestampColumn,
Expand All @@ -595,7 +606,8 @@ public Tablet convertToTablet() {
return tablet;
}

final Tablet newTablet = new Tablet(deviceId, Arrays.asList(measurementSchemaList), rowCount);
final Tablet newTablet =
new Tablet(getDeviceStr(), Arrays.asList(measurementSchemaList), rowCount);
newTablet.timestamps = timestampColumn;
newTablet.bitMaps = nullValueColumnBitmaps;
newTablet.values = valueColumns;
Expand All @@ -605,4 +617,8 @@ public Tablet convertToTablet() {

return tablet;
}

private String getDeviceStr() {
return Objects.nonNull(deviceStr) ? deviceStr : deviceId.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,23 +146,22 @@ private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
for (Map.Entry<IDeviceID, List<String>> entry : originalDeviceMeasurementsMap.entrySet()) {
final IDeviceID deviceId = entry.getKey();
String deviceStr = deviceId.toString();

// case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
// in this case, all data can be matched without checking the measurements
if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceStr)) {
if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
}

// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
else if (pattern.mayOverlapWithDevice(deviceStr)) {
else if (pattern.mayOverlapWithDevice(deviceId)) {
final List<String> filteredMeasurements = new ArrayList<>();

for (final String measurement : entry.getValue()) {
if (pattern.matchesMeasurement(deviceStr, measurement)) {
if (pattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
} else {
// Parse pattern iff there are measurements filtered out
Expand Down Expand Up @@ -221,7 +220,7 @@ public boolean hasNext() {
new TsFileInsertionDataTabletIterator(
tsFileReader,
measurementDataTypeMap,
entry.getKey().toString(),
entry.getKey(),
entry.getValue(),
timeFilterExpression);
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.TsFileReader;
import org.apache.tsfile.read.common.Field;
import org.apache.tsfile.read.common.Path;
Expand All @@ -48,7 +49,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
private final TsFileReader tsFileReader;
private final Map<String, TSDataType> measurementDataTypeMap;

private final String deviceId;
private final IDeviceID deviceId;
private final List<String> measurements;

private final IExpression timeFilterExpression;
Expand All @@ -58,7 +59,7 @@ public class TsFileInsertionDataTabletIterator implements Iterator<Tablet> {
public TsFileInsertionDataTabletIterator(
TsFileReader tsFileReader,
Map<String, TSDataType> measurementDataTypeMap,
String deviceId,
IDeviceID deviceId,
List<String> measurements,
IExpression timeFilterExpression)
throws IOException {
Expand Down Expand Up @@ -118,7 +119,11 @@ private Tablet buildNextTablet() throws IOException {
schemas.add(new MeasurementSchema(measurement, dataType));
}
final Tablet tablet =
new Tablet(deviceId, schemas, PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
new Tablet(
// Used for tree model
deviceId.toString(),
schemas,
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
tablet.initBitMaps();

while (queryDataSet.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.commons.pipe.pattern.PipePattern;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -83,14 +82,13 @@ private Map<IDeviceID, Set<String>> filterDeviceMeasurementsMapByPattern() throw

for (final Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
final IDeviceID deviceId = entry.getKey();

// case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
// in this case, all data can be matched without checking the measurements
if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(
new PlainDeviceID(deviceId), new HashSet<>(entry.getValue()));
filteredDeviceMeasurementsMap.put(deviceId, new HashSet<>(entry.getValue()));
}
}

Expand All @@ -109,7 +107,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) {
}

if (!filteredMeasurements.isEmpty()) {
filteredDeviceMeasurementsMap.put(new PlainDeviceID(deviceId), filteredMeasurements);
filteredDeviceMeasurementsMap.put(deviceId, filteredMeasurements);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;

import org.apache.tsfile.file.metadata.IDeviceID;

import java.util.Map;

/**
Expand All @@ -37,20 +39,20 @@ public class PipeRealtimeEvent extends EnrichedEvent {
private final EnrichedEvent event;
private final TsFileEpoch tsFileEpoch;

private Map<String, String[]> device2Measurements;
private Map<IDeviceID, String[]> device2Measurements;

public PipeRealtimeEvent(
final EnrichedEvent event,
final TsFileEpoch tsFileEpoch,
final Map<String, String[]> device2Measurements,
final Map<IDeviceID, String[]> device2Measurements,
final PipePattern pattern) {
this(event, tsFileEpoch, device2Measurements, null, pattern, Long.MIN_VALUE, Long.MAX_VALUE);
}

public PipeRealtimeEvent(
final EnrichedEvent event,
final TsFileEpoch tsFileEpoch,
final Map<String, String[]> device2Measurements,
final Map<IDeviceID, String[]> device2Measurements,
final PipeTaskMeta pipeTaskMeta,
final PipePattern pattern,
final long startTime,
Expand All @@ -73,7 +75,7 @@ public TsFileEpoch getTsFileEpoch() {
return tsFileEpoch;
}

public Map<String, String[]> getSchemaInfo() {
public Map<IDeviceID, String[]> getSchemaInfo() {
return device2Measurements;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;

import com.google.common.base.Functions;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,7 +70,7 @@ public PipeRealtimeEvent bindPipeTsFileInsertionEvent(
event,
epoch,
resource.getDevices().stream()
.collect(Collectors.toMap(Object::toString, device -> EMPTY_MEASUREMENT_ARRAY)),
.collect(Collectors.toMap(Functions.identity(), device -> EMPTY_MEASUREMENT_ARRAY)),
event.getPipePattern());
}

Expand All @@ -82,16 +84,16 @@ public PipeRealtimeEvent bindPipeInsertNodeTabletInsertionEvent(
epoch,
node instanceof InsertRowsNode
? getDevice2MeasurementsMapFromInsertRowsNode((InsertRowsNode) node)
: Collections.singletonMap(node.getDevicePath().getFullPath(), node.getMeasurements()),
: Collections.singletonMap(node.getDeviceID(), node.getMeasurements()),
event.getPipePattern());
}

private Map<String, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
private Map<IDeviceID, String[]> getDevice2MeasurementsMapFromInsertRowsNode(
InsertRowsNode insertRowsNode) {
return insertRowsNode.getInsertRowNodeList().stream()
.collect(
Collectors.toMap(
insertRowNode -> insertRowNode.getDevicePath().getFullPath(),
InsertNode::getDeviceID,
InsertNode::getMeasurements,
(oldMeasurements, newMeasurements) ->
Stream.of(Arrays.asList(oldMeasurements), Arrays.asList(newMeasurements))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;

import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.Pair;

import java.util.Arrays;
Expand Down Expand Up @@ -74,7 +75,7 @@ public Optional<PlanNode> visitPlan(final PlanNode node, final IoTDBPipePattern
public Optional<PlanNode> visitCreateTimeSeries(
final CreateTimeSeriesNode node, final IoTDBPipePattern pattern) {
return pattern.matchesMeasurement(
node.getPath().getDeviceString(), node.getPath().getMeasurement())
node.getPath().getIDeviceID(), node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
}
Expand All @@ -87,7 +88,8 @@ public Optional<PlanNode> visitCreateAlignedTimeSeries(
.filter(
index ->
pattern.matchesMeasurement(
node.getDevicePath().getFullPath(), node.getMeasurements().get(index)))
node.getDevicePath().getIDeviceIDAsFullDevice(),
node.getMeasurements().get(index)))
.toArray();
return filteredIndexes.length > 0
? Optional.of(
Expand Down Expand Up @@ -115,7 +117,7 @@ public Optional<PlanNode> visitCreateMultiTimeSeries(
new Pair<>(
entry.getKey(),
trimMeasurementGroup(
entry.getKey().getFullPath(), entry.getValue(), pattern)))
entry.getKey().getIDeviceIDAsFullDevice(), entry.getValue(), pattern)))
.filter(pair -> Objects.nonNull(pair.getRight()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
return !filteredMeasurementGroupMap.isEmpty()
Expand All @@ -125,7 +127,7 @@ public Optional<PlanNode> visitCreateMultiTimeSeries(
}

private static MeasurementGroup trimMeasurementGroup(
final String device, final MeasurementGroup group, final IoTDBPipePattern pattern) {
final IDeviceID device, final MeasurementGroup group, final IoTDBPipePattern pattern) {
final int[] filteredIndexes =
IntStream.range(0, group.size())
.filter(index -> pattern.matchesMeasurement(device, group.getMeasurements().get(index)))
Expand Down Expand Up @@ -154,7 +156,7 @@ private static MeasurementGroup trimMeasurementGroup(
public Optional<PlanNode> visitAlterTimeSeries(
final AlterTimeSeriesNode node, final IoTDBPipePattern pattern) {
return pattern.matchesMeasurement(
node.getPath().getDeviceString(), node.getPath().getMeasurement())
node.getPath().getIDeviceID(), node.getPath().getMeasurement())
? Optional.of(node)
: Optional.empty();
}
Expand All @@ -165,7 +167,9 @@ public Optional<PlanNode> visitInternalCreateTimeSeries(
final MeasurementGroup group =
pattern.matchPrefixPath(node.getDevicePath().getFullPath())
? trimMeasurementGroup(
node.getDevicePath().getFullPath(), node.getMeasurementGroup(), pattern)
node.getDevicePath().getIDeviceIDAsFullDevice(),
node.getMeasurementGroup(),
pattern)
: null;
return Objects.nonNull(group)
? Optional.of(
Expand Down Expand Up @@ -209,7 +213,7 @@ public Optional<PlanNode> visitInternalCreateMultiTimeSeries(
new Pair<>(
entry.getValue().getLeft(),
trimMeasurementGroup(
entry.getKey().getFullPath(),
entry.getKey().getIDeviceIDAsFullDevice(),
entry.getValue().getRight(),
pattern))))
.filter(pair -> Objects.nonNull(pair.getRight().getRight()))
Expand Down Expand Up @@ -241,7 +245,7 @@ public Optional<PlanNode> visitCreateLogicalView(
.filter(
entry ->
pattern.matchesMeasurement(
entry.getKey().getDeviceString(), entry.getKey().getMeasurement()))
entry.getKey().getIDeviceID(), entry.getKey().getMeasurement()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return !filteredViewPathToSourceMap.isEmpty()
? Optional.of(new CreateLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap))
Expand All @@ -256,7 +260,7 @@ public Optional<PlanNode> visitAlterLogicalView(
.filter(
entry ->
pattern.matchesMeasurement(
entry.getKey().getDeviceString(), entry.getKey().getMeasurement()))
entry.getKey().getIDeviceID(), entry.getKey().getMeasurement()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return !filteredViewPathToSourceMap.isEmpty()
? Optional.of(new AlterLogicalViewNode(node.getPlanNodeId(), filteredViewPathToSourceMap))
Expand Down
Loading
Loading