From 212709a4c45b36933abafa49cd24ef70c6e7693c Mon Sep 17 00:00:00 2001 From: Sanjay Ghemawat Date: Wed, 14 Feb 2024 12:22:37 -0800 Subject: [PATCH] Update to newest version of weaver. (#119) The protocol between the envelope and the weavelet has changed recently. This change updates weaver-gke to a new version of weaver and therefore the new protocol. --- examples/echo/weaver_gen.go | 2 +- gen.go | 1 - go.mod | 2 +- go.sum | 2 + internal/babysitter/babysitter.go | 62 ++----- internal/babysitter/logger.go | 45 ----- internal/babysitter/weaver_gen.go | 234 ------------------------ internal/local/starter.go | 2 +- internal/tool/testprogram/weaver_gen.go | 2 +- 9 files changed, 23 insertions(+), 329 deletions(-) delete mode 100644 internal/babysitter/logger.go delete mode 100644 internal/babysitter/weaver_gen.go diff --git a/examples/echo/weaver_gen.go b/examples/echo/weaver_gen.go index f12d264..56c8bca 100644 --- a/examples/echo/weaver_gen.go +++ b/examples/echo/weaver_gen.go @@ -176,7 +176,7 @@ var _ weaver.Main = (*main_client_stub)(nil) // you run "go build" or "go run". var _ codegen.LatestVersion = codegen.Version[[0][20]struct{}](` -ERROR: You generated this file with 'weaver generate' v0.22.1-0.20231019162801-c2294d1ae0e8 (codegen +ERROR: You generated this file with 'weaver generate' v0.22.1-0.20240208183719-2eb6066c2f85 (codegen version v0.20.0). The generated code is incompatible with the version of the github.com/ServiceWeaver/weaver module that you're using. The weaver module version can be found in your go.mod file or by running the following command. diff --git a/gen.go b/gen.go index 925fe46..c74d52a 100644 --- a/gen.go +++ b/gen.go @@ -24,4 +24,3 @@ package gen //go:generate ./dev/protoc.sh internal/store/store.proto //go:generate ./dev/protoc.sh internal/local/local.proto //go:generate ./dev/protoc.sh internal/local/proxy/proxy.proto -//go:generate weaver generate ./internal/babysitter diff --git a/go.mod b/go.mod index 71d6eef..7d6e390 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( cloud.google.com/go/monitoring v1.15.1 cloud.google.com/go/security v1.15.1 github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.11.0 - github.com/ServiceWeaver/weaver v0.22.1-0.20231019162801-c2294d1ae0e8 + github.com/ServiceWeaver/weaver v0.22.1-0.20240208183719-2eb6066c2f85 github.com/golang/protobuf v1.5.3 github.com/google/cel-go v0.17.1 github.com/google/go-cmp v0.5.9 diff --git a/go.sum b/go.sum index 7713579..15898c8 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapp github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.35.0/go.mod h1:H785fvlgotVZqht+1rHhXSs8EJ8uPVmpBYkTYO3ccpc= github.com/ServiceWeaver/weaver v0.22.1-0.20231019162801-c2294d1ae0e8 h1:smtruzdiiELIMDNHrXD+fY8/I69p0rQPqMlYERptwA4= github.com/ServiceWeaver/weaver v0.22.1-0.20231019162801-c2294d1ae0e8/go.mod h1:j27YowX7vVpIrYcEPZ9e1FR+fvVrlH9DweyO3uyOqkg= +github.com/ServiceWeaver/weaver v0.22.1-0.20240208183719-2eb6066c2f85 h1:nvYGb2V6IdwCSfDXpR4ruo1GD9919+2pnsPLcui0QbU= +github.com/ServiceWeaver/weaver v0.22.1-0.20240208183719-2eb6066c2f85/go.mod h1:j27YowX7vVpIrYcEPZ9e1FR+fvVrlH9DweyO3uyOqkg= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= diff --git a/internal/babysitter/babysitter.go b/internal/babysitter/babysitter.go index 9957804..b5af82a 100644 --- a/internal/babysitter/babysitter.go +++ b/internal/babysitter/babysitter.go @@ -19,9 +19,7 @@ import ( "crypto/x509" "fmt" "log/slog" - "net" "net/http" - "path/filepath" "sync" "time" @@ -30,7 +28,6 @@ import ( "github.com/ServiceWeaver/weaver-gke/internal/mtls" "github.com/ServiceWeaver/weaver-gke/internal/nanny" "github.com/ServiceWeaver/weaver/runtime" - "github.com/ServiceWeaver/weaver/runtime/deployers" "github.com/ServiceWeaver/weaver/runtime/envelope" "github.com/ServiceWeaver/weaver/runtime/logging" "github.com/ServiceWeaver/weaver/runtime/metrics" @@ -88,21 +85,6 @@ func Start( traceSaver func(spans *protos.TraceSpans) error, metricExporter func(metrics []*metrics.MetricSnapshot) error, ) (*Babysitter, error) { - // Custom logging component to receive log messages. - loggerComponent := &loggerImpl{dst: logSaver} - - // Make Unix domain socket listener for serving hosted system components. - tmpDir, err := runtime.NewTempDir() - if err != nil { - return nil, err - } - // TODO(sanjay): Use runtime.OnExitSignal to cleanup when available. - udsPath := filepath.Join(tmpDir, "socket") - uds, err := net.Listen("unix", udsPath) - if err != nil { - return nil, err - } - // Create the envelope. // // We use the PodName as a unique weavelet id for the following reasons: @@ -112,24 +94,18 @@ func Start( // * It allows the manager to quickly check if the weavelet is still // active by asking the Kubernetes API if the Pod with a given name // exists. - info := &protos.EnvelopeInfo{ + args := &protos.WeaveletArgs{ App: cfg.Deployment.App.Name, DeploymentId: cfg.Deployment.Id, Id: podName, - Sections: cfg.Deployment.App.Sections, RunMain: replicaSet == runtime.Main, Mtls: cfg.Mtls, InternalAddress: internalAddress, - Redirects: []*protos.EnvelopeInfo_Redirect{ - // Override the builtin logger. - { - Component: weaverLoggerName, - Target: funcLoggerName, - Address: "unix://" + udsPath, - }, - }, + // ControlSocket, Redirects filled by envelope.NewEnvelope } - e, err := envelope.NewEnvelope(ctx, info, cfg.Deployment.App) + e, err := envelope.NewEnvelope(ctx, args, cfg.Deployment.App, envelope.Options{ + Logger: logger, + }) if err != nil { return nil, err } @@ -168,24 +144,12 @@ func Start( // Watch for components to start. go b.watchComponentsToStart() - // Serve calls to system components. - go func() { - sys := map[string]any{ - funcLoggerName: loggerComponent, - } - if err := deployers.ServeComponents(ctx, uds, logger, sys); err != nil { - panic(err) - } - }() - return b, nil } -// WeaveletInfo returns information about the weavelet managed by the -// babysitter. -func (b *Babysitter) WeaveletInfo() *protos.WeaveletInfo { - return b.envelope.WeaveletInfo() -} +// WeaveletAddress returns the address that other components should dial to communicate with the +// weavelet. +func (b *Babysitter) WeaveletAddress() string { return b.envelope.WeaveletAddress() } // SelfAddr returns the address on which the babysitter is listening on // for incoming requests. @@ -231,7 +195,7 @@ func (b *Babysitter) GetLoad(_ context.Context, req *endpoints.GetLoadRequest) ( } return &endpoints.GetLoadReply{ Load: load, - WeaveletAddr: b.WeaveletInfo().DialAddr, + WeaveletAddr: b.WeaveletAddress(), }, nil } @@ -245,6 +209,14 @@ func (b *Babysitter) RunProfiling(_ context.Context, req *protos.GetProfileReque return &protos.GetProfileReply{Data: prof}, nil } +// LogBatch implements the control.DeployerControl interface. +func (b *Babysitter) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { + for _, entry := range batch.Entries { + b.logSaver(entry) + } + return nil +} + // ActivateComponent implements the envelope.EnvelopeHandler interface. func (b *Babysitter) ActivateComponent(ctx context.Context, req *protos.ActivateComponentRequest) (*protos.ActivateComponentReply, error) { targetReplicaSet := config.ReplicaSetForComponent(req.Component, b.cfg) diff --git a/internal/babysitter/logger.go b/internal/babysitter/logger.go deleted file mode 100644 index 39a069f..0000000 --- a/internal/babysitter/logger.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2023 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package babysitter - -import ( - "context" - - "github.com/ServiceWeaver/weaver" - "github.com/ServiceWeaver/weaver/runtime/protos" -) - -const ( - weaverLoggerName = "github.com/ServiceWeaver/weaver/Logger" - funcLoggerName = "github.com/ServiceWeaver/weaver-gke/internal/babysitter/funcLogger" -) - -// funcLogger overrides weaver.Logger component when running under GKE. -// It redirects log entries to a supplied function. -type funcLogger weaver.Logger - -type loggerImpl struct { - weaver.Implements[funcLogger] - dst func(*protos.LogEntry) -} - -var _ funcLogger = &loggerImpl{} - -func (l *loggerImpl) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { - for _, e := range batch.Entries { - l.dst(e) - } - return nil -} diff --git a/internal/babysitter/weaver_gen.go b/internal/babysitter/weaver_gen.go deleted file mode 100644 index 05f97ce..0000000 --- a/internal/babysitter/weaver_gen.go +++ /dev/null @@ -1,234 +0,0 @@ -// Code generated by "weaver generate". DO NOT EDIT. -//go:build !ignoreWeaverGen - -package babysitter - -import ( - "context" - "errors" - "github.com/ServiceWeaver/weaver" - "github.com/ServiceWeaver/weaver/runtime/codegen" - "github.com/ServiceWeaver/weaver/runtime/protos" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - "reflect" -) - -func init() { - codegen.Register(codegen.Registration{ - Name: "github.com/ServiceWeaver/weaver-gke/internal/babysitter/funcLogger", - Iface: reflect.TypeOf((*funcLogger)(nil)).Elem(), - Impl: reflect.TypeOf(loggerImpl{}), - LocalStubFn: func(impl any, caller string, tracer trace.Tracer) any { - return funcLogger_local_stub{impl: impl.(funcLogger), tracer: tracer, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver-gke/internal/babysitter/funcLogger", Method: "LogBatch", Remote: false})} - }, - ClientStubFn: func(stub codegen.Stub, caller string) any { - return funcLogger_client_stub{stub: stub, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver-gke/internal/babysitter/funcLogger", Method: "LogBatch", Remote: true})} - }, - ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server { - return funcLogger_server_stub{impl: impl.(funcLogger), addLoad: addLoad} - }, - ReflectStubFn: func(caller func(string, context.Context, []any, []any) error) any { - return funcLogger_reflect_stub{caller: caller} - }, - RefData: "", - }) -} - -// weaver.InstanceOf checks. -var _ weaver.InstanceOf[funcLogger] = (*loggerImpl)(nil) - -// weaver.Router checks. -var _ weaver.Unrouted = (*loggerImpl)(nil) - -// Local stub implementations. - -type funcLogger_local_stub struct { - impl funcLogger - tracer trace.Tracer - logBatchMetrics *codegen.MethodMetrics -} - -// Check that funcLogger_local_stub implements the funcLogger interface. -var _ funcLogger = (*funcLogger_local_stub)(nil) - -func (s funcLogger_local_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - // Update metrics. - begin := s.logBatchMetrics.Begin() - defer func() { s.logBatchMetrics.End(begin, err != nil, 0, 0) }() - span := trace.SpanFromContext(ctx) - if span.SpanContext().IsValid() { - // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "babysitter.funcLogger.LogBatch", trace.WithSpanKind(trace.SpanKindInternal)) - defer func() { - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - span.End() - }() - } - - return s.impl.LogBatch(ctx, a0) -} - -// Client stub implementations. - -type funcLogger_client_stub struct { - stub codegen.Stub - logBatchMetrics *codegen.MethodMetrics -} - -// Check that funcLogger_client_stub implements the funcLogger interface. -var _ funcLogger = (*funcLogger_client_stub)(nil) - -func (s funcLogger_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - // Update metrics. - var requestBytes, replyBytes int - begin := s.logBatchMetrics.Begin() - defer func() { s.logBatchMetrics.End(begin, err != nil, requestBytes, replyBytes) }() - - span := trace.SpanFromContext(ctx) - if span.SpanContext().IsValid() { - // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "babysitter.funcLogger.LogBatch", trace.WithSpanKind(trace.SpanKindClient)) - } - - defer func() { - // Catch and return any panics detected during encoding/decoding/rpc. - if err == nil { - err = codegen.CatchPanics(recover()) - if err != nil { - err = errors.Join(weaver.RemoteCallError, err) - } - } - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - span.End() - - }() - - // Encode arguments. - enc := codegen.NewEncoder() - serviceweaver_enc_ptr_LogEntryBatch_fec9a5d4(enc, a0) - var shardKey uint64 - - // Call the remote method. - requestBytes = len(enc.Data()) - var results []byte - results, err = s.stub.Run(ctx, 0, enc.Data(), shardKey) - replyBytes = len(results) - if err != nil { - err = errors.Join(weaver.RemoteCallError, err) - return - } - - // Decode the results. - dec := codegen.NewDecoder(results) - err = dec.Error() - return -} - -// Note that "weaver generate" will always generate the error message below. -// Everything is okay. The error message is only relevant if you see it when -// you run "go build" or "go run". -var _ codegen.LatestVersion = codegen.Version[[0][20]struct{}](` - -ERROR: You generated this file with 'weaver generate' v0.22.1-0.20231019162801-c2294d1ae0e8 (codegen -version v0.20.0). The generated code is incompatible with the version of the -github.com/ServiceWeaver/weaver module that you're using. The weaver module -version can be found in your go.mod file or by running the following command. - - go list -m github.com/ServiceWeaver/weaver - -We recommend updating the weaver module and the 'weaver generate' command by -running the following. - - go get github.com/ServiceWeaver/weaver@latest - go install github.com/ServiceWeaver/weaver/cmd/weaver@latest - -Then, re-run 'weaver generate' and re-build your code. If the problem persists, -please file an issue at https://github.com/ServiceWeaver/weaver/issues. - -`) - -// Server stub implementations. - -type funcLogger_server_stub struct { - impl funcLogger - addLoad func(key uint64, load float64) -} - -// Check that funcLogger_server_stub implements the codegen.Server interface. -var _ codegen.Server = (*funcLogger_server_stub)(nil) - -// GetStubFn implements the codegen.Server interface. -func (s funcLogger_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { - switch method { - case "LogBatch": - return s.logBatch - default: - return nil - } -} - -func (s funcLogger_server_stub) logBatch(ctx context.Context, args []byte) (res []byte, err error) { - // Catch and return any panics detected during encoding/decoding/rpc. - defer func() { - if err == nil { - err = codegen.CatchPanics(recover()) - } - }() - - // Decode arguments. - dec := codegen.NewDecoder(args) - var a0 *protos.LogEntryBatch - a0 = serviceweaver_dec_ptr_LogEntryBatch_fec9a5d4(dec) - - // TODO(rgrandl): The deferred function above will recover from panics in the - // user code: fix this. - // Call the local method. - appErr := s.impl.LogBatch(ctx, a0) - - // Encode the results. - enc := codegen.NewEncoder() - enc.Error(appErr) - return enc.Data(), nil -} - -// Reflect stub implementations. - -type funcLogger_reflect_stub struct { - caller func(string, context.Context, []any, []any) error -} - -// Check that funcLogger_reflect_stub implements the funcLogger interface. -var _ funcLogger = (*funcLogger_reflect_stub)(nil) - -func (s funcLogger_reflect_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - err = s.caller("LogBatch", ctx, []any{a0}, []any{}) - return -} - -// Encoding/decoding implementations. - -func serviceweaver_enc_ptr_LogEntryBatch_fec9a5d4(enc *codegen.Encoder, arg *protos.LogEntryBatch) { - if arg == nil { - enc.Bool(false) - } else { - enc.Bool(true) - enc.EncodeProto(arg) - } -} - -func serviceweaver_dec_ptr_LogEntryBatch_fec9a5d4(dec *codegen.Decoder) *protos.LogEntryBatch { - if !dec.Bool() { - return nil - } - var res protos.LogEntryBatch - dec.DecodeProto(&res) - return &res -} diff --git a/internal/local/starter.go b/internal/local/starter.go index 67adfef..12c3004 100644 --- a/internal/local/starter.go +++ b/internal/local/starter.go @@ -128,7 +128,7 @@ func (s *Starter) getHealthyPods(ctx context.Context, cfg *config.GKEConfig, rep var healthyPods []*nanny.Pod for _, r := range rs.pods { babysitterAddr := r.b.SelfAddr() - weaveletAddr := r.b.WeaveletInfo().DialAddr + weaveletAddr := r.b.WeaveletAddress() // NOTE: we could call r.b.GetLoad() directly here, but are instead // choosing to exercise the same call path as GKE proper. b := newBabysitter(babysitterAddr) diff --git a/internal/tool/testprogram/weaver_gen.go b/internal/tool/testprogram/weaver_gen.go index 3922205..d7a0ad8 100644 --- a/internal/tool/testprogram/weaver_gen.go +++ b/internal/tool/testprogram/weaver_gen.go @@ -61,7 +61,7 @@ var _ weaver.Main = (*main_client_stub)(nil) // you run "go build" or "go run". var _ codegen.LatestVersion = codegen.Version[[0][20]struct{}](` -ERROR: You generated this file with 'weaver generate' v0.22.1-0.20231019162801-c2294d1ae0e8 (codegen +ERROR: You generated this file with 'weaver generate' v0.22.1-0.20240208183719-2eb6066c2f85 (codegen version v0.20.0). The generated code is incompatible with the version of the github.com/ServiceWeaver/weaver module that you're using. The weaver module version can be found in your go.mod file or by running the following command.