diff --git a/examples/quickstart/consumer/polaris.yaml b/examples/quickstart/consumer/polaris.yaml index d926e60..3f0b683 100644 --- a/examples/quickstart/consumer/polaris.yaml +++ b/examples/quickstart/consumer/polaris.yaml @@ -1,4 +1,4 @@ global: serverConnector: addresses: - - 9.134.15.118:8091 \ No newline at end of file + - 127.0.0.1:8091 \ No newline at end of file diff --git a/examples/quickstart/provider/polaris.yaml b/examples/quickstart/provider/polaris.yaml index d926e60..3f0b683 100644 --- a/examples/quickstart/provider/polaris.yaml +++ b/examples/quickstart/provider/polaris.yaml @@ -1,4 +1,4 @@ global: serverConnector: addresses: - - 9.134.15.118:8091 \ No newline at end of file + - 127.0.0.1:8091 \ No newline at end of file diff --git a/resolver.go b/resolver.go index 4d406dd..230daf4 100644 --- a/resolver.go +++ b/resolver.go @@ -24,6 +24,10 @@ import ( "fmt" "sync" + "google.golang.org/grpc/grpclog" + + "github.com/polarismesh/polaris-go/pkg/model" + "github.com/polarismesh/polaris-go/api" "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" @@ -106,10 +110,10 @@ func getNamespace(options *dialOptions) string { const keyDialOptions = "options" -func (pr *polarisNamingResolver) lookup() (*resolver.State, error) { +func (pr *polarisNamingResolver) lookup() (*resolver.State, api.ConsumerAPI, error) { sdkCtx, err := PolarisContext() if nil != err { - return nil, err + return nil, nil, err } consumerAPI := api.NewConsumerAPIByContext(sdkCtx) instancesRequest := &api.GetInstancesRequest{} @@ -125,7 +129,7 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, error) { } resp, err := consumerAPI.GetInstances(instancesRequest) if nil != err { - return nil, err + return nil, consumerAPI, err } state := &resolver.State{} for _, instance := range resp.Instances { @@ -134,23 +138,46 @@ func (pr *polarisNamingResolver) lookup() (*resolver.State, error) { Attributes: attributes.New(keyDialOptions, pr.options), }) } - return state, nil + return state, consumerAPI, nil +} + +func (pr *polarisNamingResolver) doWatch( + consumerAPI api.ConsumerAPI) (model.ServiceKey, <-chan model.SubScribeEvent, error) { + watchRequest := &api.WatchServiceRequest{} + watchRequest.Key = model.ServiceKey{ + Namespace: getNamespace(pr.options), + Service: pr.target.Authority, + } + resp, err := consumerAPI.WatchService(watchRequest) + if nil != err { + return watchRequest.Key, nil, err + } + return watchRequest.Key, resp.EventChannel, nil } func (pr *polarisNamingResolver) watcher() { defer pr.wg.Done() + var consumerAPI api.ConsumerAPI + var eventChan <-chan model.SubScribeEvent for { select { case <-pr.ctx.Done(): return case <-pr.rn: + case <-eventChan: } - - state, err := pr.lookup() + var state *resolver.State + var err error + state, consumerAPI, err = pr.lookup() if err != nil { pr.cc.ReportError(err) } else { pr.cc.UpdateState(*state) + var svcKey model.ServiceKey + svcKey, eventChan, err = pr.doWatch(consumerAPI) + if nil != err { + grpclog.Errorf("fail to do watch for service %s: %v", svcKey, err) + } } } } diff --git a/server.go b/server.go index ec33c31..0bf9ee5 100644 --- a/server.go +++ b/server.go @@ -21,9 +21,12 @@ import ( "context" "fmt" "net" + "os" + "os/signal" "strconv" "strings" "sync" + "syscall" "time" "github.com/golang/protobuf/proto" @@ -166,6 +169,7 @@ func parsePort(addr string) (int, error) { } func deregisterServices(registerContext *RegisterContext) { + fmt.Printf("invoke deregisterServices\n") registerContext.cancel() if nil != registerContext.healthCheckWait { grpclog.Infof("[Polaris]start to wait heartbeat finish") @@ -217,7 +221,7 @@ func (s *Server) startHeartbeat(ctx context.Context, for { select { case <-ctx.Done(): - grpclog.Infof("[Polaris]heartbeat ticker %d has stopped") + grpclog.Infof("[Polaris]heartbeat ticker has stopped") wg.Done() return case <-ticker.C: @@ -228,8 +232,8 @@ func (s *Server) startHeartbeat(ctx context.Context, hbRequest.Port = registerRequest.Port err := providerAPI.Heartbeat(hbRequest) if nil != err { - grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s)", - hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace) + grpclog.Errorf("[Polaris]fail to heartbeat %s:%d to service %s(%s): %v", + hbRequest.Host, hbRequest.Port, hbRequest.Service, hbRequest.Namespace, err) } } } @@ -247,7 +251,6 @@ func (s *Server) Serve(lis net.Listener) error { registerContext := &RegisterContext{ cancel: cancel, } - defer deregisterServices(registerContext) if len(svcInfos) > 0 { polarisCtx, err := PolarisContext() if nil != err { @@ -286,6 +289,7 @@ func (s *Server) Serve(lis net.Listener) error { registerContext.registerRequests = append(registerContext.registerRequests, registerRequest) resp, err := registerContext.providerAPI.Register(registerRequest) if nil != err { + deregisterServices(registerContext) return fmt.Errorf("fail to register service %s: %v", name, err) } grpclog.Infof("[Polaris]success to register %s:%d to service %s(%s), id %s", @@ -294,5 +298,15 @@ func (s *Server) Serve(lis net.Listener) error { registerContext.healthCheckWait = s.startHeartbeat(ctx, registerContext.providerAPI, registerContext.registerRequests) } + go s.scheduleDeregister(registerContext) return s.gRPCServer.Serve(lis) } + +func (s *Server) scheduleDeregister(registerContext *RegisterContext) { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + sig := <-c + grpclog.Infof("[Polaris]receive quit signal %v", sig) + deregisterServices(registerContext) + s.gRPCServer.GracefulStop() +}