Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Artifact/event processing #4277

Merged
merged 8 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion cmd/single/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package single

import (
"context"
sharedCmd "github.com/flyteorg/flyte/flyteartifacts/cmd/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
artifactsServer "github.com/flyteorg/flyte/flyteartifacts/pkg/server"
"github.com/flyteorg/flyte/flytestdlib/database"
"net/http"

datacatalogConfig "github.com/flyteorg/flyte/datacatalog/pkg/config"
Expand Down Expand Up @@ -60,7 +64,37 @@ func startClusterResourceController(ctx context.Context) error {
}

func startArtifact(ctx context.Context, cfg Artifacts) error {
return nil
if cfg.Disabled {
logger.Infof(ctx, "Artifacts server is disabled. Skipping...")
return nil
}
// Roughly copies main/NewMigrateCmd
logger.Infof(ctx, "Artifacts: running database migrations if any...")
migs := artifactsServer.GetMigrations(ctx)
initializationSql := "create extension if not exists hstore;"
dbConfig := artifactsServer.GetDbConfig()
err := database.Migrate(context.Background(), dbConfig, migs, initializationSql)
if err != nil {
logger.Errorf(ctx, "Failed to run Artifacts database migrations. Error: %v", err)
return err
}

g, childCtx := errgroup.WithContext(ctx)

// Rough copy of NewServeCmd
g.Go(func() error {
cfg := configuration.GetApplicationConfig()
serverCfg := &cfg.ArtifactServerConfig
err := sharedCmd.ServeGateway(childCtx, "artifacts", serverCfg, artifactsServer.GrpcRegistrationHook,
artifactsServer.HttpRegistrationHook)
if err != nil {
logger.Errorf(childCtx, "Failed to start Artifacts server. Error: %v", err)
return err
}
return nil
})

return g.Wait()
}

func startAdmin(ctx context.Context, cfg Admin) error {
Expand Down
26 changes: 21 additions & 5 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# This is a sample configuration file for running single-binary Flyte locally against
# a sandbox.
# gatepr: revert the local dir to reflect home.
# paths were changed to personal to ensure settings didn't get lost.
admin:
endpoint: localhost:8089
insecure: true
Expand All @@ -15,7 +17,7 @@ cluster_resources:

logger:
show-source: true
level: 6
level: 3

propeller:
create-flyteworkflow-crd: true
Expand Down Expand Up @@ -81,13 +83,27 @@ database:
cloudEvents:
enable: true
cloudEventVersion: v2
type: redis
redis:
addr: "localhost:30004"
type: sandbox
# For admin to find artifacts service
artifacts:
host: localhost
port: 50051
port: 30080
insecure: true
# For artifact service itself
artifactsServer:
artifactBlobStoreConfig:
type: stow
stow:
kind: s3
config:
disable_ssl: true
v2_signing: true
endpoint: http://localhost:30002
auth_type: accesskey
access_key_id: minio
secret_key: miniostorage
artifactsProcessor:
cloudProvider: Sandbox
storage:
type: stow
stow:
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/artifacts/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package artifacts

// gatepr: add proper config bits for this
// eduardo to consider moving to idl clients.
type Config struct {
Host string `json:"host"`
Port int `json:"port"`
Expand Down
18 changes: 5 additions & 13 deletions flyteadmin/pkg/async/cloudevent/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cloudevent

import (
"context"
"github.com/flyteorg/flyte/flytestdlib/sandbox_utils"
"time"

dataInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/data/interfaces"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/flyteorg/flyte/flyteadmin/pkg/async"
cloudEventImplementations "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/interfaces"
redisPublisher "github.com/flyteorg/flyte/flyteadmin/pkg/async/cloudevent/redis"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/implementations"
"github.com/flyteorg/flyte/flyteadmin/pkg/common"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
Expand Down Expand Up @@ -90,20 +90,12 @@ func NewCloudEventsPublisher(ctx context.Context, db repositoryInterfaces.Reposi
}
sender = &cloudEventImplementations.KafkaSender{Client: client}

case common.Redis:
case common.Sandbox:
var publisher pubsub.Publisher
var err error
err = async.Retry(reconnectAttempts, reconnectDelay, func() error {
publisher, err = redisPublisher.NewPublisher(cloudEventsConfig.RedisConfig)
return err
})
logger.Infof(ctx, "Using Redis cloud events publisher [%v] [%+v]", publisher, cloudEventsConfig.RedisConfig)

// Persistent errors should hard fail
if err != nil {
panic(err)
publisher = sandbox_utils.NewCloudEventsPublisher()
sender = &cloudEventImplementations.PubSubSender{
Pub: publisher,
}
sender = &cloudEventImplementations.PubSubSender{Pub: publisher}

case common.Local:
fallthrough
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,8 @@ func (c *CloudEventWrappedPublisher) TransformWorkflowExecutionEvent(ctx context
// Get outputs from the workflow execution
var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig, c.storageClient, rawEvent.GetOutputUri())
if err != nil {
Expand Down Expand Up @@ -282,10 +280,8 @@ func (c *CloudEventWrappedPublisher) TransformTaskExecutionEvent(ctx context.Con

var outputs *core.LiteralMap
if rawEvent.GetOutputData() != nil {
fmt.Printf("remove this - task Got output data")
outputs = rawEvent.GetOutputData()
} else if len(rawEvent.GetOutputUri()) > 0 {
fmt.Printf("remove this - task Got output URI")
// GetInputs actually fetches the data, even though this is an output
outputs, _, err = util.GetInputs(ctx, c.urlData, &c.remoteDataConfig,
c.storageClient, rawEvent.GetOutputUri())
Expand Down Expand Up @@ -375,6 +371,7 @@ func (c *CloudEventWrappedPublisher) Publish(ctx context.Context, notificationTy
executionID = msgType.ExecutionId.String()
eventID = fmt.Sprintf("%v", executionID)
eventTime = time.Now()
// CloudEventExecutionStart don't have a nested event
finalMsg = msgType
default:
return fmt.Errorf("unsupported event types [%+v]", reflect.TypeOf(msg))
Expand Down
7 changes: 4 additions & 3 deletions flyteadmin/pkg/async/cloudevent/implementations/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package implementations

import (
"context"
"encoding/json"
"fmt"
pbcloudevents "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"

"github.com/NYTimes/gizmo/pubsub"
"github.com/Shopify/sarama"
Expand All @@ -25,8 +25,9 @@ type PubSubSender struct {

func (s *PubSubSender) Send(ctx context.Context, notificationType string, event cloudevents.Event) error {
// gatepr: investigate why the previous statement didn't work.
// eventByte, err := pbcloudevents.Protobuf.Marshal(&event)
eventByte, err := json.Marshal(&event)
// perhaps only because of redis.
eventByte, err := pbcloudevents.Protobuf.Marshal(&event)
//eventByte, err := json.Marshal(&event)
if err != nil {
logger.Errorf(ctx, "Failed to marshal cloudevent with error: %v", err)
return err
Expand Down
1 change: 0 additions & 1 deletion flyteadmin/pkg/common/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ const (
GCP CloudProvider = "gcp"
Sandbox CloudProvider = "sandbox"
Local CloudProvider = "local"
Redis CloudProvider = "redis"
None CloudProvider = "none"
)
11 changes: 9 additions & 2 deletions flyteadmin/pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,18 @@ func (m *TaskExecutionManager) updateTaskExecutionModelState(

func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, request admin.TaskExecutionEventRequest) (
*admin.TaskExecutionEventResponse, error) {

logger.Warningf(ctx, "HERE!!!123")

if err := validation.ValidateTaskExecutionRequest(request, m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-1")

if err := validation.ValidateClusterForExecutionID(ctx, m.db, request.Event.ParentNodeExecutionId.ExecutionId, request.Event.ProducerId); err != nil {
return nil, err
}
logger.Warningf(ctx, "HERE!!!123-2")

// Get the parent node execution, if none found a MissingEntityError will be returned
nodeExecutionID := request.Event.ParentNodeExecutionId
Expand Down Expand Up @@ -204,10 +209,12 @@ func (m *TaskExecutionManager) CreateTaskExecutionEvent(ctx context.Context, req
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

logger.Warningf(ctx, "HERE!!!123-3")
go func() {
ceCtx := context.TODO()
ceCtx := context.Background()
logger.Warningf(ctx, "HERE!!!123-4")
if err := m.cloudEventsPublisher.Publish(ceCtx, proto.MessageName(&request), &request); err != nil {
logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
logger.Errorf(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
}()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ type ExternalEventsConfig struct {
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

//go:generate enumer -type=CloudEventVersion -trimprefix=CloudEventVersion
type CloudEventVersion int
//go:generate enumer -type=CloudEventVersion -json -yaml -trimprefix=CloudEventVersion
type CloudEventVersion uint8

const (
// This is the initial version of the cloud events
Expand Down
39 changes: 37 additions & 2 deletions flyteadmin/pkg/runtime/interfaces/cloudeventversion_enumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions flyteartifacts/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"context"
sharedCmd "github.com/flyteorg/flyte/flyteartifacts/cmd/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/server"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"

_ "net/http/pprof" // Required to serve application.
)
Expand All @@ -22,3 +25,9 @@ func main() {
panic(err)
}
}

func init() {
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey,
contextutils.DomainKey, storage.FailureTypeLabel)
}
18 changes: 5 additions & 13 deletions flyteartifacts/cmd/shared/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,11 @@ import (
"context"
"flag"
"fmt"
sharedCfg "github.com/flyteorg/flyte/flyteartifacts/pkg/configuration/shared"
"github.com/flyteorg/flyte/flyteartifacts/pkg/configuration"
"github.com/flyteorg/flyte/flytestdlib/config"
"github.com/flyteorg/flyte/flytestdlib/config/viper"
"github.com/flyteorg/flyte/flytestdlib/contextutils"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/profutils"
"github.com/flyteorg/flyte/flytestdlib/promutils/labeled"
"github.com/flyteorg/flyte/flytestdlib/storage"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"os"
Expand All @@ -36,9 +33,9 @@ func NewRootCmd(rootUse string, grpcHook GrpcRegistrationHook, httpHook HttpRegi

go func() {
ctx := context.Background()
sharedConfig := sharedCfg.SharedServerConfig.GetConfig().(*sharedCfg.ServerConfiguration)
metricsCfg := configuration.GetApplicationConfig().ArtifactServerConfig.Metrics
err := profutils.StartProfilingServerWithDefaultHandlers(ctx,
sharedConfig.Metrics.Port.Port, nil)
metricsCfg.Port.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and metrics server. Error: %v", err)
}
Expand All @@ -52,12 +49,6 @@ func NewRootCmd(rootUse string, grpcHook GrpcRegistrationHook, httpHook HttpRegi
return rootCmd
}

func init() {
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey,
contextutils.DomainKey, storage.FailureTypeLabel)
}

func initConfig(cmd *cobra.Command, _ []string) error {
configAccessor = viper.NewAccessor(config.Options{
SearchPaths: []string{cfgFile, ".", "/etc/flyte/config", "$GOPATH/src/github.com/flyteorg/flyte/flyteartifacts"},
Expand Down Expand Up @@ -86,7 +77,8 @@ func initSubCommands(rootCmd *cobra.Command, grpcHook GrpcRegistrationHook, http
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "artifact_config.yaml", "config file (default is ./artifact_config.yaml)")

rootCmd.AddCommand(viper.GetConfigCommand())
rootCmd.AddCommand(NewServeCmd(rootCmd.Use, grpcHook, httpHook))
cfg := configuration.GetApplicationConfig()
rootCmd.AddCommand(NewServeCmd(rootCmd.Use, cfg.ArtifactServerConfig, grpcHook, httpHook))

// Allow viper to read the value of the flags
configAccessor.InitializePflags(rootCmd.PersistentFlags())
Expand Down
Loading