Skip to content

Commit

Permalink
fix:修复balancer任务协程hang死问题
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Jul 17, 2024
1 parent 893169e commit babd816
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 18 deletions.
6 changes: 3 additions & 3 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (p *polarisNamingBalancer) createSubConnection(key string, addr resolver.Ad
}
// is a new address (not existing in b.subConns).
sc, err := p.cc.NewSubConn(
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
[]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: false})
if err != nil {
GetLogger().Error("[Polaris][Balancer] failed to create new SubConn: %v", err)
return
Expand Down Expand Up @@ -245,6 +245,8 @@ func (p *polarisNamingBalancer) ResolverError(err error) {
// report an error.
return
}
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
p.regeneratePicker(nil)
p.cc.UpdateState(balancer.State{
ConnectivityState: p.state,
Expand Down Expand Up @@ -308,8 +310,6 @@ func (p *polarisNamingBalancer) regeneratePicker(options *dialOptions) {
return
}
readySCs := make(map[string]balancer.SubConn)
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
// Filter out all ready SCs from full subConn map.
for addr, sc := range p.subConns {
if st, ok := p.scStates[sc]; ok && st == connectivity.Ready {
Expand Down
30 changes: 27 additions & 3 deletions examples/quickstart/consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -41,11 +44,16 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

polaris.GetLogger().SetLevel(polaris.LogDebug)

conn, err := polaris.DialContext(ctx, "polaris://QuickStartEchoServerGRPC",
polaris.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
polaris.WithDisableRouter(),
polaris.WithDisableCircuitBreaker(),
)

conn.Close()

if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -84,8 +92,24 @@ func main() {
_, _ = w.Write([]byte(resp.GetValue()))
}
http.HandleFunc("/echo", indexHandler)
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
go func() {
if err := http.ListenAndServe(fmt.Sprintf(":%d", listenPort), nil); nil != err {
log.Fatal(err)
}
}()
runMainLoop()
}

func runMainLoop() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, []os.Signal{
syscall.SIGINT, syscall.SIGTERM,
syscall.SIGSEGV,
}...)

for s := range ch {
log.Printf("catch signal(%+v), stop servers", s)
polaris.ClosePolarisContext()
return
}
}
16 changes: 15 additions & 1 deletion examples/quickstart/consumer/polaris.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
- 127.0.0.1:8091
statReporter:
#描述:是否将统计信息上报至monitor
#类型:bool
enable: true
#描述:启用的统计上报插件类型
#类型:list
#范围:已经注册的统计上报插件的名字
chain:
- prometheus
plugin:
prometheus:
type: push
address: 127.0.0.1:9091
interval: 10s
17 changes: 16 additions & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
// DefaultNamespace default namespace when namespace is not set
DefaultNamespace = "default"
// DefaultTTL default ttl value when ttl is not set
DefaultTTL = 20
DefaultTTL = 5
// DefaultGracefulStopMaxWaitDuration default stop max wait duration when not set
DefaultGracefulStopMaxWaitDuration = 30 * time.Second
// DefaultDelayStopWaitDuration default delay time before stop
Expand All @@ -57,16 +57,31 @@ const (
)

var (
ctxRef = 0
polarisContext api.SDKContext
polarisConfig config.Configuration
mutexPolarisContext sync.Mutex
oncePolarisConfig sync.Once
)

func ClosePolarisContext() {
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
if nil == polarisContext {
return
}
ctxRef--
if ctxRef == 0 {
polarisContext.Destroy()
polarisContext = nil
}
}

// PolarisContext get or init the global polaris context
func PolarisContext() (api.SDKContext, error) {
mutexPolarisContext.Lock()
defer mutexPolarisContext.Unlock()
ctxRef++
if nil != polarisContext {
return polarisContext, nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/polarismesh/polaris-go v1.6.0-beta.5
github.com/polarismesh/polaris-go v1.6.0-alpha.7
github.com/polarismesh/specification v1.5.1
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/common v0.54.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1695,8 +1695,8 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ
github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/polarismesh/polaris-go v1.6.0-beta.5 h1:llucvfydWlFWTNeABHbbuVL2ijR7AITx8UG02tx0c/Y=
github.com/polarismesh/polaris-go v1.6.0-beta.5/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/polaris-go v1.6.0-alpha.7 h1:cWyx5BC6+JurBNCV3EaeCjd6U8edY6KrG6HkkBhWMR0=
github.com/polarismesh/polaris-go v1.6.0-alpha.7/go.mod h1:CuXO9bhHGjSoOIMWr4NXf3bJAkRBp5YoM7ibBzENC+c=
github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
github.com/polarismesh/specification v1.5.1 h1:cJ2m0RBepdopGo/e3UpKdsab3NpDZnw5IsVTB1sFc5I=
github.com/polarismesh/specification v1.5.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU=
Expand Down
24 changes: 17 additions & 7 deletions resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ func RegisterResolverInterceptor(i ResolverInterceptor) {
resolverInterceptors = append(resolverInterceptors, i)
}

func NewResolver(ctx api.SDKContext) *resolverBuilder {
return &resolverBuilder{
sdkCtx: ctx,
}
}

type resolverBuilder struct {
sdkCtx api.SDKContext
}

// Scheme polaris scheme
Expand Down Expand Up @@ -104,12 +111,14 @@ func (rb *resolverBuilder) Build(
return nil, err
}

sdkCtx, err := PolarisContext()
if nil != err {
return nil, err
if rb.sdkCtx == nil {
sdkCtx, err := PolarisContext()
if nil != err {
return nil, err
}
rb.sdkCtx = sdkCtx
}

options.SDKContext = sdkCtx
options.SDKContext = rb.sdkCtx

ctx, cancel := context.WithCancel(context.Background())
d := &polarisNamingResolver{
Expand All @@ -120,7 +129,7 @@ func (rb *resolverBuilder) Build(
options: options,
host: host,
port: port,
consumer: api.NewConsumerAPIByContext(sdkCtx),
consumer: api.NewConsumerAPIByContext(rb.sdkCtx),
eventCh: make(chan struct{}, 1),
}
go d.watcher()
Expand Down Expand Up @@ -251,9 +260,10 @@ func (pr *polarisNamingResolver) watcher() {
pr.options.Namespace, pr.host)
return
case <-pr.eventCh:
pr.doRefresh()
case <-ticker.C:
pr.doRefresh()
}
pr.doRefresh()
}
}

Expand Down

0 comments on commit babd816

Please sign in to comment.