Skip to content

Commit

Permalink
feat: api for managing databases and retention policies
Browse files Browse the repository at this point in the history
Signed-off-by: L-UOjin <[email protected]>
  • Loading branch information
L-uoJin committed Nov 26, 2023
1 parent 03251f9 commit da4d1e8
Show file tree
Hide file tree
Showing 8 changed files with 485 additions and 0 deletions.
29 changes: 29 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ type Client interface {
// Ping check that status of cluster.
Ping(idx int) error
Query(query Query) (*QueryResult, error)
// Create database
CreateDatabase(database string) error
// Create database with retention policy
// rpConfig configuration information for retention policy
CreateDatabaseWithRp(database string, rpConfig RpConfig) error
// Show database
ShowDatabase() ([]string, error)
// Drop Database
DropDatabase(database string) error
// Create retention policy
// rpConfig configuration information for retention policy
// isDefault can set the new retention policy as the default retention policy for the database
CreateRetentionPolicy(database string, rpConfig RpConfig, isDefault bool) error
// Show retention policy
ShowRetentionPolicy(database string) ([]RetentionPolicy, error)
// Drop retention policy
DropRetentionPolicy(rp string,database string) error
}

// Config is used to construct a openGemini Client instance.
Expand Down Expand Up @@ -66,6 +83,18 @@ type BatchConfig struct {
BatchSize int
}

// RpConfig represents the configuration information for retention policy
type RpConfig struct {
// Name retention policy name
Name string
// Duration indicates how long the data will be retained
Duration string
// ShardGroupDuration determine the time range for sharding groups
ShardGroupDuration string
// IndexDuration determines the time range of the index group
IndexDuration string
}

// NewClient Creates a openGemini client instance
func NewClient(config *Config) (Client, error) {
return newClient(config)
Expand Down
99 changes: 99 additions & 0 deletions opengemini/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package opengemini

import (
"errors"
"fmt"
"strings"
)

func (c *client) CreateDatabase(database string) error {
if len(database) == 0 {
return errors.New("empty database name")
}

cmd := fmt.Sprintf("CREATE DATABASE %s", database)
queryResult, err := c.queryPost(Query{Command: cmd})
if err != nil {
return err
}

err = queryResult.ResultError()
if err != nil {
return fmt.Errorf("create database err: %s", err)
}

return nil
}

func (c *client) CreateDatabaseWithRp(database string, rpConfig RpConfig) error {
if len(database) == 0 {
return errors.New("empty database name")
}

var buf strings.Builder
buf.WriteString(fmt.Sprintf("CREATE DATABASE %s WITH DURATION %s REPLICATION 1", database, rpConfig.Duration))
if len(rpConfig.ShardGroupDuration) > 0 {
buf.WriteString(fmt.Sprintf(" SHARD DURATION %s", rpConfig.ShardGroupDuration))
}
if len(rpConfig.IndexDuration) > 0 {
buf.WriteString(fmt.Sprintf(" INDEX DURATION %s", rpConfig.IndexDuration))
}
buf.WriteString(fmt.Sprintf(" NAME %s", rpConfig.Name))
queryResult, err := c.queryPost(Query{Command: buf.String()})
if err != nil {
return err
}

err = queryResult.ResultError()
if err != nil {
return fmt.Errorf("create database err: %s", err)
}

return nil
}

func (c *client) ShowDatabase() ([]string, error) {
var (
ShowDatabases = "SHOW DATABASES"
dbResult = make([]string, 0)
)
queryResult, err := c.Query(Query{Command: ShowDatabases})
if err != nil {
return nil, err
}
if len(queryResult.Error) > 0 {
return nil, fmt.Errorf("show datababse err: %s", queryResult.Error)
}
if len(queryResult.Results) == 0 || len(queryResult.Results[0].Series) == 0 {
return dbResult, nil
}

for _, v := range queryResult.Results[0].Series[0].Values {
if len(v) == 0 {
continue
}
val, ok := v[0].(string)
if !ok {
continue
}
dbResult = append(dbResult, val)
}
return dbResult, nil
}

// Drop Database
func (c *client) DropDatabase(database string) error {
if len(database) == 0 {
return errors.New("empty database name")
}
cmd := fmt.Sprintf("DROP DATABASE %s", database)
queryResult, err := c.queryPost(Query{Command: cmd})
if err != nil {
return err
}
err = queryResult.ResultError()
if err != nil {
return fmt.Errorf("drop database err: %s", err)
}
return nil
}
94 changes: 94 additions & 0 deletions opengemini/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package opengemini

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestClientCreateDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.CreateDatabase("test4_database")
assert.Nil(t, err)
}
func TestClientCreateDatabaseEmptyDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.CreateDatabase("")
assert.NotNil(t, err)
}

