diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a5337..749f4176 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -511,7 +511,8 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .table(getTableName()) .streamLoadDataFormat(dataFormat) .chunkLimit(getChunkLimit()) - .enableUpsertDelete(supportUpsertDelete()); + .enableUpsertDelete(supportUpsertDelete()) + .addCommonProperties(getSinkStreamLoadProperties()); if (hasColumnMappingProperty()) { defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 31852a0b..13253a21 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -724,4 +724,17 @@ private String createJsonTable(String tablePrefix) throws Exception { executeSrSQL(createStarRocksTable); return tableName; } + + @Test + public void testJsonLz4Compression() throws Exception { + assumeTrue(isSinkV2); + Map map = new HashMap<>(); + map.put("sink.properties.format", "json"); + map.put("sink.properties.strip_outer_array", "true"); + map.put("sink.properties.compression", "lz4_frame"); + testConfigurationBase(map, env -> null); + + map.put("sink.at-least-once.use-transaction-stream-load", "false"); + testConfigurationBase(map, env -> null); + } } diff --git a/starrocks-stream-load-sdk/pom.xml b/starrocks-stream-load-sdk/pom.xml index 5640bd81..583b9051 100644 --- a/starrocks-stream-load-sdk/pom.xml +++ b/starrocks-stream-load-sdk/pom.xml @@ -18,6 +18,7 @@ 4.4.15 1.2.83 2.12.4 + 1.8.0 @@ -58,6 +59,12 @@ ${fasterxml.version} + + org.lz4 + lz4-java + ${lz4.version} + + org.mockito mockito-core diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java index 8b774bc3..7418eee2 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java @@ -29,6 +29,10 @@ public interface StreamLoadDataFormat { StreamLoadDataFormat JSON = new JSONFormat(); StreamLoadDataFormat CSV = new CSVFormat(); + default String name() { + return ""; + } + byte[] first(); byte[] delimiter(); byte[] end(); @@ -51,6 +55,11 @@ public CSVFormat(String rowDelimiter) { this.delimiter = rowDelimiter.getBytes(StandardCharsets.UTF_8); } + @Override + public String name() { + return "csv"; + } + @Override public byte[] first() { return EMPTY_DELIMITER; @@ -89,6 +98,11 @@ class JSONFormat implements StreamLoadDataFormat, Serializable { private static final byte[] delimiter = ",".getBytes(StandardCharsets.UTF_8); private static final byte[] end = "]".getBytes(StandardCharsets.UTF_8); + @Override + public String name() { + return "json"; + } + @Override public byte[] first() { return first; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java new file mode 100644 index 00000000..d41f7407 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * APIs that are not stable yet. + */ +@Documented +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD}) +public @interface Evolving {} \ No newline at end of file diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java new file mode 100644 index 00000000..a6050bd1 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.StreamLoadDataFormat; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Optional; + +/** Compress data with the specified compression algorithm. */ +public interface CompressionCodec { + + /** + * Create an output stream to compress the raw output stream. + * + * @param rawOutputStream the output stream to compress + * @param contentSize the content size of the raw stream. -1 if the size is unknown + * @return an output stream with compressed data + */ + OutputStream createCompressionStream(final OutputStream rawOutputStream, long contentSize) throws IOException; + + static Optional createCompressionCodec(StreamLoadDataFormat dataFormat, + Optional compressionType, + Map properties) { + if (!compressionType.isPresent()) { + return Optional.empty(); + } + + if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) { + if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) { + return Optional.of(LZ4FrameCompressionCodec.create(properties)); + } + } + + throw new UnsupportedOperationException( + String.format("Not support to compress format %s with compression type %s", + dataFormat.name(), compressionType.get())); + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java new file mode 100644 index 00000000..021851fb --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; +import org.apache.http.entity.HttpEntityWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** Wrapping entity that compresses content when writing. */ +public class CompressionHttpEntity extends HttpEntityWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(CompressionHttpEntity.class); + + private final CompressionCodec compressionCodec; + + public CompressionHttpEntity(ChunkHttpEntity entity, CompressionCodec compressionCodec) { + super(entity); + this.compressionCodec = compressionCodec; + entity.setLogAfterWrite(false); + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public boolean isChunked() { + // force content chunking + return true; + } + + @Override + public InputStream getContent() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeTo(final OutputStream outStream) throws IOException { + long startTime = System.nanoTime(); + ChunkHttpEntity entity = (ChunkHttpEntity) wrappedEntity; + final CountingOutputStream countingOutputStream = new CountingOutputStream(outStream); + final OutputStream compressOutputStream = + compressionCodec.createCompressionStream(countingOutputStream, entity.getContentLength()); + entity.writeTo(compressOutputStream); + compressOutputStream.close(); + long rawSize = entity.getContentLength(); + long compressSize = countingOutputStream.getCount(); + float compressRatio = compressSize == 0 ? 1 : (float) rawSize / compressSize; + LOG.info("Write entity for table {}, raw/compressed size:{}/{}, compress ratio:{}, time:{}us", + entity.getTableUniqueKey(), rawSize, compressSize, compressRatio, (System.nanoTime() - startTime) / 1000) ; + } + + @Override + public boolean isStreaming() { + return false; + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java new file mode 100644 index 00000000..44db6a49 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4FrameOutputStream; + +public class CompressionOptions { + + public static final String LZ4_BLOCK_SIZE = "compression.lz4.block.size"; + public static final LZ4FrameOutputStream.BLOCKSIZE DEFAULT_LZ4_BLOCK_SIZE = LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB; +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java new file mode 100644 index 00000000..f1e024d1 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** Count how many bytes written. */ +public class CountingOutputStream extends FilterOutputStream { + + private long count; + + public CountingOutputStream(OutputStream out) { + super(out); + } + + public long getCount() { + return this.count; + } + + public void write(byte[] b, int off, int len) throws IOException { + this.out.write(b, off, len); + this.count += len; + } + + public void write(int b) throws IOException { + this.out.write(b); + ++this.count; + } + +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java new file mode 100644 index 00000000..42e55d1c --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java @@ -0,0 +1,67 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameOutputStream; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** Compress data using LZ4_FRAME. */ +public class LZ4FrameCompressionCodec implements CompressionCodec { + + public static final String NAME = "LZ4_FRAME"; + + private final LZ4FrameOutputStream.BLOCKSIZE blockSize; + private final LZ4Compressor compressor; + private final XXHash32 hash; + + public LZ4FrameCompressionCodec(LZ4FrameOutputStream.BLOCKSIZE blockSize) { + this.blockSize = blockSize; + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + this.hash = XXHashFactory.fastestInstance().hash32(); + } + + @Override + public OutputStream createCompressionStream(OutputStream rawOutputStream, long contentSize) throws IOException { + return contentSize < 0 ? + new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : + new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE); + } + + LZ4FrameOutputStream.BLOCKSIZE getBlockSize() { + return blockSize; + } + + public static LZ4FrameCompressionCodec create(Map properties) { + LZ4FrameOutputStream.BLOCKSIZE blockSize = (LZ4FrameOutputStream.BLOCKSIZE) properties.getOrDefault( + CompressionOptions.LZ4_BLOCK_SIZE, CompressionOptions.DEFAULT_LZ4_BLOCK_SIZE); + return new LZ4FrameCompressionCodec(blockSize); + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 4425e37b..439472b5 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -22,10 +22,14 @@ import com.starrocks.data.load.stream.StreamLoadDataFormat; import com.starrocks.data.load.stream.StreamLoadUtils; +import com.starrocks.data.load.stream.annotation.Evolving; +import com.starrocks.data.load.stream.compress.CompressionOptions; +import net.jpountz.lz4.LZ4FrameOutputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class StreamLoadTableProperties implements Serializable { @@ -33,11 +37,14 @@ public class StreamLoadTableProperties implements Serializable { private final String database; private final String table; private final StreamLoadDataFormat dataFormat; + private final Map tableProperties; + // individual stream load properties private final Map properties; private final boolean enableUpsertDelete; private final long chunkLimit; private final int maxBufferRows; private final String columns; + private final Map commonProperties; private StreamLoadTableProperties(Builder builder) { this.database = builder.database; @@ -58,8 +65,10 @@ private StreamLoadTableProperties(Builder builder) { chunkLimit = Math.min(10737418240L, builder.chunkLimit); } this.maxBufferRows = builder.maxBufferRows; - this.properties = builder.properties; + this.tableProperties = new HashMap<>(builder.tableProperties); + this.properties = new HashMap<>(builder.properties); this.columns = builder.columns; + this.commonProperties = new HashMap<>(builder.commonProperties); } public String getColumns() {return columns; } @@ -92,10 +101,28 @@ public int getMaxBufferRows() { return maxBufferRows; } + @Evolving + public Map getTableProperties() { + return tableProperties; + } + public Map getProperties() { return properties; } + public Map getCommonProperties() { + return commonProperties; + } + + public Optional getProperty(String name) { + String value = properties.get(name); + if (value != null) { + return Optional.of(value); + } + + return Optional.ofNullable(commonProperties.get(name)); + } + public static Builder builder() { return new Builder(); } @@ -110,8 +137,13 @@ public static class Builder { private long chunkLimit; private int maxBufferRows = Integer.MAX_VALUE; + private final Map tableProperties = new HashMap<>(); + + // Stream load properties private final Map properties = new HashMap<>(); + private final Map commonProperties = new HashMap<>(); + private Builder() { } @@ -132,6 +164,8 @@ public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) { streamLoadDataFormat(streamLoadTableProperties.getDataFormat()); chunkLimit(streamLoadTableProperties.getChunkLimit()); maxBufferRows(streamLoadTableProperties.getMaxBufferRows()); + tableProperties.putAll(streamLoadTableProperties.getTableProperties()); + commonProperties.putAll(streamLoadTableProperties.getCommonProperties()); return this; } @@ -185,6 +219,16 @@ public Builder addProperty(String key, String value) { return this; } + public Builder setLZ4BlockSize(LZ4FrameOutputStream.BLOCKSIZE blockSize) { + tableProperties.put(CompressionOptions.LZ4_BLOCK_SIZE, blockSize); + return this; + } + + public Builder addCommonProperties(Map properties) { + this.commonProperties.putAll(properties); + return this; + } + public StreamLoadTableProperties build() { if (database == null || table == null) { throw new IllegalArgumentException(String.format("database `%s` or table `%s` can't be null", database, table)); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java index 3a7acdd0..ee4cb129 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java @@ -35,24 +35,34 @@ public class ChunkHttpEntity extends AbstractHttpEntity { - private static final Logger log = LoggerFactory.getLogger(ChunkHttpEntity.class); + private static final Logger LOG = LoggerFactory.getLogger(ChunkHttpEntity.class); protected static final int OUTPUT_BUFFER_SIZE = 2048; private static final Header CONTENT_TYPE = new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_OCTET_STREAM.toString()); private final String tableUniqueKey; - private final InputStream content; + private final Chunk chunk; private final long contentLength; + private boolean logAfterWrite; public ChunkHttpEntity(String tableUniqueKey, Chunk chunk) { this.tableUniqueKey = tableUniqueKey; - this.content = new ChunkInputStream(chunk); + this.chunk = chunk; this.contentLength = chunk.chunkBytes(); + this.logAfterWrite = true; + } + + public String getTableUniqueKey() { + return tableUniqueKey; + } + + public void setLogAfterWrite(boolean logAfterWrite) { + this.logAfterWrite = logAfterWrite; } @Override public boolean isRepeatable() { - return false; + return true; } @Override @@ -77,26 +87,27 @@ public Header getContentEncoding() { @Override public InputStream getContent() { - return content; + return new ChunkInputStream(chunk); } @Override public void writeTo(OutputStream outputStream) throws IOException { - long total = 0; - try (InputStream inputStream = this.content) { + long startTime = System.nanoTime(); + try (InputStream inputStream = new ChunkInputStream(chunk)) { final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; - int l; - while ((l = inputStream.read(buffer)) != -1) { - total += l; - outputStream.write(buffer, 0, l); + int len; + while ((len = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, len); } } - log.info("Entity write end, uniqueKey: {}, contentLength : {}, total : {}", - tableUniqueKey, contentLength, total); + if (logAfterWrite || LOG.isDebugEnabled()) { + LOG.info("Write entity for table {}, size:{}, time:{}us", + tableUniqueKey, contentLength, (System.nanoTime() - startTime) / 1000); + } } @Override public boolean isStreaming() { - return true; + return false; } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 9a53cccd..26bfb457 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -25,6 +25,8 @@ import com.starrocks.data.load.stream.StreamLoadSnapshot; import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.TableRegion; +import com.starrocks.data.load.stream.compress.CompressionCodec; +import com.starrocks.data.load.stream.compress.CompressionHttpEntity; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.http.StreamLoadEntityMeta; import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; @@ -32,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +60,7 @@ enum State { private final String database; private final String table; private final StreamLoadTableProperties properties; + private final Optional compressionCodec; private final AtomicLong age = new AtomicLong(0L); private final AtomicLong cacheBytes = new AtomicLong(); private final AtomicLong cacheRows = new AtomicLong(); @@ -91,6 +95,10 @@ public TransactionTableRegion(String uniqueKey, this.properties = properties; this.streamLoader = streamLoader; this.labelGenerator = labelGenerator; + this.compressionCodec = CompressionCodec.createCompressionCodec( + properties.getDataFormat(), + properties.getProperty("compression"), + properties.getTableProperties()); this.state = new AtomicReference<>(State.ACTIVE); this.lastCommitTimeMills = System.currentTimeMillis(); this.activeChunk = new Chunk(properties.getDataFormat()); @@ -366,7 +374,10 @@ protected void streamLoad(int delayMs) { @Override public HttpEntity getHttpEntity() { - return new ChunkHttpEntity(uniqueKey, inactiveChunks.peek()); + ChunkHttpEntity entity = new ChunkHttpEntity(uniqueKey, inactiveChunks.peek()); + return compressionCodec + .map(codec -> (HttpEntity) new CompressionHttpEntity(entity, codec)) + .orElse(entity); } @Override diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java index 63e2ba64..7c54a9a3 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java @@ -27,6 +27,7 @@ import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; public class ChunkHttpEntityTest { @@ -34,6 +35,7 @@ public class ChunkHttpEntityTest { public void testWrite() throws Exception { ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + assertTrue(entity.isRepeatable()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); entity.writeTo(outputStream); assertArrayEquals(chunkMeta.expectedData, outputStream.toByteArray()); diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java index 19dde9f3..aa93f9c5 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java @@ -107,8 +107,8 @@ public static ChunkMeta genChunk() { } public static class ChunkMeta { - Chunk chunk; - byte[] expectedData; + public Chunk chunk; + public byte[] expectedData; public ChunkMeta(Chunk chunk, byte[] expectedData) { this.chunk = chunk; diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java new file mode 100644 index 00000000..4f63c806 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.ChunkInputStreamTest; +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; + +import java.io.ByteArrayOutputStream; + +import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; +import static org.junit.Assert.assertArrayEquals; + +/** Test base for {@link CompressionCodec}. */ +public abstract class CompressionCodecTestBase { + + protected abstract byte[] decompress(byte[] compressedData, int rawSize) throws Exception; + + public void testCompressBase(CompressionCodec compressionCodec) throws Exception { + ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); + ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + compressionEntity.writeTo(outputStream); + byte[] result = outputStream.toByteArray(); + byte[] descompressData = decompress(result, (int) entity.getContentLength()); + assertArrayEquals(chunkMeta.expectedData, descompressData); + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java new file mode 100644 index 00000000..05317f4a --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.ChunkInputStreamTest; +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link CompressionHttpEntity}. */ +public class CompressionHttpEntityTest { + + @Test + public void testBasic() throws Exception { + ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); + ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + MockCompressionCodec compressionCodec = new MockCompressionCodec(); + CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + compressionEntity.writeTo(outputStream); + assertArrayEquals(chunkMeta.expectedData, outputStream.toByteArray()); + assertEquals(1, compressionCodec.getStreams().size()); + assertEquals(entity.getContentLength(), compressionCodec.getStreams().get(0).getCount()); + } + + private static class MockCompressionCodec implements CompressionCodec { + + private final List streams; + + public MockCompressionCodec() { + this.streams = new ArrayList<>(); + } + + public List getStreams() { + return streams; + } + + @Override + public OutputStream createCompressionStream(OutputStream rawOutputStream, long contentSize) throws IOException { + this.streams.add(new CountingOutputStream(rawOutputStream)); + return streams.get(streams.size() - 1); + } + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java new file mode 100644 index 00000000..66f123e3 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link CountingOutputStream}. */ +public class CountingOutputStreamTest { + + @Test + public void testCount() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + + byte[] data = new byte[100]; + ThreadLocalRandom.current().nextBytes(data); + countingOutputStream.write(data[0]); + assertEquals(1, countingOutputStream.getCount()); + countingOutputStream.write(data, 1, data.length - 2); + assertEquals(data.length - 1, countingOutputStream.getCount()); + countingOutputStream.write(data[data.length - 1]); + assertEquals(data.length, countingOutputStream.getCount()); + countingOutputStream.write(data); + assertEquals(data.length * 2, countingOutputStream.getCount()); + countingOutputStream.close(); + + byte[] result = outputStream.toByteArray(); + assertEquals(data.length * 2, result.length); + for (int i = 0; i < result.length;) { + for (int j = 0; j < data.length; j++, i++) { + assertEquals(data[j], result[i]); + } + } + assertEquals(data.length * 2, countingOutputStream.getCount()); + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java new file mode 100644 index 00000000..ac930224 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class LZ4FrameCompressionCodecTest extends CompressionCodecTestBase { + + @Override + protected byte[] decompress(byte[] compressedData, int rawSize) throws Exception { + byte[] result = new byte[rawSize]; + LZ4FrameInputStream inputStream = new LZ4FrameInputStream(new ByteArrayInputStream(compressedData)); + inputStream.read(result); + inputStream.close(); + return result; + } + + @Test + public void testCreate() { + Map properties = new HashMap<>(); + LZ4FrameCompressionCodec codec1 = LZ4FrameCompressionCodec.create(properties); + assertEquals(LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB, codec1.getBlockSize()); + + properties.put("compression.lz4.block.size", LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB); + LZ4FrameCompressionCodec codec2 = LZ4FrameCompressionCodec.create(properties); + assertEquals(LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB, codec2.getBlockSize()); + } + + @Test + public void testCompress() throws Exception { + LZ4FrameCompressionCodec codec = LZ4FrameCompressionCodec.create(new HashMap<>()); + testCompressBase(codec); + } +}