From b9b6c6524f7f9256edf3a74a8a252e0cb33a7783 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 4 Apr 2024 22:52:52 +0800 Subject: [PATCH 1/6] Make ChunkHttpEntity repeatable Signed-off-by: PengFei Li --- .../starrocks/data/load/stream/v2/ChunkHttpEntity.java | 10 +++++----- .../data/load/stream/ChunkHttpEntityTest.java | 2 ++ 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java index 3a7acdd0..aded0fac 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java @@ -41,18 +41,18 @@ public class ChunkHttpEntity extends AbstractHttpEntity { private static final Header CONTENT_TYPE = new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_OCTET_STREAM.toString()); private final String tableUniqueKey; - private final InputStream content; + private final Chunk chunk; private final long contentLength; public ChunkHttpEntity(String tableUniqueKey, Chunk chunk) { this.tableUniqueKey = tableUniqueKey; - this.content = new ChunkInputStream(chunk); + this.chunk = chunk; this.contentLength = chunk.chunkBytes(); } @Override public boolean isRepeatable() { - return false; + return true; } @Override @@ -77,13 +77,13 @@ public Header getContentEncoding() { @Override public InputStream getContent() { - return content; + return new ChunkInputStream(chunk); } @Override public void writeTo(OutputStream outputStream) throws IOException { long total = 0; - try (InputStream inputStream = this.content) { + try (InputStream inputStream = new ChunkInputStream(chunk)) { final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; int l; while ((l = inputStream.read(buffer)) != -1) { diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java index 63e2ba64..7c54a9a3 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkHttpEntityTest.java @@ -27,6 +27,7 @@ import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; public class ChunkHttpEntityTest { @@ -34,6 +35,7 @@ public class ChunkHttpEntityTest { public void testWrite() throws Exception { ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + assertTrue(entity.isRepeatable()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); entity.writeTo(outputStream); assertArrayEquals(chunkMeta.expectedData, outputStream.toByteArray()); From 5956eb8a1da255500c95857da557d6c058464bf9 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 5 Apr 2024 03:06:11 +0800 Subject: [PATCH 2/6] Add lz4_frame compression for json format Signed-off-by: PengFei Li --- starrocks-stream-load-sdk/pom.xml | 7 ++ .../load/stream/StreamLoadDataFormat.java | 14 ++++ .../data/load/stream/annotation/Evolving.java | 32 ++++++++ .../stream/compress/CompressionCodec.java | 59 ++++++++++++++ .../compress/CompressionHttpEntity.java | 80 +++++++++++++++++++ .../stream/compress/CompressionOptions.java | 32 ++++++++ .../stream/compress/CountingOutputStream.java | 50 ++++++++++++ .../compress/LZ4FrameCompressionCodec.java | 74 +++++++++++++++++ .../properties/StreamLoadTableProperties.java | 27 ++++++- .../data/load/stream/v2/ChunkHttpEntity.java | 28 ++++--- .../stream/v2/TransactionTableRegion.java | 13 ++- 11 files changed, 405 insertions(+), 11 deletions(-) create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java create mode 100644 starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java diff --git a/starrocks-stream-load-sdk/pom.xml b/starrocks-stream-load-sdk/pom.xml index 5640bd81..583b9051 100644 --- a/starrocks-stream-load-sdk/pom.xml +++ b/starrocks-stream-load-sdk/pom.xml @@ -18,6 +18,7 @@ 4.4.15 1.2.83 2.12.4 + 1.8.0 @@ -58,6 +59,12 @@ ${fasterxml.version} + + org.lz4 + lz4-java + ${lz4.version} + + org.mockito mockito-core diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java index 8b774bc3..7418eee2 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/StreamLoadDataFormat.java @@ -29,6 +29,10 @@ public interface StreamLoadDataFormat { StreamLoadDataFormat JSON = new JSONFormat(); StreamLoadDataFormat CSV = new CSVFormat(); + default String name() { + return ""; + } + byte[] first(); byte[] delimiter(); byte[] end(); @@ -51,6 +55,11 @@ public CSVFormat(String rowDelimiter) { this.delimiter = rowDelimiter.getBytes(StandardCharsets.UTF_8); } + @Override + public String name() { + return "csv"; + } + @Override public byte[] first() { return EMPTY_DELIMITER; @@ -89,6 +98,11 @@ class JSONFormat implements StreamLoadDataFormat, Serializable { private static final byte[] delimiter = ",".getBytes(StandardCharsets.UTF_8); private static final byte[] end = "]".getBytes(StandardCharsets.UTF_8); + @Override + public String name() { + return "json"; + } + @Override public byte[] first() { return first; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java new file mode 100644 index 00000000..d41f7407 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/annotation/Evolving.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Target; + +/** + * APIs that are not stable yet. + */ +@Documented +@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD}) +public @interface Evolving {} \ No newline at end of file diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java new file mode 100644 index 00000000..a6050bd1 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionCodec.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.StreamLoadDataFormat; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.Optional; + +/** Compress data with the specified compression algorithm. */ +public interface CompressionCodec { + + /** + * Create an output stream to compress the raw output stream. + * + * @param rawOutputStream the output stream to compress + * @param contentSize the content size of the raw stream. -1 if the size is unknown + * @return an output stream with compressed data + */ + OutputStream createCompressionStream(final OutputStream rawOutputStream, long contentSize) throws IOException; + + static Optional createCompressionCodec(StreamLoadDataFormat dataFormat, + Optional compressionType, + Map properties) { + if (!compressionType.isPresent()) { + return Optional.empty(); + } + + if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) { + if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) { + return Optional.of(LZ4FrameCompressionCodec.create(properties)); + } + } + + throw new UnsupportedOperationException( + String.format("Not support to compress format %s with compression type %s", + dataFormat.name(), compressionType.get())); + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java new file mode 100644 index 00000000..605b5fc3 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java @@ -0,0 +1,80 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; +import org.apache.http.entity.HttpEntityWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** Wrapping entity that compresses content when writing. */ +public class CompressionHttpEntity extends HttpEntityWrapper { + + private static final Logger LOG = LoggerFactory.getLogger(CompressionHttpEntity.class); + + private final CompressionCodec compressionCodec; + + public CompressionHttpEntity(ChunkHttpEntity entity, CompressionCodec compressionCodec) { + super(entity); + this.compressionCodec = compressionCodec; + entity.setLogAfterWrite(false); + } + + @Override + public long getContentLength() { + return -1; + } + + @Override + public boolean isChunked() { + // force content chunking + return true; + } + + @Override + public InputStream getContent() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void writeTo(final OutputStream outStream) throws IOException { + ChunkHttpEntity entity = (ChunkHttpEntity) wrappedEntity; + final CountingOutputStream countingOutputStream = new CountingOutputStream(outStream); + final OutputStream compressOutputStream = + compressionCodec.createCompressionStream(countingOutputStream, entity.getContentLength()); + entity.writeTo(compressOutputStream); + compressOutputStream.close(); + long rawSize = entity.getContentLength(); + long compressSize = countingOutputStream.getCount(); + float compressRatio = compressSize == 0 ? 1 : (float) rawSize / compressSize; + LOG.info("Finish to write entity for table {}, raw size : {}, compressed size: {}, compress ratio: {}", + entity.getTableUniqueKey(), rawSize, compressSize, compressRatio) ; + } + + @Override + public boolean isStreaming() { + return false; + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java new file mode 100644 index 00000000..3348e643 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java @@ -0,0 +1,32 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4FrameOutputStream; + +public class CompressionOptions { + + public static final String LZ4_BLOCK_SIZE = "compression.lz4.block.size"; + public static final LZ4FrameOutputStream.BLOCKSIZE DEFAULT_LZ4_BLOCK_SIZE = LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB; + + public static final String LZ4_BLOCK_INDEPENDENCE = "compression.lz4.block.independence"; + public static final boolean DEFAULT_LZ4_BLOCK_INDEPENDENCE = false; +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java new file mode 100644 index 00000000..f1e024d1 --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CountingOutputStream.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** Count how many bytes written. */ +public class CountingOutputStream extends FilterOutputStream { + + private long count; + + public CountingOutputStream(OutputStream out) { + super(out); + } + + public long getCount() { + return this.count; + } + + public void write(byte[] b, int off, int len) throws IOException { + this.out.write(b, off, len); + this.count += len; + } + + public void write(int b) throws IOException { + this.out.write(b); + ++this.count; + } + +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java new file mode 100644 index 00000000..e199c36e --- /dev/null +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java @@ -0,0 +1,74 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FrameOutputStream; +import net.jpountz.xxhash.XXHash32; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** Compress data using LZ4_FRAME. */ +public class LZ4FrameCompressionCodec implements CompressionCodec { + + public static final String NAME = "LZ4_FRAME"; + + private final LZ4FrameOutputStream.BLOCKSIZE blockSize; + private final boolean blockIndependence; + private final LZ4Compressor compressor; + private final XXHash32 hash; + + public LZ4FrameCompressionCodec(LZ4FrameOutputStream.BLOCKSIZE blockSize, boolean blockIndependence) { + this.blockSize = blockSize; + this.blockIndependence = blockIndependence; + this.compressor = LZ4Factory.fastestInstance().fastCompressor(); + this.hash = XXHashFactory.fastestInstance().hash32(); + } + + @Override + public OutputStream createCompressionStream(OutputStream rawOutputStream, long contentSize) throws IOException { + if (contentSize < 0) { + return blockIndependence ? + new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : + new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash); + } else { + return blockIndependence ? + new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : + new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE); + } + } + + public static LZ4FrameCompressionCodec create(Map properties) { + LZ4FrameOutputStream.BLOCKSIZE blockSize = (LZ4FrameOutputStream.BLOCKSIZE) properties.getOrDefault( + CompressionOptions.LZ4_BLOCK_SIZE, CompressionOptions.DEFAULT_LZ4_BLOCK_SIZE); + boolean blockIndependence = (boolean) properties.getOrDefault( + CompressionOptions.LZ4_BLOCK_INDEPENDENCE, CompressionOptions.DEFAULT_LZ4_BLOCK_INDEPENDENCE); + return new LZ4FrameCompressionCodec(blockSize, blockIndependence); + } +} diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 4425e37b..e683b9a9 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -22,6 +22,9 @@ import com.starrocks.data.load.stream.StreamLoadDataFormat; import com.starrocks.data.load.stream.StreamLoadUtils; +import com.starrocks.data.load.stream.annotation.Evolving; +import com.starrocks.data.load.stream.compress.CompressionOptions; +import net.jpountz.lz4.LZ4FrameOutputStream; import java.io.Serializable; import java.util.HashMap; @@ -33,6 +36,8 @@ public class StreamLoadTableProperties implements Serializable { private final String database; private final String table; private final StreamLoadDataFormat dataFormat; + private final Map tableProperties; + // stream load properties private final Map properties; private final boolean enableUpsertDelete; private final long chunkLimit; @@ -58,7 +63,8 @@ private StreamLoadTableProperties(Builder builder) { chunkLimit = Math.min(10737418240L, builder.chunkLimit); } this.maxBufferRows = builder.maxBufferRows; - this.properties = builder.properties; + this.tableProperties = new HashMap<>(builder.tableProperties); + this.properties = new HashMap<>(builder.properties); this.columns = builder.columns; } @@ -92,6 +98,11 @@ public int getMaxBufferRows() { return maxBufferRows; } + @Evolving + public Map getTableProperties() { + return tableProperties; + } + public Map getProperties() { return properties; } @@ -110,6 +121,9 @@ public static class Builder { private long chunkLimit; private int maxBufferRows = Integer.MAX_VALUE; + private final Map tableProperties = new HashMap<>(); + + // Stream load properties private final Map properties = new HashMap<>(); private Builder() { @@ -132,6 +146,7 @@ public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) { streamLoadDataFormat(streamLoadTableProperties.getDataFormat()); chunkLimit(streamLoadTableProperties.getChunkLimit()); maxBufferRows(streamLoadTableProperties.getMaxBufferRows()); + tableProperties.putAll(streamLoadTableProperties.getTableProperties()); return this; } @@ -185,6 +200,16 @@ public Builder addProperty(String key, String value) { return this; } + public Builder setLZ4BlockSize(LZ4FrameOutputStream.BLOCKSIZE blockSize) { + tableProperties.put(CompressionOptions.LZ4_BLOCK_SIZE, blockSize); + return this; + } + + public Builder enableLZ4BlockIndependence() { + tableProperties.put(CompressionOptions.LZ4_BLOCK_INDEPENDENCE, true); + return this; + } + public StreamLoadTableProperties build() { if (database == null || table == null) { throw new IllegalArgumentException(String.format("database `%s` or table `%s` can't be null", database, table)); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java index aded0fac..68f436dd 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java @@ -35,7 +35,7 @@ public class ChunkHttpEntity extends AbstractHttpEntity { - private static final Logger log = LoggerFactory.getLogger(ChunkHttpEntity.class); + private static final Logger LOG = LoggerFactory.getLogger(ChunkHttpEntity.class); protected static final int OUTPUT_BUFFER_SIZE = 2048; private static final Header CONTENT_TYPE = @@ -43,11 +43,21 @@ public class ChunkHttpEntity extends AbstractHttpEntity { private final String tableUniqueKey; private final Chunk chunk; private final long contentLength; + private boolean logAfterWrite; public ChunkHttpEntity(String tableUniqueKey, Chunk chunk) { this.tableUniqueKey = tableUniqueKey; this.chunk = chunk; this.contentLength = chunk.chunkBytes(); + this.logAfterWrite = true; + } + + public String getTableUniqueKey() { + return tableUniqueKey; + } + + public void setLogAfterWrite(boolean logAfterWrite) { + this.logAfterWrite = logAfterWrite; } @Override @@ -82,21 +92,21 @@ public InputStream getContent() { @Override public void writeTo(OutputStream outputStream) throws IOException { - long total = 0; try (InputStream inputStream = new ChunkInputStream(chunk)) { final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; - int l; - while ((l = inputStream.read(buffer)) != -1) { - total += l; - outputStream.write(buffer, 0, l); + int len; + while ((len = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, len); } } - log.info("Entity write end, uniqueKey: {}, contentLength : {}, total : {}", - tableUniqueKey, contentLength, total); + if (logAfterWrite || LOG.isDebugEnabled()) { + LOG.info("Finish to write entity for table {}, contentLength : {}", + tableUniqueKey, contentLength); + } } @Override public boolean isStreaming() { - return true; + return false; } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 9a53cccd..51a197e1 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -25,6 +25,8 @@ import com.starrocks.data.load.stream.StreamLoadSnapshot; import com.starrocks.data.load.stream.StreamLoader; import com.starrocks.data.load.stream.TableRegion; +import com.starrocks.data.load.stream.compress.CompressionCodec; +import com.starrocks.data.load.stream.compress.CompressionHttpEntity; import com.starrocks.data.load.stream.exception.StreamLoadFailException; import com.starrocks.data.load.stream.http.StreamLoadEntityMeta; import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; @@ -32,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +60,7 @@ enum State { private final String database; private final String table; private final StreamLoadTableProperties properties; + private final Optional compressionCodec; private final AtomicLong age = new AtomicLong(0L); private final AtomicLong cacheBytes = new AtomicLong(); private final AtomicLong cacheRows = new AtomicLong(); @@ -91,6 +95,10 @@ public TransactionTableRegion(String uniqueKey, this.properties = properties; this.streamLoader = streamLoader; this.labelGenerator = labelGenerator; + this.compressionCodec = CompressionCodec.createCompressionCodec( + properties.getDataFormat(), + Optional.ofNullable(properties.getProperties().get("compression")), + properties.getTableProperties()); this.state = new AtomicReference<>(State.ACTIVE); this.lastCommitTimeMills = System.currentTimeMillis(); this.activeChunk = new Chunk(properties.getDataFormat()); @@ -366,7 +374,10 @@ protected void streamLoad(int delayMs) { @Override public HttpEntity getHttpEntity() { - return new ChunkHttpEntity(uniqueKey, inactiveChunks.peek()); + ChunkHttpEntity entity = new ChunkHttpEntity(uniqueKey, inactiveChunks.peek()); + return compressionCodec + .map(codec -> (HttpEntity) new CompressionHttpEntity(entity, codec)) + .orElse(entity); } @Override From 8f15957edb590ad195138594d2bbbef33964ebd8 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 11 Apr 2024 12:00:25 +0800 Subject: [PATCH 3/6] Add tests Signed-off-by: PengFei Li --- .../stream/compress/CompressionOptions.java | 5 +- .../compress/LZ4FrameCompressionCodec.java | 31 ++++---- .../properties/StreamLoadTableProperties.java | 5 -- .../load/stream/ChunkInputStreamTest.java | 4 +- .../compress/CompressionCodecTestBase.java | 46 ++++++++++++ .../compress/CompressionHttpEntityTest.java | 71 +++++++++++++++++++ .../compress/CountingOutputStreamTest.java | 59 +++++++++++++++ .../LZ4FrameCompressionCodecTest.java | 60 ++++++++++++++++ 8 files changed, 251 insertions(+), 30 deletions(-) create mode 100644 starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java create mode 100644 starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java create mode 100644 starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java create mode 100644 starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java index 3348e643..44db6a49 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionOptions.java @@ -25,8 +25,5 @@ public class CompressionOptions { public static final String LZ4_BLOCK_SIZE = "compression.lz4.block.size"; - public static final LZ4FrameOutputStream.BLOCKSIZE DEFAULT_LZ4_BLOCK_SIZE = LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB; - - public static final String LZ4_BLOCK_INDEPENDENCE = "compression.lz4.block.independence"; - public static final boolean DEFAULT_LZ4_BLOCK_INDEPENDENCE = false; + public static final LZ4FrameOutputStream.BLOCKSIZE DEFAULT_LZ4_BLOCK_SIZE = LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB; } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java index e199c36e..42e55d1c 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodec.java @@ -36,39 +36,32 @@ public class LZ4FrameCompressionCodec implements CompressionCodec { public static final String NAME = "LZ4_FRAME"; private final LZ4FrameOutputStream.BLOCKSIZE blockSize; - private final boolean blockIndependence; private final LZ4Compressor compressor; private final XXHash32 hash; - public LZ4FrameCompressionCodec(LZ4FrameOutputStream.BLOCKSIZE blockSize, boolean blockIndependence) { + public LZ4FrameCompressionCodec(LZ4FrameOutputStream.BLOCKSIZE blockSize) { this.blockSize = blockSize; - this.blockIndependence = blockIndependence; this.compressor = LZ4Factory.fastestInstance().fastCompressor(); this.hash = XXHashFactory.fastestInstance().hash32(); } @Override public OutputStream createCompressionStream(OutputStream rawOutputStream, long contentSize) throws IOException { - if (contentSize < 0) { - return blockIndependence ? - new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash, - LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : - new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash); - } else { - return blockIndependence ? - new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, - LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE, - LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : - new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, - LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE); - } + return contentSize < 0 ? + new LZ4FrameOutputStream(rawOutputStream, blockSize, -1, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE) : + new LZ4FrameOutputStream(rawOutputStream, blockSize, contentSize, compressor, hash, + LZ4FrameOutputStream.FLG.Bits.CONTENT_SIZE, + LZ4FrameOutputStream.FLG.Bits.BLOCK_INDEPENDENCE); + } + + LZ4FrameOutputStream.BLOCKSIZE getBlockSize() { + return blockSize; } public static LZ4FrameCompressionCodec create(Map properties) { LZ4FrameOutputStream.BLOCKSIZE blockSize = (LZ4FrameOutputStream.BLOCKSIZE) properties.getOrDefault( CompressionOptions.LZ4_BLOCK_SIZE, CompressionOptions.DEFAULT_LZ4_BLOCK_SIZE); - boolean blockIndependence = (boolean) properties.getOrDefault( - CompressionOptions.LZ4_BLOCK_INDEPENDENCE, CompressionOptions.DEFAULT_LZ4_BLOCK_INDEPENDENCE); - return new LZ4FrameCompressionCodec(blockSize, blockIndependence); + return new LZ4FrameCompressionCodec(blockSize); } } diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index e683b9a9..6baed3eb 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -205,11 +205,6 @@ public Builder setLZ4BlockSize(LZ4FrameOutputStream.BLOCKSIZE blockSize) { return this; } - public Builder enableLZ4BlockIndependence() { - tableProperties.put(CompressionOptions.LZ4_BLOCK_INDEPENDENCE, true); - return this; - } - public StreamLoadTableProperties build() { if (database == null || table == null) { throw new IllegalArgumentException(String.format("database `%s` or table `%s` can't be null", database, table)); diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java index 19dde9f3..aa93f9c5 100644 --- a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/ChunkInputStreamTest.java @@ -107,8 +107,8 @@ public static ChunkMeta genChunk() { } public static class ChunkMeta { - Chunk chunk; - byte[] expectedData; + public Chunk chunk; + public byte[] expectedData; public ChunkMeta(Chunk chunk, byte[] expectedData) { this.chunk = chunk; diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java new file mode 100644 index 00000000..4f63c806 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionCodecTestBase.java @@ -0,0 +1,46 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.ChunkInputStreamTest; +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; + +import java.io.ByteArrayOutputStream; + +import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; +import static org.junit.Assert.assertArrayEquals; + +/** Test base for {@link CompressionCodec}. */ +public abstract class CompressionCodecTestBase { + + protected abstract byte[] decompress(byte[] compressedData, int rawSize) throws Exception; + + public void testCompressBase(CompressionCodec compressionCodec) throws Exception { + ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); + ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + compressionEntity.writeTo(outputStream); + byte[] result = outputStream.toByteArray(); + byte[] descompressData = decompress(result, (int) entity.getContentLength()); + assertArrayEquals(chunkMeta.expectedData, descompressData); + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java new file mode 100644 index 00000000..05317f4a --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CompressionHttpEntityTest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import com.starrocks.data.load.stream.ChunkInputStreamTest; +import com.starrocks.data.load.stream.v2.ChunkHttpEntity; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import static com.starrocks.data.load.stream.ChunkInputStreamTest.genChunk; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link CompressionHttpEntity}. */ +public class CompressionHttpEntityTest { + + @Test + public void testBasic() throws Exception { + ChunkInputStreamTest.ChunkMeta chunkMeta = genChunk(); + ChunkHttpEntity entity = new ChunkHttpEntity("test", chunkMeta.chunk); + MockCompressionCodec compressionCodec = new MockCompressionCodec(); + CompressionHttpEntity compressionEntity = new CompressionHttpEntity(entity, compressionCodec); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + compressionEntity.writeTo(outputStream); + assertArrayEquals(chunkMeta.expectedData, outputStream.toByteArray()); + assertEquals(1, compressionCodec.getStreams().size()); + assertEquals(entity.getContentLength(), compressionCodec.getStreams().get(0).getCount()); + } + + private static class MockCompressionCodec implements CompressionCodec { + + private final List streams; + + public MockCompressionCodec() { + this.streams = new ArrayList<>(); + } + + public List getStreams() { + return streams; + } + + @Override + public OutputStream createCompressionStream(OutputStream rawOutputStream, long contentSize) throws IOException { + this.streams.add(new CountingOutputStream(rawOutputStream)); + return streams.get(streams.size() - 1); + } + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java new file mode 100644 index 00000000..66f123e3 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/CountingOutputStreamTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.util.concurrent.ThreadLocalRandom; + +import static org.junit.Assert.assertEquals; + +/** Tests for {@link CountingOutputStream}. */ +public class CountingOutputStreamTest { + + @Test + public void testCount() throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream); + + byte[] data = new byte[100]; + ThreadLocalRandom.current().nextBytes(data); + countingOutputStream.write(data[0]); + assertEquals(1, countingOutputStream.getCount()); + countingOutputStream.write(data, 1, data.length - 2); + assertEquals(data.length - 1, countingOutputStream.getCount()); + countingOutputStream.write(data[data.length - 1]); + assertEquals(data.length, countingOutputStream.getCount()); + countingOutputStream.write(data); + assertEquals(data.length * 2, countingOutputStream.getCount()); + countingOutputStream.close(); + + byte[] result = outputStream.toByteArray(); + assertEquals(data.length * 2, result.length); + for (int i = 0; i < result.length;) { + for (int j = 0; j < data.length; j++, i++) { + assertEquals(data[j], result[i]); + } + } + assertEquals(data.length * 2, countingOutputStream.getCount()); + } +} diff --git a/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java new file mode 100644 index 00000000..ac930224 --- /dev/null +++ b/starrocks-stream-load-sdk/src/test/java/com/starrocks/data/load/stream/compress/LZ4FrameCompressionCodecTest.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.data.load.stream.compress; + +import net.jpountz.lz4.LZ4FrameInputStream; +import net.jpountz.lz4.LZ4FrameOutputStream; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class LZ4FrameCompressionCodecTest extends CompressionCodecTestBase { + + @Override + protected byte[] decompress(byte[] compressedData, int rawSize) throws Exception { + byte[] result = new byte[rawSize]; + LZ4FrameInputStream inputStream = new LZ4FrameInputStream(new ByteArrayInputStream(compressedData)); + inputStream.read(result); + inputStream.close(); + return result; + } + + @Test + public void testCreate() { + Map properties = new HashMap<>(); + LZ4FrameCompressionCodec codec1 = LZ4FrameCompressionCodec.create(properties); + assertEquals(LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB, codec1.getBlockSize()); + + properties.put("compression.lz4.block.size", LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB); + LZ4FrameCompressionCodec codec2 = LZ4FrameCompressionCodec.create(properties); + assertEquals(LZ4FrameOutputStream.BLOCKSIZE.SIZE_1MB, codec2.getBlockSize()); + } + + @Test + public void testCompress() throws Exception { + LZ4FrameCompressionCodec codec = LZ4FrameCompressionCodec.create(new HashMap<>()); + testCompressBase(codec); + } +} From 527a2acb75bc75dbe67e56a226c6cbc4c6593219 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 11 Apr 2024 12:43:29 +0800 Subject: [PATCH 4/6] Fix properties Signed-off-by: PengFei Li --- .../properties/StreamLoadTableProperties.java | 26 ++++++++++++++++++- .../stream/v2/TransactionTableRegion.java | 2 +- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 6baed3eb..439472b5 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.util.HashMap; import java.util.Map; +import java.util.Optional; public class StreamLoadTableProperties implements Serializable { @@ -37,12 +38,13 @@ public class StreamLoadTableProperties implements Serializable { private final String table; private final StreamLoadDataFormat dataFormat; private final Map tableProperties; - // stream load properties + // individual stream load properties private final Map properties; private final boolean enableUpsertDelete; private final long chunkLimit; private final int maxBufferRows; private final String columns; + private final Map commonProperties; private StreamLoadTableProperties(Builder builder) { this.database = builder.database; @@ -66,6 +68,7 @@ private StreamLoadTableProperties(Builder builder) { this.tableProperties = new HashMap<>(builder.tableProperties); this.properties = new HashMap<>(builder.properties); this.columns = builder.columns; + this.commonProperties = new HashMap<>(builder.commonProperties); } public String getColumns() {return columns; } @@ -107,6 +110,19 @@ public Map getProperties() { return properties; } + public Map getCommonProperties() { + return commonProperties; + } + + public Optional getProperty(String name) { + String value = properties.get(name); + if (value != null) { + return Optional.of(value); + } + + return Optional.ofNullable(commonProperties.get(name)); + } + public static Builder builder() { return new Builder(); } @@ -126,6 +142,8 @@ public static class Builder { // Stream load properties private final Map properties = new HashMap<>(); + private final Map commonProperties = new HashMap<>(); + private Builder() { } @@ -147,6 +165,7 @@ public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) { chunkLimit(streamLoadTableProperties.getChunkLimit()); maxBufferRows(streamLoadTableProperties.getMaxBufferRows()); tableProperties.putAll(streamLoadTableProperties.getTableProperties()); + commonProperties.putAll(streamLoadTableProperties.getCommonProperties()); return this; } @@ -205,6 +224,11 @@ public Builder setLZ4BlockSize(LZ4FrameOutputStream.BLOCKSIZE blockSize) { return this; } + public Builder addCommonProperties(Map properties) { + this.commonProperties.putAll(properties); + return this; + } + public StreamLoadTableProperties build() { if (database == null || table == null) { throw new IllegalArgumentException(String.format("database `%s` or table `%s` can't be null", database, table)); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java index 51a197e1..26bfb457 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/TransactionTableRegion.java @@ -97,7 +97,7 @@ public TransactionTableRegion(String uniqueKey, this.labelGenerator = labelGenerator; this.compressionCodec = CompressionCodec.createCompressionCodec( properties.getDataFormat(), - Optional.ofNullable(properties.getProperties().get("compression")), + properties.getProperty("compression"), properties.getTableProperties()); this.state = new AtomicReference<>(State.ACTIVE); this.lastCommitTimeMills = System.currentTimeMillis(); From e287f8c4090cf2affcbfe777fb12ff1f87ca6489 Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Thu, 11 Apr 2024 12:48:12 +0800 Subject: [PATCH 5/6] Support lz4 compress for json Signed-off-by: PengFei Li --- .../flink/table/sink/StarRocksSinkOptions.java | 3 ++- .../flink/it/sink/StarRocksSinkITTest.java | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a5337..749f4176 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -511,7 +511,8 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .table(getTableName()) .streamLoadDataFormat(dataFormat) .chunkLimit(getChunkLimit()) - .enableUpsertDelete(supportUpsertDelete()); + .enableUpsertDelete(supportUpsertDelete()) + .addCommonProperties(getSinkStreamLoadProperties()); if (hasColumnMappingProperty()) { defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 31852a0b..13253a21 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -724,4 +724,17 @@ private String createJsonTable(String tablePrefix) throws Exception { executeSrSQL(createStarRocksTable); return tableName; } + + @Test + public void testJsonLz4Compression() throws Exception { + assumeTrue(isSinkV2); + Map map = new HashMap<>(); + map.put("sink.properties.format", "json"); + map.put("sink.properties.strip_outer_array", "true"); + map.put("sink.properties.compression", "lz4_frame"); + testConfigurationBase(map, env -> null); + + map.put("sink.at-least-once.use-transaction-stream-load", "false"); + testConfigurationBase(map, env -> null); + } } From d406f494cbe9c04cb04fcb73d1d7de4aa8780b7f Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Fri, 12 Apr 2024 12:20:11 +0800 Subject: [PATCH 6/6] Add time stat Signed-off-by: PengFei Li --- .../data/load/stream/compress/CompressionHttpEntity.java | 5 +++-- .../com/starrocks/data/load/stream/v2/ChunkHttpEntity.java | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java index 605b5fc3..021851fb 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/compress/CompressionHttpEntity.java @@ -60,6 +60,7 @@ public InputStream getContent() throws IOException { @Override public void writeTo(final OutputStream outStream) throws IOException { + long startTime = System.nanoTime(); ChunkHttpEntity entity = (ChunkHttpEntity) wrappedEntity; final CountingOutputStream countingOutputStream = new CountingOutputStream(outStream); final OutputStream compressOutputStream = @@ -69,8 +70,8 @@ public void writeTo(final OutputStream outStream) throws IOException { long rawSize = entity.getContentLength(); long compressSize = countingOutputStream.getCount(); float compressRatio = compressSize == 0 ? 1 : (float) rawSize / compressSize; - LOG.info("Finish to write entity for table {}, raw size : {}, compressed size: {}, compress ratio: {}", - entity.getTableUniqueKey(), rawSize, compressSize, compressRatio) ; + LOG.info("Write entity for table {}, raw/compressed size:{}/{}, compress ratio:{}, time:{}us", + entity.getTableUniqueKey(), rawSize, compressSize, compressRatio, (System.nanoTime() - startTime) / 1000) ; } @Override diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java index 68f436dd..ee4cb129 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/ChunkHttpEntity.java @@ -92,6 +92,7 @@ public InputStream getContent() { @Override public void writeTo(OutputStream outputStream) throws IOException { + long startTime = System.nanoTime(); try (InputStream inputStream = new ChunkInputStream(chunk)) { final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE]; int len; @@ -100,8 +101,8 @@ public void writeTo(OutputStream outputStream) throws IOException { } } if (logAfterWrite || LOG.isDebugEnabled()) { - LOG.info("Finish to write entity for table {}, contentLength : {}", - tableUniqueKey, contentLength); + LOG.info("Write entity for table {}, size:{}, time:{}us", + tableUniqueKey, contentLength, (System.nanoTime() - startTime) / 1000); } }