Skip to content

Commit

Permalink
Pipe: Support syncing table model data between clusters (TsFileInsert…
Browse files Browse the repository at this point in the history
…ionEventTableParser impl & following fixup) (#13806)

Co-authored-by: Zhenyu Luo <[email protected]>
  • Loading branch information
SteveYurongSu and luoluoyuyu authored Oct 18, 2024
1 parent 3ef4ac6 commit bd5b30b
Show file tree
Hide file tree
Showing 17 changed files with 405 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;

Expand Down Expand Up @@ -85,8 +86,12 @@ public InsertTabletStatement constructStatement() {
}

// Table model
request.setWriteToTable(true);
request.columnCategories =
tablet.getColumnTypes().stream()
.map(t -> (byte) t.ordinal())
.collect(Collectors.toList());
final InsertTabletStatement statement = StatementGenerator.createStatement(request);
statement.setWriteToTable(true);
statement.setDatabaseName(dataBaseName);
return statement;
} catch (final MetadataException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public Tablet convertToTablet() {
newTablet.bitMaps = nullValueColumnBitmaps;
newTablet.values = valueColumns;
newTablet.rowSize = rowCount;
newTablet.setColumnTypes(Arrays.asList(valueColumnTypes));

tablet = newTablet;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,11 +424,6 @@ public boolean mayEventPathsOverlappedWithPattern() {

/////////////////////////// PipeInsertionEvent ///////////////////////////

/**
* Judge whether the TsFile is table model or tree model. If the TsFile is table model, the method
* will return true. If the TsFile is tree model, the method will return false. If the TsFile is
* mixed model, the method will return true and mark the event as table model event.
*/
@Override
public boolean isTableModelEvent() {
if (getRawIsTableModelEvent() == null) {
Expand All @@ -450,8 +445,8 @@ public boolean isTableModelEvent() {
markAsTreeModelEvent();
} else {
markAsTableModelEvent();
break;
}
break;
}
} catch (final Exception e) {
throw new PipeException(
Expand Down Expand Up @@ -502,7 +497,7 @@ private TsFileInsertionEventParser initEventParser() {
if (eventParser == null) {
eventParser =
new TsFileInsertionEventParserProvider(
tsFile, treePattern, startTime, endTime, pipeTaskMeta, this)
tsFile, treePattern, tablePattern, startTime, endTime, pipeTaskMeta, this)
.provide();
}
return eventParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.parser;

import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.event.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
Expand All @@ -38,8 +39,11 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(TsFileInsertionEventParser.class);

protected final TreePattern pattern; // used to filter data
protected final TreePattern treePattern; // used to filter data
protected final TablePattern tablePattern; // used to filter data
protected final GlobalTimeExpression timeFilterExpression; // used to filter data
protected final long startTime; // used to filter data
protected final long endTime; // used to filter data

protected final PipeTaskMeta pipeTaskMeta; // used to report progress
protected final PipeInsertionEvent sourceEvent; // used to report progress
Expand All @@ -49,16 +53,20 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable {
protected TsFileSequenceReader tsFileSequenceReader;

protected TsFileInsertionEventParser(
final TreePattern pattern,
final TreePattern treePattern,
final TablePattern tablePattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent) {
this.pattern = pattern;
this.treePattern = treePattern;
this.tablePattern = tablePattern;
timeFilterExpression =
(startTime == Long.MIN_VALUE && endTime == Long.MAX_VALUE)
? null
: new GlobalTimeExpression(TimeFilterApi.between(startTime, endTime));
this.startTime = startTime;
this.endTime = endTime;

this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;

import org.apache.tsfile.file.metadata.IDeviceID;
Expand All @@ -39,7 +41,8 @@
public class TsFileInsertionEventParserProvider {

private final File tsFile;
private final TreePattern pattern;
private final TreePattern treePattern;
private final TablePattern tablePattern;
private final long startTime;
private final long endTime;

Expand All @@ -49,23 +52,30 @@ public class TsFileInsertionEventParserProvider {
public TsFileInsertionEventParserProvider(
final File tsFile,
final TreePattern treePattern,
final TablePattern tablePattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final PipeTsFileInsertionEvent sourceEvent) {
this.tsFile = tsFile;
this.pattern = treePattern;
this.treePattern = treePattern;
this.tablePattern = tablePattern;
this.startTime = startTime;
this.endTime = endTime;
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
}

public TsFileInsertionEventParser provide() throws IOException {
if (sourceEvent.isTableModelEvent()) {
return new TsFileInsertionEventTableParser(
tsFile, tablePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
}

if (startTime != Long.MIN_VALUE
|| endTime != Long.MAX_VALUE
|| pattern instanceof IoTDBTreePattern
&& !((IoTDBTreePattern) pattern).mayMatchMultipleTimeSeriesInOneDevice()) {
|| treePattern instanceof IoTDBTreePattern
&& !((IoTDBTreePattern) treePattern).mayMatchMultipleTimeSeriesInOneDevice()) {
// 1. If time filter exists, use query here because the scan container may filter it
// row by row in single page chunk.
// 2. If the pattern matches only one time series in one device, use query container here
Expand All @@ -75,7 +85,7 @@ public TsFileInsertionEventParser provide() throws IOException {
// hard to know whether it only matches one timeseries, while matching multiple is often the
// case.
return new TsFileInsertionEventQueryParser(
tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
}

final Map<IDeviceID, Boolean> deviceIsAlignedMap =
Expand All @@ -84,7 +94,7 @@ public TsFileInsertionEventParser provide() throws IOException {
// If we failed to get from cache, it indicates that the memory usage is high.
// We use scan data container because it requires less memory.
return new TsFileInsertionEventScanParser(
tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent);
}

final int originalSize = deviceIsAlignedMap.size();
Expand All @@ -94,10 +104,10 @@ public TsFileInsertionEventParser provide() throws IOException {
return (double) filteredDeviceIsAlignedMap.size() / originalSize
> PipeConfig.getInstance().getPipeTsFileScanParsingThreshold()
? new TsFileInsertionEventScanParser(
tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent)
tsFile, treePattern, startTime, endTime, pipeTaskMeta, sourceEvent)
: new TsFileInsertionEventQueryParser(
tsFile,
pattern,
treePattern,
startTime,
endTime,
pipeTaskMeta,
Expand All @@ -107,15 +117,16 @@ public TsFileInsertionEventParser provide() throws IOException {

private Map<IDeviceID, Boolean> filterDeviceIsAlignedMapByPattern(
final Map<IDeviceID, Boolean> deviceIsAlignedMap) {
if (Objects.isNull(pattern) || pattern.isRoot()) {
if (Objects.isNull(treePattern) || treePattern.isRoot()) {
return deviceIsAlignedMap;
}

return deviceIsAlignedMap.entrySet().stream()
.filter(
entry -> {
final IDeviceID deviceId = entry.getKey();
return pattern.coversDevice(deviceId) || pattern.mayOverlapWithDevice(deviceId);
return treePattern.coversDevice(deviceId)
|| treePattern.mayOverlapWithDevice(deviceId);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public TsFileInsertionEventQueryParser(
final PipeInsertionEvent sourceEvent,
final Map<IDeviceID, Boolean> deviceIsAlignedMap)
throws IOException {
super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);

try {
final PipeTsFileResourceManager tsFileResourceManager = PipeDataNodeResourceManager.tsfile();
Expand Down Expand Up @@ -165,19 +165,21 @@ private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(

// case 1: for example, pattern is root.a.b or pattern is null and device is root.a.b.c
// in this case, all data can be matched without checking the measurements
if (Objects.isNull(pattern) || pattern.isRoot() || pattern.coversDevice(deviceId)) {
if (Objects.isNull(treePattern)
|| treePattern.isRoot()
|| treePattern.coversDevice(deviceId)) {
if (!entry.getValue().isEmpty()) {
filteredDeviceMeasurementsMap.put(deviceId, entry.getValue());
}
}

// case 2: for example, pattern is root.a.b.c and device is root.a.b
// in this case, we need to check the full path
else if (pattern.mayOverlapWithDevice(deviceId)) {
else if (treePattern.mayOverlapWithDevice(deviceId)) {
final List<String> filteredMeasurements = new ArrayList<>();

for (final String measurement : entry.getValue()) {
if (pattern.matchesMeasurement(deviceId, measurement)) {
if (treePattern.matchesMeasurement(deviceId, measurement)) {
filteredMeasurements.add(measurement);
}
}
Expand All @@ -203,13 +205,13 @@ private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
}

private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
if (Objects.isNull(pattern) || pattern.isRoot()) {
if (Objects.isNull(treePattern) || treePattern.isRoot()) {
return devices;
}

final Set<IDeviceID> filteredDevices = new HashSet<>();
for (final IDeviceID device : devices) {
if (pattern.coversDevice(device) || pattern.mayOverlapWithDevice(device)) {
if (treePattern.coversDevice(device) || treePattern.mayOverlapWithDevice(device)) {
filteredDevices.add(device);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public TsFileInsertionEventScanParser(
final PipeTaskMeta pipeTaskMeta,
final PipeInsertionEvent sourceEvent)
throws IOException {
super(pattern, startTime, endTime, pipeTaskMeta, sourceEvent);
super(pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent);

this.startTime = startTime;
this.endTime = endTime;
Expand Down Expand Up @@ -381,7 +381,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
break;
}

if (!pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
if (!treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
tsFileSequenceReader.position(
tsFileSequenceReader.position() + chunkHeader.getDataSize());
break;
Expand All @@ -408,7 +408,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);

if (Objects.isNull(currentDevice)
|| !pattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
|| !treePattern.matchesMeasurement(currentDevice, chunkHeader.getMeasurementID())) {
tsFileSequenceReader.position(
tsFileSequenceReader.position() + chunkHeader.getDataSize());
break;
Expand Down Expand Up @@ -456,7 +456,7 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException {
isMultiPageList.clear();
measurementIndexMap.clear();
final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID : null;
currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null;
break;
case MetaMarker.OPERATION_INDEX_RANGE:
tsFileSequenceReader.readPlanIndex();
Expand Down
Loading

0 comments on commit bd5b30b

Please sign in to comment.