Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/connect #4473

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
# gatepr: revert the local dir to reflect home.
# paths were changed to personal to ensure settings didn't get lost.
admin:
endpoint: localhost:8089
# This endpoint is used by flytepropeller to talk to admin
# and artifacts to talk to admin,
# and _also_, admin to talk to artifacts
endpoint: localhost:30080
insecure: true
flyteadmin:
featureGates:
enableArtifacts: true

catalog-cache:
endpoint: localhost:8081
Expand Down Expand Up @@ -84,11 +90,6 @@ cloudEvents:
enable: true
cloudEventVersion: v2
type: sandbox
# For admin to find artifacts service
artifacts:
host: localhost
port: 30080
insecure: true
# For artifact service itself
artifactsServer:
artifactBlobStoreConfig:
Expand Down
44 changes: 0 additions & 44 deletions flyteadmin/pkg/artifacts/artifact_client.go

This file was deleted.

12 changes: 0 additions & 12 deletions flyteadmin/pkg/artifacts/artifact_client_test.go

This file was deleted.

14 changes: 12 additions & 2 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package artifacts
import (
"context"
"fmt"
admin2 "github.com/flyteorg/flyte/flyteidl/clients/go/admin"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -76,8 +78,16 @@ func (a *ArtifactRegistry) GetClient() artifact.ArtifactRegistryClient {
return a.client
}

func NewArtifactRegistry(ctx context.Context, config *Config, opts ...grpc.DialOption) *ArtifactRegistry {
func NewArtifactRegistry(ctx context.Context, _ *Config, _ ...grpc.DialOption) *ArtifactRegistry {
cfg := admin2.GetConfig(ctx)
clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to create Artifact client")
// too many calls to this function to update, just panic for now.
panic(err)
}

return &ArtifactRegistry{
client: InitializeArtifactClient(ctx, config, opts...),
client: clients.ArtifactServiceClient(),
}
}
4 changes: 2 additions & 2 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,13 +1011,13 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
// Also send in the inputsForQueryTemplating for two reasons, so we don't run queries for things we don't need to
// and so we can fill in template args.
// ArtifactIDs are also returned for lineage purposes.
resolvedExpectedInputs, usedArtifactIDs, err := m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating)
lpExpectedInputs, usedArtifactIDs, err = m.ResolveParameterMapArtifacts(ctxPD, launchPlan.Closure.ExpectedInputs, inputsForQueryTemplating)
if err != nil {
logger.Errorf(ctx, "Error looking up launch plan closure parameter map: %v", err)
return nil, nil, err
}

logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, resolvedExpectedInputs)
logger.Debugf(ctx, "Resolved launch plan closure expected inputs from [%+v] to [%+v]", launchPlan.Closure.ExpectedInputs, lpExpectedInputs)
logger.Debugf(ctx, "Found artifact keys: %v", artifactTrackers)
logger.Debugf(ctx, "Found artifact IDs: %v", usedArtifactIDs)

Expand Down
17 changes: 7 additions & 10 deletions flyteidl/clients/go/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"crypto/x509"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"

grpcRetry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpcPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand All @@ -30,6 +31,7 @@
identityServiceClient service.IdentityServiceClient
dataProxyServiceClient service.DataProxyServiceClient
signalServiceClient service.SignalServiceClient
artifactServiceClient artifact.ArtifactRegistryClient
}

// AdminClient retrieves the AdminServiceClient
Expand Down Expand Up @@ -59,6 +61,10 @@
return c.signalServiceClient
}

func (c Clientset) ArtifactServiceClient() artifact.ArtifactRegistryClient {
return c.artifactServiceClient

Check warning on line 65 in flyteidl/clients/go/admin/client.go

View check run for this annotation

Codecov / codecov/patch

flyteidl/clients/go/admin/client.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}

func NewAdminClient(ctx context.Context, conn *grpc.ClientConn) service.AdminServiceClient {
logger.Infof(ctx, "Initialized Admin client")
return service.NewAdminServiceClient(conn)
Expand Down Expand Up @@ -199,20 +205,11 @@
cs.healthServiceClient = grpc_health_v1.NewHealthClient(adminConnection)
cs.dataProxyServiceClient = service.NewDataProxyServiceClient(adminConnection)
cs.signalServiceClient = service.NewSignalServiceClient(adminConnection)
cs.artifactServiceClient = artifact.NewArtifactRegistryClient(adminConnection)

return &cs, nil
}

// Deprecated: Please use NewClientsetBuilder() instead.
func InitializeAdminClientFromConfig(ctx context.Context, tokenCache cache.TokenCache, opts ...grpc.DialOption) (service.AdminServiceClient, error) {
clientSet, err := initializeClients(ctx, GetConfig(ctx), tokenCache, opts...)
if err != nil {
return nil, err
}

return clientSet.AdminClient(), nil
}

func InitializeMockAdminClient() service.AdminServiceClient {
logger.Infof(context.TODO(), "Initialized Mock Admin client")
return &mocks.AdminServiceClient{}
Expand Down
Loading