Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artf/lints #4429

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions flyteadmin/pkg/artifacts/artifact_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flytestdlib/logger"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

func NewArtifactConnection(_ context.Context, cfg *Config, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/artifacts/registry.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package artifacts

import (
"context"
"fmt"

"google.golang.org/grpc"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"google.golang.org/grpc"

"context"
)

// ArtifactRegistry contains a client to talk to an Artifact service and has helper methods
Expand Down
16 changes: 8 additions & 8 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,8 @@ package cloudevent

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/sandbox_utils"
"time"

dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/NYTimes/gizmo/pubsub"
gizmoAWS "github.com/NYTimes/gizmo/pubsub/aws"
gizmoGCP "github.com/NYTimes/gizmo/pubsub/gcp"
Expand All @@ -21,9 +16,13 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/sandboxutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Repository, storageClient *storage.DataStore, urlData dataInterfaces.RemoteURLInterface, cloudEventsConfig runtimeInterfaces.CloudEventsConfig, remoteDataConfig runtimeInterfaces.RemoteDataConfig, scope promutils.Scope) interfaces.Publisher {
Expand Down Expand Up @@ -93,7 +92,7 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

case common.Sandbox:
var publisher pubsub.Publisher
publisher = sandbox_utils.NewCloudEventsPublisher()
publisher = sandboxutils.NewCloudEventsPublisher()
sender = &cloudEventImplementations.PubSubSender{
Pub: publisher,
}
Expand All @@ -108,7 +107,8 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi

if cloudEventsConfig.CloudEventVersion == runtimeInterfaces.CloudEventVersionv2 {
return cloudEventImplementations.NewCloudEventsWrappedPublisher(db, sender, scope, storageClient, urlData, remoteDataConfig)
} else {
return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)
}

return cloudEventImplementations.NewCloudEventsPublisher(sender, scope, cloudEventsConfig.EventsPublisherConfig.EventTypes)

}
10 changes: 5 additions & 5 deletions flyteadmin/pkg/async/cloudevent/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package cloudevent

import (
"context"
dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks"
"github.com/flyteorg/flyte/flytestdlib/storage"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
"github.com/stretchr/testify/mock"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataMocks "github.com/flyteorg/flyte/flyteadmin/pkg/data/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/mocks"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
storageMocks "github.com/flyteorg/flyte/flytestdlib/storage/mocks"
)

