Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
THUMarkLau committed May 24, 2024
1 parent 94152b2 commit 07e1c96
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public abstract class LogWriter implements ILogWriter {
private final ICompressor compressor = ICompressor.getCompressor(compressionAlg);
private final ByteBuffer compressedByteBuffer;
// Minimum size to compress, default is 32 KB
private static final long MIN_COMPRESS_SIZE = 32 * 1024;
private static final long MIN_COMPRESS_SIZE = 0;

protected LogWriter(File logFile) throws IOException {
this.logFile = logFile;
Expand All @@ -74,6 +74,9 @@ protected LogWriter(File logFile) throws IOException {
@Override
public double write(ByteBuffer buffer) throws IOException {
int bufferSize = buffer.position();
if (bufferSize == 0) {
return 1.0;
}
buffer.flip();
boolean compressed = false;
int uncompressedSize = bufferSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class WALInputStream extends InputStream implements AutoCloseable {
private final ByteBuffer headerBuffer = ByteBuffer.allocate(Integer.BYTES + 1);
private final ByteBuffer compressedHeader = ByteBuffer.allocate(Integer.BYTES);
private ByteBuffer dataBuffer = null;
private long fileSize;
File logFile;

enum FileVersion {
Expand All @@ -49,6 +50,7 @@ enum FileVersion {

public WALInputStream(File logFile) throws IOException {
channel = FileChannel.open(logFile.toPath());
fileSize = channel.size();
analyzeFileVersion();
this.logFile = logFile;
}
Expand Down Expand Up @@ -111,7 +113,11 @@ public void close() throws IOException {

@Override
public int available() throws IOException {
return (int) (channel.size() - channel.position());
long size = (channel.size() - channel.position());
if (!Objects.isNull(dataBuffer)) {
size += dataBuffer.limit() - dataBuffer.position();
}
return (int) size;
}

private void loadNextSegment() throws IOException {
Expand All @@ -126,6 +132,14 @@ private void loadNextSegment() throws IOException {

private void loadNextSegmentV1() throws IOException {
// just read raw data as input
if (channel.position() >= fileSize) {
throw new IOException("Unexpected end of file");
}
if (Objects.isNull(dataBuffer)) {
// read 128 KB
dataBuffer = ByteBuffer.allocate(128 * 1024);
}
dataBuffer.clear();
channel.read(dataBuffer);
dataBuffer.flip();
}
Expand Down Expand Up @@ -180,4 +194,41 @@ private void tryLoadSegment() throws IOException {
version = FileVersion.V1;
}
}

public void skipToGivenPosition(long pos) throws IOException {
if (version == FileVersion.V2) {
channel.position(WALWriter.MAGIC_STRING_BYTES);
ByteBuffer buffer = ByteBuffer.allocate(Byte.BYTES + Integer.BYTES);
long posRemain = pos;
int currSegmentSize = 0;
while (posRemain > 0) {
buffer.clear();
channel.read(buffer);
buffer.flip();
buffer.get();
currSegmentSize = buffer.getInt();
if (posRemain >= currSegmentSize) {
posRemain -= currSegmentSize;
} else {
break;
}
}
dataBuffer = ByteBuffer.allocate(currSegmentSize);
channel.read(dataBuffer);
dataBuffer.position((int) posRemain);
} else {
dataBuffer.clear();
channel.position(pos);
}
}

public void read(ByteBuffer buffer) throws IOException {
int totalBytesToBeRead = buffer.remaining();
int currReadBytes = Math.min(dataBuffer.remaining(), buffer.remaining());
dataBuffer.get(buffer.array(), buffer.position(), currReadBytes);
if (totalBytesToBeRead - currReadBytes > 0) {
loadNextSegment();
read(buffer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.io.WALInputStream;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.WALNode;

import org.apache.tsfile.utils.Pair;
Expand Down Expand Up @@ -100,11 +101,11 @@ ByteBuffer read() throws IOException {
if (!canRead()) {
throw new IOException("Target file hasn't been specified.");
}
try (FileChannel channel = openReadFileChannel()) {
try (WALInputStream is = openReadFileStream()) {
is.skipToGivenPosition(position);
ByteBuffer buffer = ByteBuffer.allocate(size);
channel.position(position);
channel.read(buffer);
buffer.clear();
is.read(buffer);
buffer.flip();
return buffer;
}
}
Expand Down Expand Up @@ -135,6 +136,26 @@ public FileChannel openReadFileChannel() throws IOException {
}
}

public WALInputStream openReadFileStream() throws IOException {
if (isInSealedFile()) {
walFile = walNode.getWALFile(walFileVersionId);
return new WALInputStream(walFile);
} else {
try {
walFile = walNode.getWALFile(walFileVersionId);
return new WALInputStream(walFile);
} catch (IOException e) {
// unsealed file may be renamed after sealed, so we should try again
if (isInSealedFile()) {
walFile = walNode.getWALFile(walFileVersionId);
return new WALInputStream(walFile);
} else {
throw e;
}
}
}
}

public File getWalFile() {
return walFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ public void testEmptyFile() throws IOException {
// recover
WALRecoverWriter walRecoverWriter = new WALRecoverWriter(logFile);
walRecoverWriter.recover(walMetaData);
// verify file, marker + metadata(search index + size number) + metadata size + magic string
// verify file, marker + metadata(search index + size number) + metadata size + head magic
// string + tail magic string
Assert.assertEquals(
Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES + WALWriter.MAGIC_STRING_BYTES,
Byte.BYTES
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
+ WALWriter.MAGIC_STRING_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
Expand All @@ -95,7 +99,10 @@ public void testFileWithoutMagicString() throws IOException {
walRecoverWriter.recover(walMetaData);
// verify file, marker + metadata(search index + size number) + metadata size + magic string
Assert.assertEquals(
Byte.BYTES + (Long.BYTES + Integer.BYTES) + Integer.BYTES + WALWriter.MAGIC_STRING_BYTES,
Byte.BYTES
+ (Long.BYTES + Integer.BYTES)
+ Integer.BYTES
+ WALWriter.MAGIC_STRING_BYTES * 2,
logFile.length());
try (WALByteBufReader reader = new WALByteBufReader(logFile)) {
Assert.assertFalse(reader.hasNext());
Expand Down

0 comments on commit 07e1c96

Please sign in to comment.