Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support batch points write method #26

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Client interface {
Ping(idx int) error
Query(query Query) (*QueryResult, error)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep this blank line

// WriteBatchPoints batch points to assigned database
WriteBatchPoints(database string, bp *BatchPoints) error
// CreateDatabase Create database
CreateDatabase(database string) error
// CreateDatabaseWithRp Create database with retention policy
Expand Down
1 change: 1 addition & 0 deletions opengemini/url_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
UrlPing = "/ping"
UrlQuery = "/query"
UrlStatus = "/status"
UrlWrite = "/write"
)

var noAuthRequired = map[string]map[string]struct{}{
Expand Down
58 changes: 58 additions & 0 deletions opengemini/write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package opengemini

import (
"bytes"
"compress/gzip"
"errors"
"io"
"net/http"
"net/url"
)

func (c *client) WriteBatchPoints(database string, bp *BatchPoints) error {
var buffer bytes.Buffer

var writer io.Writer

if c.config.GzipEnabled {
writer = gzip.NewWriter(&buffer)
} else {
writer = &buffer
}
for _, p := range bp.Points {
if p == nil {
continue
}
if _, err := io.WriteString(writer, p.String()); err != nil {
return err
}
if _, err := writer.Write([]byte{'\n'}); err != nil {
return err
}
}
if closer, ok := writer.(io.Closer); ok {
if err := closer.Close(); err != nil {
return err
}
}

req := requestDetails{
queryValues: make(url.Values),
body: &buffer,
}
req.queryValues.Add("db", database)
resp, err := c.executeHttpPost(UrlWrite, req)
if err != nil {
return err
}

defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent {
reason, err := io.ReadAll(resp.Body)
if err != nil {
return errors.New("write failed and couldn't get the error for " + err.Error())
}
return errors.New("write failed for " + string(reason))
}
return nil
}
62 changes: 62 additions & 0 deletions opengemini/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package opengemini

import (
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestClient_Write(t *testing.T) {
c := testDefaultClient(t)

// create a test database with rand suffix
database := randomDatabaseName()
err := c.CreateDatabase(database)
assert.Nil(t, err)

// delete test database before exit test case
defer func() {
err := c.DropDatabase(database)
assert.Nil(t, err)
}()

bp := &BatchPoints{}
testMeasurement := randomMeasurement()
// point1 will write success with four kinds variant type field
point1 := &Point{}
point1.SetMeasurement(testMeasurement)
point1.AddTag("Tag", "Test1")
point1.AddField("stringField", "test1")
point1.AddField("intField", 897870)
point1.AddField("doubleField", 834.5433)
point1.AddField("boolField", true)
bp.AddPoint(point1)

// point2 will parse fail for having no field
point2 := &Point{}
point2.SetMeasurement(testMeasurement)
point2.AddTag("Tag", "Test2")
bp.AddPoint(point2)

// point3 will write success with timestamp
point3 := &Point{}
point3.SetMeasurement(testMeasurement)
point3.AddTag("Tag", "Test3")
point3.AddField("stringField", "test3")
point3.AddField("boolField", false)
point3.Time = time.Now()
bp.AddPoint(point3)

err = c.WriteBatchPoints(database, bp)
assert.Nil(t, err)

// check whether write success
q := Query{
Database: database,
Command: "select * from " + testMeasurement,
}
time.Sleep(time.Second * 5)
result, err := c.Query(q)
assert.Nil(t, err)
assert.Equal(t, 2, len(result.Results[0].Series[0].Values))
}
Loading