From babd816cc1ef60be1fe00d87f7e730199d6c3999 Mon Sep 17 00:00:00 2001 From: chuntaojun Date: Wed, 17 Jul 2024 23:37:18 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E5=A4=8Dbalancer=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=8D=8F=E7=A8=8Bhang=E6=AD=BB=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- balancer.go | 6 ++--- examples/quickstart/consumer/main.go | 30 ++++++++++++++++++++--- examples/quickstart/consumer/polaris.yaml | 16 +++++++++++- global.go | 17 ++++++++++++- go.mod | 2 +- go.sum | 4 +-- resolver.go | 24 ++++++++++++------ 7 files changed, 81 insertions(+), 18 deletions(-) diff --git a/balancer.go b/balancer.go index 2b1ae20..65a0a63 100644 --- a/balancer.go +++ b/balancer.go @@ -171,7 +171,7 @@ func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Ad } // is a new address (not existing in b.subConns). sc, err := p.cc.NewSubConn( - []resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + []resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: false}) if err != nil { GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err) return @@ -245,6 +245,8 @@ func (p *polarisNamingBalancer) ResolverError(err error) { // report an error. return } + p.rwMutex.RLock() + defer p.rwMutex.RUnlock() p.regeneratePicker(nil) p.cc.UpdateState(balancer.State{ ConnectivityState: p.state, @@ -308,8 +310,6 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) { return } readySCs := make(map[string]balancer.SubConn) - p.rwMutex.RLock() - defer p.rwMutex.RUnlock() // Filter out all ready SCs from full subConn map. for addr, sc := range p.subConns { if st, ok := p.scStates[sc]; ok && st == connectivity.Ready { diff --git a/examples/quickstart/consumer/main.go b/examples/quickstart/consumer/main.go index 3f430c6..e65ad7e 100644 --- a/examples/quickstart/consumer/main.go +++ b/examples/quickstart/consumer/main.go @@ -22,6 +22,9 @@ import ( "fmt" "log" "net/http" + "os" + "os/signal" + "syscall" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -41,11 +44,16 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + polaris.GetLogger().SetLevel(polaris.LogDebug) + conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC", polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())), polaris.WithDisableRouter(), polaris.WithDisableCircuitBreaker(), ) + + conn.Close() + if err != nil { log.Fatal(err) } @@ -84,8 +92,24 @@ func main() { _, _ = w.Write([]byte(resp.GetValue())) } http.HandleFunc("/echo", indexHandler) - if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err { - log.Fatal(err) - } + go func() { + if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err { + log.Fatal(err) + } + }() + runMainLoop() +} +func runMainLoop() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, []os.Signal{ + syscall.SIGINT, syscall.SIGTERM, + syscall.SIGSEGV, + }...) + + for s := range ch { + log.Printf("catch signal(%+v), stop servers", s) + polaris.ClosePolarisContext() + return + } } diff --git a/examples/quickstart/consumer/polaris.yaml b/examples/quickstart/consumer/polaris.yaml index 3f0b683..0ed88e3 100644 --- a/examples/quickstart/consumer/polaris.yaml +++ b/examples/quickstart/consumer/polaris.yaml @@ -1,4 +1,18 @@ global: serverConnector: addresses: - - 127.0.0.1:8091 \ No newline at end of file + - 127.0.0.1:8091 + statReporter: + #描述:是否将统计信息上报至monitor + #类型:bool + enable: true + #描述:启用的统计上报插件类型 + #类型:list + #范围:已经注册的统计上报插件的名字 + chain: + - prometheus + plugin: + prometheus: + type: push + address: 127.0.0.1:9091 + interval: 10s diff --git a/global.go b/global.go index 6dee589..66c2229 100644 --- a/global.go +++ b/global.go @@ -42,7 +42,7 @@ var ( // DefaultNamespace default namespace when namespace is not set DefaultNamespace = "default" // DefaultTTL default ttl value when ttl is not set - DefaultTTL = 20 + DefaultTTL = 5 // DefaultGracefulStopMaxWaitDuration default stop max wait duration when not set DefaultGracefulStopMaxWaitDuration = 30 * time.Second // DefaultDelayStopWaitDuration default delay time before stop @@ -57,16 +57,31 @@ const ( ) var ( + ctxRef = 0 polarisContext api.SDKContext polarisConfig config.Configuration mutexPolarisContext sync.Mutex oncePolarisConfig sync.Once ) +func ClosePolarisContext() { + mutexPolarisContext.Lock() + defer mutexPolarisContext.Unlock() + if nil == polarisContext { + return + } + ctxRef-- + if ctxRef == 0 { + polarisContext.Destroy() + polarisContext = nil + } +} + // PolarisContext get or init the global polaris context func PolarisContext() (api.SDKContext, error) { mutexPolarisContext.Lock() defer mutexPolarisContext.Unlock() + ctxRef++ if nil != polarisContext { return polarisContext, nil } diff --git a/go.mod b/go.mod index 429125a..9d0ce15 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/errwrap v1.1.0 // indirect github.com/natefinch/lumberjack v2.0.0+incompatible - github.com/polarismesh/polaris-go v1.6.0-beta.5 + github.com/polarismesh/polaris-go v1.6.0-alpha.7 github.com/polarismesh/specification v1.5.1 github.com/prometheus/client_golang v1.19.1 // indirect github.com/prometheus/common v0.54.0 // indirect diff --git a/go.sum b/go.sum index 9b3bd80..9700eb1 100644 --- a/go.sum +++ b/go.sum @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/polaris-go v1.6.0-beta.5 h1:llucvfydWlFWTNeABHbbuVL2ijR7AITx8UG02tx0c/Y= -github.com/polarismesh/polaris-go v1.6.0-beta.5/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c= +github.com/polarismesh/polaris-go v1.6.0-alpha.7 h1:cWyx5BC6+JurBNCV3EaeCjd6U8edY6KrG6HkkBhWMR0= +github.com/polarismesh/polaris-go v1.6.0-alpha.7/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c= github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I= github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= diff --git a/resolver.go b/resolver.go index c6fa8cc..be512ae 100644 --- a/resolver.go +++ b/resolver.go @@ -55,7 +55,14 @@ func RegisterResolverInterceptor(i ResolverInterceptor) { resolverInterceptors = append(resolverInterceptors, i) } +func NewResolver(ctx api.SDKContext) *resolverBuilder { + return &resolverBuilder{ + sdkCtx: ctx, + } +} + type resolverBuilder struct { + sdkCtx api.SDKContext } // Scheme polaris scheme @@ -104,12 +111,14 @@ func (rb *resolverBuilder) Build( return nil, err } - sdkCtx, err := PolarisContext() - if nil != err { - return nil, err + if rb.sdkCtx == nil { + sdkCtx, err := PolarisContext() + if nil != err { + return nil, err + } + rb.sdkCtx = sdkCtx } - - options.SDKContext = sdkCtx + options.SDKContext = rb.sdkCtx ctx, cancel := context.WithCancel(context.Background()) d := &polarisNamingResolver{ @@ -120,7 +129,7 @@ func (rb *resolverBuilder) Build( options: options, host: host, port: port, - consumer: api.NewConsumerAPIByContext(sdkCtx), + consumer: api.NewConsumerAPIByContext(rb.sdkCtx), eventCh: make(chan struct{}, 1), } go d.watcher() @@ -251,9 +260,10 @@ func (pr *polarisNamingResolver) watcher() { pr.options.Namespace, pr.host) return case <-pr.eventCh: + pr.doRefresh() case <-ticker.C: + pr.doRefresh() } - pr.doRefresh() } }