Skip to content

Commit

Permalink
Pipe: Support table pattern matching when sync data between clusters (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu authored Oct 18, 2024
1 parent dcde7f9 commit 82a77b6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

public class TsFileInsertionEventTableParserTabletIterator implements Iterator<Tablet> {

Expand All @@ -43,6 +43,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator<T
private final long endTime;

private final List<IMeasurementSchema> columnSchemas;
private final List<Tablet.ColumnType> columnTypes;
private final List<String> columnNames;
private final TsBlockReader tsBlockReader;

Expand All @@ -56,18 +57,19 @@ public TsFileInsertionEventTableParserTabletIterator(
this.startTime = startTime;
this.endTime = endTime;

columnSchemas = new ArrayList<>();
columnTypes = new ArrayList<>();
columnNames = new ArrayList<>();
try {
columnSchemas =
tableSchema.getColumnSchemas().stream()
// time column in aligned time-series should not be a query column
.filter(
schema ->
schema.getMeasurementId() != null && !schema.getMeasurementId().isEmpty())
.collect(Collectors.toList());
columnNames =
columnSchemas.stream()
.map(IMeasurementSchema::getMeasurementId)
.collect(Collectors.toList());
for (int i = 0, size = tableSchema.getColumnSchemas().size(); i < size; i++) {
final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i);
if (schema.getMeasurementId() != null && !schema.getMeasurementId().isEmpty()) {
columnSchemas.add(schema);
columnTypes.add(tableSchema.getColumnTypes().get(i));
columnNames.add(schema.getMeasurementId());
}
}

tsBlockReader = tableQueryExecutor.query(tableName, columnNames, null, null, null);
} catch (final ReadProcessException e) {
throw new PipeException("Failed to build query data set", e);
Expand Down Expand Up @@ -96,7 +98,8 @@ public Tablet next() {
private Tablet buildNextTablet() throws IOException {
final TsBlock tsBlock = tsBlockReader.next();

final Tablet tablet = new Tablet(tableName, columnSchemas, tsBlock.getPositionCount());
final Tablet tablet =
new Tablet(tableName, columnSchemas, columnTypes, tsBlock.getPositionCount());
tablet.initBitMaps();

final TsBlock.TsBlockRowIterator rowIterator = tsBlock.getTsBlockRowIterator();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<thrift.version>0.14.1</thrift.version>
<xz.version>1.9</xz.version>
<zstd-jni.version>1.5.6-3</zstd-jni.version>
<tsfile.version>1.2.0-d28fef17-SNAPSHOT</tsfile.version>
<tsfile.version>1.2.0-241018-SNAPSHOT</tsfile.version>
</properties>
<!--
if we claim dependencies in dependencyManagement, then we do not claim
Expand Down

0 comments on commit 82a77b6

Please sign in to comment.