From 490e2899739d51408c0f5526c68ad22f70900593 Mon Sep 17 00:00:00 2001 From: Ryan Lo Date: Fri, 28 Feb 2025 17:08:57 +0800 Subject: [PATCH 1/3] fix: flyteadmin doesn't shutdown servers gracefully Signed-off-by: Ryan Lo --- flyteadmin/pkg/server/service.go | 47 +++++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 77e8c5803b..60ba0f34b6 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -6,7 +6,10 @@ import ( "fmt" "net" "net/http" + "os" + "os/signal" "strings" + "syscall" "time" "github.com/gorilla/handlers" @@ -81,8 +84,8 @@ func SetMetricKeys(appConfig *runtimeIfaces.ApplicationConfig) { // Creates a new gRPC Server with all the configuration func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, - scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { + storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, + scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { logger.Infof(ctx, "Registering default middleware with blanket auth validation") pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer( @@ -200,8 +203,8 @@ func healthCheckFunc(w http.ResponseWriter, _ *http.Request) { } func newHTTPServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, _ *authConfig.Config, authCtx interfaces.AuthenticationContext, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { // Register the server that will serve HTTP/REST Traffic mux := http.NewServeMux() @@ -329,9 +332,9 @@ func generateRequestID() string { } func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - authCfg *authConfig.Config, storageConfig *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + authCfg *authConfig.Config, storageConfig *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { logger.Infof(ctx, "Serving Flyte Admin Insecure") // This will parse configuration and create the necessary objects for dealing with auth @@ -422,11 +425,29 @@ func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, ReadHeaderTimeout: time.Duration(cfg.ReadHeaderTimeoutSeconds) * time.Second, } - err = server.ListenAndServe() - if err != nil { - return errors.Wrapf(err, "failed to Start HTTP Server") + go func() { + err = server.ListenAndServe() + if err != nil && err != http.ErrServerClosed { + logger.Fatalf(ctx, "Failed to Start HTTP Server: %v", err) + } + }() + + // Gracefully shutdown the servers + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + // Create a context with timeout for the shutdown process + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := server.Shutdown(shutdownCtx); err != nil { + logger.Errorf(ctx, "Failed to shut down HTTP server: %v", err) } + grpcServer.GracefulStop() + + logger.Infof(ctx, "Servers gracefully stopped") return nil } @@ -445,9 +466,9 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha } func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, authCfg *authConfig.Config, - storageCfg *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + storageCfg *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { certPool, cert, err := GetSslCredentials(ctx, cfg.Security.Ssl.CertificateFile, cfg.Security.Ssl.KeyFile) sm := secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()) From 09cdb5ffd432320c2acefe517317f621887d3395 Mon Sep 17 00:00:00 2001 From: Ryan Lo Date: Fri, 28 Feb 2025 17:14:53 +0800 Subject: [PATCH 2/3] chore Signed-off-by: Ryan Lo --- flyteadmin/pkg/server/service.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 60ba0f34b6..775ead36d5 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -84,8 +84,8 @@ func SetMetricKeys(appConfig *runtimeIfaces.ApplicationConfig) { // Creates a new gRPC Server with all the configuration func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, - scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { + storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, + scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { logger.Infof(ctx, "Registering default middleware with blanket auth validation") pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer( @@ -203,8 +203,8 @@ func healthCheckFunc(w http.ResponseWriter, _ *http.Request) { } func newHTTPServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, _ *authConfig.Config, authCtx interfaces.AuthenticationContext, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { // Register the server that will serve HTTP/REST Traffic mux := http.NewServeMux() @@ -332,9 +332,9 @@ func generateRequestID() string { } func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - authCfg *authConfig.Config, storageConfig *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + authCfg *authConfig.Config, storageConfig *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { logger.Infof(ctx, "Serving Flyte Admin Insecure") // This will parse configuration and create the necessary objects for dealing with auth @@ -466,9 +466,9 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha } func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, authCfg *authConfig.Config, - storageCfg *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + storageCfg *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { certPool, cert, err := GetSslCredentials(ctx, cfg.Security.Ssl.CertificateFile, cfg.Security.Ssl.KeyFile) sm := secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()) From a264c158752e8d319117e951b7375c3280a78a3b Mon Sep 17 00:00:00 2001 From: Ryan Lo Date: Fri, 28 Feb 2025 17:23:29 +0800 Subject: [PATCH 3/3] graceful shutdown for serveGatewaySecure Signed-off-by: Ryan Lo --- flyteadmin/pkg/server/service.go | 46 +++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/flyteadmin/pkg/server/service.go b/flyteadmin/pkg/server/service.go index 775ead36d5..eba226958e 100644 --- a/flyteadmin/pkg/server/service.go +++ b/flyteadmin/pkg/server/service.go @@ -84,8 +84,8 @@ func SetMetricKeys(appConfig *runtimeIfaces.ApplicationConfig) { // Creates a new gRPC Server with all the configuration func newGRPCServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, - scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { + storageCfg *storage.Config, authCtx interfaces.AuthenticationContext, + scope promutils.Scope, sm core.SecretManager, opts ...grpc.ServerOption) (*grpc.Server, error) { logger.Infof(ctx, "Registering default middleware with blanket auth validation") pluginRegistry.RegisterDefault(plugins.PluginIDUnaryServiceMiddleware, grpcmiddleware.ChainUnaryServer( @@ -203,8 +203,8 @@ func healthCheckFunc(w http.ResponseWriter, _ *http.Request) { } func newHTTPServer(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, _ *authConfig.Config, authCtx interfaces.AuthenticationContext, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + grpcAddress string, grpcConnectionOpts ...grpc.DialOption) (*http.ServeMux, error) { // Register the server that will serve HTTP/REST Traffic mux := http.NewServeMux() @@ -332,9 +332,9 @@ func generateRequestID() string { } func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, - authCfg *authConfig.Config, storageConfig *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + authCfg *authConfig.Config, storageConfig *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { logger.Infof(ctx, "Serving Flyte Admin Insecure") // This will parse configuration and create the necessary objects for dealing with auth @@ -428,7 +428,7 @@ func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, go func() { err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { - logger.Fatalf(ctx, "Failed to Start HTTP Server: %v", err) + logger.Fatalf(ctx, "Failed to start HTTP Server: %v", err) } }() @@ -442,7 +442,7 @@ func serveGatewayInsecure(ctx context.Context, pluginRegistry *plugins.Registry, defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { - logger.Errorf(ctx, "Failed to shut down HTTP server: %v", err) + logger.Errorf(ctx, "Failed to shutdown HTTP server: %v", err) } grpcServer.GracefulStop() @@ -466,9 +466,9 @@ func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha } func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, cfg *config.ServerConfig, authCfg *authConfig.Config, - storageCfg *storage.Config, - additionalHandlers map[string]func(http.ResponseWriter, *http.Request), - scope promutils.Scope) error { + storageCfg *storage.Config, + additionalHandlers map[string]func(http.ResponseWriter, *http.Request), + scope promutils.Scope) error { certPool, cert, err := GetSslCredentials(ctx, cfg.Security.Ssl.CertificateFile, cfg.Security.Ssl.KeyFile) sm := secretmanager.NewFileEnvSecretManager(secretmanager.GetConfig()) @@ -555,10 +555,26 @@ func serveGatewaySecure(ctx context.Context, pluginRegistry *plugins.Registry, c ReadHeaderTimeout: time.Duration(cfg.ReadHeaderTimeoutSeconds) * time.Second, } - err = srv.Serve(tls.NewListener(conn, srv.TLSConfig)) + go func() { + err = srv.Serve(tls.NewListener(conn, srv.TLSConfig)) + if err != nil && err != http.ErrServerClosed { + logger.Errorf(ctx, "Failed to start HTTP/2 Server: %v", err) + } + }() - if err != nil { - return errors.Wrapf(err, "failed to Start HTTP/2 Server") + // Gracefully shutdown the servers + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + // Create a context with timeout for the shutdown process + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + logger.Errorf(ctx, "Failed to shutdown HTTP server: %v", err) } + + logger.Infof(ctx, "Servers gracefully stopped") return nil }