Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(perf): enable config encoding for querying large results #191

Merged
merged 11 commits into from
Nov 7, 2024
Merged
1 change: 1 addition & 0 deletions examples/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
Host: "127.0.0.1",
Port: 8086,
}},
Codec: opengemini.CodecMsgPack,
}
client, err := opengemini.NewClient(config)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/libgox/unicodex v0.1.0
github.com/prometheus/client_golang v1.20.5
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
)

require (
Expand All @@ -21,6 +22,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
10 changes: 10 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ const (
AuthTypeToken
)

type Codec string

// Define constants for different encode/decode config

const (
CodecMsgPack Codec = "MsgPack"
)

// Client represents a openGemini client.
type Client interface {
// Ping check that status of cluster.
Expand Down Expand Up @@ -136,6 +144,8 @@ type Config struct {
MaxIdleConnsPerHost int
// GzipEnabled determines whether to use gzip for data transmission.
GzipEnabled bool
// Codec determines the Codec mode used for data transmission.
Codec Codec
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// CustomMetricsLabels add custom labels to all the metrics reported by this client instance
Expand Down
78 changes: 54 additions & 24 deletions opengemini/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"io"
"net/http"
"time"

"github.com/vmihailenco/msgpack/v5"
xkx9431 marked this conversation as resolved.
Show resolved Hide resolved
)

type Query struct {
Expand All @@ -31,13 +33,12 @@ type Query struct {

// Query sends a command to the server
func (c *client) Query(q Query) (*QueryResult, error) {
req := requestDetails{
queryValues: make(map[string][]string),
}
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
req.queryValues.Add("rp", q.RetentionPolicy)
req.queryValues.Add("epoch", q.Precision.Epoch())
req := buildRequestDetails(c.config, func(req *requestDetails) {
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
req.queryValues.Add("rp", q.RetentionPolicy)
req.queryValues.Add("epoch", q.Precision.Epoch())
})

// metric
c.metrics.queryCounter.Add(1)
Expand All @@ -53,34 +54,55 @@ func (c *client) Query(q Query) (*QueryResult, error) {
if err != nil {
return nil, errors.New("query request failed, error: " + err.Error())
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
qr, err := retrieveQueryResFromResp(resp)
if err != nil {
return nil, errors.New("query resp read failed, error: " + err.Error())
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, errors.New("query error resp, code: " + resp.Status + "body: " + string(body))
return qr, nil
}

func (c *client) queryPost(q Query) (*QueryResult, error) {
req := buildRequestDetails(c.config, func(req *requestDetails) {
req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
})

resp, err := c.executeHttpPost(UrlQuery, req)
if err != nil {
return nil, errors.New("request failed, error: " + err.Error())
}
var qr = new(QueryResult)
err = json.Unmarshal(body, qr)
qr, err := retrieveQueryResFromResp(resp)
if err != nil {
return nil, errors.New("query unmarshal resp body failed, error: " + err.Error())
return nil, err
}
return qr, nil
}

func (c *client) queryPost(q Query) (*QueryResult, error) {
func buildRequestDetails(c *Config, requestModifier func(*requestDetails)) requestDetails {
req := requestDetails{
queryValues: make(map[string][]string),
}

req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
resp, err := c.executeHttpPost(UrlQuery, req)
if err != nil {
return nil, errors.New("request failed, error: " + err.Error())
applyCodec(&req, c)

if requestModifier != nil {
requestModifier(&req)
}

return req
}

func applyCodec(req *requestDetails, config *Config) {
if config.Codec == CodecMsgPack {
if req.header == nil {
req.header = make(http.Header)
}
req.header.Set("Accept", "application/x-msgpack")
}
}

// retrieve query result from the response
func retrieveQueryResFromResp(resp *http.Response) (*QueryResult, error) {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
Expand All @@ -89,10 +111,18 @@ func (c *client) queryPost(q Query) (*QueryResult, error) {
if resp.StatusCode != http.StatusOK {
return nil, errors.New("error resp, code: " + resp.Status + "body: " + string(body))
}
contentType := resp.Header.Get("Content-Type")
var qr = new(QueryResult)
err = json.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal resp body failed, error: " + err.Error())
if contentType == "application/x-msgpack" {
err = msgpack.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal msgpack body failed, error: " + err.Error())
}
} else {
err = json.Unmarshal(body, qr)
if err != nil {
return nil, errors.New("unmarshal json body failed, error: " + err.Error())
}
}
return qr, nil
}
8 changes: 4 additions & 4 deletions opengemini/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ const RpColumnLen = 8

// SeriesResult contains the results of a series query
type SeriesResult struct {
Series []*Series `json:"series,omitempty"`
Error string `json:"error,omitempty"`
Series []*Series `json:"series,omitempty" msgpack:"series,omitempty"`
Error string `json:"error,omitempty" msgpack:"error,omitempty"`
}

// QueryResult is the top-level struct
type QueryResult struct {
Results []*SeriesResult `json:"results,omitempty"`
Error string `json:"error,omitempty"`
Results []*SeriesResult `json:"results,omitempty" msgpack:"results,omitempty"`
Error string `json:"error,omitempty" msgpack:"error,omitempty"`
}

func (result *QueryResult) hasError() error {
Expand Down
72 changes: 72 additions & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package opengemini

import (
"fmt"
"testing"
"time"

"github.com/libgox/addr"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -69,6 +71,63 @@ func TestQueryWithEpoch(t *testing.T) {
assert.Equal(t, length, getTimestampLength(v))
}
}
func TestQueryWithMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []addr.Address{{
Host: "localhost",
Port: 8086,
}},
Codec: CodecMsgPack,
})

// create a test database with rand suffix
database := randomDatabaseName()
err := c.CreateDatabase(database)
assert.Nil(t, err)

// delete test database before exit test case
defer func() {
err := c.DropDatabase(database)
assert.Nil(t, err)
}()

testMeasurement := randomMeasurement()
p := &Point{}
p.Measurement = testMeasurement
p.AddField("TestField", 123)
p.Time = time.Now()

err = c.WritePoint(database, p, func(err error) {
assert.Nil(t, err)
})
assert.Nil(t, err)

time.Sleep(time.Second * 5)

PrecisionTimestampLength := make(map[Precision]int64)
PrecisionTimestampLength[PrecisionNanosecond] = 19
PrecisionTimestampLength[PrecisionMicrosecond] = 16
PrecisionTimestampLength[PrecisionMillisecond] = 13
PrecisionTimestampLength[PrecisionSecond] = 10
PrecisionTimestampLength[PrecisionMinute] = 8
PrecisionTimestampLength[PrecisionHour] = 6

// check whether write success
for precision, length := range PrecisionTimestampLength {
q := Query{
Database: database,
Command: "select * from " + testMeasurement,
Precision: precision,
}
result, err := c.Query(q)
assert.Nil(t, err)
v, err := convertToInt64(result.Results[0].Series[0].Values[0][0])
if err != nil {
t.Fatalf("conversion error: %v", err)
}
assert.Equal(t, length, getTimestampLength(v))
}
}

func getTimestampLength(timestamp int64) int64 {
var length int64 = 0
Expand All @@ -77,3 +136,16 @@ func getTimestampLength(timestamp int64) int64 {
}
return length
}

func convertToInt64(value interface{}) (int64, error) {
switch val := value.(type) {
case float64:
return int64(val), nil
case int64:
return val, nil
case int32:
return int64(val), nil
default:
return 0, fmt.Errorf("unsupported type: %T", value)
}
}
8 changes: 4 additions & 4 deletions opengemini/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type SeriesValues []SeriesValue

// Series defines the structure for series data
type Series struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values SeriesValues `json:"values,omitempty"`
Name string `json:"name,omitempty" msgpack:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty" msgpack:"tags,omitempty"`
Columns []string `json:"columns,omitempty" msgpack:"columns,omitempty"`
Values SeriesValues `json:"values,omitempty" msgpack:"values,omitempty"`
}