Skip to content

Commit

Permalink
Fix unexpected column order in ExportTsFile when wildcard is not used
Browse files Browse the repository at this point in the history
  • Loading branch information
jt2594838 committed Sep 30, 2024
1 parent 6742d4b commit af6e4fc
Showing 1 changed file with 127 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ExportTsFile extends AbstractTsFileTool {

Expand Down Expand Up @@ -299,7 +301,7 @@ private static void dumpResult(String sql, int index) {
final String path = targetDirectory + targetFile + index + ".tsfile";
try (SessionDataSet sessionDataSet = session.executeQueryStatement(sql, timeout)) {
long start = System.currentTimeMillis();
writeTsFileFile(sessionDataSet, path);
writeWithTablets(sessionDataSet, path);
long end = System.currentTimeMillis();
ioTPrinter.println("Export completely!cost: " + (end - start) + " ms.");
} catch (StatementExecutionException
Expand All @@ -310,11 +312,119 @@ private static void dumpResult(String sql, int index) {
}
}

private static void collectSchemas(
List<String> columnNames,
List<String> columnTypes,
Map<String, List<IMeasurementSchema>> deviceSchemaMap,
Set<String> alignedDevices,
Map<String, List<Integer>> deviceColumnIndices)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < columnNames.size(); i++) {
String column = columnNames.get(i);
if (!column.startsWith("root.")) {
continue;
}
TSDataType tsDataType = getTsDataType(columnTypes.get(i));
Path path = new Path(column, true);
String deviceId = path.getDeviceString();
// query whether the device is aligned or not
try (SessionDataSet deviceDataSet =
session.executeQueryStatement("show devices " + deviceId, timeout)) {
List<Field> deviceList = deviceDataSet.next().getFields();
if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) {
alignedDevices.add(deviceId);
}
}

// query timeseries metadata
MeasurementSchema measurementSchema =
new MeasurementSchema(path.getMeasurement(), tsDataType);
List<Field> seriesList =
session.executeQueryStatement("show timeseries " + column, timeout).next().getFields();
measurementSchema.setEncoding(
TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize());
measurementSchema.setCompressor(
CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize());

deviceSchemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema);
deviceColumnIndices.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(i);
}
}

private static List<Tablet> constructTablets(
Map<String, List<IMeasurementSchema>> deviceSchemaMap,
Set<String> alignedDevices,
TsFileWriter tsFileWriter)
throws WriteProcessException {
List<Tablet> tabletList = new ArrayList<>(deviceSchemaMap.size());
for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry : deviceSchemaMap.entrySet()) {
String deviceId = stringListEntry.getKey();
List<IMeasurementSchema> schemaList = stringListEntry.getValue();
Tablet tablet = new Tablet(deviceId, schemaList);
tablet.initBitMaps();
Path path = new Path(tablet.getDeviceId());
if (alignedDevices.contains(tablet.getDeviceId())) {
tsFileWriter.registerAlignedTimeseries(path, schemaList);
} else {
tsFileWriter.registerTimeseries(path, schemaList);
}
tabletList.add(tablet);
}
return tabletList;
}

private static void writeWithTablets(
SessionDataSet sessionDataSet,
List<Tablet> tabletList,
Set<String> alignedDevices,
TsFileWriter tsFileWriter,
Map<String, List<Integer>> deviceColumnIndices)
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
while (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();

for (Tablet tablet : tabletList) {
String deviceId = tablet.getDeviceId();
List<Integer> columnIndices = deviceColumnIndices.get(deviceId);
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
List<IMeasurementSchema> schemas = tablet.getSchemas();

for (int i = 0, columnIndicesSize = columnIndices.size(); i < columnIndicesSize; i++) {
Integer columnIndex = columnIndices.get(i);
IMeasurementSchema measurementSchema = schemas.get(i);
// -1 for time not in fields
Object value = fields.get(columnIndex - 1).getObjectValue(measurementSchema.getType());
if (value == null) {
tablet.bitMaps[i].mark(rowIndex);
}
tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
}

if (tablet.rowSize == tablet.getMaxRowNumber()) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
tablet.initBitMaps();
tablet.reset();
}
}
}

