diff --git a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java index 75de6632..8720250c 100644 --- a/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java +++ b/opengemini-client-api/src/main/java/io/opengemini/client/api/Configuration.java @@ -41,7 +41,7 @@ public class Configuration { CompressMethod compressMethod; - // deprecated + // deprecated, will use compressMethod and contentType boolean gzipEnabled; HttpClientConfig httpConfig; diff --git a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java index 4945721d..e9e9f9ee 100644 --- a/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java +++ b/opengemini-client-common/src/main/java/io/opengemini/client/common/BaseClient.java @@ -55,6 +55,9 @@ public BaseClient(Configuration conf) { contentEncodingHeader.add("gzip"); headers.put("Content-Encoding", contentEncodingHeader); } + + applyCodec(conf); + String httpPrefix; if (conf.getHttpConfig().tlsConfig() != null) { httpPrefix = "https://"; @@ -77,6 +80,37 @@ public BaseClient(Configuration conf) { scheduler.ifPresent(this::startHealthCheck); } + private void applyCodec(Configuration config) { + if (headers == null) { + throw new IllegalStateException("Headers map is not initialized"); + } + + List acceptHeader = new ArrayList<>(); + switch (config.getContentType()) { + case MSGPACK: + acceptHeader.add("application/msgpack"); + break; + case JSON: + acceptHeader.add("application/json"); + break; + } + headers.put("Accept", acceptHeader); + + List acceptEncodingHeader = new ArrayList<>(); + switch (config.getCompressMethod()) { + case GZIP: + acceptEncodingHeader.add("gzip"); + break; + case ZSTD: + acceptEncodingHeader.add("zstd"); + break; + case SNAPPY: + acceptEncodingHeader.add("snappy"); + break; + } + headers.put("Accept-Encoding", acceptEncodingHeader); + } + /** * Health Check * Start schedule task(period 10s) to ping all server url diff --git a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java index 32523c47..c8bba286 100644 --- a/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java +++ b/opengemini-client/src/main/java/io/opengemini/client/impl/OpenGeminiClient.java @@ -16,21 +16,19 @@ package io.opengemini.client.impl; +import com.sun.org.apache.bcel.internal.generic.RETURN; import io.github.openfacade.http.BasicAuthRequestFilter; import io.github.openfacade.http.HttpClient; import io.github.openfacade.http.HttpClientConfig; import io.github.openfacade.http.HttpClientFactory; import io.github.openfacade.http.HttpResponse; -import io.opengemini.client.api.AuthConfig; -import io.opengemini.client.api.AuthType; -import io.opengemini.client.api.Configuration; -import io.opengemini.client.api.OpenGeminiException; -import io.opengemini.client.api.Pong; -import io.opengemini.client.api.Query; -import io.opengemini.client.api.QueryResult; +import io.opengemini.client.api.*; import io.opengemini.client.common.BaseAsyncClient; import io.opengemini.client.common.HeaderConst; import io.opengemini.client.common.JacksonService; +import io.opengemini.client.common.compress.GzipCompressor; +import io.opengemini.client.common.compress.SnappyCompressor; +import io.opengemini.client.common.compress.ZstdCompressor; import org.jetbrains.annotations.NotNull; import java.io.IOException; @@ -122,22 +120,37 @@ protected CompletableFuture executePing() { } private T processResponseBody(HttpResponse response, Class type) throws IOException { - String contentType = response.headers().get("Content-Type").get(0); - byte[] body = response.body(); - if (contentType.contains("application/x-gzip")) { - // Handle gzip content type -// body = GzipService.decompress(body); + String contentType = response.headers().get("Content-Type") != null ? response.headers().get("Content-Type").get(0) : null; + String contentEncoding = response.headers().get("Content-Encoding") != null ? response.headers().get("Content-Encoding").get(0) : null; + byte[] body = processCompression(contentEncoding, response.body(), type); + + return processContentType(contentType, body, type); + } + + private byte[] processCompression(String compressMethod, byte[] body, Class type ) throws IOException { + byte[] decompressedBody = null; + if (CompressMethod.GZIP.getValue().equals(compressMethod)) { + GzipCompressor compressor = new GzipCompressor(); + decompressedBody = compressor.decompress(body); + } else if (CompressMethod.SNAPPY.getValue().equals(compressMethod)) { + SnappyCompressor compressor = new SnappyCompressor(); + decompressedBody = compressor.decompress(body); + + } else if (CompressMethod.ZSTD.getValue().equals(compressMethod)) { + ZstdCompressor compressor = new ZstdCompressor(); + decompressedBody = compressor.decompress(body); } - if (contentType.contains("application/msgpack")) { - // Handle msgpack content type - //return MsgPackService.toObject(body, type); - } else if (contentType.contains("application/json")) { - // Handle JSON content type + return decompressedBody != null ? decompressedBody : body; + } + + private T processContentType(String contentType, byte[] body, Class type) throws IOException { + if (ContentType.JSON.getValue().equals(contentType)) { return JacksonService.toObject(body, type); + } else if (ContentType.MSGPACK.getValue().equals(contentType)) { + throw new IOException("Unsupported content type: " + contentType); } - // Default handling - return JacksonService.toObject(body, type); + return JacksonService.toObject(body, type); } public CompletableFuture get(String url) { diff --git a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java index f37594d4..9396608e 100644 --- a/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java +++ b/opengemini-client/src/test/java/io/opengemini/client/impl/OpenGeminiClientWriteTest.java @@ -18,14 +18,7 @@ import io.github.openfacade.http.HttpClientConfig; import io.github.openfacade.http.HttpClientEngine; -import io.opengemini.client.api.Address; -import io.opengemini.client.api.Configuration; -import io.opengemini.client.api.OpenGeminiException; -import io.opengemini.client.api.Point; -import io.opengemini.client.api.Query; -import io.opengemini.client.api.QueryResult; -import io.opengemini.client.api.RpConfig; -import io.opengemini.client.api.Series; +import io.opengemini.client.api.*; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestInstance; @@ -64,6 +57,21 @@ protected List clientList() throws OpenGeminiException { .build(); clients.add(OpenGeminiClientFactory.create(configuration)); } + + List compressMethods = Arrays.asList(CompressMethod.GZIP, CompressMethod.ZSTD, CompressMethod.SNAPPY); + for (CompressMethod compressMethod : compressMethods) { + HttpClientConfig httpConfig = new HttpClientConfig.Builder() + .engine(HttpClientEngine.AsyncHttpClient) + .connectTimeout(Duration.ofSeconds(3)) + .timeout(Duration.ofSeconds(3)) + .build(); + Configuration configuration = Configuration.builder() + .addresses(Collections.singletonList(new Address("127.0.0.1", 8086))) + .httpConfig(httpConfig) + .compressMethod(compressMethod) + .build(); + clients.add(OpenGeminiClientFactory.create(configuration)); + } return clients; }