Skip to content

Commit

Permalink
[log] Change compression type as a storage configuration and support …
Browse files Browse the repository at this point in the history
…compression level (#314)
  • Loading branch information
wuchong committed Jan 19, 2025
1 parent 8a95d28 commit 2658ea0
Showing 43 changed files with 456 additions and 248 deletions.
Original file line number Diff line number Diff line change
@@ -100,8 +100,8 @@ public boolean isClosed() {
}

@Override
public int sizeInBytes() {
return recordsBuilder.getSizeInBytes();
public int estimatedSizeInBytes() {
return recordsBuilder.estimatedSizeInBytes();
}

@Override
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ public void resetWriterState(long writerId, int batchSequence) {
}

@Override
public int sizeInBytes() {
public int estimatedSizeInBytes() {
return recordsBuilder.getSizeInBytes();
}
}
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ public boolean isClosed() {
}

@Override
public int sizeInBytes() {
public int estimatedSizeInBytes() {
return recordsBuilder.getSizeInBytes();
}

Original file line number Diff line number Diff line change
@@ -22,7 +22,6 @@
import com.alibaba.fluss.cluster.BucketLocation;
import com.alibaba.fluss.cluster.Cluster;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.compression.ArrowCompressionType;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.FlussRuntimeException;
@@ -108,8 +107,6 @@ public final class RecordAccumulator {

private final IdempotenceManager idempotenceManager;

private final ArrowCompressionType arrowCompressionType;

// TODO add retryBackoffMs to retry the produce request upon receiving an error.
// TODO add deliveryTimeoutMs to report success or failure on record delivery.
// TODO add nextBatchExpiryTimeMs
@@ -129,8 +126,6 @@ public final class RecordAccumulator {
this.batchSize =
Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes());

this.arrowCompressionType = conf.get(ConfigOptions.CLIENT_WRITER_ARROW_COMPRESSION_TYPE);

this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf);
this.pagesPerBatch = Math.max(1, batchSize / writerBufferPool.pageSize());
this.bufferAllocator = new RootAllocator(Long.MAX_VALUE);
@@ -509,7 +504,7 @@ private RecordAppendResult appendNewBatch(
schemaId,
outputView.getPreAllocatedSize(),
tableInfo.getTableDescriptor().getSchema().toRowType(),
arrowCompressionType);
tableInfo.getTableDescriptor().getArrowCompressionInfo());
batch =
new ArrowLogWriteBatch(
tb, physicalTablePath, schemaId, arrowWriter, outputView);
@@ -578,7 +573,7 @@ private List<WriteBatch> drainBatchesForOneNode(Cluster cluster, ServerNode node

// TODO retry back off check.

if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size
// due to compression; in this case we will still eventually send this batch in
// a single request.
@@ -626,7 +621,7 @@ private List<WriteBatch> drainBatchesForOneNode(Cluster cluster, ServerNode node
// the rest of the work by processing outside the lock close() is particularly expensive
Preconditions.checkNotNull(batch, "batch should not be null");
batch.close();
size += batch.sizeInBytes();
size += batch.estimatedSizeInBytes();
ready.add(batch);
// mark the batch as drained.
batch.drained(System.currentTimeMillis());
Original file line number Diff line number Diff line change
@@ -214,11 +214,13 @@ private void sendWriteData() throws Exception {

if (!batches.isEmpty()) {
addToInflightBatches(batches);
updateWriterMetrics(batches);

// TODO add logic for batch expire.

sendWriteRequests(batches);

// move metrics update to the end to make sure the batches has been built.
updateWriterMetrics(batches);
}
}

@@ -527,10 +529,14 @@ private void updateWriterMetrics(Map<Integer, List<WriteBatch>> batches) {
int recordCount = batch.getRecordCount();
writerMetricGroup.recordsSendTotal().inc(recordCount);
writerMetricGroup.setBatchQueueTimeMs(batch.getQueueTimeMs());
writerMetricGroup.bytesSendTotal().inc(batch.sizeInBytes());
writerMetricGroup
.bytesSendTotal()
.inc(batch.estimatedSizeInBytes());

writerMetricGroup.recordPerBatch().update(recordCount);
writerMetricGroup.bytesPerBatch().update(batch.sizeInBytes());
writerMetricGroup
.bytesPerBatch()
.update(batch.estimatedSizeInBytes());
}
});
}
Original file line number Diff line number Diff line change
@@ -88,11 +88,11 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
public abstract boolean isClosed();

/**
* get size in bytes.
*
* @return the size in bytes
* Get an estimate of the number of bytes written to the underlying buffer. The returned value
* is exactly correct if the record set is not compressed or if the batch has been {@link
* #build()}.
*/
public abstract int sizeInBytes();
public abstract int estimatedSizeInBytes();

/**
* get pooled memory segments to de-allocate. After produceLog/PutKv acks, the {@link
Original file line number Diff line number Diff line change
@@ -48,6 +48,7 @@
import java.util.Map;

import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.toByteBuffer;
import static com.alibaba.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION;
import static com.alibaba.fluss.record.TestData.DATA2;
import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE;
import static com.alibaba.fluss.record.TestData.DATA2_TABLE_ID;
@@ -235,7 +236,7 @@ private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, Projec

FileLogProjection fileLogProjection = new FileLogProjection();
fileLogProjection.setCurrentProjection(
DATA2_TABLE_ID, rowType, projection.getProjectionInOrder());
DATA2_TABLE_ID, rowType, NO_COMPRESSION, projection.getProjectionInOrder());
ByteBuffer buffer =
toByteBuffer(
fileLogProjection
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import com.alibaba.fluss.client.table.writer.TableWriter;
import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.compression.ArrowCompressionType;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.config.MemorySize;
@@ -54,19 +53,17 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
@@ -937,28 +934,30 @@ void testFirstRowMergeEngine() throws Exception {
}

@ParameterizedTest
@MethodSource("arrowCompressionTypes")
void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType)
throws Exception {
@CsvSource({"none,3", "lz4_frame,3", "zstd,3", "zstd,9"})
void testArrowCompressionAndProject(String compression, String level) throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.STRING())
.column("d", DataTypes.BIGINT())
.build();
TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(schema)
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE.key(), compression)
.property(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL.key(), level)
.build();
TablePath tablePath = TablePath.of("test_db_1", "test_arrow_compression_and_project");
createTable(tablePath, tableDescriptor, false);

Configuration conf = new Configuration(clientConf);
conf.set(ConfigOptions.CLIENT_WRITER_ARROW_COMPRESSION_TYPE, arrowCompressionType);
try (Connection conn = ConnectionFactory.createConnection(conf);
try (Connection conn = ConnectionFactory.createConnection(clientConf);
Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
int expectedSize = 30;
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend " + i : null;
String value = i % 2 == 0 ? "hello, friend " + i : null;
InternalRow row = row(schema.toRowType(), new Object[] {i, 100, value, i * 10L});
appendWriter.append(row);
if (i % 10 == 0) {
@@ -980,7 +979,7 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType)
assertThat(scanRecord.getRow().getInt(1)).isEqualTo(100);
if (count % 2 == 0) {
assertThat(scanRecord.getRow().getString(2).toString())
.isEqualTo("hello, friend " + count);
.isEqualTo("hello, friend " + count);
} else {
// check null values
assertThat(scanRecord.getRow().isNullAt(2)).isTrue();
@@ -1003,7 +1002,7 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType)
assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count);
if (count % 2 == 0) {
assertThat(scanRecord.getRow().getString(1).toString())
.isEqualTo("hello, friend " + count);
.isEqualTo("hello, friend " + count);
} else {
// check null values
assertThat(scanRecord.getRow().isNullAt(1)).isTrue();
@@ -1015,8 +1014,4 @@ void testArrowCompressionAndProject(ArrowCompressionType arrowCompressionType)
logScanner.close();
}
}

private static Stream<ArrowCompressionType> arrowCompressionTypes() {
return Arrays.stream(ArrowCompressionType.values());
}
}
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@

package com.alibaba.fluss.client.write;

import com.alibaba.fluss.compression.ArrowCompressionType;
import com.alibaba.fluss.compression.ArrowCompressionInfo;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.memory.PreAllocatedPagedOutputView;
import com.alibaba.fluss.memory.TestingMemorySegmentPool;
@@ -128,7 +128,7 @@ void testAppendWithPreAllocatedMemorySegments() throws Exception {
DATA1_TABLE_INFO.getSchemaId(),
maxSizeInBytes,
DATA1_ROW_TYPE,
ArrowCompressionType.NO),
ArrowCompressionInfo.NO_COMPRESSION),
new PreAllocatedPagedOutputView(memorySegmentList));
assertThat(arrowLogWriteBatch.pooledMemorySegments()).isEqualTo(memorySegmentList);

@@ -180,7 +180,7 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
DATA1_TABLE_INFO.getSchemaId(),
maxSizeInBytes,
DATA1_ROW_TYPE,
ArrowCompressionType.NO),
ArrowCompressionInfo.NO_COMPRESSION),
new UnmanagedPagedOutputView(128));
}

Original file line number Diff line number Diff line change
@@ -32,9 +32,9 @@
* CommonsCompressionFactory.
*/
@Internal
public class FlussArrowCompressionFactory implements CompressionCodec.Factory {
public class ArrowCompressionFactory implements CompressionCodec.Factory {

public static final FlussArrowCompressionFactory INSTANCE = new FlussArrowCompressionFactory();
public static final ArrowCompressionFactory INSTANCE = new ArrowCompressionFactory();

@Override
public CompressionCodec createCodec(CompressionUtil.CodecType codecType) {
@@ -67,7 +67,7 @@ public CompressionCodec createCodec(CompressionUtil.CodecType codecType, int com
public static CompressionUtil.CodecType toArrowCompressionCodecType(
ArrowCompressionType compressionType) {
switch (compressionType) {
case NO:
case NONE:
return CompressionUtil.CodecType.NO_COMPRESSION;
case LZ4_FRAME:
return CompressionUtil.CodecType.LZ4_FRAME;
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.compression;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.shaded.arrow.org.apache.arrow.vector.compression.CompressionCodec;

/** Compression information for Arrow record batches. */
public class ArrowCompressionInfo {

public static final ArrowCompressionInfo NO_COMPRESSION =
new ArrowCompressionInfo(ArrowCompressionType.NONE, -1);

private final ArrowCompressionType compressionType;
private final int compressionLevel;

public ArrowCompressionInfo(ArrowCompressionType compressionType, int compressionLevel) {
this.compressionType = compressionType;
this.compressionLevel = compressionLevel;
}

public ArrowCompressionType getCompressionType() {
return compressionType;
}

/**
* Get the compression level. If the compression level is not supported by the compression type,
* -1 is returned.
*/
public int getCompressionLevel() {
return compressionLevel;
}

/** Create a Arrow compression codec based on the compression type and level. */
public CompressionCodec createCompressionCodec() {
return ArrowCompressionFactory.INSTANCE.createCodec(
ArrowCompressionFactory.toArrowCompressionCodecType(compressionType),
compressionLevel);
}

@Override
public String toString() {
return compressionLevel == -1
? compressionType.toString()
: compressionType + "-" + compressionLevel;
}

public static ArrowCompressionInfo fromConf(Configuration conf) {
ArrowCompressionType compressionType =
conf.get(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_TYPE);
if (compressionType == ArrowCompressionType.ZSTD) {
int compressionLevel = conf.get(ConfigOptions.TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL);
if (compressionLevel < 1 || compressionLevel > 22) {
throw new IllegalArgumentException(
"Invalid ZSTD compression level: "
+ compressionLevel
+ ". Expected a value between 1 and 22.");
}
return new ArrowCompressionInfo(compressionType, compressionLevel);
} else {
return new ArrowCompressionInfo(compressionType, -1);
}
}
}
Original file line number Diff line number Diff line change
@@ -18,15 +18,16 @@
package com.alibaba.fluss.compression;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metadata.LogFormat;

/**
* Supported compression types for ARROW format.
* Supported compression types for ARROW {@link LogFormat}.
*
* @since 0.6
*/
@PublicEvolving
public enum ArrowCompressionType {
NO,
NONE,
LZ4_FRAME,
ZSTD;
ZSTD
}
Loading

0 comments on commit 2658ea0

Please sign in to comment.