From a69dda5e0e454aafd0c97534c5261424dba1f921 Mon Sep 17 00:00:00 2001 From: lavkesh Date: Wed, 27 Sep 2023 14:39:00 +0800 Subject: [PATCH] feat: add global logger --- cli/migrate.go | 12 +++++------ cli/serve.go | 24 +++++++++++----------- core/core.go | 6 +----- core/core_test.go | 2 +- core/read_test.go | 10 ++++----- core/sync.go | 4 ++-- core/write_test.go | 36 ++++++++++++++++----------------- internal/server/middlewares.go | 10 ++++----- internal/server/server.go | 9 ++++----- modules/firehose/driver.go | 14 ++++++------- modules/firehose/driver_plan.go | 6 ++++-- pkg/logger/logger.go | 9 +++++++-- pkg/telemetry/telemetry.go | 6 +++--- 13 files changed, 74 insertions(+), 74 deletions(-) diff --git a/cli/migrate.go b/cli/migrate.go index 350c60d0..6ffefa1e 100644 --- a/cli/migrate.go +++ b/cli/migrate.go @@ -3,10 +3,8 @@ package cli import ( "context" - "github.com/spf13/cobra" - "go.uber.org/zap" - "github.com/goto/entropy/pkg/logger" + "github.com/spf13/cobra" ) func cmdMigrate() *cobra.Command { @@ -24,18 +22,18 @@ func cmdMigrate() *cobra.Command { return err } - zapLog, err := logger.New(&cfg.Log) + err = logger.Setup(&cfg.Log) if err != nil { return err } - return runMigrations(cmd.Context(), zapLog, cfg) + return runMigrations(cmd.Context(), cfg) }) return cmd } -func runMigrations(ctx context.Context, zapLog *zap.Logger, cfg Config) error { - store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer) +func runMigrations(ctx context.Context, cfg Config) error { + store := setupStorage(cfg.PGConnStr, cfg.Syncer) return store.Migrate(ctx) } diff --git a/cli/serve.go b/cli/serve.go index 5e057bbd..1e00058c 100644 --- a/cli/serve.go +++ b/cli/serve.go @@ -38,23 +38,23 @@ func cmdServe() *cobra.Command { return err } - zapLog, err := logger.New(&cfg.Log) + err = logger.Setup(&cfg.Log) if err != nil { return err } - telemetry.Init(cmd.Context(), cfg.Telemetry, zapLog) + telemetry.Init(cmd.Context(), cfg.Telemetry) nrApp, err := newrelic.NewApplication( newrelic.ConfigAppName(cfg.Telemetry.ServiceName), newrelic.ConfigLicense(cfg.Telemetry.NewRelicAPIKey), ) - store := setupStorage(zapLog, cfg.PGConnStr, cfg.Syncer) - moduleService := module.NewService(setupRegistry(zapLog), store) - resourceService := core.New(store, moduleService, time.Now, zapLog) + store := setupStorage(cfg.PGConnStr, cfg.Syncer) + moduleService := module.NewService(setupRegistry(), store) + resourceService := core.New(store, moduleService, time.Now) if migrate { - if migrateErr := runMigrations(cmd.Context(), zapLog, cfg); migrateErr != nil { + if migrateErr := runMigrations(cmd.Context(), cfg); migrateErr != nil { return migrateErr } } @@ -62,21 +62,21 @@ func cmdServe() *cobra.Command { if spawnWorker { go func() { if runErr := resourceService.RunSyncer(cmd.Context(), cfg.Syncer.SyncInterval); runErr != nil { - zapLog.Error("syncer exited with error", zap.Error(err)) + zap.L().Error("syncer exited with error", zap.Error(err)) } }() } return entropyserver.Serve(cmd.Context(), cfg.Service.httpAddr(), cfg.Service.grpcAddr(), - nrApp, zapLog, resourceService, moduleService, + nrApp, resourceService, moduleService, ) }) return cmd } -func setupRegistry(logger *zap.Logger) module.Registry { +func setupRegistry() module.Registry { supported := []module.Descriptor{ kubernetes.Module, firehose.Module, @@ -85,7 +85,7 @@ func setupRegistry(logger *zap.Logger) module.Registry { registry := &modules.Registry{} for _, desc := range supported { if err := registry.Register(desc); err != nil { - logger.Fatal("failed to register module", + zap.L().Fatal("failed to register module", zap.String("module_kind", desc.Kind), zap.Error(err), ) @@ -94,10 +94,10 @@ func setupRegistry(logger *zap.Logger) module.Registry { return registry } -func setupStorage(logger *zap.Logger, pgConStr string, syncCfg syncerConf) *postgres.Store { +func setupStorage(pgConStr string, syncCfg syncerConf) *postgres.Store { store, err := postgres.Open(pgConStr, syncCfg.RefreshInterval, syncCfg.ExtendLockBy) if err != nil { - logger.Fatal("failed to connect to Postgres database", + zap.L().Fatal("failed to connect to Postgres database", zap.Error(err), zap.String("conn_str", pgConStr)) } return store diff --git a/core/core.go b/core/core.go index ef9ea293..cbffd67d 100644 --- a/core/core.go +++ b/core/core.go @@ -7,15 +7,12 @@ import ( "encoding/json" "time" - "go.uber.org/zap" - "github.com/goto/entropy/core/module" "github.com/goto/entropy/core/resource" "github.com/goto/entropy/pkg/errors" ) type Service struct { - logger *zap.Logger clock func() time.Time store resource.Store moduleSvc ModuleService @@ -30,7 +27,7 @@ type ModuleService interface { GetOutput(ctx context.Context, res module.ExpandedResource) (json.RawMessage, error) } -func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, lg *zap.Logger) *Service { +func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time) *Service { const ( defaultMaxRetries = 10 defaultSyncBackoff = 5 * time.Second @@ -41,7 +38,6 @@ func New(repo resource.Store, moduleSvc ModuleService, clockFn func() time.Time, } return &Service{ - logger: lg, clock: clockFn, store: repo, syncBackoff: defaultSyncBackoff, diff --git a/core/core_test.go b/core/core_test.go index 54628532..bee41da8 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -25,6 +25,6 @@ var ( func TestNew(t *testing.T) { t.Parallel() - s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock, nil) + s := core.New(&mocks.ResourceStore{}, &mocks.ModuleService{}, deadClock) assert.NotNil(t, s) } diff --git a/core/read_test.go b/core/read_test.go index d8160e70..e33e6991 100644 --- a/core/read_test.go +++ b/core/read_test.go @@ -32,7 +32,7 @@ func TestService_GetResource(t *testing.T) { GetByURN(mock.Anything, mock.Anything). Return(nil, errors.ErrNotFound). Once() - return core.New(repo, nil, nil, nil) + return core.New(repo, nil, nil) }, urn: "foo:bar:baz", wantErr: errors.ErrNotFound, @@ -52,7 +52,7 @@ func TestService_GetResource(t *testing.T) { Return(nil, nil). Once() - return core.New(repo, mod, deadClock, nil) + return core.New(repo, mod, deadClock) }, urn: "foo:bar:baz", want: &sampleResource, @@ -99,7 +99,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return(nil, nil). Once() - return core.New(repo, nil, deadClock, nil) + return core.New(repo, nil, deadClock) }, want: nil, wantErr: nil, @@ -113,7 +113,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return(nil, errStoreFailure). Once() - return core.New(repo, nil, deadClock, nil) + return core.New(repo, nil, deadClock) }, want: nil, wantErr: errors.ErrInternal, @@ -127,7 +127,7 @@ func TestService_ListResources(t *testing.T) { List(mock.Anything, mock.Anything). Return([]resource.Resource{sampleResource}, nil). Once() - return core.New(repo, nil, deadClock, nil) + return core.New(repo, nil, deadClock) }, want: []resource.Resource{sampleResource}, wantErr: nil, diff --git a/core/sync.go b/core/sync.go index 25134c6b..d4d17795 100644 --- a/core/sync.go +++ b/core/sync.go @@ -26,14 +26,14 @@ func (svc *Service) RunSyncer(ctx context.Context, interval time.Duration) error err := svc.store.SyncOne(ctx, svc.handleSync) if err != nil { - svc.logger.Warn("SyncOne() failed", zap.Error(err)) + zap.L().Warn("SyncOne() failed", zap.Error(err)) } } } } func (svc *Service) handleSync(ctx context.Context, res resource.Resource) (*resource.Resource, error) { - logEntry := svc.logger.With( + logEntry := zap.L().With( zap.String("resource_urn", res.URN), zap.String("resource_status", res.State.Status), zap.Int("retries", res.State.SyncResult.Retries), diff --git a/core/write_test.go b/core/write_test.go index 29ddd3fc..4fd64826 100644 --- a/core/write_test.go +++ b/core/write_test.go @@ -37,7 +37,7 @@ func TestService_CreateResource(t *testing.T) { PlanAction(mock.Anything, mock.Anything, mock.Anything). Return(nil, errSample).Once() - return core.New(nil, mod, deadClock, nil) + return core.New(nil, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -59,7 +59,7 @@ func TestService_CreateResource(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -98,7 +98,7 @@ func TestService_CreateResource(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -136,7 +136,7 @@ func TestService_CreateResource(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -170,7 +170,7 @@ func TestService_CreateResource(t *testing.T) { Return(errSample). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -198,7 +198,7 @@ func TestService_CreateResource(t *testing.T) { Create(mock.Anything, mock.Anything, mock.Anything). Return(errors.ErrConflict).Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -255,7 +255,7 @@ func TestService_CreateResource(t *testing.T) { }). Return(nil) - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, res: resource.Resource{ Kind: "mock", @@ -328,7 +328,7 @@ func TestService_UpdateResource(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, nil, deadClock, nil) + return core.New(resourceRepo, nil, deadClock) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -357,7 +357,7 @@ func TestService_UpdateResource(t *testing.T) { Return(&testResource, nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -404,7 +404,7 @@ func TestService_UpdateResource(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -450,7 +450,7 @@ func TestService_UpdateResource(t *testing.T) { }). Twice() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:project:child", update: resource.UpdateRequest{ @@ -513,7 +513,7 @@ func TestService_DeleteResource(t *testing.T) { Return(nil, testErr). Once() - return core.New(resourceRepo, nil, deadClock, nil) + return core.New(resourceRepo, nil, deadClock) }, urn: "orn:entropy:mock:foo:bar", wantErr: testErr, @@ -558,7 +558,7 @@ func TestService_DeleteResource(t *testing.T) { Return(testErr). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:foo:bar", wantErr: errors.ErrInternal, @@ -603,7 +603,7 @@ func TestService_DeleteResource(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:foo:bar", wantErr: nil, @@ -653,7 +653,7 @@ func TestService_ApplyAction(t *testing.T) { Return(nil, errors.ErrNotFound). Once() - return core.New(resourceRepo, nil, deadClock, nil) + return core.New(resourceRepo, nil, deadClock) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -680,7 +680,7 @@ func TestService_ApplyAction(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -713,7 +713,7 @@ func TestService_ApplyAction(t *testing.T) { }, nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, @@ -756,7 +756,7 @@ func TestService_ApplyAction(t *testing.T) { Return(nil). Once() - return core.New(resourceRepo, mod, deadClock, nil) + return core.New(resourceRepo, mod, deadClock) }, urn: "orn:entropy:mock:foo:bar", action: sampleAction, diff --git a/internal/server/middlewares.go b/internal/server/middlewares.go index ab0cc35c..9f4f8970 100644 --- a/internal/server/middlewares.go +++ b/internal/server/middlewares.go @@ -81,7 +81,7 @@ func requestID() gorillamux.MiddlewareFunc { } } -func requestLogger(lg *zap.Logger) gorillamux.MiddlewareFunc { +func requestLogger() gorillamux.MiddlewareFunc { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(wr http.ResponseWriter, req *http.Request) { t := time.Now() @@ -96,7 +96,7 @@ func requestLogger(lg *zap.Logger) gorillamux.MiddlewareFunc { bodyBytes, err := io.ReadAll(req.Body) if err != nil { - lg.Error("error reading request body: %v", zap.String("error", err.Error())) + zap.L().Error("error reading request body: %v", zap.String("error", err.Error())) return } reader := io.NopCloser(bytes.NewBuffer(bodyBytes)) @@ -122,16 +122,16 @@ func requestLogger(lg *zap.Logger) gorillamux.MiddlewareFunc { dst := bytes.NewBuffer(nil) err = json.Compact(dst, bodyBytes) if err != nil { - lg.Error("error json compacting request body: %v", zap.String("error", err.Error())) + zap.L().Error("error json compacting request body: %v", zap.String("error", err.Error())) } else { fields = append(fields, zap.String("request_body", dst.String())) } } if !is2xx(wrapped.Status) { - lg.Warn("request handled with non-2xx response", fields...) + zap.L().Warn("request handled with non-2xx response", fields...) } else { - lg.Info("request handled", fields...) + zap.L().Info("request handled", fields...) } }) } diff --git a/internal/server/server.go b/internal/server/server.go index 3ebc1194..857da037 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -40,14 +40,14 @@ const ( // Serve initialises all the gRPC+HTTP API routes, starts listening for requests at addr, and blocks until server exits. // Server exits gracefully when context is cancelled. -func Serve(ctx context.Context, httpAddr, grpcAddr string, nrApp *newrelic.Application, logger *zap.Logger, +func Serve(ctx context.Context, httpAddr, grpcAddr string, nrApp *newrelic.Application, resourceSvc resourcesv1.ResourceService, moduleSvc modulesv1.ModuleService, ) error { grpcOpts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_recovery.UnaryServerInterceptor(), grpc_ctxtags.UnaryServerInterceptor(), - grpc_zap.UnaryServerInterceptor(logger), + grpc_zap.UnaryServerInterceptor(zap.L()), nrgrpc.UnaryServerInterceptor(nrApp), )), grpc.StatsHandler(&ocgrpc.ServerHandler{}), @@ -75,7 +75,6 @@ func Serve(ctx context.Context, httpAddr, grpcAddr string, nrApp *newrelic.Appli } resourceServiceRPC := &resourcesv1.LogWrapper{ - Logger: logger, ResourceServiceServer: resourcesv1.NewAPIServer(resourceSvc), } grpcServer.RegisterService(&entropyv1beta1.ResourceService_ServiceDesc, resourceServiceRPC) @@ -99,10 +98,10 @@ func Serve(ctx context.Context, httpAddr, grpcAddr string, nrApp *newrelic.Appli httpRouter.Use( requestID(), withOpenCensus(), - requestLogger(logger), // nolint + requestLogger(), // nolint ) - logger.Info("starting http & grpc servers", + zap.L().Info("starting http & grpc servers", zap.String("http_addr", httpAddr), zap.String("grpc_addr", grpcAddr), ) diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index 773aef9f..f1db3317 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -215,7 +215,7 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, } tolerationKey := fmt.Sprintf("firehose_%s", conf.EnvVariables["SINK_TYPE"]) - var tolerations = []map[string]any{} + tolerations := []map[string]any{} for _, t := range kubeOut.Tolerations[tolerationKey] { tolerations = append(tolerations, map[string]any{ "key": t.Key, @@ -225,9 +225,9 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, }) } - var mountSecrets = []map[string]any{} - var requiredDuringSchedulingIgnoredDuringExecution = []Preference{} - var preferredDuringSchedulingIgnoredDuringExecution = []WeightedPreference{} + mountSecrets := []map[string]any{} + requiredDuringSchedulingIgnoredDuringExecution := []Preference{} + preferredDuringSchedulingIgnoredDuringExecution := []WeightedPreference{} if fd.conf.NodeAffinityMatchExpressions.RequiredDuringSchedulingIgnoredDuringExecution != nil { requiredDuringSchedulingIgnoredDuringExecution = fd.conf.NodeAffinityMatchExpressions.RequiredDuringSchedulingIgnoredDuringExecution @@ -238,7 +238,7 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, if fd.conf.GCSSinkCredential != "" { const mountFile = "gcs_auth.json" - var credPath = fmt.Sprintf("/etc/secret/%s", mountFile) + credPath := fmt.Sprintf("/etc/secret/%s", mountFile) mountSecrets = append(mountSecrets, map[string]any{ "value": fd.conf.GCSSinkCredential, @@ -251,7 +251,7 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, if fd.conf.DLQGCSSinkCredential != "" { const mountFile = "dlq_gcs_auth.json" - var credPath = fmt.Sprintf("/etc/secret/%s", mountFile) + credPath := fmt.Sprintf("/etc/secret/%s", mountFile) mountSecrets = append(mountSecrets, map[string]any{ "value": fd.conf.DLQGCSSinkCredential, @@ -263,7 +263,7 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, if fd.conf.BigQuerySinkCredential != "" { const mountFile = "bigquery_auth.json" - var credPath = fmt.Sprintf("/etc/secret/%s", mountFile) + credPath := fmt.Sprintf("/etc/secret/%s", mountFile) mountSecrets = append(mountSecrets, map[string]any{ "value": fd.conf.BigQuerySinkCredential, diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index 5f49975e..c85c914d 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -15,8 +15,10 @@ import ( const SourceKafkaConsumerAutoOffsetReset = "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET" -var suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`) -var errGroupIDFormat = fmt.Errorf("group id must match the format '%s'", suffixRegex) +var ( + suffixRegex = regexp.MustCompile(`^([A-Za-z0-9-]+)-([0-9]+)$`) + errGroupIDFormat = fmt.Errorf("group id must match the format '%s'", suffixRegex) +) func (fd *firehoseDriver) Plan(_ context.Context, exr module.ExpandedResource, act module.ActionRequest) (*resource.Resource, error) { switch act.Name { diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index ccc6861c..19454d35 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -9,11 +9,16 @@ type LogConfig struct { Level string `mapstructure:"level" default:"info"` } -func New(config *LogConfig) (*zap.Logger, error) { +func Setup(config *LogConfig) error { defaultConfig := zap.NewProductionConfig() defaultConfig.Level = zap.NewAtomicLevelAt(getZapLogLevelFromString(config.Level)) logger, err := zap.NewProductionConfig().Build() - return logger, err + if err != nil { + return err + } + // Setting up global Logger. This can be accessed by zap.L() + zap.ReplaceGlobals(logger) + return nil } func getZapLogLevelFromString(level string) zapcore.Level { diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 9a76827c..77c49ff8 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -31,7 +31,7 @@ type Config struct { // Init initialises OpenCensus based async-telemetry processes and // returns (i.e., it does not block). -func Init(ctx context.Context, cfg Config, lg *zap.Logger) { +func Init(ctx context.Context, cfg Config) { mux := http.NewServeMux() mux.Handle("/debug/pprof/goroutine", pprof.Handler("goroutine")) mux.Handle("/debug/pprof/heap", pprof.Handler("heap")) @@ -39,14 +39,14 @@ func Init(ctx context.Context, cfg Config, lg *zap.Logger) { mux.Handle("/debug/pprof/block", pprof.Handler("block")) if err := setupOpenCensus(ctx, mux, cfg); err != nil { - lg.Error("failed to setup OpenCensus", zap.Error(err)) + zap.L().Error("failed to setup OpenCensus", zap.Error(err)) } if cfg.Debug != "" { go func() { // nolint if err := http.ListenAndServe(cfg.Debug, mux); err != nil { - lg.Error("debug server exited due to error", zap.Error(err)) + zap.L().Error("debug server exited due to error", zap.Error(err)) } }() }