Skip to content

Commit

Permalink
feat: new write protocol implement
Browse files Browse the repository at this point in the history
Signed-off-by: Young Xu <[email protected]>
  • Loading branch information
xuthus5 committed Dec 8, 2024
1 parent ebbbf52 commit 02abab6
Show file tree
Hide file tree
Showing 8 changed files with 755 additions and 1 deletion.
28 changes: 28 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"crypto/tls"
"log/slog"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -69,6 +70,10 @@ type Client interface {
WriteBatchPoints(ctx context.Context, database string, bp []*Point) error
// WriteBatchPointsWithRp write batch points with retention policy
WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error
// WriteByGRPC write batch record to assigned database.retention_policy by gRPC.
// rbs RecordBuilderImpl, it will generate Record and write to the database.
// You'd better use NewRecordBuilder to build RecordBuilderImpl.
WriteByGRPC(ctx context.Context, rbs ...*RecordBuilderImpl) error

// CreateDatabase Create database
CreateDatabase(database string) error
Expand Down Expand Up @@ -166,6 +171,8 @@ type Config struct {
CustomMetricsLabels map[string]string
// Logger structured logger for logging operations
Logger *slog.Logger
// RPCConfig configuration information for write service by gRPC
RPCConfig *RPCConfig
}

// Address configuration for providing service.
Expand All @@ -176,6 +183,10 @@ type Address struct {
Port int
}

func (a *Address) String() string {
return a.Host + ":" + strconv.Itoa(a.Port)
}

// AuthType type of identity authentication.
type AuthType int

Expand Down Expand Up @@ -212,6 +223,23 @@ type RpConfig struct {
IndexDuration string
}

// RPCConfig represents the configuration information for write service by gRPC.
type RPCConfig struct {
// Addresses Configure the service endpoints for the openGemini grpc write service.
// This parameter is required.
Addresses []Address
// AuthConfig configuration information for authentication.
AuthConfig *AuthConfig
// BatchConfig configuration information for batch processing.
BatchConfig *BatchConfig
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// CompressMethod determines the compress method used for data transmission.
CompressMethod CompressMethod
// Timeout default 30s
Timeout time.Duration
}

// NewClient Creates a openGemini client instance
func NewClient(config *Config) (Client, error) {
return newClient(config)
Expand Down
10 changes: 9 additions & 1 deletion opengemini/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type client struct {
prevIdx atomic.Int32
dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB]
metrics *metrics
rpcClient *recordWriterClient

batchContext context.Context
batchContextCancel context.CancelFunc
Expand All @@ -48,7 +49,7 @@ type client struct {

func newClient(c *Config) (Client, error) {
if len(c.Addresses) == 0 {
return nil, errors.New("must have at least one address")
return nil, ErrEmptyAddress
}
if c.AuthConfig != nil {
if c.AuthConfig.AuthType == AuthTypeToken && len(c.AuthConfig.Token) == 0 {
Expand Down Expand Up @@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) {
} else {
dbClient.logger = slog.Default()
}
if c.RPCConfig != nil {
rc, err := newRecordWriterClient(c.RPCConfig)
if err != nil {
return nil, errors.New("failed to create rpc client: " + err.Error())
}
dbClient.rpcClient = rc
}
dbClient.prevIdx.Store(-1)
if len(c.Addresses) > 1 {
// if there are multiple addresses, start the health check
Expand Down
2 changes: 2 additions & 0 deletions opengemini/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ErrEmptyTagOrField = errors.New("empty tag or field")
ErrEmptyTagKey = errors.New("empty tag key")
ErrRetentionPolicy = errors.New("empty retention policy")
ErrEmptyRecord = errors.New("empty record")
ErrEmptyAddress = errors.New("empty address, must have at least one address")
)

// checkDatabaseName checks if the database name is empty and returns an error if it is.
Expand Down
1 change: 1 addition & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) {
assert.Equal(t, length, getTimestampLength(v))
}
}

func TestQueryWithMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []Address{{
Expand Down
149 changes: 149 additions & 0 deletions opengemini/record_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package opengemini

import (
"errors"
"fmt"
"github.com/openGemini/opengemini-client-go/lib/record"
"time"
)

var _ RecordBuilder = (*RecordBuilderImpl)(nil)

type RecordBuilder interface {
// Database specifies the name of the database, required
Database(database string) RecordBuilder
// RetentionPolicy specifies the retention policy, required
RetentionPolicy(policy string) RecordBuilder
// Measurement specifies the name of the measurement, required
Measurement(measurement string) RecordBuilder
// AddTag add a tag to the record.
// If the key exists, it will be overwritten.
// If the key is `time`, it will cause an error.
// If the key is empty or the value is empty, it will be ignored.
AddTag(key string, value string) RecordBuilder
// AddTags add multiple tags to the record.
// Each entry in the map represents a tag where the key is the tag name and the value is the tag value.
AddTags(tags map[string]string) RecordBuilder
// AddField add a field to the record.
// If the key is empty, it will be ignored.
// If the key is `time`, it will cause an error.
// If the key already exists, its value will be overwritten.
AddField(key string, value interface{}) RecordBuilder
// AddFields add multiple fields to the record.
// Each entry in the map represents a field where the key is the field name and the value is the field value.
AddFields(fields map[string]interface{}) RecordBuilder
// Time specifies the time of the record.
// If the time is not specified or zero value, the current time will be used.
Time(tt time.Time) RecordBuilder
// Build returns the built RecordBuilderImpl and an error if any.
// The returned RecordBuilderImpl can be used to build a record.Record
// by calling record.Record.FromBuilder.
Build() (*RecordBuilderImpl, error)
}

func NewRecordBuilder() RecordBuilder {
return &RecordBuilderImpl{}
}

type FieldTuple struct {
record.Field
Value interface{}
}

type RecordBuilderImpl struct {
database string
retentionPolicy string
measurement string
tags []*FieldTuple
fields []*FieldTuple
tt time.Time

err error
}

func (r *RecordBuilderImpl) Database(database string) RecordBuilder {
r.database = database
return r
}

func (r *RecordBuilderImpl) RetentionPolicy(policy string) RecordBuilder {
r.retentionPolicy = policy
return r
}

func (r *RecordBuilderImpl) Measurement(measurement string) RecordBuilder {
r.measurement = measurement
return r
}

func (r *RecordBuilderImpl) Build() (*RecordBuilderImpl, error) {
return r, nil
}

func (r *RecordBuilderImpl) AddTag(key string, value string) RecordBuilder {
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
r.tags = append(r.tags, &FieldTuple{
Field: record.Field{
Name: key,
Type: record.FieldTypeTag,
},
Value: value,
})
return r
}

func (r *RecordBuilderImpl) AddTags(tags map[string]string) RecordBuilder {
for key, value := range tags {
r.AddTag(key, value)
}
return r
}

func (r *RecordBuilderImpl) AddField(key string, value interface{}) RecordBuilder {
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
typ := record.FieldTypeUnknown
switch value.(type) {
case string:
typ = record.FieldTypeString
case float32, float64:
typ = record.FieldTypeFloat
case bool:
typ = record.FieldTypeBoolean
case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int:
typ = record.FieldTypeInt
}
r.fields = append(r.fields, &FieldTuple{
Field: record.Field{
Name: key,
Type: typ,
},
Value: value,
})
return r
}

func (r *RecordBuilderImpl) AddFields(fields map[string]interface{}) RecordBuilder {
for key, value := range fields {
r.AddField(key, value)
}
return r
}

func (r *RecordBuilderImpl) Time(tt time.Time) RecordBuilder {
r.tt = tt
return r
}
Loading

0 comments on commit 02abab6

Please sign in to comment.