Skip to content

Commit

Permalink
[Feature] Sink supports LZ4 compression with json format (#354)
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy authored Apr 19, 2024
1 parent ba84f11 commit 5c2a334
Show file tree
Hide file tree
Showing 19 changed files with 676 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,8 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
.table(getTableName())
.streamLoadDataFormat(dataFormat)
.chunkLimit(getChunkLimit())
.enableUpsertDelete(supportUpsertDelete());
.enableUpsertDelete(supportUpsertDelete())
.addCommonProperties(getSinkStreamLoadProperties());

if (hasColumnMappingProperty()) {
defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,4 +724,17 @@ private String createJsonTable(String tablePrefix) throws Exception {
executeSrSQL(createStarRocksTable);
return tableName;
}

@Test
public void testJsonLz4Compression() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> map = new HashMap<>();
map.put("sink.properties.format", "json");
map.put("sink.properties.strip_outer_array", "true");
map.put("sink.properties.compression", "lz4_frame");
testConfigurationBase(map, env -> null);

map.put("sink.at-least-once.use-transaction-stream-load", "false");
testConfigurationBase(map, env -> null);
}
}
7 changes: 7 additions & 0 deletions starrocks-stream-load-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<httpcore.version>4.4.15</httpcore.version>
<fastjson.version>1.2.83</fastjson.version>
<fasterxml.version>2.12.4</fasterxml.version>
<lz4.version>1.8.0</lz4.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -58,6 +59,12 @@
<version>${fasterxml.version}</version>
</dependency>

<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${lz4.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public interface StreamLoadDataFormat {
StreamLoadDataFormat JSON = new JSONFormat();
StreamLoadDataFormat CSV = new CSVFormat();

default String name() {
return "";
}

byte[] first();
byte[] delimiter();
byte[] end();
Expand All @@ -51,6 +55,11 @@ public CSVFormat(String rowDelimiter) {
this.delimiter = rowDelimiter.getBytes(StandardCharsets.UTF_8);
}

@Override
public String name() {
return "csv";
}

@Override
public byte[] first() {
return EMPTY_DELIMITER;
Expand Down Expand Up @@ -89,6 +98,11 @@ class JSONFormat implements StreamLoadDataFormat, Serializable {
private static final byte[] delimiter = ",".getBytes(StandardCharsets.UTF_8);
private static final byte[] end = "]".getBytes(StandardCharsets.UTF_8);

@Override
public String name() {
return "json";
}

@Override
public byte[] first() {
return first;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Target;

/**
* APIs that are not stable yet.
*/
@Documented
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD})
public @interface Evolving {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.compress;

import com.starrocks.data.load.stream.StreamLoadDataFormat;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.Optional;

/** Compress data with the specified compression algorithm. */
public interface CompressionCodec {

/**
* Create an output stream to compress the raw output stream.
*
* @param rawOutputStream the output stream to compress
* @param contentSize the content size of the raw stream. -1 if the size is unknown
* @return an output stream with compressed data
*/
OutputStream createCompressionStream(final OutputStream rawOutputStream, long contentSize) throws IOException;

static Optional<CompressionCodec> createCompressionCodec(StreamLoadDataFormat dataFormat,
Optional<String> compressionType,
Map<String, Object> properties) {
if (!compressionType.isPresent()) {
return Optional.empty();
}

if (LZ4FrameCompressionCodec.NAME.equalsIgnoreCase(compressionType.get())) {
if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) {
return Optional.of(LZ4FrameCompressionCodec.create(properties));
}
}

throw new UnsupportedOperationException(
String.format("Not support to compress format %s with compression type %s",
dataFormat.name(), compressionType.get()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.compress;

import com.starrocks.data.load.stream.v2.ChunkHttpEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/** Wrapping entity that compresses content when writing. */
public class CompressionHttpEntity extends HttpEntityWrapper {

private static final Logger LOG = LoggerFactory.getLogger(CompressionHttpEntity.class);

private final CompressionCodec compressionCodec;

public CompressionHttpEntity(ChunkHttpEntity entity, CompressionCodec compressionCodec) {
super(entity);
this.compressionCodec = compressionCodec;
entity.setLogAfterWrite(false);
}

@Override
public long getContentLength() {
return -1;
}

@Override
public boolean isChunked() {
// force content chunking
return true;
}

@Override
public InputStream getContent() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void writeTo(final OutputStream outStream) throws IOException {
long startTime = System.nanoTime();
ChunkHttpEntity entity = (ChunkHttpEntity) wrappedEntity;
final CountingOutputStream countingOutputStream = new CountingOutputStream(outStream);
final OutputStream compressOutputStream =
compressionCodec.createCompressionStream(countingOutputStream, entity.getContentLength());
entity.writeTo(compressOutputStream);
compressOutputStream.close();
long rawSize = entity.getContentLength();
long compressSize = countingOutputStream.getCount();
float compressRatio = compressSize == 0 ? 1 : (float) rawSize / compressSize;
LOG.info("Write entity for table {}, raw/compressed size:{}/{}, compress ratio:{}, time:{}us",
entity.getTableUniqueKey(), rawSize, compressSize, compressRatio, (System.nanoTime() - startTime) / 1000) ;
}

@Override
public boolean isStreaming() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.compress;

import net.jpountz.lz4.LZ4FrameOutputStream;

public class CompressionOptions {

public static final String LZ4_BLOCK_SIZE = "compression.lz4.block.size";
public static final LZ4FrameOutputStream.BLOCKSIZE DEFAULT_LZ4_BLOCK_SIZE = LZ4FrameOutputStream.BLOCKSIZE.SIZE_4MB;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.data.load.stream.compress;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/** Count how many bytes written. */
public class CountingOutputStream extends FilterOutputStream {

private long count;

public CountingOutputStream(OutputStream out) {
super(out);
}

public long getCount() {
return this.count;
}

public void write(byte[] b, int off, int len) throws IOException {
this.out.write(b, off, len);
this.count += len;
}

public void write(int b) throws IOException {
this.out.write(b);
++this.count;
}

}
Loading

0 comments on commit 5c2a334

Please sign in to comment.