Skip to content

Commit

Permalink
enhance
Browse files Browse the repository at this point in the history
Signed-off-by: OneSizeFitQuorum <[email protected]>
  • Loading branch information
OneSizeFitsQuorum committed Jun 19, 2024
1 parent acaad85 commit fef03fb
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,7 @@ public class IoTDBConfig {
*/
private String RateLimiterType = "FixedIntervalRateLimiter";

private CompressionType WALCompressionAlgorithm = CompressionType.UNCOMPRESSED;
private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;

IoTDBConfig() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"io_task_queue_size_for_flushing",
Integer.toString(conf.getIoTaskQueueSizeForFlushing()))));
boolean enableWALCompression =
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "false"));
Boolean.parseBoolean(properties.getProperty("enable_wal_compression", "true"));
conf.setWALCompressionAlgorithm(
enableWALCompression ? CompressionType.LZ4 : CompressionType.UNCOMPRESSED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,33 @@ public abstract class LogWriter implements ILogWriter {
protected final FileChannel logChannel;
protected long size = 0;
protected long originalSize = 0;
private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES * 2 + 1);
private final int COMPRESSED_HEADER_SIZE = Byte.BYTES + Integer.BYTES * 2;
private final int UN_COMPRESSED_HEADER_SIZE = Byte.BYTES + Integer.BYTES;
private final ByteBuffer headerBuffer = ByteBuffer.allocate(COMPRESSED_HEADER_SIZE);
private ICompressor compressor =
ICompressor.getCompressor(
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm());
private ByteBuffer compressedByteBuffer;
// Minimum size to compress, default is 32 KB
private static long minCompressionSize = 32 * 1024L;

/** Minimum size to compress, default is 32 KB */
private static long MIN_COMPRESSION_SIZE = 32 * 1024L;

protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
this.logStream = new FileOutputStream(logFile, true);
this.logChannel = this.logStream.getChannel();
if (!logFile.exists() || logFile.length() == 0) {
this.logChannel.write(
ByteBuffer.wrap(WALWriter.MAGIC_STRING.getBytes(StandardCharsets.UTF_8)));
ByteBuffer.wrap(WALWriter.MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8)));
size += logChannel.position();
}
if (IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm()
!= CompressionType.UNCOMPRESSED) {
// TODO: Use a dynamic strategy to enlarge the buffer size
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
} else {
compressedByteBuffer = null;
}
}

