diff --git a/flyteadmin/pkg/artifacts/artifact_client.go b/flyteadmin/pkg/artifacts/artifact_client.go index e71536a9b5..d20b9b51e9 100644 --- a/flyteadmin/pkg/artifacts/artifact_client.go +++ b/flyteadmin/pkg/artifacts/artifact_client.go @@ -4,10 +4,12 @@ import ( "context" "crypto/tls" "fmt" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" - "github.com/flyteorg/flyte/flytestdlib/logger" + "google.golang.org/grpc" "google.golang.org/grpc/credentials" + + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyte/flytestdlib/logger" ) func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) { diff --git a/flyteadmin/pkg/artifacts/registry.go b/flyteadmin/pkg/artifacts/registry.go index b58030ff75..cee436c1d6 100644 --- a/flyteadmin/pkg/artifacts/registry.go +++ b/flyteadmin/pkg/artifacts/registry.go @@ -1,14 +1,15 @@ package artifacts import ( + "context" "fmt" + + "google.golang.org/grpc" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/logger" - "google.golang.org/grpc" - - "context" ) // ArtifactRegistry contains a client to talk to an Artifact service and has helper methods diff --git a/flyteadmin/pkg/async/cloudevent/factory.go b/flyteadmin/pkg/async/cloudevent/factory.go index d130735c60..65cd48de93 100644 --- a/flyteadmin/pkg/async/cloudevent/factory.go +++ b/flyteadmin/pkg/async/cloudevent/factory.go @@ -2,13 +2,8 @@ package cloudevent import ( "context" - "github.com/flyteorg/flyte/flytestdlib/sandbox_utils" "time" - dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces" - repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces" - "github.com/flyteorg/flyte/flytestdlib/storage" - "github.com/NYTimes/gizmo/pubsub" gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws" gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp" @@ -21,9 +16,13 @@ import ( "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations" "github.com/flyteorg/flyte/flyteadmin/pkg/common" + dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces" + repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/sandboxutils" + "github.com/flyteorg/flyte/flytestdlib/storage" ) func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Repository, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, cloudEventsConfig runtimeInterfaces.CloudEventsConfig, remoteDataConfig runtimeInterfaces.RemoteDataConfig, scope promutils.Scope) interfaces.Publisher { @@ -93,7 +92,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi case common.Sandbox: var publisher pubsub.Publisher - publisher = sandbox_utils.NewCloudEventsPublisher() + publisher = sandboxutils.NewCloudEventsPublisher() sender = &cloudEventImplementations.PubSubSender{ Pub: publisher, } @@ -108,7 +107,8 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi if cloudEventsConfig.CloudEventVersion == runtimeInterfaces.CloudEventVersionv2 { return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig) - } else { - return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes) } + + return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes) + } diff --git a/flyteadmin/pkg/async/cloudevent/factory_test.go b/flyteadmin/pkg/async/cloudevent/factory_test.go index 705405078b..5902dfb6ce 100644 --- a/flyteadmin/pkg/async/cloudevent/factory_test.go +++ b/flyteadmin/pkg/async/cloudevent/factory_test.go @@ -2,19 +2,19 @@ package cloudevent import ( "context" - dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks" - "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks" - "github.com/flyteorg/flyte/flytestdlib/storage" - storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks" - "github.com/stretchr/testify/mock" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/implementations" "github.com/flyteorg/flyte/flyteadmin/pkg/common" + dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks" + "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/storage" + storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks" ) func getMockStore() *storage.DataStore { diff --git a/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go b/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go index 6c8a1f2d9f..c7d967cc1b 100644 --- a/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go +++ b/flyteadmin/pkg/async/cloudevent/implementations/cloudevent_publisher.go @@ -4,35 +4,30 @@ import ( "bytes" "context" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/common" - "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models" - "github.com/flyteorg/flyte/flytestdlib/contextutils" - "reflect" "time" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces" + "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations" + "github.com/flyteorg/flyte/flyteadmin/pkg/common" dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/util" repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces" + "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models" "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" - "github.com/flyteorg/flyte/flytestdlib/storage" - - "github.com/golang/protobuf/jsonpb" - - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" - - "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations" - - cloudevents "github.com/cloudevents/sdk-go/v2" - "github.com/golang/protobuf/proto" - "k8s.io/apimachinery/pkg/util/sets" - - "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces" + "github.com/flyteorg/flyte/flytestdlib/contextutils" "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/storage" ) const ( @@ -153,7 +148,15 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context Domain: rawEvent.ExecutionId.Domain, Name: rawEvent.ExecutionId.Name, }) + if err != nil { + logger.Warningf(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.ExecutionId) + return nil, err + } ex, err := transformers.FromExecutionModel(ctx, executionModel, transformers.DefaultExecutionTransformerOptions) + if err != nil { + logger.Warningf(ctx, "couldn't transform execution [%+v] for cloud event processing", rawEvent.ExecutionId) + return nil, err + } if ex.Closure.WorkflowId == nil { logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex) return nil, fmt.Errorf("workflow id is nil for execution [%+v]", ex) @@ -164,6 +167,10 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context Name: ex.Closure.WorkflowId.Name, Version: ex.Closure.WorkflowId.Version, }) + if err != nil { + logger.Warningf(ctx, "couldn't find workflow [%+v] for cloud event processing", ex.Closure.WorkflowId) + return nil, err + } var workflowInterface core.TypedInterface if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 { err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface) @@ -349,6 +356,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con return nil, err } task, err := transformers.FromTaskModel(taskModel) + if err != nil { + logger.Debugf(ctx, "Failed to transform task model with err %v", err) + return nil, err + } typedInterface = task.Closure.CompiledTask.Template.Interface taskExecID = lte.Id } @@ -425,6 +436,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy eventSource = common.FlyteURLKeyFromNodeExecutionIDRetry(*e.ParentNodeExecutionId, int(e.RetryAttempt)) finalMsg, err = c.TransformTaskExecutionEvent(ctx, e) + if err != nil { + logger.Errorf(ctx, "Failed to transform task execution event with error: %v", err) + return err + } case *admin.NodeExecutionEventRequest: topic = "cloudevents.NodeExecution" e := msgType.Event @@ -434,6 +449,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy eventID = fmt.Sprintf("%v.%v", executionID, phase) eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id) finalMsg, err = c.TransformNodeExecutionEvent(ctx, e) + if err != nil { + logger.Errorf(ctx, "Failed to transform node execution event with error: %v", err) + return err + } case *event.CloudEventExecutionStart: topic = "cloudevents.ExecutionStart" executionID = msgType.ExecutionId.String() diff --git a/flyteadmin/pkg/async/cloudevent/implementations/sender.go b/flyteadmin/pkg/async/cloudevent/implementations/sender.go index cfc950e9ad..4e7ba23d8a 100644 --- a/flyteadmin/pkg/async/cloudevent/implementations/sender.go +++ b/flyteadmin/pkg/async/cloudevent/implementations/sender.go @@ -3,10 +3,10 @@ package implementations import ( "context" "fmt" - pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/NYTimes/gizmo/pubsub" "github.com/Shopify/sarama" + pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" cloudevents "github.com/cloudevents/sdk-go/v2" diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index 35fb9f6fc7..1247f83085 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -3,22 +3,10 @@ package impl import ( "context" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" "strconv" "time" "github.com/benbjohnson/clock" - "github.com/flyteorg/flyte/flyteadmin/pkg/common" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s" - "github.com/flyteorg/flyte/flytestdlib/contextutils" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/flyteorg/flyte/flytestdlib/promutils" - "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" - "github.com/flyteorg/flyte/flytestdlib/storage" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" @@ -26,10 +14,12 @@ import ( "google.golang.org/grpc/codes" "github.com/flyteorg/flyte/flyteadmin/auth" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" cloudeventInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces" eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications" notificationInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces" + "github.com/flyteorg/flyte/flyteadmin/pkg/common" dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/executions" @@ -44,6 +34,16 @@ import ( runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" "github.com/flyteorg/flyte/flyteadmin/plugins" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + "github.com/flyteorg/flyte/flytestdlib/storage" ) const childContainerQueueKey = "child_queue" @@ -739,16 +739,16 @@ func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding if inputVal.GetScalar() == nil || inputVal.GetScalar().GetPrimitive() == nil { return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input value [%+v]", inputVal) } - var strVal = "" + var strVal string p := inputVal.GetScalar().GetPrimitive() switch p.GetValue().(type) { case *core.Primitive_Integer: - strVal = fmt.Sprintf("%s", p.GetStringValue()) + strVal = p.GetStringValue() case *core.Primitive_Datetime: t := time.Unix(p.GetDatetime().Seconds, int64(p.GetDatetime().Nanos)) strVal = t.Format("2006-01-02") case *core.Primitive_StringValue: - strVal = fmt.Sprintf("%s", p.GetStringValue()) + strVal = p.GetStringValue() case *core.Primitive_FloatValue: strVal = fmt.Sprintf("%.2f", p.GetFloatValue()) case *core.Primitive_Boolean: diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index c09e51db2b..f8ff2356da 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "strings" "testing" "time" @@ -23,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/flyteorg/flyte/flyteadmin/auth" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks" notificationMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/mocks" "github.com/flyteorg/flyte/flyteadmin/pkg/common" diff --git a/flyteadmin/pkg/manager/impl/launch_plan_manager.go b/flyteadmin/pkg/manager/impl/launch_plan_manager.go index bf21c96176..dcb8484559 100644 --- a/flyteadmin/pkg/manager/impl/launch_plan_manager.go +++ b/flyteadmin/pkg/manager/impl/launch_plan_manager.go @@ -3,13 +3,13 @@ package impl import ( "bytes" "context" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "strconv" "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/common" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" diff --git a/flyteadmin/pkg/manager/impl/launch_plan_manager_test.go b/flyteadmin/pkg/manager/impl/launch_plan_manager_test.go index 1cb99b8038..64e069f26d 100644 --- a/flyteadmin/pkg/manager/impl/launch_plan_manager_test.go +++ b/flyteadmin/pkg/manager/impl/launch_plan_manager_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "testing" "time" @@ -13,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces" "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/mocks" "github.com/flyteorg/flyte/flyteadmin/pkg/common" diff --git a/flyteadmin/pkg/manager/impl/task_manager.go b/flyteadmin/pkg/manager/impl/task_manager.go index 67a17e123d..87107d4eb4 100644 --- a/flyteadmin/pkg/manager/impl/task_manager.go +++ b/flyteadmin/pkg/manager/impl/task_manager.go @@ -3,21 +3,15 @@ package impl import ( "bytes" "context" - "github.com/golang/protobuf/proto" "strconv" "time" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyte/flytestdlib/contextutils" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/flyteorg/flyte/flytestdlib/promutils" - "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteadmin/pkg/common" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources" @@ -29,6 +23,12 @@ import ( "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers" runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" workflowengine "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyte/flytestdlib/contextutils" + "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/promutils" + "github.com/flyteorg/flyte/flytestdlib/promutils/labeled" ) type taskMetrics struct { diff --git a/flyteadmin/pkg/manager/impl/task_manager_test.go b/flyteadmin/pkg/manager/impl/task_manager_test.go index 84a43368ec..bc19311bb3 100644 --- a/flyteadmin/pkg/manager/impl/task_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_manager_test.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "testing" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteadmin/pkg/common" adminErrors "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils" diff --git a/flyteadmin/pkg/manager/impl/workflow_manager.go b/flyteadmin/pkg/manager/impl/workflow_manager.go index 648e722aeb..aeb2e82fe7 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager.go @@ -3,15 +3,15 @@ package impl import ( "bytes" "context" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" - "github.com/golang/protobuf/proto" "strconv" "time" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteadmin/pkg/common" "github.com/flyteorg/flyte/flyteadmin/pkg/errors" "github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/util" diff --git a/flyteadmin/pkg/manager/impl/workflow_manager_test.go b/flyteadmin/pkg/manager/impl/workflow_manager_test.go index affb2f750f..3c2837f0e4 100644 --- a/flyteadmin/pkg/manager/impl/workflow_manager_test.go +++ b/flyteadmin/pkg/manager/impl/workflow_manager_test.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "testing" "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "google.golang.org/grpc/codes" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteadmin/pkg/common" commonMocks "github.com/flyteorg/flyte/flyteadmin/pkg/common/mocks" adminErrors "github.com/flyteorg/flyte/flyteadmin/pkg/errors" diff --git a/flyteadmin/pkg/rpc/adminservice/base.go b/flyteadmin/pkg/rpc/adminservice/base.go index 53a8809f80..e578ab464f 100644 --- a/flyteadmin/pkg/rpc/adminservice/base.go +++ b/flyteadmin/pkg/rpc/adminservice/base.go @@ -3,11 +3,11 @@ package adminservice import ( "context" "fmt" - "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "runtime/debug" "github.com/golang/protobuf/proto" + "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent" eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/implementations" "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications" diff --git a/flyteadmin/pkg/runtime/interfaces/application_configuration.go b/flyteadmin/pkg/runtime/interfaces/application_configuration.go index 4a20b5237f..68227ce224 100644 --- a/flyteadmin/pkg/runtime/interfaces/application_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/application_configuration.go @@ -1,10 +1,10 @@ package interfaces import ( - artifactsClient "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/golang/protobuf/ptypes/wrappers" "golang.org/x/time/rate" + artifactsClient "github.com/flyteorg/flyte/flyteadmin/pkg/artifacts" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flytestdlib/config" diff --git a/flyteartifacts/pkg/server/processor/channel_processor.go b/flyteartifacts/pkg/server/processor/channel_processor.go index d32175ef1c..741c5801f9 100644 --- a/flyteartifacts/pkg/server/processor/channel_processor.go +++ b/flyteartifacts/pkg/server/processor/channel_processor.go @@ -7,14 +7,14 @@ import ( "github.com/cloudevents/sdk-go/v2/event" flyteEvents "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/flyteorg/flyte/flytestdlib/sandbox_utils" + "github.com/flyteorg/flyte/flytestdlib/sandboxutils" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "time" ) type SandboxCloudEventsReceiver struct { - subChan <-chan sandbox_utils.SandboxMessage + subChan <-chan sandboxutils.SandboxMessage Handler EventsHandlerInterface } @@ -33,7 +33,7 @@ func (p *SandboxCloudEventsReceiver) StartProcessing(ctx context.Context) { logger.Warning(context.Background(), "Sandbox cloud event processor has stopped because context cancelled") } -func (p *SandboxCloudEventsReceiver) handleMessage(ctx context.Context, sandboxMsg sandbox_utils.SandboxMessage) error { +func (p *SandboxCloudEventsReceiver) handleMessage(ctx context.Context, sandboxMsg sandboxutils.SandboxMessage) error { ce := &event.Event{} err := pbcloudevents.Protobuf.Unmarshal(sandboxMsg.Raw, ce) if err != nil { @@ -112,6 +112,6 @@ func (p *SandboxCloudEventsReceiver) StopProcessing() error { func NewSandboxCloudEventProcessor(eventsHandler EventsHandlerInterface) *SandboxCloudEventsReceiver { return &SandboxCloudEventsReceiver{ Handler: eventsHandler, - subChan: sandbox_utils.MsgChan, + subChan: sandboxutils.MsgChan, } } diff --git a/flytestdlib/database/db.go b/flytestdlib/database/db.go index 1a8f07760e..fe884c75f4 100644 --- a/flytestdlib/database/db.go +++ b/flytestdlib/database/db.go @@ -4,10 +4,11 @@ import ( "context" "fmt" - "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/go-gormigrate/gormigrate/v2" "gorm.io/driver/sqlite" "gorm.io/gorm" + + "github.com/flyteorg/flyte/flytestdlib/logger" ) // GetDB uses the dbConfig to create gorm DB object. If the db doesn't exist for the dbConfig then a new one is created diff --git a/flytestdlib/database/postgres.go b/flytestdlib/database/postgres.go index 877b9a2ad6..1254dd5d14 100644 --- a/flytestdlib/database/postgres.go +++ b/flytestdlib/database/postgres.go @@ -8,11 +8,12 @@ import ( "os" "strings" - "github.com/flyteorg/flyte/flytestdlib/logger" oldPgConn "github.com/jackc/pgconn" "github.com/jackc/pgx/v5/pgconn" "gorm.io/driver/postgres" "gorm.io/gorm" + + "github.com/flyteorg/flyte/flytestdlib/logger" ) const pqInvalidDBCode = "3D000" diff --git a/flytestdlib/sandbox_utils/processor.go b/flytestdlib/sandboxutils/processor.go similarity index 98% rename from flytestdlib/sandbox_utils/processor.go rename to flytestdlib/sandboxutils/processor.go index 79b1eb14ab..9b6731691b 100644 --- a/flytestdlib/sandbox_utils/processor.go +++ b/flytestdlib/sandboxutils/processor.go @@ -1,12 +1,14 @@ -package sandbox_utils +package sandboxutils import ( "context" "fmt" - "github.com/flyteorg/flyte/flytestdlib/logger" - "github.com/golang/protobuf/proto" "sync" "time" + + "github.com/golang/protobuf/proto" + + "github.com/flyteorg/flyte/flytestdlib/logger" ) var MsgChan chan SandboxMessage