diff --git a/opengemini/client.go b/opengemini/client.go index ecae91e..f9ae54d 100644 --- a/opengemini/client.go +++ b/opengemini/client.go @@ -16,6 +16,23 @@ type Client interface { // Ping check that status of cluster. Ping(idx int) error Query(query Query) (*QueryResult, error) + // Create database + CreateDatabase(database string) error + // Create database with retention policy + // rpConfig configuration information for retention policy + CreateDatabaseWithRp(database string, rpConfig RpConfig) error + // Show database + ShowDatabase() ([]string, error) + // Drop Database + DropDatabase(database string) error + // Create retention policy + // rpConfig configuration information for retention policy + // isDefault can set the new retention policy as the default retention policy for the database + CreateRetentionPolicy(database string, rpConfig RpConfig, isDefault bool) error + // Show retention policy + ShowRetentionPolicy(database string) ([]RetentionPolicy, error) + // Drop retention policy + DropRetentionPolicy(rp string,database string) error } // Config is used to construct a openGemini Client instance. @@ -66,6 +83,18 @@ type BatchConfig struct { BatchSize int } +// RpConfig represents the configuration information for retention policy +type RpConfig struct { + // Name retention policy name + Name string + // Duration indicates how long the data will be retained + Duration string + // ShardGroupDuration determine the time range for sharding groups + ShardGroupDuration string + // IndexDuration determines the time range of the index group + IndexDuration string +} + // NewClient Creates a openGemini client instance func NewClient(config *Config) (Client, error) { return newClient(config) diff --git a/opengemini/database.go b/opengemini/database.go new file mode 100644 index 0000000..aa7bd3a --- /dev/null +++ b/opengemini/database.go @@ -0,0 +1,99 @@ +package opengemini + +import ( + "errors" + "fmt" + "strings" +) + +func (c *client) CreateDatabase(database string) error { + if len(database) == 0 { + return errors.New("empty database name") + } + + cmd := fmt.Sprintf("CREATE DATABASE %s", database) + queryResult, err := c.queryPost(Query{Command: cmd}) + if err != nil { + return err + } + + err = queryResult.ResultError() + if err != nil { + return fmt.Errorf("create database err: %s", err) + } + + return nil +} + +func (c *client) CreateDatabaseWithRp(database string, rpConfig RpConfig) error { + if len(database) == 0 { + return errors.New("empty database name") + } + + var buf strings.Builder + buf.WriteString(fmt.Sprintf("CREATE DATABASE %s WITH DURATION %s REPLICATION 1", database, rpConfig.Duration)) + if len(rpConfig.ShardGroupDuration) > 0 { + buf.WriteString(fmt.Sprintf(" SHARD DURATION %s", rpConfig.ShardGroupDuration)) + } + if len(rpConfig.IndexDuration) > 0 { + buf.WriteString(fmt.Sprintf(" INDEX DURATION %s", rpConfig.IndexDuration)) + } + buf.WriteString(fmt.Sprintf(" NAME %s", rpConfig.Name)) + queryResult, err := c.queryPost(Query{Command: buf.String()}) + if err != nil { + return err + } + + err = queryResult.ResultError() + if err != nil { + return fmt.Errorf("create database err: %s", err) + } + + return nil +} + +func (c *client) ShowDatabase() ([]string, error) { + var ( + ShowDatabases = "SHOW DATABASES" + dbResult = make([]string, 0) + ) + queryResult, err := c.Query(Query{Command: ShowDatabases}) + if err != nil { + return nil, err + } + if len(queryResult.Error) > 0 { + return nil, fmt.Errorf("show datababse err: %s", queryResult.Error) + } + if len(queryResult.Results) == 0 || len(queryResult.Results[0].Series) == 0 { + return dbResult, nil + } + + for _, v := range queryResult.Results[0].Series[0].Values { + if len(v) == 0 { + continue + } + val, ok := v[0].(string) + if !ok { + continue + } + dbResult = append(dbResult, val) + } + return dbResult, nil +} + +// Drop Database +func (c *client) DropDatabase(database string) error { + if len(database) == 0 { + return errors.New("empty database name") + } + cmd := fmt.Sprintf("DROP DATABASE %s", database) + queryResult, err := c.queryPost(Query{Command: cmd}) + if err != nil { + return err + } + err = queryResult.ResultError() + if err != nil { + return fmt.Errorf("drop database err: %s", err) + } + return nil +} diff --git a/opengemini/database_test.go b/opengemini/database_test.go new file mode 100644 index 0000000..c661505 --- /dev/null +++ b/opengemini/database_test.go @@ -0,0 +1,94 @@ +package opengemini + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestClientCreateDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabase("test4_database") + assert.Nil(t, err) +} +func TestClientCreateDatabaseEmptyDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabase("") + assert.NotNil(t, err) +} + +func TestClientCreateDatabaseWithRp(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabaseWithRp("test4_database", RpConfig{Name: "test4", Duration: "1d", ShardGroupDuration: "1h", IndexDuration: "7h"}) + assert.Nil(t, err) +} + +func TestClientCreateDatabaseWithRpInvalid(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabaseWithRp("test4_database", RpConfig{Name: "test4", Duration: "1", ShardGroupDuration: "1h", IndexDuration: "7h"}) + assert.NotNil(t, err) +} + +func TestClientCreateDatabaseWithRpEmptyDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabaseWithRp("", RpConfig{Name: "test4", Duration: "1h", ShardGroupDuration: "1h", IndexDuration: "7h"}) + assert.NotNil(t, err) +} + + +func TestClientShowDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + _, err := c.ShowDatabase() + assert.Nil(t,err) +} + +func TestClientDropDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.DropDatabase("vvv_database") + assert.Nil(t,err) +} + +func TestClientDropDatabaseEmptyDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.DropDatabase("") + assert.NotNil(t,err) +} diff --git a/opengemini/http.go b/opengemini/http.go index 523c9b5..a3ceac5 100644 --- a/opengemini/http.go +++ b/opengemini/http.go @@ -54,6 +54,10 @@ func (c *client) executeHttpGet(urlPath string, details requestDetails) (*http.R return c.executeHttpRequest(http.MethodGet, urlPath, details) } +func (c *client) executeHttpPost(urlPath string, details requestDetails) (*http.Response, error) { + return c.executeHttpRequest(http.MethodPost, urlPath, details) +} + func (c *client) executeHttpRequest(method, urlPath string, details requestDetails) (*http.Response, error) { idx := int(c.currentIdx.Add(1)) % len(c.serverUrls) return c.executeHttpRequestInner(method, c.serverUrls[idx], urlPath, details) diff --git a/opengemini/query.go b/opengemini/query.go index 94d6280..b368a6d 100644 --- a/opengemini/query.go +++ b/opengemini/query.go @@ -35,3 +35,28 @@ func (c *client) Query(q Query) (*QueryResult, error) { } return qr, nil } + +func (c *client) queryPost(q Query) (*QueryResult, error) { + req := requestDetails{ + queryValues: make(map[string][]string), + } + + req.queryValues.Add("db", q.Database) + req.queryValues.Add("q", q.Command) + resp, err := c.executeHttpPost(UrlQuery, req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + var qr = new(QueryResult) + err = json.Unmarshal(body, qr) + if err != nil { + return nil, err + } + return qr, nil +} diff --git a/opengemini/query_result.go b/opengemini/query_result.go index 0349243..1a57573 100644 --- a/opengemini/query_result.go +++ b/opengemini/query_result.go @@ -1,5 +1,7 @@ package opengemini +import "errors" + // SeriesResult contains the results of a series query type SeriesResult struct { Series []Series `json:"series,omitempty"` @@ -11,3 +13,15 @@ type QueryResult struct { Results []SeriesResult `json:"results,omitempty"` Error string `json:"error,omitempty"` } + +func (result *QueryResult) ResultError() error { + if len(result.Error) > 0 { + return errors.New(result.Error) + } + for _,res := range result.Results{ + if len(res.Error)>0 { + return errors.New(res.Error) + } + } + return nil +} diff --git a/opengemini/retention_policy.go b/opengemini/retention_policy.go new file mode 100644 index 0000000..8a4d445 --- /dev/null +++ b/opengemini/retention_policy.go @@ -0,0 +1,149 @@ +package opengemini + +import ( + "errors" + "fmt" + "strings" +) + +// RetentionPolicy defines the structure for retention policy info +type RetentionPolicy struct { + Name string + Duration string + ShardGroupDuration string + HotDuration string + WarmDuration string + IndexDuration string + ReplicaNum int64 + IsDefault bool +} + +// CreateRp Create retention policy +func (c *client) CreateRetentionPolicy(database string, rpConfig RpConfig, isDefault bool) error { + if len(database) == 0 { + return errors.New("empty database name") + } + var buf strings.Builder + buf.WriteString(fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION %s REPLICATION 1", rpConfig.Name, database, rpConfig.Duration)) + if len(rpConfig.ShardGroupDuration) > 0 { + buf.WriteString(fmt.Sprintf(" SHARD DURATION %s", rpConfig.ShardGroupDuration)) + } + if len(rpConfig.IndexDuration) > 0 { + buf.WriteString(fmt.Sprintf(" INDEX DURATION %s", rpConfig.IndexDuration)) + } + if isDefault { + buf.WriteString(" DEFAULT") + } + + queryResult, err := c.queryPost(Query{Command: buf.String()}) + if err != nil { + return err + } + + err = queryResult.ResultError() + if err != nil { + return fmt.Errorf("create retention policy err: %s", err) + } + + return nil +} + +// ShowRp Show retention policy +func (c *client) ShowRetentionPolicy(database string) ([]RetentionPolicy, error) { + var ( + ShowRetentionPolicy = "SHOW RETENTION POLICIES" + rpResult = make([]RetentionPolicy, 0) + ) + if len(database) == 0 { + return nil, errors.New("empty database name") + } + + queryResult, err := c.Query(Query{Database: database, Command: ShowRetentionPolicy}) + if err != nil { + return nil, err + } + + err = queryResult.ResultError() + if err != nil { + return rpResult, fmt.Errorf("show retention policy err: %s", err) + } + + if len(queryResult.Results) == 0 { + return rpResult, nil + } + if len(queryResult.Results[0].Series) == 0 { + return rpResult, nil + } + rpResult = convertRetentionPolicy(queryResult) + return rpResult, nil +} + +func convertRetentionPolicy(queryResult *QueryResult) []RetentionPolicy { + var ( + retentionPolicy = make([]RetentionPolicy, 0) + rpColumnLen = 8 + ) + if len(queryResult.Results) == 0 || len(queryResult.Results[0].Series) == 0 { + return retentionPolicy + } + + for _, v := range queryResult.Results[0].Series[0].Values { + if len(v) < rpColumnLen { + break + } + var ( + ok bool + replicaNum float64 + ) + rp := new(RetentionPolicy) + if rp.Name, ok = v[0].(string); !ok { + break + } + + if rp.Duration, ok = v[1].(string); !ok { + break + } + if rp.ShardGroupDuration, ok = v[2].(string); !ok { + break + } + if rp.HotDuration, ok = v[3].(string); !ok { + break + } + if rp.WarmDuration, ok = v[4].(string); !ok { + break + } + if rp.IndexDuration, ok = v[5].(string); !ok { + break + } + if replicaNum, ok = v[6].(float64); !ok { + break + } + rp.ReplicaNum = int64(replicaNum) + if rp.IsDefault, ok = v[7].(bool); !ok { + break + } + retentionPolicy = append(retentionPolicy, *rp) + } + return retentionPolicy +} + +// DropRp Drop retention policy +func (c *client) DropRetentionPolicy(rp string, database string) error { + if len(rp) == 0 { + return errors.New("empty retention policy") + } + if len(database) == 0 { + return errors.New("empty database name") + } + + cmd := fmt.Sprintf("DROP RETENTION POLICY %s ON %s", rp, database) + queryResult, err := c.queryPost(Query{Command: cmd}) + if err != nil { + return err + } + err = queryResult.ResultError() + if err != nil { + return fmt.Errorf("drop retention policy err: %s", err) + } + return nil +} diff --git a/opengemini/retention_policy_test.go b/opengemini/retention_policy_test.go new file mode 100644 index 0000000..c9397c0 --- /dev/null +++ b/opengemini/retention_policy_test.go @@ -0,0 +1,71 @@ +package opengemini + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestClientCreateRetentionPolicy(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateDatabase("test_database") + assert.Nil(t, err) + err = c.CreateRetentionPolicy("test_database",RpConfig{Name: "test_rp1",Duration: "3d"},false) + assert.Nil(t, err) + err = c.CreateRetentionPolicy("test_database",RpConfig{Name: "test_rp2",Duration: "3d",ShardGroupDuration: "1h"},false) + assert.Nil(t, err) + err = c.CreateRetentionPolicy("test_database",RpConfig{Name: "test_rp3",Duration: "3d",ShardGroupDuration: "1h",IndexDuration: "7h"},false) + assert.Nil(t, err) + err = c.CreateRetentionPolicy("test_database",RpConfig{Name: "test_rp4",Duration: "3d"},true) + assert.Nil(t, err) +} + +func TestClientCreateRetentionPolicyNotExistDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateRetentionPolicy("test_db",RpConfig{Name: "test_rp1",Duration: "3d"},false) + assert.NotNil(t, err) +} + +func TestClientCreateRetentionPolicyEmptyDatabase(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.CreateRetentionPolicy("",RpConfig{Name: "test_rp1",Duration: "3d"},false) + assert.NotNil(t, err) +} + + +func TestClientDropRetentionPolicy(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + err := c.DropRetentionPolicy("test_rp1","test_database") + assert.Nil(t,err) +} + +func TestClientShowRetentionPolicy(t *testing.T) { + c := testNewClient(t, &Config{ + Addresses: []*Address{{ + Host: "localhost", + Port: 8086, + }}, + }) + rpResult,err := c.ShowRetentionPolicy("test_database") + assert.Nil(t,err) + assert.NotEqual(t,len(rpResult),0) +} \ No newline at end of file