This repository has been archived by the owner on May 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 25
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
tools: add zstd compression support (#21)
- Loading branch information
Showing
3 changed files
with
135 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
src/main/java/com/xiaomi/infra/pegasus/tools/ZstdWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package com.xiaomi.infra.pegasus.tools; | ||
|
||
import com.github.luben.zstd.Zstd; | ||
import com.github.luben.zstd.ZstdInputStream; | ||
import com.xiaomi.infra.pegasus.client.PException; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
|
||
/** | ||
* ZstdWrapper wraps the compress/decompress APIs of ZStd algorithm. | ||
*/ | ||
public class ZstdWrapper { | ||
|
||
private ZstdWrapper() { | ||
} | ||
|
||
/** | ||
* compress the `src` and return the compressed. | ||
* throws: RuntimeException if compression failed. | ||
*/ | ||
public static byte[] compress(byte[] src) { | ||
return Zstd.compress(src); | ||
} | ||
|
||
/** | ||
* decompress the `src` and return the original. | ||
* throws: | ||
* - IllegalArgumentException if given `src` is null or empty | ||
* - PException if decompression failed, maybe your `src` is corrupted. | ||
*/ | ||
public static byte[] decompress(byte[] src) throws PException { | ||
if (src == null || src.length == 0) { | ||
throw new IllegalArgumentException("src is empty"); | ||
} | ||
|
||
byte[] ret; | ||
long originalSize = Zstd.decompressedSize(src); | ||
if (originalSize > 0) { | ||
ret = new byte[(int) originalSize]; | ||
long code = Zstd.decompress(ret, src); | ||
if (Zstd.isError(code)) { | ||
throw new PException("decompression failed: " + Zstd.getErrorName(code)); | ||
} | ||
if (code != originalSize) { | ||
throw new PException("decompression failed"); | ||
} | ||
return ret; | ||
} | ||
|
||
// fallback to decompress in streaming mode | ||
byte[] inBuf = new byte[1024]; | ||
ByteArrayOutputStream decompressOutBuf = new ByteArrayOutputStream(); | ||
try { | ||
ZstdInputStream decompress = new ZstdInputStream(new ByteArrayInputStream(src)); | ||
while (true) { | ||
int n = decompress.read(inBuf); | ||
if (n <= 0) { | ||
break; | ||
} | ||
decompressOutBuf.write(inBuf, 0, n); | ||
} | ||
ret = decompressOutBuf.toByteArray(); | ||
} catch (IOException e) { | ||
throw new PException("decompression failed: " + e.getMessage()); | ||
} | ||
return ret; | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
src/test/java/com/xiaomi/infra/pegasus/tools/TestZstdWrapper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package com.xiaomi.infra.pegasus.tools; | ||
|
||
|
||
import com.xiaomi.infra.pegasus.client.PException; | ||
import com.xiaomi.infra.pegasus.client.PegasusClientFactory; | ||
import com.xiaomi.infra.pegasus.client.PegasusClientInterface; | ||
import com.xiaomi.infra.pegasus.client.PegasusTableInterface; | ||
import org.junit.Assert; | ||
import org.junit.Test; | ||
|
||
public class TestZstdWrapper { | ||
@Test | ||
public void testCompression() throws Exception { | ||
PegasusClientInterface client = PegasusClientFactory.getSingletonClient(); | ||
PegasusTableInterface table = client.openTable("temp"); | ||
|
||
for (int t = 0; t < 4; t++) { | ||
// generate a 10KB value | ||
StringBuilder builder = new StringBuilder(); | ||
for (int i = 0; i < 10000; i++) { | ||
builder.append('a' + t); | ||
} | ||
byte[] value = builder.toString().getBytes(); | ||
|
||
// write the record into pegasus | ||
table.set("h".getBytes(), "s".getBytes(), ZstdWrapper.compress(value), 1000); | ||
|
||
// read the record from pegasus | ||
byte[] compressedBuf = table.get("h".getBytes(), "s".getBytes(), 1000); | ||
|
||
// decompress the value | ||
Assert.assertArrayEquals(ZstdWrapper.decompress(compressedBuf), value); | ||
} | ||
|
||
// ensure empty value won't break the program | ||
{ | ||
try { | ||
ZstdWrapper.decompress("".getBytes()); | ||
Assert.fail("expecting a IllegalArgumentException"); | ||
} catch (Exception e) { | ||
Assert.assertTrue(e instanceof IllegalArgumentException); | ||
} | ||
try { | ||
ZstdWrapper.decompress(null); | ||
Assert.fail("expecting a IllegalArgumentException"); | ||
} catch (Exception e) { | ||
Assert.assertTrue(e instanceof IllegalArgumentException); | ||
} | ||
} | ||
|
||
{ // decompress invalid data | ||
try { | ||
ZstdWrapper.decompress("abc123".getBytes()); | ||
Assert.fail("expecting a PException"); | ||
} catch (Exception e) { | ||
Assert.assertTrue(e instanceof PException); | ||
} | ||
} | ||
} | ||
} |