Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unexpected column order in ExportTsFile when wildcard is not used #13662

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ExportTsFile extends AbstractTsFileTool {

Expand Down Expand Up @@ -299,7 +301,7 @@ private static void dumpResult(String sql, int index) {
final String path = targetDirectory + targetFile + index + ".tsfile";
try (SessionDataSet sessionDataSet = session.executeQueryStatement(sql, timeout)) {
long start = System.currentTimeMillis();
writeTsFileFile(sessionDataSet, path);
writeWithTablets(sessionDataSet, path);
long end = System.currentTimeMillis();
ioTPrinter.println("Export completely!cost: " + (end - start) + " ms.");
} catch (StatementExecutionException
Expand All @@ -310,11 +312,119 @@ private static void dumpResult(String sql, int index) {
}
}

private static void collectSchemas(
List<String> columnNames,
List<String> columnTypes,
Map<String, List<IMeasurementSchema>> deviceSchemaMap,
Set<String> alignedDevices,
Map<String, List<Integer>> deviceColumnIndices)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < columnNames.size(); i++) {
String column = columnNames.get(i);
if (!column.startsWith("root.")) {
continue;
}
TSDataType tsDataType = getTsDataType(columnTypes.get(i));
Path path = new Path(column, true);
String deviceId = path.getDeviceString();
// query whether the device is aligned or not
try (SessionDataSet deviceDataSet =
session.executeQueryStatement("show devices " + deviceId, timeout)) {
List<Field> deviceList = deviceDataSet.next().getFields();
if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) {
alignedDevices.add(deviceId);
}
}

// query timeseries metadata
MeasurementSchema measurementSchema =
new MeasurementSchema(path.getMeasurement(), tsDataType);
List<Field> seriesList =
session.executeQueryStatement("show timeseries " + column, timeout).next().getFields();
measurementSchema.setEncoding(
TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize());
measurementSchema.setCompressor(
CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize());

deviceSchemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema);
deviceColumnIndices.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(i);
}
}

private static List<Tablet> constructTablets(
Map<String, List<IMeasurementSchema>> deviceSchemaMap,
Set<String> alignedDevices,
TsFileWriter tsFileWriter)
throws WriteProcessException {
List<Tablet> tabletList = new ArrayList<>(deviceSchemaMap.size());
for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry : deviceSchemaMap.entrySet()) {
String deviceId = stringListEntry.getKey();
List<IMeasurementSchema> schemaList = stringListEntry.getValue();
Tablet tablet = new Tablet(deviceId, schemaList);
tablet.initBitMaps();
Path path = new Path(tablet.getDeviceId());
if (alignedDevices.contains(tablet.getDeviceId())) {
tsFileWriter.registerAlignedTimeseries(path, schemaList);
} else {
tsFileWriter.registerTimeseries(path, schemaList);
}
tabletList.add(tablet);
}
return tabletList;
}

private static void writeWithTablets(
SessionDataSet sessionDataSet,
List<Tablet> tabletList,
Set<String> alignedDevices,
TsFileWriter tsFileWriter,
Map<String, List<Integer>> deviceColumnIndices)
throws IoTDBConnectionException,
StatementExecutionException,
IOException,
WriteProcessException {
while (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();

for (Tablet tablet : tabletList) {
String deviceId = tablet.getDeviceId();
List<Integer> columnIndices = deviceColumnIndices.get(deviceId);
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
List<IMeasurementSchema> schemas = tablet.getSchemas();

for (int i = 0, columnIndicesSize = columnIndices.size(); i < columnIndicesSize; i++) {
Integer columnIndex = columnIndices.get(i);
IMeasurementSchema measurementSchema = schemas.get(i);
// -1 for time not in fields
Object value = fields.get(columnIndex - 1).getObjectValue(measurementSchema.getType());
if (value == null) {
tablet.bitMaps[i].mark(rowIndex);
}
tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
}

if (tablet.rowSize == tablet.getMaxRowNumber()) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
tablet.initBitMaps();
tablet.reset();
}
}
}

for (Tablet tablet : tabletList) {
if (tablet.rowSize != 0) {
writeToTsFile(alignedDevices, tsFileWriter, tablet);
}
}
}