for (Tablet tablet : tabletList) {
if (tablet.rowSize != 0) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
}
}
}

@SuppressWarnings({
"squid:S3776",
"squid:S6541"
}) // Suppress high Cognitive Complexity warning, Suppress many task in one method warning
public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePath)
public static void writeWithTablets(SessionDataSet sessionDataSet, String filePath)
throws IOException,
IoTDBConnectionException,
StatementExecutionException,
Expand All @@ -325,91 +435,32 @@ public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePat
if (f.exists()) {
Files.delete(f.toPath());
}
HashSet<String> deviceFilterSet = new HashSet<>();

try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
Map<String, List<IMeasurementSchema>> schemaMap = new LinkedHashMap<>();
for (int i = 0; i < columnNames.size(); i++) {
String column = columnNames.get(i);
if (!column.startsWith("root.")) {
continue;
}
TSDataType tsDataType = getTsDataType(columnTypes.get(i));
Path path = new Path(column, true);
String deviceId = path.getDeviceString();
try (SessionDataSet deviceDataSet =
session.executeQueryStatement("show devices " + deviceId, timeout)) {
List<Field> deviceList = deviceDataSet.next().getFields();
if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) {
deviceFilterSet.add(deviceId);
}
}
MeasurementSchema measurementSchema =
new MeasurementSchema(path.getMeasurement(), tsDataType);
// device -> column indices in columnNames
Map<String, List<Integer>> deviceColumnIndices = new HashMap<>();
Set<String> alignedDevices = new HashSet<>();
Map<String, List<IMeasurementSchema>> deviceSchemaMap = new LinkedHashMap<>();

List<Field> seriesList =
session.executeQueryStatement("show timeseries " + column, timeout).next().getFields();
collectSchemas(
columnNames, columnTypes, deviceSchemaMap, alignedDevices, deviceColumnIndices);

List<Tablet> tabletList = constructTablets(deviceSchemaMap, alignedDevices, tsFileWriter);

measurementSchema.setEncoding(
TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize());
measurementSchema.setCompressor(
CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize());
schemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema);
}
List<Tablet> tabletList = new ArrayList<>();
for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry : schemaMap.entrySet()) {
String deviceId = stringListEntry.getKey();
List<IMeasurementSchema> schemaList = stringListEntry.getValue();
Tablet tablet = new Tablet(deviceId, schemaList);
tablet.initBitMaps();
Path path = new Path(tablet.getDeviceId());
if (deviceFilterSet.contains(tablet.getDeviceId())) {
tsFileWriter.registerAlignedTimeseries(path, schemaList);
} else {
tsFileWriter.registerTimeseries(path, schemaList);
}
tabletList.add(tablet);
}
if (tabletList.isEmpty()) {
ioTPrinter.println("!!!Warning:Tablet is empty,no data can be exported.");
System.exit(CODE_ERROR);
}
while (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();
int i = 0;
while (i < fields.size()) {
for (Tablet tablet : tabletList) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
List<IMeasurementSchema> schemas = tablet.getSchemas();
for (int j = 0; j < schemas.size(); j++) {
IMeasurementSchema measurementSchema = schemas.get(j);
Object value = fields.get(i).getObjectValue(measurementSchema.getType());
if (value == null) {
tablet.bitMaps[j].mark(rowIndex);
}
tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
i++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
tablet.initBitMaps();
tablet.reset();
}
}
}
}
for (Tablet tablet : tabletList) {
if (tablet.rowSize != 0) {
writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
}
}

writeWithTablets(
sessionDataSet, tabletList, alignedDevices, tsFileWriter, deviceColumnIndices);

tsFileWriter.flushAllChunkGroups();
}
}

private static void writeToTsfile(
HashSet<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
private static void writeToTsFile(
Set<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
throws IOException, WriteProcessException {
if (deviceFilterSet.contains(tablet.getDeviceId())) {
tsFileWriter.writeAligned(tablet);
Expand Down

0 comments on commit af6e4fc

Please sign in to comment.