diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java index 22cdcb907da3..0a6586863b1d 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java @@ -55,10 +55,12 @@ import java.io.IOException; import java.nio.file.Files; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class ExportTsFile extends AbstractTsFileTool { @@ -299,7 +301,7 @@ private static void dumpResult(String sql, int index) { final String path = targetDirectory + targetFile + index + ".tsfile"; try (SessionDataSet sessionDataSet = session.executeQueryStatement(sql, timeout)) { long start = System.currentTimeMillis(); - writeTsFileFile(sessionDataSet, path); + writeWithTablets(sessionDataSet, path); long end = System.currentTimeMillis(); ioTPrinter.println("Export completely!cost: " + (end - start) + " ms."); } catch (StatementExecutionException @@ -310,11 +312,119 @@ private static void dumpResult(String sql, int index) { } } + private static void collectSchemas( + List columnNames, + List columnTypes, + Map> deviceSchemaMap, + Set alignedDevices, + Map> deviceColumnIndices) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < columnNames.size(); i++) { + String column = columnNames.get(i); + if (!column.startsWith("root.")) { + continue; + } + TSDataType tsDataType = getTsDataType(columnTypes.get(i)); + Path path = new Path(column, true); + String deviceId = path.getDeviceString(); + // query whether the device is aligned or not + try (SessionDataSet deviceDataSet = + session.executeQueryStatement("show devices " + deviceId, timeout)) { + List deviceList = deviceDataSet.next().getFields(); + if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) { + alignedDevices.add(deviceId); + } + } + + // query timeseries metadata + MeasurementSchema measurementSchema = + new MeasurementSchema(path.getMeasurement(), tsDataType); + List seriesList = + session.executeQueryStatement("show timeseries " + column, timeout).next().getFields(); + measurementSchema.setEncoding( + TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize()); + measurementSchema.setCompressor( + CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize()); + + deviceSchemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema); + deviceColumnIndices.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(i); + } + } + + private static List constructTablets( + Map> deviceSchemaMap, + Set alignedDevices, + TsFileWriter tsFileWriter) + throws WriteProcessException { + List tabletList = new ArrayList<>(deviceSchemaMap.size()); + for (Map.Entry> stringListEntry : deviceSchemaMap.entrySet()) { + String deviceId = stringListEntry.getKey(); + List schemaList = stringListEntry.getValue(); + Tablet tablet = new Tablet(deviceId, schemaList); + tablet.initBitMaps(); + Path path = new Path(tablet.getDeviceId()); + if (alignedDevices.contains(tablet.getDeviceId())) { + tsFileWriter.registerAlignedTimeseries(path, schemaList); + } else { + tsFileWriter.registerTimeseries(path, schemaList); + } + tabletList.add(tablet); + } + return tabletList; + } + + private static void writeWithTablets( + SessionDataSet sessionDataSet, + List tabletList, + Set alignedDevices, + TsFileWriter tsFileWriter, + Map> deviceColumnIndices) + throws IoTDBConnectionException, + StatementExecutionException, + IOException, + WriteProcessException { + while (sessionDataSet.hasNext()) { + RowRecord rowRecord = sessionDataSet.next(); + List fields = rowRecord.getFields(); + + for (Tablet tablet : tabletList) { + String deviceId = tablet.getDeviceId(); + List columnIndices = deviceColumnIndices.get(deviceId); + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); + List schemas = tablet.getSchemas(); + + for (int i = 0, columnIndicesSize = columnIndices.size(); i < columnIndicesSize; i++) { + Integer columnIndex = columnIndices.get(i); + IMeasurementSchema measurementSchema = schemas.get(i); + // -1 for time not in fields + Object value = fields.get(columnIndex - 1).getObjectValue(measurementSchema.getType()); + if (value == null) { + tablet.bitMaps[i].mark(rowIndex); + } + tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value); + } + + if (tablet.rowSize == tablet.getMaxRowNumber()) { + writeToTsFile(alignedDevices, tsFileWriter, tablet); + tablet.initBitMaps(); + tablet.reset(); + } + } + } + + for (Tablet tablet : tabletList) { + if (tablet.rowSize != 0) { + writeToTsFile(alignedDevices, tsFileWriter, tablet); + } + } + } + @SuppressWarnings({ "squid:S3776", "squid:S6541" }) // Suppress high Cognitive Complexity warning, Suppress many task in one method warning - public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePath) + public static void writeWithTablets(SessionDataSet sessionDataSet, String filePath) throws IOException, IoTDBConnectionException, StatementExecutionException, @@ -325,91 +435,32 @@ public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePat if (f.exists()) { Files.delete(f.toPath()); } - HashSet deviceFilterSet = new HashSet<>(); + try (TsFileWriter tsFileWriter = new TsFileWriter(f)) { - Map> schemaMap = new LinkedHashMap<>(); - for (int i = 0; i < columnNames.size(); i++) { - String column = columnNames.get(i); - if (!column.startsWith("root.")) { - continue; - } - TSDataType tsDataType = getTsDataType(columnTypes.get(i)); - Path path = new Path(column, true); - String deviceId = path.getDeviceString(); - try (SessionDataSet deviceDataSet = - session.executeQueryStatement("show devices " + deviceId, timeout)) { - List deviceList = deviceDataSet.next().getFields(); - if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) { - deviceFilterSet.add(deviceId); - } - } - MeasurementSchema measurementSchema = - new MeasurementSchema(path.getMeasurement(), tsDataType); + // device -> column indices in columnNames + Map> deviceColumnIndices = new HashMap<>(); + Set alignedDevices = new HashSet<>(); + Map> deviceSchemaMap = new LinkedHashMap<>(); - List seriesList = - session.executeQueryStatement("show timeseries " + column, timeout).next().getFields(); + collectSchemas( + columnNames, columnTypes, deviceSchemaMap, alignedDevices, deviceColumnIndices); + + List tabletList = constructTablets(deviceSchemaMap, alignedDevices, tsFileWriter); - measurementSchema.setEncoding( - TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize()); - measurementSchema.setCompressor( - CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize()); - schemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema); - } - List tabletList = new ArrayList<>(); - for (Map.Entry> stringListEntry : schemaMap.entrySet()) { - String deviceId = stringListEntry.getKey(); - List schemaList = stringListEntry.getValue(); - Tablet tablet = new Tablet(deviceId, schemaList); - tablet.initBitMaps(); - Path path = new Path(tablet.getDeviceId()); - if (deviceFilterSet.contains(tablet.getDeviceId())) { - tsFileWriter.registerAlignedTimeseries(path, schemaList); - } else { - tsFileWriter.registerTimeseries(path, schemaList); - } - tabletList.add(tablet); - } if (tabletList.isEmpty()) { ioTPrinter.println("!!!Warning:Tablet is empty,no data can be exported."); System.exit(CODE_ERROR); } - while (sessionDataSet.hasNext()) { - RowRecord rowRecord = sessionDataSet.next(); - List fields = rowRecord.getFields(); - int i = 0; - while (i < fields.size()) { - for (Tablet tablet : tabletList) { - int rowIndex = tablet.rowSize++; - tablet.addTimestamp(rowIndex, rowRecord.getTimestamp()); - List schemas = tablet.getSchemas(); - for (int j = 0; j < schemas.size(); j++) { - IMeasurementSchema measurementSchema = schemas.get(j); - Object value = fields.get(i).getObjectValue(measurementSchema.getType()); - if (value == null) { - tablet.bitMaps[j].mark(rowIndex); - } - tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value); - i++; - } - if (tablet.rowSize == tablet.getMaxRowNumber()) { - writeToTsfile(deviceFilterSet, tsFileWriter, tablet); - tablet.initBitMaps(); - tablet.reset(); - } - } - } - } - for (Tablet tablet : tabletList) { - if (tablet.rowSize != 0) { - writeToTsfile(deviceFilterSet, tsFileWriter, tablet); - } - } + + writeWithTablets( + sessionDataSet, tabletList, alignedDevices, tsFileWriter, deviceColumnIndices); + tsFileWriter.flushAllChunkGroups(); } } - private static void writeToTsfile( - HashSet deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet) + private static void writeToTsFile( + Set deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet) throws IOException, WriteProcessException { if (deviceFilterSet.contains(tablet.getDeviceId())) { tsFileWriter.writeAligned(tablet);