diff --git a/examples/example/example.go b/examples/example/example.go index acbb802..832414c 100644 --- a/examples/example/example.go +++ b/examples/example/example.go @@ -36,6 +36,7 @@ func main() { Host: "127.0.0.1", Port: 8086, }}, + Codec: opengemini.CodecMsgPack, } client, err := opengemini.NewClient(config) if err != nil { diff --git a/go.mod b/go.mod index 08da835..c7feae7 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/libgox/unicodex v0.1.0 github.com/prometheus/client_golang v1.20.5 github.com/stretchr/testify v1.9.0 + github.com/vmihailenco/msgpack/v5 v5.4.1 ) require ( @@ -21,6 +22,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect golang.org/x/sys v0.22.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 0f730af..c07f4a9 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,10 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= diff --git a/opengemini/client.go b/opengemini/client.go index 117dc12..fa77ec6 100644 --- a/opengemini/client.go +++ b/opengemini/client.go @@ -31,6 +31,14 @@ const ( AuthTypeToken ) +type Codec string + +// Define constants for different encode/decode config + +const ( + CodecMsgPack Codec = "MsgPack" +) + // Client represents a openGemini client. type Client interface { // Ping check that status of cluster. @@ -136,6 +144,8 @@ type Config struct { MaxIdleConnsPerHost int // GzipEnabled determines whether to use gzip for data transmission. GzipEnabled bool + // Codec determines the Codec mode used for data transmission. + Codec Codec // TlsConfig configuration information for tls. TlsConfig *tls.Config // CustomMetricsLabels add custom labels to all the metrics reported by this client instance diff --git a/opengemini/query.go b/opengemini/query.go index 727f712..762747b 100644 --- a/opengemini/query.go +++ b/opengemini/query.go @@ -20,6 +20,8 @@ import ( "io" "net/http" "time" + + "github.com/vmihailenco/msgpack/v5" ) type Query struct { @@ -31,13 +33,12 @@ type Query struct { // Query sends a command to the server func (c *client) Query(q Query) (*QueryResult, error) { - req := requestDetails{ - queryValues: make(map[string][]string), - } - req.queryValues.Add("db", q.Database) - req.queryValues.Add("q", q.Command) - req.queryValues.Add("rp", q.RetentionPolicy) - req.queryValues.Add("epoch", q.Precision.Epoch()) + req := buildRequestDetails(c.config, func(req *requestDetails) { + req.queryValues.Add("db", q.Database) + req.queryValues.Add("q", q.Command) + req.queryValues.Add("rp", q.RetentionPolicy) + req.queryValues.Add("epoch", q.Precision.Epoch()) + }) // metric c.metrics.queryCounter.Add(1) @@ -53,34 +54,55 @@ func (c *client) Query(q Query) (*QueryResult, error) { if err != nil { return nil, errors.New("query request failed, error: " + err.Error()) } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + qr, err := retrieveQueryResFromResp(resp) if err != nil { - return nil, errors.New("query resp read failed, error: " + err.Error()) + return nil, err } - if resp.StatusCode != http.StatusOK { - return nil, errors.New("query error resp, code: " + resp.Status + "body: " + string(body)) + return qr, nil +} + +func (c *client) queryPost(q Query) (*QueryResult, error) { + req := buildRequestDetails(c.config, func(req *requestDetails) { + req.queryValues.Add("db", q.Database) + req.queryValues.Add("q", q.Command) + }) + + resp, err := c.executeHttpPost(UrlQuery, req) + if err != nil { + return nil, errors.New("request failed, error: " + err.Error()) } - var qr = new(QueryResult) - err = json.Unmarshal(body, qr) + qr, err := retrieveQueryResFromResp(resp) if err != nil { - return nil, errors.New("query unmarshal resp body failed, error: " + err.Error()) + return nil, err } return qr, nil } -func (c *client) queryPost(q Query) (*QueryResult, error) { +func buildRequestDetails(c *Config, requestModifier func(*requestDetails)) requestDetails { req := requestDetails{ queryValues: make(map[string][]string), } - req.queryValues.Add("db", q.Database) - req.queryValues.Add("q", q.Command) - resp, err := c.executeHttpPost(UrlQuery, req) - if err != nil { - return nil, errors.New("request failed, error: " + err.Error()) + applyCodec(&req, c) + + if requestModifier != nil { + requestModifier(&req) } + return req +} + +func applyCodec(req *requestDetails, config *Config) { + if config.Codec == CodecMsgPack { + if req.header == nil { + req.header = make(http.Header) + } + req.header.Set("Accept", "application/x-msgpack") + } +} + +// retrieve query result from the response +func retrieveQueryResFromResp(resp *http.Response) (*QueryResult, error) { defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { @@ -89,10 +111,18 @@ func (c *client) queryPost(q Query) (*QueryResult, error) { if resp.StatusCode != http.StatusOK { return nil, errors.New("error resp, code: " + resp.Status + "body: " + string(body)) } + contentType := resp.Header.Get("Content-Type") var qr = new(QueryResult) - err = json.Unmarshal(body, qr) - if err != nil { - return nil, errors.New("unmarshal resp body failed, error: " + err.Error()) + if contentType == "application/x-msgpack" { + err = msgpack.Unmarshal(body, qr) + if err != nil { + return nil, errors.New("unmarshal msgpack body failed, error: " + err.Error()) + } + } else { + err = json.Unmarshal(body, qr) + if err != nil { + return nil, errors.New("unmarshal json body failed, error: " + err.Error()) + } } return qr, nil } diff --git a/opengemini/query_result.go b/opengemini/query_result.go index fd1df5c..4649f8d 100644 --- a/opengemini/query_result.go +++ b/opengemini/query_result.go @@ -20,14 +20,14 @@ const RpColumnLen = 8 // SeriesResult contains the results of a series query type SeriesResult struct { - Series []*Series `json:"series,omitempty"` - Error string `json:"error,omitempty"` + Series []*Series `json:"series,omitempty" msgpack:"series,omitempty"` + Error string `json:"error,omitempty" msgpack:"error,omitempty"` } // QueryResult is the top-level struct type QueryResult struct { - Results []*SeriesResult `json:"results,omitempty"` - Error string `json:"error,omitempty"` + Results []*SeriesResult `json:"results,omitempty" msgpack:"results,omitempty"` + Error string `json:"error,omitempty" msgpack:"error,omitempty"` } func (result *QueryResult) hasError() error { diff --git a/opengemini/query_test.go b/opengemini/query_test.go index ef84d79..20c7c20 100644 --- a/opengemini/query_test.go +++ b/opengemini/query_test.go @@ -15,9 +15,11 @@ package opengemini import ( + "fmt" "testing" "time" + "github.com/libgox/addr" "github.com/stretchr/testify/assert" ) @@ -69,6 +71,63 @@ func TestQueryWithEpoch(t *testing.T) { assert.Equal(t, length, getTimestampLength(v)) } } +func TestQueryWithMsgPack(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []addr.Address{{ + Host: "localhost", + Port: 8086, + }}, + Codec: CodecMsgPack, + }) + + // create a test database with rand suffix + database := randomDatabaseName() + err := c.CreateDatabase(database) + assert.Nil(t, err) + + // delete test database before exit test case + defer func() { + err := c.DropDatabase(database) + assert.Nil(t, err) + }() + + testMeasurement := randomMeasurement() + p := &Point{} + p.Measurement = testMeasurement + p.AddField("TestField", 123) + p.Time = time.Now() + + err = c.WritePoint(database, p, func(err error) { + assert.Nil(t, err) + }) + assert.Nil(t, err) + + time.Sleep(time.Second * 5) + + PrecisionTimestampLength := make(map[Precision]int64) + PrecisionTimestampLength[PrecisionNanosecond] = 19 + PrecisionTimestampLength[PrecisionMicrosecond] = 16 + PrecisionTimestampLength[PrecisionMillisecond] = 13 + PrecisionTimestampLength[PrecisionSecond] = 10 + PrecisionTimestampLength[PrecisionMinute] = 8 + PrecisionTimestampLength[PrecisionHour] = 6 + + // check whether write success + for precision, length := range PrecisionTimestampLength { + q := Query{ + Database: database, + Command: "select * from " + testMeasurement, + Precision: precision, + } + result, err := c.Query(q) + assert.Nil(t, err) + v, err := convertToInt64(result.Results[0].Series[0].Values[0][0]) + if err != nil { + t.Fatalf("conversion error: %v", err) + } + assert.Equal(t, length, getTimestampLength(v)) + } +} func getTimestampLength(timestamp int64) int64 { var length int64 = 0 @@ -77,3 +136,16 @@ func getTimestampLength(timestamp int64) int64 { } return length } + +func convertToInt64(value interface{}) (int64, error) { + switch val := value.(type) { + case float64: + return int64(val), nil + case int64: + return val, nil + case int32: + return int64(val), nil + default: + return 0, fmt.Errorf("unsupported type: %T", value) + } +} diff --git a/opengemini/series.go b/opengemini/series.go index 5afdbd2..b7f6bc2 100644 --- a/opengemini/series.go +++ b/opengemini/series.go @@ -20,8 +20,8 @@ type SeriesValues []SeriesValue // Series defines the structure for series data type Series struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Columns []string `json:"columns,omitempty"` - Values SeriesValues `json:"values,omitempty"` + Name string `json:"name,omitempty" msgpack:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty" msgpack:"tags,omitempty"` + Columns []string `json:"columns,omitempty" msgpack:"columns,omitempty"` + Values SeriesValues `json:"values,omitempty" msgpack:"values,omitempty"` }