Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to rc/1.3.3] Check decoded page size in compaction #13432

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,13 @@ private boolean canSealCurrentPageWriter() {
}

private boolean canFlushPage(PageLoader timePage, List<PageLoader> valuePages) {
long count = timePage.getHeader().getStatistics().getCount();
boolean largeEnough =
timePage.getHeader().getUncompressedSize() >= targetPageSize
|| timePage.getHeader().getStatistics().getCount() >= targetPagePointNum;
count >= targetPagePointNum
|| Math.max(
estimateMemorySizeAsPageWriter(timePage),
timePage.getHeader().getUncompressedSize())
>= targetPageSize;
if (timeSchema.getEncodingType() != timePage.getEncoding()
|| timeSchema.getCompressor() != timePage.getCompressionType()) {
return false;
Expand All @@ -516,11 +520,53 @@ private boolean canFlushPage(PageLoader timePage, List<PageLoader> valuePages) {
if (valuePage.getModifiedStatus() == ModifiedStatus.PARTIAL_DELETED) {
return false;
}
if (valuePage.getHeader().getUncompressedSize() >= targetPageSize) {
if (Math.max(
valuePage.getHeader().getUncompressedSize(),
estimateMemorySizeAsPageWriter(valuePage))
>= targetPageSize) {
largeEnough = true;
}
}
return largeEnough;
}

private long estimateMemorySizeAsPageWriter(PageLoader pageLoader) {
long count = pageLoader.getHeader().getStatistics().getCount();
long size;
switch (pageLoader.getDataType()) {
case INT32:
case DATE:
size = count * Integer.BYTES;
break;
case TIMESTAMP:
case INT64:
case VECTOR:
size = count * Long.BYTES;
break;
case FLOAT:
size = count * Float.BYTES;
break;
case DOUBLE:
size = count * Double.BYTES;
break;
case BOOLEAN:
size = count * Byte.BYTES;
break;
case TEXT:
case STRING:
case BLOB:
size = pageLoader.getHeader().getUncompressedSize();
break;
default:
throw new IllegalArgumentException(
"Unsupported data type: " + pageLoader.getDataType().toString());
}
// Due to the fact that the page writer in memory includes some other objects
// and has a special calculation method, the estimated size will actually be
// larger. So we simply adopt the method of multiplying by 1.05 times. If this
// is not done, the result here might be close to the target page size but
// slightly smaller.
return (long) (size * 1.05);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ private void getEndOffset() throws IOException {
channel.read(metadataSizeBuf, position);
metadataSizeBuf.flip();
int metadataSize = metadataSizeBuf.getInt();
// -1 is for the endmarker
endOffset = channel.size() - version.getVersionBytes().length - Integer.BYTES - metadataSize;
} finally {
if (version == WALFileVersion.V2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,36 @@ public void testSimpleCompactionByFlushPage() throws Exception {
Collections.singletonList(targetResource), Collections.emptyList())));
}

@Test
public void testCompactionByFlushPage() throws Exception {
// chunk1: [[1000,7000]] chunk2: [[8000,15000]]
TsFileResource seqResource1 =
generateSingleAlignedSeriesFile(
"d0",
Arrays.asList("s0", "s1", "s2"),
new TimeRange[] {new TimeRange(1000, 7000), new TimeRange(8000, 15000)},
TSEncoding.RLE,
CompressionType.LZ4,
Arrays.asList(false, false, false),
true);
seqResources.add(seqResource1);
CompactionTaskSummary summary = new CompactionTaskSummary();
TsFileResource targetResource = performCompaction(summary);
seqResources.clear();
Assert.assertEquals(8, summary.getDeserializeChunkCount());
Assert.assertEquals(0, summary.getDirectlyFlushPageCount());
seqResources.add(targetResource);
summary = new CompactionTaskSummary();
// the point num of first page is less than 10000 because the page writer of it reach the page
// size limit
// chunk1: [[1000,10053], [10054,15000]]
performCompaction(summary);
Assert.assertEquals(4, summary.getDeserializeChunkCount());
// the first aligned page can be flushed directly
Assert.assertEquals(4, summary.getDirectlyFlushPageCount());
Assert.assertEquals(4, summary.getDeserializePageCount());
}

@Test
public void testSimpleCompactionByWritePoint() throws Exception {
TsFileResource seqResource1 =
Expand Down
Loading