@Override
public double write(ByteBuffer buffer) throws IOException {
// To support hot loading, we can't define it as a variable,
// because we need to dynamically check whether wal compression is enabled
// each time the buffer is serialized
CompressionType compressionType =
IoTDBDescriptor.getInstance().getConfig().getWALCompressionAlgorithm();
int bufferSize = buffer.position();
Expand All @@ -92,12 +88,13 @@ public double write(ByteBuffer buffer) throws IOException {
int uncompressedSize = bufferSize;
if (compressionType != CompressionType.UNCOMPRESSED
/* Do not compress buffer that is less than min size */
&& bufferSize > minCompressionSize) {
&& bufferSize > MIN_COMPRESSION_SIZE) {
if (Objects.isNull(compressedByteBuffer)) {
// TODO: Use a dynamic strategy to enlarge the buffer size
compressedByteBuffer =
ByteBuffer.allocate(
compressor.getMaxBytesForCompression(
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize()));
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2));
}
compressedByteBuffer.clear();
if (compressor.getType() != compressionType) {
Expand All @@ -108,6 +105,11 @@ public double write(ByteBuffer buffer) throws IOException {
bufferSize = buffer.position();
buffer.flip();
compressed = true;
originalSize += COMPRESSED_HEADER_SIZE;
size += COMPRESSED_HEADER_SIZE;
} else {
originalSize += UN_COMPRESSED_HEADER_SIZE;
size += UN_COMPRESSED_HEADER_SIZE;
}
size += bufferSize;
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ public class WALInputStream extends InputStream implements AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(WALInputStream.class);
private final FileChannel channel;
private final ByteBuffer segmentHeaderBuffer = ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES);
private final ByteBuffer segmentHeaderWithoutConpressedSizeBuffer =
ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
private final ByteBuffer compressedSizeHeaderBuffer = ByteBuffer.allocate(Integer.BYTES);
private ByteBuffer dataBuffer = null;
private ByteBuffer compressedBuffer = null;
private long fileSize;
private final long fileSize;
File logFile;
/*
The WAL file consist of following parts:
Expand All @@ -69,7 +70,7 @@ public WALInputStream(File logFile) throws IOException {
}

private void getEndOffset() throws IOException {
if (channel.size() < WALWriter.MAGIC_STRING_BYTES + Integer.BYTES) {
if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES + Integer.BYTES) {
// An broken file
endOffset = channel.size();
return;
Expand All @@ -79,17 +80,17 @@ private void getEndOffset() throws IOException {
try {
if (version == FileVersion.V2) {
// New Version
ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_BYTES);
ByteBuffer magicStringBuffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
channel.read(magicStringBuffer, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES);
magicStringBuffer.flip();
if (!new String(magicStringBuffer.array(), StandardCharsets.UTF_8)
.equals(WALWriter.MAGIC_STRING)) {
.equals(WALWriter.MAGIC_STRING_V2)) {
// This is a broken wal file
endOffset = channel.size();
return;
} else {
// This is a normal wal file
position = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES;
position = channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES;
}
} else {
// Old version
Expand All @@ -115,29 +116,35 @@ private void getEndOffset() throws IOException {
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
endOffset = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES - metadataSize - 1;
// -1 is for the endmarker
endOffset =
channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES - metadataSize - 1;
} finally {
channel.position(WALWriter.MAGIC_STRING_BYTES);
if (version == FileVersion.V2) {
channel.position(WALWriter.MAGIC_STRING_V2_BYTES);
} else {
channel.position(WALWriter.MAGIC_STRING_V1_BYTES);
}
}
}

private void analyzeFileVersion() throws IOException {
if (channel.size() < WALWriter.MAGIC_STRING_BYTES) {
if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES) {
version = FileVersion.UNKNOWN;
return;
}
if (isCurrentVersion()) {
if (isV2Version()) {
this.version = FileVersion.V2;
return;
}
this.version = FileVersion.V1;
}

private boolean isCurrentVersion() throws IOException {
private boolean isV2Version() throws IOException {
channel.position(0);
ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
ByteBuffer buffer = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
channel.read(buffer);
return new String(buffer.array(), StandardCharsets.UTF_8).equals(WALWriter.MAGIC_STRING);
return new String(buffer.array(), StandardCharsets.UTF_8).equals(WALWriter.MAGIC_STRING_V2);
}

@Override
Expand Down Expand Up @@ -292,9 +299,9 @@ private void tryLoadSegment() throws IOException {
*/
public void skipToGivenLogicalPosition(long pos) throws IOException {
if (version == FileVersion.V2) {
channel.position(WALWriter.MAGIC_STRING_BYTES);
channel.position(WALWriter.MAGIC_STRING_V2_BYTES);
long posRemain = pos;
SegmentInfo segmentInfo = null;
SegmentInfo segmentInfo;
do {
segmentInfo = getNextSegmentInfo();
if (posRemain >= segmentInfo.uncompressedSize) {
Expand Down Expand Up @@ -340,24 +347,25 @@ public long getFileCurrentPos() throws IOException {
}

private SegmentInfo getNextSegmentInfo() throws IOException {
segmentHeaderBuffer.clear();
channel.read(segmentHeaderBuffer);
segmentHeaderBuffer.flip();
segmentHeaderWithoutConpressedSizeBuffer.clear();
channel.read(segmentHeaderWithoutConpressedSizeBuffer);
segmentHeaderWithoutConpressedSizeBuffer.flip();
SegmentInfo info = new SegmentInfo();
info.compressionType = CompressionType.deserialize(segmentHeaderBuffer.get());
info.dataInDiskSize = segmentHeaderBuffer.getInt();
info.compressionType =
CompressionType.deserialize(segmentHeaderWithoutConpressedSizeBuffer.get());
info.dataInDiskSize = segmentHeaderWithoutConpressedSizeBuffer.getInt();
if (info.compressionType != CompressionType.UNCOMPRESSED) {
compressedHeader.clear();
channel.read(compressedHeader);
compressedHeader.flip();
info.uncompressedSize = compressedHeader.getInt();
compressedSizeHeaderBuffer.clear();
channel.read(compressedSizeHeaderBuffer);
compressedSizeHeaderBuffer.flip();
info.uncompressedSize = compressedSizeHeaderBuffer.getInt();
} else {
info.uncompressedSize = info.dataInDiskSize;
}
return info;
}

private class SegmentInfo {
private static class SegmentInfo {
public CompressionType compressionType;
public int dataInDiskSize;
public int uncompressedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public int serializedSize() {
+ (memTablesId.isEmpty() ? 0 : Integer.BYTES + memTablesId.size() * Long.BYTES);
}

public void serialize(File file, ByteBuffer buffer) {
public void serialize(ByteBuffer buffer) {
buffer.putLong(firstSearchIndex);
buffer.putInt(buffersSize.size());
for (int size : buffersSize) {
Expand Down Expand Up @@ -129,12 +129,12 @@ public long getFirstSearchIndex() {
}

public static WALMetaData readFromWALFile(File logFile, FileChannel channel) throws IOException {
if (channel.size() < WALWriter.MAGIC_STRING_BYTES || !isValidMagicString(channel)) {
if (channel.size() < WALWriter.MAGIC_STRING_V2_BYTES || !isValidMagicString(channel)) {
throw new IOException(String.format("Broken wal file %s", logFile));
}
// load metadata size
ByteBuffer metadataSizeBuf = ByteBuffer.allocate(Integer.BYTES);
long position = channel.size() - WALWriter.MAGIC_STRING_BYTES - Integer.BYTES;
long position = channel.size() - WALWriter.MAGIC_STRING_V2_BYTES - Integer.BYTES;
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
// load metadata
Expand All @@ -159,11 +159,11 @@ public static WALMetaData readFromWALFile(File logFile, FileChannel channel) thr
}

private static boolean isValidMagicString(FileChannel channel) throws IOException {
ByteBuffer magicStringBytes = ByteBuffer.allocate(WALWriter.MAGIC_STRING_BYTES);
channel.read(magicStringBytes, channel.size() - WALWriter.MAGIC_STRING_BYTES);
ByteBuffer magicStringBytes = ByteBuffer.allocate(WALWriter.MAGIC_STRING_V2_BYTES);
channel.read(magicStringBytes, channel.size() - WALWriter.MAGIC_STRING_V2_BYTES);
magicStringBytes.flip();
String magicString = new String(magicStringBytes.array(), StandardCharsets.UTF_8);
return magicString.equals(WALWriter.MAGIC_STRING)
return magicString.equals(WALWriter.MAGIC_STRING_V2)
|| magicString.contains(WALWriter.MAGIC_STRING_V1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@
public class WALWriter extends LogWriter {
private static final Logger logger = LoggerFactory.getLogger(WALWriter.class);
public static final String MAGIC_STRING_V1 = "WAL";
public static final String MAGIC_STRING = "V2-WAL";
public static final int MAGIC_STRING_BYTES = MAGIC_STRING.getBytes(StandardCharsets.UTF_8).length;
public static final String MAGIC_STRING_V2 = "V2-WAL";
public static final int MAGIC_STRING_V1_BYTES =
MAGIC_STRING_V1.getBytes(StandardCharsets.UTF_8).length;
public static final int MAGIC_STRING_V2_BYTES =
MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8).length;

private WALFileStatus walFileStatus = WALFileStatus.CONTAINS_NONE_SEARCH_INDEX;
// wal files' metadata
Expand Down Expand Up @@ -68,14 +71,14 @@ private void endFile() throws IOException {
int metaDataSize = metaData.serializedSize();
ByteBuffer buffer =
ByteBuffer.allocate(
endMarker.serializedSize() + metaDataSize + Integer.BYTES + MAGIC_STRING_BYTES);
endMarker.serializedSize() + metaDataSize + Integer.BYTES + MAGIC_STRING_V2_BYTES);
// mark info part ends
endMarker.serialize(buffer);
// flush meta data
metaData.serialize(logFile, buffer);
metaData.serialize(buffer);
buffer.putInt(metaDataSize);
// add magic string
buffer.put(MAGIC_STRING.getBytes(StandardCharsets.UTF_8));
buffer.put(MAGIC_STRING_V2.getBytes(StandardCharsets.UTF_8));
size += buffer.position();
writeMetadata(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;

import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING;
import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_BYTES;
import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2;
import static org.apache.iotdb.db.storageengine.dataregion.wal.io.WALWriter.MAGIC_STRING_V2_BYTES;

/** Check whether the wal file is broken and recover it. */
public class WALRecoverWriter {
Expand All @@ -42,10 +42,10 @@ public WALRecoverWriter(File logFile) {
public void recover(WALMetaData metaData) throws IOException {
// locate broken data
long truncateSize;
if (logFile.length() < MAGIC_STRING_BYTES) { // file without magic string
if (logFile.length() < MAGIC_STRING_V2_BYTES) { // file without magic string
truncateSize = 0;
} else {
if (readTailMagic().equals(MAGIC_STRING)) { // complete file
if (readTailMagic().equals(MAGIC_STRING_V2)) { // complete file
return;
} else { // file with broken magic string
truncateSize = metaData.getTruncateOffSet();
Expand All @@ -63,8 +63,8 @@ public void recover(WALMetaData metaData) throws IOException {

private String readTailMagic() throws IOException {
try (FileChannel channel = FileChannel.open(logFile.toPath(), StandardOpenOption.READ)) {
ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_BYTES);
channel.read(magicStringBytes, channel.size() - MAGIC_STRING_BYTES);
ByteBuffer magicStringBytes = ByteBuffer.allocate(MAGIC_STRING_V2_BYTES);
channel.read(magicStringBytes, channel.size() - MAGIC_STRING_V2_BYTES);
magicStringBytes.flip();
return new String(magicStringBytes.array());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static void setMinCompressionSize(long size)
throws NoSuchFieldException, ClassNotFoundException, IllegalAccessException {
Class<?> logWriterClass =
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
Field minCompressionSizeField = logWriterClass.getDeclaredField("minCompressionSize");
Field minCompressionSizeField = logWriterClass.getDeclaredField("MIN_COMPRESSION_SIZE");
minCompressionSizeField.setAccessible(true);
minCompressionSizeField.setLong(null, size);
}
Expand All @@ -45,7 +45,7 @@ public static long getMinCompressionSize()
throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
Class<?> logWriterClass =
Class.forName("org.apache.iotdb.db.storageengine.dataregion.wal.io.LogWriter");
Field minCompressionSizeField = logWriterClass.getDeclaredField("minCompressionSize");
Field minCompressionSizeField = logWriterClass.getDeclaredField("MIN_COMPRESSION_SIZE");
minCompressionSizeField.setAccessible(true);
return minCompressionSizeField.getLong(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,10 @@ public void testUncompressedWALStructure()

try (DataInputStream dataInputStream =
new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES];
// head magic string
dataInputStream.readFully(magicStringBytes);
Assert.assertEquals(WALWriter.MAGIC_STRING, new String(magicStringBytes));
Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes));
Assert.assertEquals(
CompressionType.UNCOMPRESSED, CompressionType.deserialize(dataInputStream.readByte()));
Assert.assertEquals(buf.array().length, dataInputStream.readInt());
Expand All @@ -209,7 +209,7 @@ public void testUncompressedWALStructure()
dataInputStream.readFully(metadataBuf.array());
// Tail magic string
dataInputStream.readFully(magicStringBytes);
Assert.assertEquals(WALWriter.MAGIC_STRING, new String(magicStringBytes));
Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes));
}
}

Expand Down Expand Up @@ -243,10 +243,10 @@ public void testCompressedWALStructure()

try (DataInputStream dataInputStream =
new DataInputStream(new BufferedInputStream(Files.newInputStream(walFile.toPath())))) {
byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_BYTES];
byte[] magicStringBytes = new byte[WALWriter.MAGIC_STRING_V2_BYTES];
// head magic string
dataInputStream.readFully(magicStringBytes);
Assert.assertEquals(WALWriter.MAGIC_STRING, new String(magicStringBytes));
Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes));
Assert.assertEquals(
CompressionType.LZ4, CompressionType.deserialize(dataInputStream.readByte()));
Assert.assertEquals(compressed.length, dataInputStream.readInt());
Expand All @@ -263,7 +263,7 @@ public void testCompressedWALStructure()
dataInputStream.readFully(metadataBuf.array());
// Tail magic string
dataInputStream.readFully(magicStringBytes);
Assert.assertEquals(WALWriter.MAGIC_STRING, new String(magicStringBytes));
Assert.assertEquals(WALWriter.MAGIC_STRING_V2, new String(magicStringBytes));
}
}

Expand Down
Loading

0 comments on commit fef03fb

Please sign in to comment.