@SuppressWarnings({
"squid:S3776",
"squid:S6541"
}) // Suppress high Cognitive Complexity warning, Suppress many task in one method warning
public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePath)
public static void writeWithTablets(SessionDataSet sessionDataSet, String filePath)
throws IOException,
IoTDBConnectionException,
StatementExecutionException,
Expand All @@ -325,91 +435,32 @@ public static void writeTsFileFile(SessionDataSet sessionDataSet, String filePat
if (f.exists()) {
Files.delete(f.toPath());
}
HashSet<String> deviceFilterSet = new HashSet<>();

try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
Map<String, List<IMeasurementSchema>> schemaMap = new LinkedHashMap<>();
for (int i = 0; i < columnNames.size(); i++) {
String column = columnNames.get(i);
if (!column.startsWith("root.")) {
continue;
}
TSDataType tsDataType = getTsDataType(columnTypes.get(i));
Path path = new Path(column, true);
String deviceId = path.getDeviceString();
try (SessionDataSet deviceDataSet =
session.executeQueryStatement("show devices " + deviceId, timeout)) {
List<Field> deviceList = deviceDataSet.next().getFields();
if (deviceList.size() > 1 && "true".equals(deviceList.get(1).getStringValue())) {
deviceFilterSet.add(deviceId);
}
}
MeasurementSchema measurementSchema =
new MeasurementSchema(path.getMeasurement(), tsDataType);
// device -> column indices in columnNames
Map<String, List<Integer>> deviceColumnIndices = new HashMap<>();
Set<String> alignedDevices = new HashSet<>();
Map<String, List<IMeasurementSchema>> deviceSchemaMap = new LinkedHashMap<>();

List<Field> seriesList =
session.executeQueryStatement("show timeseries " + column, timeout).next().getFields();
collectSchemas(
columnNames, columnTypes, deviceSchemaMap, alignedDevices, deviceColumnIndices);

List<Tablet> tabletList = constructTablets(deviceSchemaMap, alignedDevices, tsFileWriter);

measurementSchema.setEncoding(
TSEncoding.valueOf(seriesList.get(4).getStringValue()).serialize());
measurementSchema.setCompressor(
CompressionType.valueOf(seriesList.get(5).getStringValue()).serialize());
schemaMap.computeIfAbsent(deviceId, key -> new ArrayList<>()).add(measurementSchema);
}
List<Tablet> tabletList = new ArrayList<>();
for (Map.Entry<String, List<IMeasurementSchema>> stringListEntry : schemaMap.entrySet()) {
String deviceId = stringListEntry.getKey();
List<IMeasurementSchema> schemaList = stringListEntry.getValue();
Tablet tablet = new Tablet(deviceId, schemaList);
tablet.initBitMaps();
Path path = new Path(tablet.getDeviceId());
if (deviceFilterSet.contains(tablet.getDeviceId())) {
tsFileWriter.registerAlignedTimeseries(path, schemaList);
} else {
tsFileWriter.registerTimeseries(path, schemaList);
}
tabletList.add(tablet);
}
if (tabletList.isEmpty()) {
ioTPrinter.println("!!!Warning:Tablet is empty,no data can be exported.");
System.exit(CODE_ERROR);
}
while (sessionDataSet.hasNext()) {
RowRecord rowRecord = sessionDataSet.next();
List<Field> fields = rowRecord.getFields();
int i = 0;
while (i < fields.size()) {
for (Tablet tablet : tabletList) {
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
List<IMeasurementSchema> schemas = tablet.getSchemas();
for (int j = 0; j < schemas.size(); j++) {
IMeasurementSchema measurementSchema = schemas.get(j);
Object value = fields.get(i).getObjectValue(measurementSchema.getType());
if (value == null) {
tablet.bitMaps[j].mark(rowIndex);
}
tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
i++;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
tablet.initBitMaps();
tablet.reset();
}
}
}
}
for (Tablet tablet : tabletList) {
if (tablet.rowSize != 0) {
writeToTsfile(deviceFilterSet, tsFileWriter, tablet);
}
}

writeWithTablets(
sessionDataSet, tabletList, alignedDevices, tsFileWriter, deviceColumnIndices);

tsFileWriter.flushAllChunkGroups();
}
}

private static void writeToTsfile(
HashSet<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
private static void writeToTsFile(
Set<String> deviceFilterSet, TsFileWriter tsFileWriter, Tablet tablet)
throws IOException, WriteProcessException {
if (deviceFilterSet.contains(tablet.getDeviceId())) {
tsFileWriter.writeAligned(tablet);
Expand Down
Loading