func TestClientCreateDatabaseWithRp(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.CreateDatabaseWithRp("test4_database", RpConfig{Name: "test4", Duration: "1d", ShardGroupDuration: "1h", IndexDuration: "7h"})
assert.Nil(t, err)
}

func TestClientCreateDatabaseWithRpInvalid(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.CreateDatabaseWithRp("test4_database", RpConfig{Name: "test4", Duration: "1", ShardGroupDuration: "1h", IndexDuration: "7h"})
assert.NotNil(t, err)
}

func TestClientCreateDatabaseWithRpEmptyDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.CreateDatabaseWithRp("", RpConfig{Name: "test4", Duration: "1h", ShardGroupDuration: "1h", IndexDuration: "7h"})
assert.NotNil(t, err)
}


func TestClientShowDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
_, err := c.ShowDatabase()
assert.Nil(t,err)
}

func TestClientDropDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.DropDatabase("vvv_database")
assert.Nil(t,err)
}

func TestClientDropDatabaseEmptyDatabase(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []*Address{{
Host: "localhost",
Port: 8086,
}},
})
err := c.DropDatabase("")
assert.NotNil(t,err)
}
4 changes: 4 additions & 0 deletions opengemini/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func (c *client) executeHttpGet(urlPath string, details requestDetails) (*http.R
return c.executeHttpRequest(http.MethodGet, urlPath, details)
}

func (c *client) executeHttpPost(urlPath string, details requestDetails) (*http.Response, error) {
return c.executeHttpRequest(http.MethodPost, urlPath, details)
}

func (c *client) executeHttpRequest(method, urlPath string, details requestDetails) (*http.Response, error) {
idx := int(c.currentIdx.Add(1)) % len(c.serverUrls)
return c.executeHttpRequestInner(method, c.serverUrls[idx], urlPath, details)
Expand Down
25 changes: 25 additions & 0 deletions opengemini/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,28 @@ func (c *client) Query(q Query) (*QueryResult, error) {
}
return qr, nil
}

func (c *client) queryPost(q Query) (*QueryResult, error) {
req := requestDetails{
queryValues: make(map[string][]string),
}

req.queryValues.Add("db", q.Database)
req.queryValues.Add("q", q.Command)
resp, err := c.executeHttpPost(UrlQuery, req)
if err != nil {
return nil, err
}

defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var qr = new(QueryResult)
err = json.Unmarshal(body, qr)
if err != nil {
return nil, err
}
return qr, nil
}
14 changes: 14 additions & 0 deletions opengemini/query_result.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package opengemini

import "errors"

// SeriesResult contains the results of a series query
type SeriesResult struct {
Series []Series `json:"series,omitempty"`
Expand All @@ -11,3 +13,15 @@ type QueryResult struct {
Results []SeriesResult `json:"results,omitempty"`
Error string `json:"error,omitempty"`
}

func (result *QueryResult) ResultError() error {
if len(result.Error) > 0 {
return errors.New(result.Error)
}
for _,res := range result.Results{
if len(res.Error)>0 {
return errors.New(res.Error)
}
}
return nil
}
Loading

0 comments on commit da4d1e8

Please sign in to comment.