Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Future Outlier <[email protected]>
  • Loading branch information
Future Outlier committed Dec 10, 2023
1 parent d764139 commit ef02107
Show file tree
Hide file tree
Showing 23 changed files with 315 additions and 37 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/pluginmachinery/webapi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type PluginEntry struct {
// is supported.
DefaultForTaskTypes []pluginsCore.TaskType

// Synchronous plugin
// A map of task types to boolean indicating if the plugin should be used synchronously for that task type.
IsSyncTask map[pluginsCore.TaskType]bool
}

Expand Down
63 changes: 27 additions & 36 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,32 +319,15 @@ func getFinalContext(ctx context.Context, operation string, agent *Agent) (conte
if timeout == 0 {
return ctx, func() {}
}
return context.WithTimeout(ctx, timeout)
}

func combineToDistinctTaskTypes(originalTaskTypes, additionalTaskTypes []string) []string {
set := make(map[string]struct{})

for _, task := range originalTaskTypes {
set[task] = struct{}{}
}
for _, task := range additionalTaskTypes {
set[task] = struct{}{}
}

var supportedTaskTypes []string
for task := range set {
supportedTaskTypes = append(supportedTaskTypes, task)
}

return supportedTaskTypes
return context.WithTimeout(ctx, timeout)
}

func getAgentMetadata(agent *Agent, connectionCache map[*Agent]*grpc.ClientConn, isSyncTask map[string]bool) []string {
func updateAgentTaskTypes(agent *Agent, connectionCache map[*Agent]*grpc.ClientConn, isSyncTask map[string]bool) {
client, err := getAgentMetadataClientFunc(context.Background(), agent, connectionCache)
if err != nil {
logger.Errorf(context.Background(), "failed to connect to agent [%v] with error: [%v]", agent, err)
return []string{}
return
}

finalCtx, cancel := getFinalContext(context.Background(), "ListAgent", agent)
Expand All @@ -353,51 +336,59 @@ func getAgentMetadata(agent *Agent, connectionCache map[*Agent]*grpc.ClientConn,
res, err := client.ListAgent(finalCtx, &admin.ListAgentsRequest{})
if err != nil {
logger.Errorf(context.Background(), "failed to send list agent request with error: [%v]", err)
return []string{}
return
}

agents := res.GetAgents()
logger.Infof(context.Background(), "here are all agents [%v] in [%v] agent server", agents, agent)

var supportedTaskTypes []string
for _, agent := range agents {
supportedTaskTypes = append(supportedTaskTypes, agent.SupportedTaskType)
isSyncTask[agent.SupportedTaskType] = agent.IsSync
}

return supportedTaskTypes
}

func setAgentMetadata(cfg *Config, connectionCache map[*Agent]*grpc.ClientConn, isSyncTask map[string]bool) {
// Combine default agent server's task types with config's existing task types.
func getAgentMetadata(cfg *Config, connectionCache map[*Agent]*grpc.ClientConn) ([]string, map[string]bool) {
// Assign supported task types from the config to prevent a panic when no task type is supported.
// https://github.com/flyteorg/flyte/blob/master/flyteplugins/go/tasks/pluginmachinery/registry.go#L27
supportedTaskTypes := cfg.SupportedTaskTypes
isSyncTask := make(map[string]bool)

// Combine the default agent server's task types with the config's existing task types.
// Use empty string as key to return default agent server
defaultAgent, err := getFinalAgent("", cfg)
if err != nil {
logger.Errorf(context.Background(), "failed to get default agent [%v] with error: [%v]", err)
} else {
cfg.SupportedTaskTypes = combineToDistinctTaskTypes(cfg.SupportedTaskTypes, getAgentMetadata(defaultAgent, connectionCache, isSyncTask))
updateAgentTaskTypes(defaultAgent, connectionCache, isSyncTask)
}

// For each agent server, update config's SupportedTaskTypes by aggregating their unique supported tasks.
// For example, 1 agent server support bigquery task only, another agent server support spark task only
// We can get both of them by combining the supported task types from 2 agent servers
// For each agent server, use a map to store its supported task types and whether it is a sync task.
// This combines unique task types across all agents.
// We need to iterate all agent servers to get all supported task types.
// For example, one agent server supports only bigquery tasks, while another supports only spark tasks.
// We can get both by combining the supported task types from two agent servers.
for _, agent := range cfg.Agents {
cfg.SupportedTaskTypes = combineToDistinctTaskTypes(cfg.SupportedTaskTypes, getAgentMetadata(agent, connectionCache, isSyncTask))
updateAgentTaskTypes(agent, connectionCache, isSyncTask)
}

for task := range isSyncTask {
supportedTaskTypes = append(supportedTaskTypes, task)
}

return supportedTaskTypes, isSyncTask
}

func newAgentPlugin() webapi.PluginEntry {
cfg := GetConfig()
connectionCache := make(map[*Agent]*grpc.ClientConn)
isSyncTask := make(map[string]bool)

agentMetadata := setAgentMetadata(cfg, connectionCache, isSyncTask)
logger.Infof(context.Background(), "supported task types: %v", cfg.SupportedTaskTypes)
supportedTaskTypes, isSyncTask := getAgentMetadata(cfg, connectionCache)
logger.Infof(context.Background(), "supported task types: %v", supportedTaskTypes)
logger.Infof(context.Background(), "is sync task: %v", isSyncTask)

return webapi.PluginEntry{
ID: "agent-service",
SupportedTaskTypes: cfg.SupportedTaskTypes,
SupportedTaskTypes: supportedTaskTypes,
PluginLoader: func(ctx context.Context, iCtx webapi.PluginSetupContext) (webapi.AsyncPlugin, error) {
return &Plugin{
metricScope: iCtx.MetricsScope(),
Expand Down
10 changes: 10 additions & 0 deletions rsts/_tags/AWS.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Tag: AWS
########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/cloud_event.rst
../deployment/plugins/aws/index.rst
../deployment/plugins/k8s/index.rst
29 changes: 29 additions & 0 deletions rsts/_tags/Advanced.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Tag: Advanced
#############

.. toctree::
:maxdepth: 1
:caption: With this tag

../concepts/admin.rst
../concepts/architecture.rst
../concepts/catalog.rst
../concepts/component_architecture/flytepropeller_architecture.rst
../concepts/component_architecture/native_scheduler_architecture.rst
../deployment/agents/index.rst
../deployment/configuration/auth_appendix.rst
../deployment/configuration/auth_migration.rst
../deployment/configuration/auth_setup.rst
../deployment/configuration/cloud_event.rst
../deployment/configuration/customizable_resources.rst
../deployment/configuration/eventing.rst
../deployment/configuration/monitoring.rst
../deployment/configuration/notifications.rst
../deployment/configuration/performance.rst
../deployment/deployment/cloud_production.rst
../deployment/deployment/multicluster.rst
../deployment/plugins/aws/index.rst
../deployment/plugins/gcp/index.rst
../deployment/plugins/k8s/index.rst
../deployment/plugins/webapi/index.rst
../deployment/security/index.rst
8 changes: 8 additions & 0 deletions rsts/_tags/Agent.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: Agent
##########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/agents/index.rst
10 changes: 10 additions & 0 deletions rsts/_tags/Authentication.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Tag: Authentication
###################

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/auth_appendix.rst
../deployment/configuration/auth_migration.rst
../deployment/configuration/auth_setup.rst
27 changes: 27 additions & 0 deletions rsts/_tags/Basic.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Tag: Basic
##########

.. toctree::
:maxdepth: 1
:caption: With this tag

../community/contribute.rst
../community/troubleshoot.rst
../concepts/data_management.rst
../concepts/domains.rst
../concepts/dynamic_spec.rst
../concepts/executions.rst
../concepts/flyte_console.rst
../concepts/launchplans.rst
../concepts/nodes.rst
../concepts/projects.rst
../concepts/registration.rst
../concepts/schedules.rst
../concepts/state_machine.rst
../concepts/tasks.rst
../concepts/versioning.rst
../concepts/workflow_lifecycle.rst
../concepts/workflows.rst
../deployment/deployment/cloud_simple.rst
../deployment/deployment/sandbox.rst
../reference/swagger.rst
8 changes: 8 additions & 0 deletions rsts/_tags/Configuration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: Configuration
##################

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/eventing.rst
9 changes: 9 additions & 0 deletions rsts/_tags/Contribute.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Tag: Contribute
###############

.. toctree::
:maxdepth: 1
:caption: With this tag

../community/contribute.rst
../concepts/console.rst
11 changes: 11 additions & 0 deletions rsts/_tags/Data.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Tag: Data
#########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/agents/index.rst
../deployment/plugins/aws/index.rst
../deployment/plugins/gcp/index.rst
../deployment/plugins/webapi/index.rst
19 changes: 19 additions & 0 deletions rsts/_tags/Design.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Tag: Design
###########

.. toctree::
:maxdepth: 1
:caption: With this tag

../concepts/admin.rst
../concepts/architecture.rst
../concepts/catalog.rst
../concepts/component_architecture/flytepropeller_architecture.rst
../concepts/component_architecture/native_scheduler_architecture.rst
../concepts/data_management.rst
../concepts/dynamic_spec.rst
../concepts/launchplans.rst
../concepts/registration.rst
../concepts/state_machine.rst
../concepts/workflow_lifecycle.rst
../deployment/configuration/auth_appendix.rst
10 changes: 10 additions & 0 deletions rsts/_tags/GCP.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Tag: GCP
########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/cloud_event.rst
../deployment/plugins/gcp/index.rst
../deployment/plugins/k8s/index.rst
20 changes: 20 additions & 0 deletions rsts/_tags/Glossary.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Tag: Glossary
#############

.. toctree::
:maxdepth: 1
:caption: With this tag

../concepts/architecture.rst
../concepts/data_management.rst
../concepts/domains.rst
../concepts/execution_timeline.rst
../concepts/executions.rst
../concepts/launchplans.rst
../concepts/nodes.rst
../concepts/projects.rst
../concepts/registration.rst
../concepts/schedules.rst
../concepts/tasks.rst
../concepts/versioning.rst
../concepts/workflows.rst
20 changes: 20 additions & 0 deletions rsts/_tags/Infrastructure.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Tag: Infrastructure
###################

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/auth_migration.rst
../deployment/configuration/auth_setup.rst
../deployment/configuration/cloud_event.rst
../deployment/configuration/customizable_resources.rst
../deployment/configuration/eventing.rst
../deployment/configuration/monitoring.rst
../deployment/configuration/notifications.rst
../deployment/configuration/performance.rst
../deployment/deployment/cloud_production.rst
../deployment/deployment/cloud_simple.rst
../deployment/deployment/multicluster.rst
../deployment/deployment/sandbox.rst
../deployment/security/index.rst
12 changes: 12 additions & 0 deletions rsts/_tags/Integration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Tag: Integration
################

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/agents/index.rst
../deployment/plugins/aws/index.rst
../deployment/plugins/gcp/index.rst
../deployment/plugins/k8s/index.rst
../deployment/plugins/webapi/index.rst
9 changes: 9 additions & 0 deletions rsts/_tags/Intermediate.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Tag: Intermediate
#################

.. toctree::
:maxdepth: 1
:caption: With this tag

../concepts/console.rst
../concepts/execution_timeline.rst
14 changes: 14 additions & 0 deletions rsts/_tags/Kubernetes.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Tag: Kubernetes
###############

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/configuration/performance.rst
../deployment/deployment/cloud_production.rst
../deployment/deployment/cloud_simple.rst
../deployment/deployment/multicluster.rst
../deployment/deployment/sandbox.rst
../deployment/plugins/k8s/index.rst
../deployment/security/index.rst
8 changes: 8 additions & 0 deletions rsts/_tags/MachineLearning.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: MachineLearning
####################

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/plugins/aws/index.rst
8 changes: 8 additions & 0 deletions rsts/_tags/Spark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: Spark
##########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/plugins/k8s/index.rst
8 changes: 8 additions & 0 deletions rsts/_tags/Troubleshoot.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: Troubleshoot
#################

.. toctree::
:maxdepth: 1
:caption: With this tag

../community/troubleshoot.rst
8 changes: 8 additions & 0 deletions rsts/_tags/UI.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: UI
#######

.. toctree::
:maxdepth: 1
:caption: With this tag

../concepts/flyte_console.rst
8 changes: 8 additions & 0 deletions rsts/_tags/WebAPI.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Tag: WebAPI
###########

.. toctree::
:maxdepth: 1
:caption: With this tag

../deployment/plugins/webapi/index.rst
Loading

0 comments on commit ef02107

Please sign in to comment.