func getMockStore() *storage.DataStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,30 @@ import (
"bytes"
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flytestdlib/contextutils"

"reflect"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/util"
repositoryInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/models"
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flytestdlib/storage"

"github.com/golang/protobuf/jsonpb"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const (
Expand Down Expand Up @@ -153,7 +148,15 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
Domain: rawEvent.ExecutionId.Domain,
Name: rawEvent.ExecutionId.Name,
})
if err != nil {
logger.Warningf(ctx, "couldn't find execution [%+v] for cloud event processing", rawEvent.ExecutionId)
return nil, err
}
ex, err := transformers.FromExecutionModel(ctx, executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(ctx, "couldn't transform execution [%+v] for cloud event processing", rawEvent.ExecutionId)
return nil, err
}
if ex.Closure.WorkflowId == nil {
logger.Warningf(ctx, "workflow id is nil for execution [%+v]", ex)
return nil, fmt.Errorf("workflow id is nil for execution [%+v]", ex)
Expand All @@ -164,6 +167,10 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
Name: ex.Closure.WorkflowId.Name,
Version: ex.Closure.WorkflowId.Version,
})
if err != nil {
logger.Warningf(ctx, "couldn't find workflow [%+v] for cloud event processing", ex.Closure.WorkflowId)
return nil, err
}
var workflowInterface core.TypedInterface
if workflowModel.TypedInterface != nil && len(workflowModel.TypedInterface) > 0 {
err = proto.Unmarshal(workflowModel.TypedInterface, &workflowInterface)
Expand Down Expand Up @@ -349,6 +356,10 @@ func (c *CloudEventWrappedPublisher) TransformNodeExecutionEvent(ctx context.Con
return nil, err
}
task, err := transformers.FromTaskModel(taskModel)
if err != nil {
logger.Debugf(ctx, "Failed to transform task model with err %v", err)
return nil, err
}
typedInterface = task.Closure.CompiledTask.Template.Interface
taskExecID = lte.Id
}
Expand Down Expand Up @@ -425,6 +436,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
eventSource = common.FlyteURLKeyFromNodeExecutionIDRetry(*e.ParentNodeExecutionId,
int(e.RetryAttempt))
finalMsg, err = c.TransformTaskExecutionEvent(ctx, e)
if err != nil {
logger.Errorf(ctx, "Failed to transform task execution event with error: %v", err)
return err
}
case *admin.NodeExecutionEventRequest:
topic = "cloudevents.NodeExecution"
e := msgType.Event
Expand All @@ -434,6 +449,10 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
eventID = fmt.Sprintf("%v.%v", executionID, phase)
eventSource = common.FlyteURLKeyFromNodeExecutionID(*msgType.Event.Id)
finalMsg, err = c.TransformNodeExecutionEvent(ctx, e)
if err != nil {
logger.Errorf(ctx, "Failed to transform node execution event with error: %v", err)
return err
}
case *event.CloudEventExecutionStart:
topic = "cloudevents.ExecutionStart"
executionID = msgType.ExecutionId.String()
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/async/cloudevent/implementations/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package implementations
import (
"context"
"fmt"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"

"github.com/NYTimes/gizmo/pubsub"
"github.com/Shopify/sarama"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"

Expand Down
30 changes: 15 additions & 15 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,23 @@ package impl
import (
"context"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"strconv"
"time"

"github.com/benbjohnson/clock"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/auth"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
cloudeventInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
eventWriter "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications"
notificationInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/executions"
Expand All @@ -44,6 +34,16 @@ import (
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengineInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteadmin/plugins"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/artifact"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const childContainerQueueKey = "child_queue"
Expand Down Expand Up @@ -739,16 +739,16 @@ func (m *ExecutionManager) getStringFromInput(ctx context.Context, inputBinding
if inputVal.GetScalar() == nil || inputVal.GetScalar().GetPrimitive() == nil {
return "", errors.NewFlyteAdminErrorf(codes.InvalidArgument, "invalid input value [%+v]", inputVal)
}
var strVal = ""
var strVal string
p := inputVal.GetScalar().GetPrimitive()
switch p.GetValue().(type) {
case *core.Primitive_Integer:
strVal = fmt.Sprintf("%s", p.GetStringValue())
strVal = p.GetStringValue()
case *core.Primitive_Datetime:
t := time.Unix(p.GetDatetime().Seconds, int64(p.GetDatetime().Nanos))
strVal = t.Format("2006-01-02")
case *core.Primitive_StringValue:
strVal = fmt.Sprintf("%s", p.GetStringValue())
strVal = p.GetStringValue()
case *core.Primitive_FloatValue:
strVal = fmt.Sprintf("%.2f", p.GetFloatValue())
case *core.Primitive_Boolean:
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"strings"
"testing"
"time"
Expand All @@ -23,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

"github.com/flyteorg/flyte/flyteadmin/auth"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
eventWriterMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/events/mocks"
notificationMocks "github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/launch_plan_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package impl
import (
"bytes"
"context"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"strconv"

"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/launch_plan_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"testing"
"time"

Expand All @@ -13,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
scheduleInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/schedule/mocks"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
Expand Down
16 changes: 8 additions & 8 deletions flyteadmin/pkg/manager/impl/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,15 @@ package impl
import (
"bytes"
"context"
"github.com/golang/protobuf/proto"
"strconv"
"time"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/resources"
Expand All @@ -29,6 +23,12 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
workflowengine "github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
)

type taskMetrics struct {
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"testing"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"

"github.com/flyteorg/flyte/flyteadmin/pkg/artifacts"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
adminErrors "github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/manager/impl/testutils"
Expand Down
Loading
Loading