diff --git a/opengemini/client.go b/opengemini/client.go index a0cd057..68bce4e 100644 --- a/opengemini/client.go +++ b/opengemini/client.go @@ -18,6 +18,7 @@ import ( "context" "crypto/tls" "log/slog" + "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -69,6 +70,10 @@ type Client interface { WriteBatchPoints(ctx context.Context, database string, bp []*Point) error // WriteBatchPointsWithRp write batch points with retention policy WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error + // WriteByGRPC write batch record to assigned database.retention_policy by gRPC. + // rbs RecordBuilderImpl, it will generate Record and write to the database. + // You'd better use NewRecordBuilder to build RecordBuilderImpl. + WriteByGRPC(ctx context.Context, rbs ...*RecordBuilderImpl) error // CreateDatabase Create database CreateDatabase(database string) error @@ -166,6 +171,8 @@ type Config struct { CustomMetricsLabels map[string]string // Logger structured logger for logging operations Logger *slog.Logger + // RPCConfig configuration information for write service by gRPC + RPCConfig *RPCConfig } // Address configuration for providing service. @@ -176,6 +183,10 @@ type Address struct { Port int } +func (a *Address) String() string { + return a.Host + ":" + strconv.Itoa(a.Port) +} + // AuthType type of identity authentication. type AuthType int @@ -212,6 +223,23 @@ type RpConfig struct { IndexDuration string } +// RPCConfig represents the configuration information for write service by gRPC. +type RPCConfig struct { + // Addresses Configure the service endpoints for the openGemini grpc write service. + // This parameter is required. + Addresses []Address + // AuthConfig configuration information for authentication. + AuthConfig *AuthConfig + // BatchConfig configuration information for batch processing. + BatchConfig *BatchConfig + // TlsConfig configuration information for tls. + TlsConfig *tls.Config + // CompressMethod determines the compress method used for data transmission. + CompressMethod CompressMethod + // Timeout default 30s + Timeout time.Duration +} + // NewClient Creates a openGemini client instance func NewClient(config *Config) (Client, error) { return newClient(config) diff --git a/opengemini/client_impl.go b/opengemini/client_impl.go index d92d1f2..ed5f988 100644 --- a/opengemini/client_impl.go +++ b/opengemini/client_impl.go @@ -39,6 +39,7 @@ type client struct { prevIdx atomic.Int32 dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB] metrics *metrics + rpcClient *recordWriterClient batchContext context.Context batchContextCancel context.CancelFunc @@ -48,7 +49,7 @@ type client struct { func newClient(c *Config) (Client, error) { if len(c.Addresses) == 0 { - return nil, errors.New("must have at least one address") + return nil, ErrEmptyAddress } if c.AuthConfig != nil { if c.AuthConfig.AuthType == AuthTypeToken && len(c.AuthConfig.Token) == 0 { @@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) { } else { dbClient.logger = slog.Default() } + if c.RPCConfig != nil { + rc, err := newRecordWriterClient(c.RPCConfig) + if err != nil { + return nil, errors.New("failed to create rpc client: " + err.Error()) + } + dbClient.rpcClient = rc + } dbClient.prevIdx.Store(-1) if len(c.Addresses) > 1 { // if there are multiple addresses, start the health check diff --git a/opengemini/error.go b/opengemini/error.go index d148b3a..a13387c 100644 --- a/opengemini/error.go +++ b/opengemini/error.go @@ -27,6 +27,8 @@ var ( ErrEmptyTagOrField = errors.New("empty tag or field") ErrEmptyTagKey = errors.New("empty tag key") ErrRetentionPolicy = errors.New("empty retention policy") + ErrEmptyRecord = errors.New("empty record") + ErrEmptyAddress = errors.New("empty address, must have at least one address") ) // checkDatabaseName checks if the database name is empty and returns an error if it is. diff --git a/opengemini/query_test.go b/opengemini/query_test.go index 84feac1..48b550b 100644 --- a/opengemini/query_test.go +++ b/opengemini/query_test.go @@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) { assert.Equal(t, length, getTimestampLength(v)) } } + func TestQueryWithMsgPack(t *testing.T) { c := testNewClient(t, &Config{ Addresses: []Address{{ diff --git a/opengemini/record_builder.go b/opengemini/record_builder.go new file mode 100644 index 0000000..e900152 --- /dev/null +++ b/opengemini/record_builder.go @@ -0,0 +1,149 @@ +package opengemini + +import ( + "errors" + "fmt" + "github.com/openGemini/opengemini-client-go/lib/record" + "time" +) + +var _ RecordBuilder = (*RecordBuilderImpl)(nil) + +type RecordBuilder interface { + // Database specifies the name of the database, required + Database(database string) RecordBuilder + // RetentionPolicy specifies the retention policy, required + RetentionPolicy(policy string) RecordBuilder + // Measurement specifies the name of the measurement, required + Measurement(measurement string) RecordBuilder + // AddTag add a tag to the record. + // If the key exists, it will be overwritten. + // If the key is `time`, it will cause an error. + // If the key is empty or the value is empty, it will be ignored. + AddTag(key string, value string) RecordBuilder + // AddTags add multiple tags to the record. + // Each entry in the map represents a tag where the key is the tag name and the value is the tag value. + AddTags(tags map[string]string) RecordBuilder + // AddField add a field to the record. + // If the key is empty, it will be ignored. + // If the key is `time`, it will cause an error. + // If the key already exists, its value will be overwritten. + AddField(key string, value interface{}) RecordBuilder + // AddFields add multiple fields to the record. + // Each entry in the map represents a field where the key is the field name and the value is the field value. + AddFields(fields map[string]interface{}) RecordBuilder + // Time specifies the time of the record. + // If the time is not specified or zero value, the current time will be used. + Time(tt time.Time) RecordBuilder + // Build returns the built RecordBuilderImpl and an error if any. + // The returned RecordBuilderImpl can be used to build a record.Record + // by calling record.Record.FromBuilder. + Build() (*RecordBuilderImpl, error) +} + +func NewRecordBuilder() RecordBuilder { + return &RecordBuilderImpl{} +} + +type FieldTuple struct { + record.Field + Value interface{} +} + +type RecordBuilderImpl struct { + database string + retentionPolicy string + measurement string + tags []*FieldTuple + fields []*FieldTuple + tt time.Time + + err error +} + +func (r *RecordBuilderImpl) Database(database string) RecordBuilder { + r.database = database + return r +} + +func (r *RecordBuilderImpl) RetentionPolicy(policy string) RecordBuilder { + r.retentionPolicy = policy + return r +} + +func (r *RecordBuilderImpl) Measurement(measurement string) RecordBuilder { + r.measurement = measurement + return r +} + +func (r *RecordBuilderImpl) Build() (*RecordBuilderImpl, error) { + return r, nil +} + +func (r *RecordBuilderImpl) AddTag(key string, value string) RecordBuilder { + if key == "" { + r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName)) + return r + } + if key == record.TimeField { + r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn)) + return r + } + r.tags = append(r.tags, &FieldTuple{ + Field: record.Field{ + Name: key, + Type: record.FieldTypeTag, + }, + Value: value, + }) + return r +} + +func (r *RecordBuilderImpl) AddTags(tags map[string]string) RecordBuilder { + for key, value := range tags { + r.AddTag(key, value) + } + return r +} + +func (r *RecordBuilderImpl) AddField(key string, value interface{}) RecordBuilder { + if key == "" { + r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName)) + return r + } + if key == record.TimeField { + r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn)) + return r + } + typ := record.FieldTypeUnknown + switch value.(type) { + case string: + typ = record.FieldTypeString + case float32, float64: + typ = record.FieldTypeFloat + case bool: + typ = record.FieldTypeBoolean + case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int: + typ = record.FieldTypeInt + } + r.fields = append(r.fields, &FieldTuple{ + Field: record.Field{ + Name: key, + Type: typ, + }, + Value: value, + }) + return r +} + +func (r *RecordBuilderImpl) AddFields(fields map[string]interface{}) RecordBuilder { + for key, value := range fields { + r.AddField(key, value) + } + return r +} + +func (r *RecordBuilderImpl) Time(tt time.Time) RecordBuilder { + r.tt = tt + return r +} diff --git a/opengemini/record_impl.go b/opengemini/record_impl.go new file mode 100644 index 0000000..e3b9329 --- /dev/null +++ b/opengemini/record_impl.go @@ -0,0 +1,413 @@ +package opengemini + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/openGemini/opengemini-client-go/lib/record" + "github.com/openGemini/opengemini-client-go/proto" +) + +func (c *client) WriteByGRPC(ctx context.Context, rbs ...*RecordBuilderImpl) error { + if len(rbs) == 0 { + return ErrEmptyRecord + } + return c.rpcClient.writeRecords(ctx, rbs...) +} + +type recordWriterClient struct { + cfg *RPCConfig + mux sync.RWMutex + lb *grpcLoadBalance + transforms map[string]transform +} + +func newRecordWriterClient(cfg *RPCConfig) (*recordWriterClient, error) { + if len(cfg.Addresses) == 0 { + return nil, fmt.Errorf("no rpc addresses provided: %w", ErrEmptyAddress) + } + + balance, err := newRPCLoadBalance(cfg) + if err != nil { + return nil, errors.New("create rpc load balance failed: " + err.Error()) + } + + rw := &recordWriterClient{transforms: make(map[string]transform), lb: balance, cfg: cfg} + return rw, nil +} + +func (r *recordWriterClient) writeRecord(ctx context.Context, rb *RecordBuilderImpl) error { + if err := checkDatabaseAndPolicy(rb.database, rb.retentionPolicy); err != nil { + return err + } + r.mux.Lock() + defer r.mux.Unlock() + name := rb.database + rb.retentionPolicy + transform, ok := r.transforms[name] + if !ok { + transform = newTransform() + } + + if err := transform.AppendRecord(rb); err != nil { + return err + } + + r.transforms[name] = transform + + if r.cfg.BatchConfig == nil { + return r.flush(ctx, rb.database, rb.retentionPolicy) + } + + if transform[rb.measurement].RowCount == 2 { + return r.flush(ctx, rb.database, rb.retentionPolicy) + } + return nil +} + +func (r *recordWriterClient) writeRecords(ctx context.Context, rbs ...*RecordBuilderImpl) error { + for _, rb := range rbs { + if err := r.writeRecord(ctx, rb); err != nil { + return err + } + } + return nil +} + +func (r *recordWriterClient) flush(ctx context.Context, database, retentionPolicy string) (err error) { + if err := checkDatabaseAndPolicy(database, retentionPolicy); err != nil { + return err + } + + name := database + retentionPolicy + t, ok := r.transforms[name] + if !ok { + return ErrEmptyRecord + } + + if len(t) == 0 { + return ErrEmptyRecord + } + + fmt.Println("record: ", len(t)) + + var req = &proto.WriteRequest{ + Database: database, + RetentionPolicy: retentionPolicy, + } + + if r.cfg.AuthConfig != nil { + req.Username = r.cfg.AuthConfig.Username + req.Password = r.cfg.AuthConfig.Password + } + + // 使用带超时的上下文 + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + for mst, rawRecord := range t { + rec, err := rawRecord.ToSrvRecords() + if err != nil { + return fmt.Errorf("failed to convert records: %v", err) + } + fmt.Println(rec.String()) + var buff []byte + buff, err = rec.Marshal(buff) + if err != nil { + return fmt.Errorf("failed to marshal record: %v", err) + } + + req.Records = append(req.Records, &proto.Record{ + Measurement: mst, + MinTime: rawRecord.MinTime, + MaxTime: rawRecord.MaxTime, + Block: buff, + }) + } + + response, err := r.lb.getClient().Write(ctx, req) + if err != nil { + return fmt.Errorf("failed to write rows: %v", err) + } + + t.reset() + + if response.Code != proto.ResponseCode_Success { + return fmt.Errorf("failed to write rows: %s", response.String()) + } + + return nil +} + +func (r *recordWriterClient) Close() error { + return nil +} + +var ( + ErrInvalidTimeColumn = errors.New("key can't be time") + ErrEmptyName = errors.New("empty name not allowed") + ErrInvalidFieldType = errors.New("invalid field type") + ErrUnknownFieldType = errors.New("unknown field type") +) + +type Column struct { + schema record.Field + cv record.ColVal +} + +type columner struct { + RowCount int + MinTime int64 + MaxTime int64 + Columns map[string]*Column + fillChecker map[string]bool +} + +type transform map[string]*columner + +// newTransform creates a new transform instance with configuration +func newTransform() transform { + return make(transform) +} + +// AppendRecord writes data by row with improved error handling +func (t *transform) AppendRecord(rbi *RecordBuilderImpl) error { + if err := checkMeasurementName(rbi.measurement); err != nil { + return err + } + + c, ok := (*t)[rbi.measurement] + if !ok { + c = &columner{ + Columns: make(map[string]*Column), + fillChecker: make(map[string]bool), + } + } + + // process tags + if err := c.processTagColumns(rbi.tags); err != nil { + return err + } + + // process fields + if err := c.processFieldColumns(rbi.fields); err != nil { + return err + } + + // process timestamp + if err := c.processTimestamp(rbi.tt); err != nil { + return err + } + + c.RowCount++ + + // fill another field or tag + if err := c.processMissValueColumns(); err != nil { + return err + } + + (*t)[rbi.measurement] = c + + return nil +} + +func (t *transform) reset() { + for k := range *t { + delete(*t, k) + } +} + +func (c *columner) createColumn(name string, fieldType int) (*Column, error) { + column := &Column{ + schema: record.Field{ + Type: fieldType, + Name: name, + }, + cv: record.ColVal{}, + } + column.cv.Init() + if err := c.appendFieldNulls(column, c.RowCount); err != nil { + return nil, err + } + + return column, nil +} + +func (c *columner) appendFieldNulls(column *Column, count int) error { + switch column.schema.Type { + case record.FieldTypeTag, record.FieldTypeString: + column.cv.AppendStringNulls(count) + return nil + case record.FieldTypeInt, record.FieldTypeUInt: + column.cv.AppendIntegerNulls(count) + return nil + case record.FieldTypeBoolean: + column.cv.AppendBooleanNulls(count) + return nil + case record.FieldTypeFloat: + column.cv.AppendFloatNulls(count) + return nil + default: + return ErrInvalidFieldType + } +} + +// appendFieldValue appends field value to the column +func (c *columner) appendFieldValue(column *Column, value interface{}) error { + switch v := value.(type) { + case string: + column.cv.AppendString(v) + return nil + case bool: + column.cv.AppendBoolean(v) + return nil + case float64: + column.cv.AppendFloat(v) + return nil + case float32: + column.cv.AppendFloat(float64(v)) + return nil + case int: + column.cv.AppendInteger(int64(v)) + return nil + case int64: + column.cv.AppendInteger(v) + return nil + case int32: + column.cv.AppendInteger(int64(v)) + return nil + case uint: + column.cv.AppendInteger(int64(v)) + return nil + case uint32: + column.cv.AppendInteger(int64(v)) + return nil + case uint64: + column.cv.AppendInteger(int64(v)) + return nil + } + // For unknown types, try to throw error + return ErrUnknownFieldType +} + +func (c *columner) processTagColumns(tags []*FieldTuple) (err error) { + for _, tag := range tags { + tagColumn, ok := c.Columns[tag.Name] + if !ok { + tagColumn, err = c.createColumn(tag.Name, record.FieldTypeTag) + if err != nil { + return err + } + } + // write the tag value to column, Value must be string + tagColumn.cv.AppendString(tag.Value.(string)) + c.fillChecker[tag.Name] = true + c.Columns[tag.Name] = tagColumn + } + return nil +} + +func (c *columner) processFieldColumns(fields []*FieldTuple) (err error) { + for _, field := range fields { + fieldColumn, ok := c.Columns[field.Name] + if !ok { + fieldColumn, err = c.createColumn(field.Name, field.Type) + if err != nil { + return err + } + } + + if err := c.appendFieldValue(fieldColumn, field.Value); err != nil { + return err + } + + c.fillChecker[field.Name] = true + c.Columns[field.Name] = fieldColumn + } + return nil +} + +// processTimestamp handles timestamp processing with validation +func (c *columner) processTimestamp(tt time.Time) (err error) { + var timestamp = time.Now().UnixNano() + if !tt.IsZero() { + timestamp = tt.UnixNano() + } + + timeCol, exists := c.Columns[record.TimeField] + if !exists { + timeCol, err = c.createColumn(record.TimeField, record.FieldTypeInt) + if err != nil { + return err + } + } + + timeCol.cv.AppendInteger(timestamp) + c.Columns[record.TimeField] = timeCol + + // Update min/max time + if timestamp < c.MinTime { + c.MinTime = timestamp + } + if timestamp > c.MaxTime { + c.MaxTime = timestamp + } + return nil +} + +func (c *columner) processMissValueColumns() error { + for fieldName, ok := range c.fillChecker { + if ok { + continue + } + column, ok := c.Columns[fieldName] + if !ok { + continue + } + offset := c.RowCount - column.cv.Len + if offset == 0 { + continue + } + if err := c.appendFieldNulls(column, offset); err != nil { + return err + } + } + c.resetFillChecker() + return nil +} + +// ToSrvRecords converts to record.Record with improved sorting and validation +func (c *columner) ToSrvRecords() (*record.Record, error) { + if len(c.Columns) == 0 { + return nil, ErrEmptyRecord + } + + rec := &record.Record{} + rec.Schema = make([]record.Field, 0, len(c.Columns)) + rec.ColVals = make([]record.ColVal, 0, len(c.Columns)) + + for _, column := range c.Columns { + rec.Schema = append(rec.Schema, column.schema) + rec.ColVals = append(rec.ColVals, column.cv) + } + + // Sort and validate the record + sort.Sort(rec) + if err := record.CheckRecord(rec); err != nil { + return nil, err + } + + rec = record.NewColumnSortHelper().Sort(rec) + + return rec, nil +} + +// resetFillChecker clears fill checker map +func (c *columner) resetFillChecker() { + for key := range c.fillChecker { + c.fillChecker[key] = false + } +} diff --git a/opengemini/record_impl_test.go b/opengemini/record_impl_test.go new file mode 100644 index 0000000..dcb9e2e --- /dev/null +++ b/opengemini/record_impl_test.go @@ -0,0 +1,80 @@ +package opengemini + +import ( + "context" + "github.com/stretchr/testify/assert" + "testing" +) + +func testDefaultRPCClient(t *testing.T) Client { + return testNewClient(t, &Config{ + Addresses: []Address{{ + Host: "localhost", + Port: 8086, + }}, + RPCConfig: &RPCConfig{ + Addresses: []Address{{ + Host: "localhost", + Port: 8305, + }}, + BatchConfig: &BatchConfig{}, + }, + }) +} + +func TestNewRPCClient(t *testing.T) { + c := testDefaultRPCClient(t) + ctx := context.Background() + db := "db0" + rp := "autogen" + mst := "m0" + // create a test database with rand suffix + //err := c.CreateDatabase(db) + //assert.Nil(t, err) + + // delete test database before exit test case + //defer func() { + // err := c.DropDatabase(db) + // assert.Nil(t, err) + //}() + + //time.Sleep(time.Second * 3) + + rec, err := NewRecordBuilder().Database(db).RetentionPolicy(rp).Measurement(mst). + AddTag("t1", "t1").AddTag("t2", "t2").AddField("i", 100). + AddField("b", true).AddField("f", 3.14).AddField("s1", "pi1").Build() + assert.Nil(t, err) + err = c.WriteByGRPC(ctx, rec) + assert.Nil(t, err) +} + +func TestNewRPCClient1(t *testing.T) { + c := testDefaultRPCClient(t) + ctx := context.Background() + db := "db0" + rp := "autogen" + mst := "m0" + // create a test database with rand suffix + //err := c.CreateDatabase(db) + //assert.Nil(t, err) + + // delete test database before exit test case + //defer func() { + // err := c.DropDatabase(db) + // assert.Nil(t, err) + //}() + + //time.Sleep(time.Second * 3) + + rec, err := NewRecordBuilder().Database(db).RetentionPolicy(rp).Measurement(mst). + AddTag("t1", "t1").AddTag("t2", "t2").AddField("i", 100). + AddField("b", true).AddField("f", 3.14).AddField("s1", "pi1").Build() + assert.Nil(t, err) + err = c.WriteByGRPC(ctx, rec) + + rec1, err := NewRecordBuilder().Database(db).RetentionPolicy(rp).Measurement(mst). + AddTag("t1", "t1").AddTag("t3", "t3").AddField("i", 100). + AddField("b", true).AddField("f", 3.14).AddField("s1", "pi1").Build() + assert.Nil(t, err) + err = c.WriteByGRPC(ctx, rec1) +} diff --git a/opengemini/record_loadbalance.go b/opengemini/record_loadbalance.go new file mode 100644 index 0000000..540cddc --- /dev/null +++ b/opengemini/record_loadbalance.go @@ -0,0 +1,73 @@ +package opengemini + +import ( + "fmt" + "sync/atomic" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + + "github.com/openGemini/opengemini-client-go/proto" +) + +type grpcEndpoint struct { + address string + available atomic.Bool + conn *grpc.ClientConn + client proto.WriteServiceClient +} + +type grpcLoadBalance struct { + endpoints []*grpcEndpoint +} + +func newRPCLoadBalance(cfg *RPCConfig) (*grpcLoadBalance, error) { + var eps []*grpcEndpoint + var dialOptions = []grpc.DialOption{ + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: true, + }), + grpc.WithInitialWindowSize(1 << 24), // 16MB + grpc.WithInitialConnWindowSize(1 << 24), // 16MB + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64 * 1024 * 1024)), // 64MB + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(64 * 1024 * 1024)), // 64MB + } + + if cfg.TlsConfig == nil { + dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + creds := credentials.NewTLS(cfg.TlsConfig) + dialOptions = append(dialOptions, grpc.WithTransportCredentials(creds)) + } + for _, address := range cfg.Addresses { + addr := address.String() + conn, err := grpc.NewClient(addr, dialOptions...) + if err != nil { + return nil, fmt.Errorf("connect to %s failed: %v", addr, err) + } + eps = append(eps, &grpcEndpoint{ + address: addr, + conn: conn, + client: proto.NewWriteServiceClient(conn)}) + } + return &grpcLoadBalance{endpoints: eps}, nil +} + +// getClient returns the next available WriteService client. +func (r *grpcLoadBalance) getClient() proto.WriteServiceClient { + return r.endpoints[0].client +} + +func (r *grpcLoadBalance) checkEndpoints() error { + panic("implement me") +} + +// Close closes the RPC load balancer and any underlying resources. +func (r *grpcLoadBalance) Close() error { + panic("implement me") +}