Skip to content

Commit

Permalink
feat(compress): add the compress method for large content
Browse files Browse the repository at this point in the history
Signed-off-by: kaixuan xu <[email protected]>
  • Loading branch information
xkx9431 committed Jan 11, 2025
1 parent bca013d commit 55cc423
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class Configuration {

CompressMethod compressMethod;

// deprecated
// deprecated, will use compressMethod and contentType
boolean gzipEnabled;

HttpClientConfig httpConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public BaseClient(Configuration conf) {
contentEncodingHeader.add("gzip");
headers.put("Content-Encoding", contentEncodingHeader);
}

applyCodec(conf);

String httpPrefix;
if (conf.getHttpConfig().tlsConfig() != null) {
httpPrefix = "https://";
Expand All @@ -77,6 +80,37 @@ public BaseClient(Configuration conf) {
scheduler.ifPresent(this::startHealthCheck);
}

private void applyCodec(Configuration config) {
if (headers == null) {
throw new IllegalStateException("Headers map is not initialized");
}

List<String> acceptHeader = new ArrayList<>();
switch (config.getContentType()) {
case MSGPACK:
acceptHeader.add("application/msgpack");
break;
case JSON:
acceptHeader.add("application/json");
break;
}
headers.put("Accept", acceptHeader);

List<String> acceptEncodingHeader = new ArrayList<>();
switch (config.getCompressMethod()) {
case GZIP:
acceptEncodingHeader.add("gzip");
break;
case ZSTD:
acceptEncodingHeader.add("zstd");
break;
case SNAPPY:
acceptEncodingHeader.add("snappy");
break;
}
headers.put("Accept-Encoding", acceptEncodingHeader);
}

/**
* Health Check
* Start schedule task(period 10s) to ping all server url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@

package io.opengemini.client.impl;

import com.sun.org.apache.bcel.internal.generic.RETURN;
import io.github.openfacade.http.BasicAuthRequestFilter;
import io.github.openfacade.http.HttpClient;
import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientFactory;
import io.github.openfacade.http.HttpResponse;
import io.opengemini.client.api.AuthConfig;
import io.opengemini.client.api.AuthType;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.OpenGeminiException;
import io.opengemini.client.api.Pong;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.QueryResult;
import io.opengemini.client.api.*;
import io.opengemini.client.common.BaseAsyncClient;
import io.opengemini.client.common.HeaderConst;
import io.opengemini.client.common.JacksonService;
import io.opengemini.client.common.compress.GzipCompressor;
import io.opengemini.client.common.compress.SnappyCompressor;
import io.opengemini.client.common.compress.ZstdCompressor;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
Expand Down Expand Up @@ -122,22 +120,37 @@ protected CompletableFuture<Pong> executePing() {
}

private <T> T processResponseBody(HttpResponse response, Class<T> type) throws IOException {
String contentType = response.headers().get("Content-Type").get(0);
byte[] body = response.body();
if (contentType.contains("application/x-gzip")) {
// Handle gzip content type
// body = GzipService.decompress(body);
String contentType = response.headers().get("Content-Type") != null ? response.headers().get("Content-Type").get(0) : null;
String contentEncoding = response.headers().get("Content-Encoding") != null ? response.headers().get("Content-Encoding").get(0) : null;
byte[] body = processCompression(contentEncoding, response.body(), type);

return processContentType(contentType, body, type);
}

private <T> byte[] processCompression(String compressMethod, byte[] body, Class<T> type ) throws IOException {
byte[] decompressedBody = null;
if (CompressMethod.GZIP.getValue().equals(compressMethod)) {
GzipCompressor compressor = new GzipCompressor();
decompressedBody = compressor.decompress(body);
} else if (CompressMethod.SNAPPY.getValue().equals(compressMethod)) {
SnappyCompressor compressor = new SnappyCompressor();
decompressedBody = compressor.decompress(body);

} else if (CompressMethod.ZSTD.getValue().equals(compressMethod)) {
ZstdCompressor compressor = new ZstdCompressor();
decompressedBody = compressor.decompress(body);
}

if (contentType.contains("application/msgpack")) {
// Handle msgpack content type
//return MsgPackService.toObject(body, type);
} else if (contentType.contains("application/json")) {
// Handle JSON content type
return decompressedBody != null ? decompressedBody : body;
}

private <T> T processContentType(String contentType, byte[] body, Class<T> type) throws IOException {
if (ContentType.JSON.getValue().equals(contentType)) {
return JacksonService.toObject(body, type);
} else if (ContentType.MSGPACK.getValue().equals(contentType)) {
throw new IOException("Unsupported content type: " + contentType);
}
// Default handling
return JacksonService.toObject(body, type);
return JacksonService.toObject(body, type);
}

public CompletableFuture<HttpResponse> get(String url) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,7 @@

import io.github.openfacade.http.HttpClientConfig;
import io.github.openfacade.http.HttpClientEngine;
import io.opengemini.client.api.Address;
import io.opengemini.client.api.Configuration;
import io.opengemini.client.api.OpenGeminiException;
import io.opengemini.client.api.Point;
import io.opengemini.client.api.Query;
import io.opengemini.client.api.QueryResult;
import io.opengemini.client.api.RpConfig;
import io.opengemini.client.api.Series;
import io.opengemini.client.api.*;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInstance;
Expand Down Expand Up @@ -64,6 +57,21 @@ protected List<OpenGeminiClient> clientList() throws OpenGeminiException {
.build();
clients.add(OpenGeminiClientFactory.create(configuration));
}

List<CompressMethod> compressMethods = Arrays.asList(CompressMethod.GZIP, CompressMethod.ZSTD, CompressMethod.SNAPPY);
for (CompressMethod compressMethod : compressMethods) {
HttpClientConfig httpConfig = new HttpClientConfig.Builder()
.engine(HttpClientEngine.AsyncHttpClient)
.connectTimeout(Duration.ofSeconds(3))
.timeout(Duration.ofSeconds(3))
.build();
Configuration configuration = Configuration.builder()
.addresses(Collections.singletonList(new Address("127.0.0.1", 8086)))
.httpConfig(httpConfig)
.compressMethod(compressMethod)
.build();
clients.add(OpenGeminiClientFactory.create(configuration));
}
return clients;
}

Expand Down

0 comments on commit 55cc423

Please sign in to comment.