From 19652a2b9d7713a679c1c5d1663884e1b482f29b Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Thu, 23 Nov 2023 17:36:09 +0800 Subject: [PATCH] fix bug in execution_manager with lpExpectedInputs, remove the standalone artifact client and move into idl clientset, update local config file, remove a deprecated function Signed-off-by: Yee Hing Tong --- flyte-single-binary-local.yaml | 13 +++--- flyteadmin/pkg/artifacts/artifact_client.go | 44 ------------------- .../pkg/artifacts/artifact_client_test.go | 12 ----- flyteadmin/pkg/artifacts/registry.go | 14 +++++- .../pkg/manager/impl/execution_manager.go | 4 +- flyteidl/clients/go/admin/client.go | 17 +++---- 6 files changed, 28 insertions(+), 76 deletions(-) delete mode 100644 flyteadmin/pkg/artifacts/artifact_client.go delete mode 100644 flyteadmin/pkg/artifacts/artifact_client_test.go diff --git a/flyte-single-binary-local.yaml b/flyte-single-binary-local.yaml index fe5b76c46c..deac2fe3c0 100644 --- a/flyte-single-binary-local.yaml +++ b/flyte-single-binary-local.yaml @@ -3,8 +3,14 @@ # gatepr: revert the local dir to reflect home. # paths were changed to personal to ensure settings didn't get lost. admin: - endpoint: localhost:8089 + # This endpoint is used by flytepropeller to talk to admin + # and artifacts to talk to admin, + # and _also_, admin to talk to artifacts + endpoint: localhost:30080 insecure: true +flyteadmin: + featureGates: + enableArtifacts: true catalog-cache: endpoint: localhost:8081 @@ -84,11 +90,6 @@ cloudEvents: enable: true cloudEventVersion: v2 type: sandbox -# For admin to find artifacts service -artifacts: - host: localhost - port: 30080 - insecure: true # For artifact service itself artifactsServer: artifactBlobStoreConfig: diff --git a/flyteadmin/pkg/artifacts/artifact_client.go b/flyteadmin/pkg/artifacts/artifact_client.go deleted file mode 100644 index d20b9b51e9..0000000000 --- a/flyteadmin/pkg/artifacts/artifact_client.go +++ /dev/null @@ -1,44 +0,0 @@ -package artifacts - -import ( - "context" - "crypto/tls" - "fmt" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" - "github.com/flyteorg/flyte/flytestdlib/logger" -) - -func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - if opts == nil { - // Initialize opts list to the potential number of options we will add. Initialization optimizes memory - // allocation. - opts = make([]grpc.DialOption, 0, 5) - } - - if cfg.Insecure { - opts = append(opts, grpc.WithInsecure()) - } else { - tlsConfig := &tls.Config{} //nolint - creds := credentials.NewTLS(tlsConfig) - opts = append(opts, grpc.WithTransportCredentials(creds)) - } - - return grpc.Dial(fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), opts...) -} - -func InitializeArtifactClient(ctx context.Context, cfg *Config, opts ...grpc.DialOption) artifact.ArtifactRegistryClient { - if cfg == nil { - logger.Warningf(ctx, "Artifact config is not set, skipping creation of client...") - return nil - } - conn, err := NewArtifactConnection(ctx, cfg, opts...) - if err != nil { - logger.Panicf(ctx, "failed to initialize Artifact connection. Err: %s", err.Error()) - panic(err) - } - return artifact.NewArtifactRegistryClient(conn) -} diff --git a/flyteadmin/pkg/artifacts/artifact_client_test.go b/flyteadmin/pkg/artifacts/artifact_client_test.go deleted file mode 100644 index 64a886c5f5..0000000000 --- a/flyteadmin/pkg/artifacts/artifact_client_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package artifacts - -import ( - "context" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestEmpty(t *testing.T) { - c := InitializeArtifactClient(context.Background(), nil) - assert.Nil(t, c) -} diff --git a/flyteadmin/pkg/artifacts/registry.go b/flyteadmin/pkg/artifacts/registry.go index bcc6f35d1c..9a0555f490 100644 --- a/flyteadmin/pkg/artifacts/registry.go +++ b/flyteadmin/pkg/artifacts/registry.go @@ -3,6 +3,8 @@ package artifacts import ( "context" "fmt" + admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin" + "google.golang.org/grpc" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" @@ -76,8 +78,16 @@ func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient { return a.client } -func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) *ArtifactRegistry { +func NewArtifactRegistry(ctx context.Context, _ *Config, _ ...grpc.DialOption) *ArtifactRegistry { + cfg := admin2.GetConfig(ctx) + clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to create Artifact client") + // too many calls to this function to update, just panic for now. + panic(err) + } + return &ArtifactRegistry{ - client: InitializeArtifactClient(ctx, config, opts...), + client: clients.ArtifactServiceClient(), } } diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 415fe2c21a..fd7796c7b2 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -1011,13 +1011,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel( // Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to // and so we can fill in template args. // ArtifactIDs are also returned for lineage purposes. - resolvedExpectedInputs, usedArtifactIDs, err := m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating) + lpExpectedInputs, usedArtifactIDs, err = m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating) if err != nil { logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err) return nil, nil, err } - logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs) + logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs) logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers) logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs) diff --git a/flyteidl/clients/go/admin/client.go b/flyteidl/clients/go/admin/client.go index 69c3542367..85476ab8ba 100644 --- a/flyteidl/clients/go/admin/client.go +++ b/flyteidl/clients/go/admin/client.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "errors" "fmt" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus" @@ -30,6 +31,7 @@ type Clientset struct { identityServiceClient service.IdentityServiceClient dataProxyServiceClient service.DataProxyServiceClient signalServiceClient service.SignalServiceClient + artifactServiceClient artifact.ArtifactRegistryClient } // AdminClient retrieves the AdminServiceClient @@ -59,6 +61,10 @@ func (c Clientset) SignalServiceClient() service.SignalServiceClient { return c.signalServiceClient } +func (c Clientset) ArtifactServiceClient() artifact.ArtifactRegistryClient { + return c.artifactServiceClient +} + func NewAdminClient(ctx context.Context, conn *grpc.ClientConn) service.AdminServiceClient { logger.Infof(ctx, "Initialized Admin client") return service.NewAdminServiceClient(conn) @@ -199,20 +205,11 @@ func initializeClients(ctx context.Context, cfg *Config, tokenCache cache.TokenC cs.healthServiceClient = grpc_health_v1.NewHealthClient(adminConnection) cs.dataProxyServiceClient = service.NewDataProxyServiceClient(adminConnection) cs.signalServiceClient = service.NewSignalServiceClient(adminConnection) + cs.artifactServiceClient = artifact.NewArtifactRegistryClient(adminConnection) return &cs, nil } -// Deprecated: Please use NewClientsetBuilder() instead. -func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) { - clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...) - if err != nil { - return nil, err - } - - return clientSet.AdminClient(), nil -} - func InitializeMockAdminClient() service.AdminServiceClient { logger.Infof(context.TODO(), "Initialized Mock Admin client") return &mocks.AdminServiceClient{}