diff --git a/opengemini/client.go b/opengemini/client.go index a0cd057..9492712 100644 --- a/opengemini/client.go +++ b/opengemini/client.go @@ -18,9 +18,12 @@ import ( "context" "crypto/tls" "log/slog" + "strconv" "time" "github.com/prometheus/client_golang/prometheus" + + "github.com/openGemini/opengemini-client-go/proto" ) const ( @@ -33,6 +36,7 @@ const ( type Codec string type ContentType string + type CompressMethod string const ( @@ -69,6 +73,9 @@ type Client interface { WriteBatchPoints(ctx context.Context, database string, bp []*Point) error // WriteBatchPointsWithRp write batch points with retention policy WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error + // WriteByGRPC write batch record to assigned database.retention_policy by gRPC. + // You'd better use NewRecordBuilder to build req. + WriteByGRPC(ctx context.Context, req *proto.WriteRequest) error // CreateDatabase Create database CreateDatabase(database string) error @@ -166,6 +173,8 @@ type Config struct { CustomMetricsLabels map[string]string // Logger structured logger for logging operations Logger *slog.Logger + // RPCConfig configuration information for write service by gRPC + RPCConfig *GRPCConfig } // Address configuration for providing service. @@ -176,6 +185,10 @@ type Address struct { Port int } +func (a *Address) String() string { + return a.Host + ":" + strconv.Itoa(a.Port) +} + // AuthType type of identity authentication. type AuthType int @@ -212,6 +225,21 @@ type RpConfig struct { IndexDuration string } +// GRPCConfig represents the configuration information for write service by gRPC. +type GRPCConfig struct { + // Addresses Configure the service endpoints for the openGemini grpc write service. + // This parameter is required. + Addresses []Address + // AuthConfig configuration information for authentication. + AuthConfig *AuthConfig + // TlsConfig configuration information for tls. + TlsConfig *tls.Config + // CompressMethod determines the compress method used for data transmission. + CompressMethod CompressMethod + // Timeout default 30s + Timeout time.Duration +} + // NewClient Creates a openGemini client instance func NewClient(config *Config) (Client, error) { return newClient(config) diff --git a/opengemini/client_impl.go b/opengemini/client_impl.go index d92d1f2..ed5f988 100644 --- a/opengemini/client_impl.go +++ b/opengemini/client_impl.go @@ -39,6 +39,7 @@ type client struct { prevIdx atomic.Int32 dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB] metrics *metrics + rpcClient *recordWriterClient batchContext context.Context batchContextCancel context.CancelFunc @@ -48,7 +49,7 @@ type client struct { func newClient(c *Config) (Client, error) { if len(c.Addresses) == 0 { - return nil, errors.New("must have at least one address") + return nil, ErrEmptyAddress } if c.AuthConfig != nil { if c.AuthConfig.AuthType == AuthTypeToken && len(c.AuthConfig.Token) == 0 { @@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) { } else { dbClient.logger = slog.Default() } + if c.RPCConfig != nil { + rc, err := newRecordWriterClient(c.RPCConfig) + if err != nil { + return nil, errors.New("failed to create rpc client: " + err.Error()) + } + dbClient.rpcClient = rc + } dbClient.prevIdx.Store(-1) if len(c.Addresses) > 1 { // if there are multiple addresses, start the health check diff --git a/opengemini/error.go b/opengemini/error.go index d148b3a..a13387c 100644 --- a/opengemini/error.go +++ b/opengemini/error.go @@ -27,6 +27,8 @@ var ( ErrEmptyTagOrField = errors.New("empty tag or field") ErrEmptyTagKey = errors.New("empty tag key") ErrRetentionPolicy = errors.New("empty retention policy") + ErrEmptyRecord = errors.New("empty record") + ErrEmptyAddress = errors.New("empty address, must have at least one address") ) // checkDatabaseName checks if the database name is empty and returns an error if it is. diff --git a/opengemini/query_test.go b/opengemini/query_test.go index 84feac1..48b550b 100644 --- a/opengemini/query_test.go +++ b/opengemini/query_test.go @@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) { assert.Equal(t, length, getTimestampLength(v)) } } + func TestQueryWithMsgPack(t *testing.T) { c := testNewClient(t, &Config{ Addresses: []Address{{ diff --git a/opengemini/record_builder.go b/opengemini/record_builder.go new file mode 100644 index 0000000..1238c5c --- /dev/null +++ b/opengemini/record_builder.go @@ -0,0 +1,269 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/openGemini/opengemini-client-go/lib/record" + "github.com/openGemini/opengemini-client-go/proto" +) + +var ( + _ RecordBuilder = (*RecordBuilderImpl)(nil) + recordLinePool = &sync.Pool{New: func() any { + return &RecordLineBuilderImpl{} + }} +) + +type RecordLine interface{} + +type RecordLineBuilder interface { + // AddTag add a tag to the record. + // If the key exists, it will be overwritten. + // If the key is `time`, it will cause an error. + // If the key is empty or the value is empty, it will be ignored. + AddTag(key string, value string) RecordLineBuilder + // AddTags add multiple tags to the record. + // Each entry in the map represents a tag where the key is the tag name and the value is the tag value. + AddTags(tags map[string]string) RecordLineBuilder + // AddField add a field to the record. + // If the key is empty, it will be ignored. + // If the key is `time`, it will cause an error. + // If the key already exists, its value will be overwritten. + AddField(key string, value interface{}) RecordLineBuilder + // AddFields add multiple fields to the record. + // Each entry in the map represents a field where the key is the field name and the value is the field value. + AddFields(fields map[string]interface{}) RecordLineBuilder + CompressMethod(method CompressMethod) RecordLineBuilder + Error() error + // Build specifies the time of the record. + // If the time is not specified or zero value, the current time will be used. + Build(tt time.Time) RecordLine +} + +type RecordBuilder interface { + Authenticate(username, password string) RecordBuilder + AddRecord(rlb ...RecordLine) RecordBuilder + Build() (*proto.WriteRequest, error) +} + +type FieldTuple struct { + record.Field + Value interface{} +} + +type RecordBuilderImpl struct { + database string + retentionPolicy string + username string + password string + transform transform + err error +} + +func (r *RecordBuilderImpl) reset() { + r.transform.reset() +} + +func (r *RecordBuilderImpl) Authenticate(username, password string) RecordBuilder { + r.username = username + r.password = password + return r +} + +func NewRecordBuilder(database, retentionPolicy string) RecordBuilder { + return &RecordBuilderImpl{database: database, retentionPolicy: retentionPolicy, transform: make(transform)} +} + +func (r *RecordBuilderImpl) AddRecord(rlb ...RecordLine) RecordBuilder { + for _, lineBuilder := range rlb { + lb, ok := lineBuilder.(*RecordLineBuilderImpl) + if !ok { + continue + } + err := r.transform.AppendRecord(lb) + recordLinePool.Put(lb) + if err != nil { + r.err = errors.Join(r.err, err) + continue + } + } + return r +} + +func (r *RecordBuilderImpl) Build() (*proto.WriteRequest, error) { + defer r.reset() + + if r.err != nil { + return nil, r.err + } + + var req = &proto.WriteRequest{ + Database: r.database, + RetentionPolicy: r.retentionPolicy, + Username: r.username, + Password: r.password, + } + + for mst, rawRecord := range r.transform { + rec, err := rawRecord.ToSrvRecords() + if err != nil { + return nil, fmt.Errorf("failed to convert records: %v", err) + } + var buff []byte + buff, err = rec.Marshal(buff) + if err != nil { + return nil, fmt.Errorf("failed to marshal record: %v", err) + } + + req.Records = append(req.Records, &proto.Record{ + Measurement: mst, + MinTime: rawRecord.MinTime, + MaxTime: rawRecord.MaxTime, + Block: buff, + }) + } + + return req, nil +} + +type RecordLineBuilderImpl struct { + measurement string + tags []*FieldTuple + fields []*FieldTuple + tt time.Time + compressMethod CompressMethod + built bool + + err error +} + +func (r *RecordLineBuilderImpl) CompressMethod(method CompressMethod) RecordLineBuilder { + r.compressMethod = method + return r +} + +func newRecordLineBuilder(measurement string) *RecordLineBuilderImpl { + r := recordLinePool.Get().(*RecordLineBuilderImpl) + r.measurement = measurement + if len(r.tags) != 0 { + r.tags = r.tags[:0] + } + if len(r.fields) != 0 { + r.fields = r.fields[:0] + } + if !r.tt.IsZero() { + r.tt = time.Time{} + } + r.built = false + r.err = nil + return r +} + +func NewRecordLineBuilder(measurement string) RecordLineBuilder { + return newRecordLineBuilder(measurement) +} + +func (r *RecordLineBuilderImpl) Error() error { + return r.err +} + +func (r *RecordLineBuilderImpl) AddTag(key string, value string) RecordLineBuilder { + if r.built { + r = newRecordLineBuilder(r.measurement) + } + if key == "" { + r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName)) + return r + } + if key == record.TimeField { + r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn)) + return r + } + r.tags = append(r.tags, &FieldTuple{ + Field: record.Field{ + Name: key, + Type: record.FieldTypeTag, + }, + Value: value, + }) + return r +} + +func (r *RecordLineBuilderImpl) AddTags(tags map[string]string) RecordLineBuilder { + if r.built { + r = newRecordLineBuilder(r.measurement) + } + for key, value := range tags { + r.AddTag(key, value) + } + return r +} + +func (r *RecordLineBuilderImpl) AddField(key string, value interface{}) RecordLineBuilder { + if r.built { + r = newRecordLineBuilder(r.measurement) + } + if key == "" { + r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName)) + return r + } + if key == record.TimeField { + r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn)) + return r + } + typ := record.FieldTypeUnknown + switch value.(type) { + case string: + typ = record.FieldTypeString + case float32, float64: + typ = record.FieldTypeFloat + case bool: + typ = record.FieldTypeBoolean + case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int: + typ = record.FieldTypeInt + } + r.fields = append(r.fields, &FieldTuple{ + Field: record.Field{ + Name: key, + Type: typ, + }, + Value: value, + }) + return r +} + +func (r *RecordLineBuilderImpl) AddFields(fields map[string]interface{}) RecordLineBuilder { + if r.built { + r = newRecordLineBuilder(r.measurement) + } + for key, value := range fields { + r.AddField(key, value) + } + return r +} + +func (r *RecordLineBuilderImpl) Build(tt time.Time) RecordLine { + r.built = true + if err := checkMeasurementName(r.measurement); err != nil { + r.err = errors.Join(err, r.err) + } + r.tt = tt + return r +} diff --git a/opengemini/record_impl.go b/opengemini/record_impl.go new file mode 100644 index 0000000..21d0568 --- /dev/null +++ b/opengemini/record_impl.go @@ -0,0 +1,332 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "context" + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/openGemini/opengemini-client-go/lib/record" + "github.com/openGemini/opengemini-client-go/proto" +) + +func (c *client) WriteByGRPC(ctx context.Context, req *proto.WriteRequest) error { + if req == nil { + return ErrEmptyRecord + } + + // 使用带超时的上下文 + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + response, err := c.rpcClient.lb.getClient().Write(ctx, req) + if err != nil { + return fmt.Errorf("failed to write rows: %v", err) + } + + if response.Code != proto.ResponseCode_Success { + return fmt.Errorf("failed to write rows: %s", response.String()) + } + + return nil +} + +type recordWriterClient struct { + cfg *GRPCConfig + mux sync.RWMutex + lb *grpcLoadBalance +} + +func newRecordWriterClient(cfg *GRPCConfig) (*recordWriterClient, error) { + if len(cfg.Addresses) == 0 { + return nil, fmt.Errorf("no grpc addresses provided: %w", ErrEmptyAddress) + } + + balance, err := newRPCLoadBalance(cfg) + if err != nil { + return nil, errors.New("create grpc load balance failed: " + err.Error()) + } + + rw := &recordWriterClient{lb: balance, cfg: cfg} + return rw, nil +} + +func (r *recordWriterClient) Close() error { + return nil +} + +var ( + ErrInvalidTimeColumn = errors.New("key can't be time") + ErrEmptyName = errors.New("empty name not allowed") + ErrInvalidFieldType = errors.New("invalid field type") + ErrUnknownFieldType = errors.New("unknown field type") +) + +type Column struct { + schema record.Field + cv record.ColVal +} + +type columner struct { + RowCount int + MinTime int64 + MaxTime int64 + Columns map[string]*Column + fillChecker map[string]bool +} + +type transform map[string]*columner + +// AppendRecord writes data by row with improved error handling +func (t *transform) AppendRecord(rbi *RecordLineBuilderImpl) error { + c, ok := (*t)[rbi.measurement] + if !ok { + c = &columner{ + Columns: make(map[string]*Column), + fillChecker: make(map[string]bool), + } + } + + // process tags + if err := c.processTagColumns(rbi.tags); err != nil { + return err + } + + // process fields + if err := c.processFieldColumns(rbi.fields); err != nil { + return err + } + + // process timestamp + if err := c.processTimestamp(rbi.tt); err != nil { + return err + } + + c.RowCount++ + + // fill another field or tag + if err := c.processMissValueColumns(); err != nil { + return err + } + + (*t)[rbi.measurement] = c + + return nil +} + +func (t *transform) reset() { + for k := range *t { + delete(*t, k) + } +} + +func (c *columner) createColumn(name string, fieldType int) (*Column, error) { + column := &Column{ + schema: record.Field{ + Type: fieldType, + Name: name, + }, + cv: record.ColVal{}, + } + column.cv.Init() + if err := c.appendFieldNulls(column, c.RowCount); err != nil { + return nil, err + } + + return column, nil +} + +func (c *columner) appendFieldNulls(column *Column, count int) error { + switch column.schema.Type { + case record.FieldTypeTag, record.FieldTypeString: + column.cv.AppendStringNulls(count) + return nil + case record.FieldTypeInt, record.FieldTypeUInt: + column.cv.AppendIntegerNulls(count) + return nil + case record.FieldTypeBoolean: + column.cv.AppendBooleanNulls(count) + return nil + case record.FieldTypeFloat: + column.cv.AppendFloatNulls(count) + return nil + default: + return ErrInvalidFieldType + } +} + +// appendFieldValue appends field value to the column +func (c *columner) appendFieldValue(column *Column, value interface{}) error { + switch v := value.(type) { + case string: + column.cv.AppendString(v) + return nil + case bool: + column.cv.AppendBoolean(v) + return nil + case float64: + column.cv.AppendFloat(v) + return nil + case float32: + column.cv.AppendFloat(float64(v)) + return nil + case int: + column.cv.AppendInteger(int64(v)) + return nil + case int64: + column.cv.AppendInteger(v) + return nil + case int32: + column.cv.AppendInteger(int64(v)) + return nil + case uint: + column.cv.AppendInteger(int64(v)) + return nil + case uint32: + column.cv.AppendInteger(int64(v)) + return nil + case uint64: + column.cv.AppendInteger(int64(v)) + return nil + } + // For unknown types, try to throw error + return ErrUnknownFieldType +} + +func (c *columner) processTagColumns(tags []*FieldTuple) (err error) { + for _, tag := range tags { + tagColumn, ok := c.Columns[tag.Name] + if !ok { + tagColumn, err = c.createColumn(tag.Name, record.FieldTypeTag) + if err != nil { + return err + } + } + // write the tag value to column, Value must be string + tagColumn.cv.AppendString(tag.Value.(string)) + c.fillChecker[tag.Name] = true + c.Columns[tag.Name] = tagColumn + } + return nil +} + +func (c *columner) processFieldColumns(fields []*FieldTuple) (err error) { + for _, field := range fields { + fieldColumn, ok := c.Columns[field.Name] + if !ok { + fieldColumn, err = c.createColumn(field.Name, field.Type) + if err != nil { + return err + } + } + + if err := c.appendFieldValue(fieldColumn, field.Value); err != nil { + return err + } + + c.fillChecker[field.Name] = true + c.Columns[field.Name] = fieldColumn + } + return nil +} + +// processTimestamp handles timestamp processing with validation +func (c *columner) processTimestamp(tt time.Time) (err error) { + var timestamp = time.Now().UnixNano() + if !tt.IsZero() { + timestamp = tt.UnixNano() + } + + timeCol, exists := c.Columns[record.TimeField] + if !exists { + timeCol, err = c.createColumn(record.TimeField, record.FieldTypeInt) + if err != nil { + return err + } + } + + timeCol.cv.AppendInteger(timestamp) + c.Columns[record.TimeField] = timeCol + + // Update min/max time + if timestamp < c.MinTime { + c.MinTime = timestamp + } + if timestamp > c.MaxTime { + c.MaxTime = timestamp + } + return nil +} + +func (c *columner) processMissValueColumns() error { + for fieldName, ok := range c.fillChecker { + if ok { + continue + } + column, ok := c.Columns[fieldName] + if !ok { + continue + } + offset := c.RowCount - column.cv.Len + if offset == 0 { + continue + } + if err := c.appendFieldNulls(column, offset); err != nil { + return err + } + } + c.resetFillChecker() + return nil +} + +// ToSrvRecords converts to record.Record with improved sorting and validation +func (c *columner) ToSrvRecords() (*record.Record, error) { + if len(c.Columns) == 0 { + return nil, ErrEmptyRecord + } + + rec := &record.Record{} + rec.Schema = make([]record.Field, 0, len(c.Columns)) + rec.ColVals = make([]record.ColVal, 0, len(c.Columns)) + + for _, column := range c.Columns { + rec.Schema = append(rec.Schema, column.schema) + rec.ColVals = append(rec.ColVals, column.cv) + } + + // Sort and validate the record + sort.Sort(rec) + + fmt.Println(rec.String()) + + if err := record.CheckRecord(rec); err != nil { + return nil, err + } + + rec = record.NewColumnSortHelper().Sort(rec) + + return rec, nil +} + +// resetFillChecker clears fill checker map +func (c *columner) resetFillChecker() { + for key := range c.fillChecker { + c.fillChecker[key] = false + } +} diff --git a/opengemini/record_impl_test.go b/opengemini/record_impl_test.go new file mode 100644 index 0000000..d514c93 --- /dev/null +++ b/opengemini/record_impl_test.go @@ -0,0 +1,77 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func testDefaultRPCClient(t *testing.T) Client { + return testNewClient(t, &Config{ + Addresses: []Address{{ + Host: "localhost", + Port: 8086, + }}, + RPCConfig: &GRPCConfig{ + Addresses: []Address{{ + Host: "localhost", + Port: 8305, + }}, + }, + }) +} + +func TestNewRPCClient1(t *testing.T) { + c := testDefaultRPCClient(t) + ctx := context.Background() + db := "db0" + rp := "autogen" + mst := "m3" + // create a test database with rand suffix + //err := c.CreateDatabase(db) + //assert.Nil(t, err) + + // delete test database before exit test case + //defer func() { + // err := c.DropDatabase(db) + // assert.Nil(t, err) + //}() + + //time.Sleep(time.Second * 3) + + var builder = NewRecordBuilder(db, rp) + + var recordBuilder = NewRecordLineBuilder(mst) + + writeRequest, err := builder.AddRecord( + recordBuilder.AddTag("t1", "t1").AddTag("t2", "t2"). + AddField("i", 100).AddField("b", true).AddField("f", 3.14). + AddField("s1", "pi1").Build(time.Now().Add(-time.Second*10)), + recordBuilder.AddTag("a1", "a1").AddTag("a2", "a2"). + AddField("i", 100).AddField("b", true).AddField("f", 3.14). + AddField("s1", "pi1").Build(time.Now().Add(-time.Second*5)), + recordBuilder.AddTag("b1", "b1").AddTag("b2", "b2"). + AddField("i", 100).AddField("b", true).AddField("f", 3.14). + AddField("s1", "pi1").Build(time.Now()), + ).Build() + + assert.Nil(t, err) + err = c.WriteByGRPC(ctx, writeRequest) + assert.Nil(t, err) +} diff --git a/opengemini/record_loadbalance.go b/opengemini/record_loadbalance.go new file mode 100644 index 0000000..29cb1ab --- /dev/null +++ b/opengemini/record_loadbalance.go @@ -0,0 +1,87 @@ +// Copyright 2024 openGemini Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opengemini + +import ( + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + + "github.com/openGemini/opengemini-client-go/proto" +) + +type grpcEndpoint struct { + address string + conn *grpc.ClientConn + client proto.WriteServiceClient +} + +type grpcLoadBalance struct { + endpoints []*grpcEndpoint +} + +func newRPCLoadBalance(cfg *GRPCConfig) (*grpcLoadBalance, error) { + var eps []*grpcEndpoint + var dialOptions = []grpc.DialOption{ + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: true, + }), + // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.6, + Jitter: 0.2, + MaxDelay: time.Second * 30, // Configurable + }, + MinConnectTimeout: time.Second * 20, + }), + grpc.WithInitialWindowSize(1 << 24), // 16MB + grpc.WithInitialConnWindowSize(1 << 24), // 16MB + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(64 * 1024 * 1024)), // 64MB + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(64 * 1024 * 1024)), // 64MB + } + + if cfg.TlsConfig == nil { + dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + creds := credentials.NewTLS(cfg.TlsConfig) + dialOptions = append(dialOptions, grpc.WithTransportCredentials(creds)) + } + for _, address := range cfg.Addresses { + addr := address.String() + conn, err := grpc.NewClient(addr, dialOptions...) + if err != nil { + return nil, fmt.Errorf("connect to %s failed: %v", addr, err) + } + eps = append(eps, &grpcEndpoint{ + address: addr, + conn: conn, + client: proto.NewWriteServiceClient(conn)}) + } + return &grpcLoadBalance{endpoints: eps}, nil +} + +// getClient returns the next available WriteService client. +func (r *grpcLoadBalance) getClient() proto.WriteServiceClient { + return r.endpoints[0].client +}