Skip to content

Commit

Permalink
Fix NodeInfo processLoadSnapshot method (avoid compatibility issues c…
Browse files Browse the repository at this point in the history
…aused by ainode serialization) (apache#13622) (apache#13637)
  • Loading branch information
liyuheng55555 authored Sep 26, 2024
1 parent 7b36d8f commit f27a249
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -722,21 +723,24 @@ public void processLoadSnapshot(File snapshotDir) throws IOException, TException
aiNodeInfoReadWriteLock.writeLock().lock();
versionInfoReadWriteLock.writeLock().lock();

try (FileInputStream fileInputStream = new FileInputStream(snapshotFile);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
try (ByteArrayInputStream inputStream =
new ByteArrayInputStream(Files.readAllBytes(snapshotFile.toPath()));
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(inputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);

clear();

nextNodeId.set(ReadWriteIOUtils.readInt(fileInputStream));
nextNodeId.set(ReadWriteIOUtils.readInt(inputStream));

deserializeRegisteredConfigNode(fileInputStream, protocol);
deserializeRegisteredConfigNode(inputStream, protocol);

deserializeRegisteredDataNode(fileInputStream, protocol);
deserializeRegisteredDataNode(inputStream, protocol);

deserializeRegisteredAINode(fileInputStream, protocol);
// TODO: Compatibility design. Should replace this function to actual deserialization method
// in IoTDB 2.2 / 1.5
tryDeserializeRegisteredAINode(inputStream, protocol);

deserializeBuildInfo(fileInputStream);
deserializeBuildInfo(inputStream);

} finally {
versionInfoReadWriteLock.writeLock().unlock();
Expand Down Expand Up @@ -770,6 +774,18 @@ private void deserializeRegisteredDataNode(InputStream inputStream, TProtocol pr
}
}

private void tryDeserializeRegisteredAINode(ByteArrayInputStream inputStream, TProtocol protocol)
throws IOException {
try {
// 0 has no meaning here
inputStream.mark(0);
deserializeRegisteredAINode(inputStream, protocol);
} catch (IOException | TException ignore) {
// Exception happens here means that the data is upgraded from the old version
inputStream.reset();
}
}

private void deserializeRegisteredAINode(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
int size = ReadWriteIOUtils.readInt(inputStream);
Expand Down Expand Up @@ -805,6 +821,7 @@ public void clear() {
nextNodeId.set(-1);
registeredDataNodes.clear();
registeredConfigNodes.clear();
registeredAINodes.clear();
nodeVersionInfo.clear();
}

Expand Down

0 comments on commit f27a249

Please sign in to comment.