Skip to content

Commit

Permalink
build job
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei committed Nov 27, 2023
1 parent b377342 commit 5ff87fe
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 13 deletions.
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,16 @@ func (r ClusterSpec) GetComponentDefRefName(componentName string) string {
return ""
}

// GetComponentDefName gets the name of referenced component definition.
func (r ClusterSpec) GetComponentDefName(componentName string) string {
for _, component := range r.ComponentSpecs {
if componentName == component.Name {
return component.ComponentDef
}
}
return ""
}

// ValidateEnabledLogs validates enabledLogs config in cluster.yaml, and returns metav1.Condition when detecting invalid values.
func (r ClusterSpec) ValidateEnabledLogs(cd *ClusterDefinition) error {
message := make([]string, 0)
Expand Down
12 changes: 12 additions & 0 deletions apis/apps/v1alpha1/opsdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,15 @@ type OpsDefinitionList struct {
func init() {
SchemeBuilder.Register(&OpsDefinition{}, &OpsDefinitionList{})
}

func (o *OpsDefinition) GetComponentDefRef(serviceKind string) *ComponentDefinitionRef {
if o == nil {
return nil
}
for _, v := range o.Spec.ComponentDefinitionRefs {
if serviceKind == v.ServiceKind {
return &v
}
}
return nil
}
1 change: 0 additions & 1 deletion controllers/apps/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ const (
// name of our custom finalizer
dbClusterDefFinalizerName = "clusterdefinition.kubeblocks.io/finalizer"
clusterVersionFinalizerName = "clusterversion.kubeblocks.io/finalizer"
opsRequestFinalizerName = "opsrequest.kubeblocks.io/finalizer"
opsDefinitionFinalizerName = "opsdefinition.kubeblocks.io/finalizer"

// annotations keys
Expand Down
170 changes: 167 additions & 3 deletions controllers/apps/operations/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,33 @@ package operations

import (
"fmt"
"strconv"
"strings"
"text/template"
"time"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/common"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/dataprotection/utils"
)

const (
kbEnvCompSVCName = "KB_COMP_SVC_NAME"
kbEnvCompSVCPortPrefix = "KB_COMP_SVC_PORT_"
kbEnvConnectEndpoint = "KB_CONNECT_ENDPOINT"
kbEnvConnectUserName = "KB_CONNECT_USERNAME"
kbEnvConnectPassword = "KB_CONNECT_PASSWORD"
kbEnvConnectHost = "KB_CONNECT_HOST"
kbEnvConnectPort = "KB_CONNECT_PORT"
)

type CustomOpsHandler struct{}
Expand Down Expand Up @@ -63,7 +79,7 @@ func (c CustomOpsHandler) ActionStartedCondition(reqCtx intctrlutil.RequestCtx,
func (c CustomOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error {
preChecks := opsRes.OpsDef.Spec.PreChecks
customSpec := opsRes.OpsRequest.Spec.CustomSpec
// 1. do preChecks (if is job, can return needWaiting)
// 1. do preChecks
for _, v := range preChecks {
if v.Expression != nil {
if err := c.checkExpression(reqCtx, cli, opsRes, v.Expression, customSpec.ComponentName); err != nil {
Expand All @@ -76,7 +92,11 @@ func (c CustomOpsHandler) Action(reqCtx intctrlutil.RequestCtx, cli client.Clien
}
}
// 2. do job action
return nil
job, err := c.buildJob(reqCtx, cli, opsRes)
if err != nil {
return err
}
return client.IgnoreAlreadyExists(cli.Create(reqCtx.Ctx, job))
}

func (c CustomOpsHandler) checkExpression(reqCtx intctrlutil.RequestCtx,
Expand Down Expand Up @@ -110,9 +130,119 @@ func (c CustomOpsHandler) checkExpression(reqCtx intctrlutil.RequestCtx,
}

func (c CustomOpsHandler) checkExecAction(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource, exec *appsv1alpha1.PreCheckExec) error {
// TODO: implement it
// TODO: return needWaitingError to wait for job successfully.
return nil
}

func (c CustomOpsHandler) getJobName(opsName, compName string) string {
jobName := fmt.Sprintf("%s-%s", opsName, compName)
if len(jobName) > 63 {
jobName = jobName[:63]
}
return jobName
}

func (c CustomOpsHandler) buildJob(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource) (*batchv1.Job, error) {
var (
customSpec = opsRes.OpsRequest.Spec.CustomSpec
compName = customSpec.ComponentName
clusterName = opsRes.Cluster.Name
opsName = opsRes.OpsRequest.Name
)

buildSecretKeyRef := func(secretName, key string) *corev1.EnvVarSource {
return &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: secretName,
},
Key: clusterName,
},
}
}

buildJobSpec := func() (*batchv1.JobSpec, error) {
jobSpec := opsRes.OpsDef.Spec.JobSpec
if jobSpec.BackoffLimit == nil {
jobSpec.BackoffLimit = pointer.Int32(3)
}

comp := opsRes.Cluster.Spec.GetComponentByName(compName)
if comp == nil {
return nil, nil
}
// get component definition
compDef, err := getCompDefinition(reqCtx, cli, opsRes.Cluster, customSpec.ComponentName)
if err != nil {
return nil, err
}
compDefRef := opsRes.OpsDef.GetComponentDefRef(compDef.Spec.ServiceKind)
if compDefRef == nil {
return nil, nil
}
// inject built-in env
fullCompName := constant.GenerateClusterComponentName(clusterName, compName)
var env = []corev1.EnvVar{
{Name: constant.KBEnvClusterName, Value: opsRes.Cluster.Name},
{Name: constant.KBEnvComponentName, Value: compName},
{Name: constant.KBEnvClusterCompName, Value: fullCompName},
{Name: constant.KBEnvCompReplicas, Value: strconv.Itoa(int(comp.Replicas))},
{Name: constant.KBEnvCompServiceVersion, Value: compDef.Spec.ServiceVersion},
}

// inject connect envs
if compDefRef.ConnectionCredentialName != "" {
connectCredentialSecretName := constant.GenerateComponentConnCredential(clusterName, compName, compDefRef.ConnectionCredentialName)
env = append(env, corev1.EnvVar{Name: kbEnvConnectEndpoint, ValueFrom: buildSecretKeyRef(connectCredentialSecretName, "endpoint")})
env = append(env, corev1.EnvVar{Name: kbEnvConnectUserName, ValueFrom: buildSecretKeyRef(connectCredentialSecretName, "username")})
env = append(env, corev1.EnvVar{Name: kbEnvConnectPassword, ValueFrom: buildSecretKeyRef(connectCredentialSecretName, "password")})
env = append(env, corev1.EnvVar{Name: kbEnvConnectHost, ValueFrom: buildSecretKeyRef(connectCredentialSecretName, "host")})
env = append(env, corev1.EnvVar{Name: kbEnvConnectPort, ValueFrom: buildSecretKeyRef(connectCredentialSecretName, "port")})
}

// inject SVC and SVC ports
if compDefRef.ServiceName != "" {
env = append(env, corev1.EnvVar{Name: kbEnvCompSVCName, Value: fmt.Sprintf("%s-%s", fullCompName, compDefRef.ServiceName)})
//
}

// inject params env
for k, v := range customSpec.Params {
env = append(env, corev1.EnvVar{Name: strings.ToUpper(k), Value: v})
}
for i := range jobSpec.Template.Spec.Containers {
jobSpec.Template.Spec.Containers[i].Env = append(jobSpec.Template.Spec.Containers[i].Env, env...)
}
return &jobSpec, nil
}

jobSpec, err := buildJobSpec()
if err != nil {
return nil, err
}
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
constant.OpsRequestNameLabelKey: opsName,
constant.AppInstanceLabelKey: clusterName,
constant.KBAppComponentLabelKey: compName,
},
Name: c.getJobName(opsName, compName),
Namespace: opsRes.OpsRequest.Namespace,
},
Spec: *jobSpec,
}
controllerutil.AddFinalizer(job, constant.OpsRequestFinalizerName)
scheme, _ := appsv1alpha1.SchemeBuilder.Build()
if err = utils.SetControllerReference(opsRes.OpsRequest, job, scheme); err != nil {
return nil, err
}
return job, nil
}

// ReconcileAction will be performed when action is done and loops till OpsRequest.status.phase is Succeed/Failed.
// the Reconcile function for stop opsRequest.
func (c CustomOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) (appsv1alpha1.OpsPhase, time.Duration, error) {
Expand Down Expand Up @@ -149,7 +279,7 @@ func initOpsDefAndValidate(reqCtx intctrlutil.RequestCtx, cli client.Client, ops
return false, err
}
opsRes.OpsDef = opsDef
// validate schema
// 1. validate OpenApV3Schema
parametersSchema := opsDef.Spec.ParametersSchema
// covert to type map[string]interface{}
params := map[string]interface{}{}
Expand All @@ -161,5 +291,39 @@ func initOpsDefAndValidate(reqCtx intctrlutil.RequestCtx, cli client.Client, ops
return false, err
}
}
// 2. validate component and serviceKind
comp := opsRes.Cluster.Spec.GetComponentByName(customSpec.ComponentName)
if comp == nil {
return false, intctrlutil.NewNotFound(`can not found component in cluster "%s"`, opsRes.Cluster.Name)
}
compDef, err := getCompDefinition(reqCtx, cli, opsRes.Cluster, customSpec.ComponentName)
if err != nil {
return false, err
}
var serviceKindMatched bool
for _, v := range opsDef.Spec.ComponentDefinitionRefs {
if v.ServiceKind == compDef.Spec.ServiceKind {
serviceKindMatched = true
break
}
}
if !serviceKindMatched {
return false, intctrlutil.NewFatalError(fmt.Sprintf(`not supported serviceKind "%s`, compDef.Spec.ServiceKind))
}
return opsDef.Spec.TriggerPhaseChange, nil
}

func getCompDefinition(reqCtx intctrlutil.RequestCtx,
cli client.Client,
cluster *appsv1alpha1.Cluster,
compName string) (*appsv1alpha1.ComponentDefinition, error) {
compDefName := cluster.Spec.GetComponentDefName(compName)
if len(compDefName) == 0 {
return nil, intctrlutil.NewNotFound(`can not found component definition by the component name "%s"`, compName)
}
compDef := &appsv1alpha1.ComponentDefinition{}
if err := cli.Get(reqCtx.Ctx, client.ObjectKey{Name: compDefName}, compDef); err != nil {
return nil, err
}
return compDef, nil
}
3 changes: 3 additions & 0 deletions controllers/apps/operations/ops_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (opsMgr *OpsManager) Do(reqCtx intctrlutil.RequestCtx, cli client.Client, o
}

if opsRequest.Status.Phase == appsv1alpha1.OpsPendingPhase {
if opsRequest.Spec.Cancel {
return &ctrl.Result{}, PatchOpsStatus(reqCtx.Ctx, cli, opsRes, appsv1alpha1.OpsCancelledPhase)
}
// validate entry condition for OpsRequest, check if the cluster is in the right phase
if err = validateOpsWaitingPhase(opsRes.Cluster, opsRequest, opsBehaviour); err != nil {
// check if the error is caused by WaitForClusterPhaseErr error
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/opsrequest_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *OpsRequestReconciler) handleDeletion(reqCtx intctrlutil.RequestCtx, ops
if opsRes.OpsRequest.Status.Phase == appsv1alpha1.OpsRunningPhase {
return nil, nil
}
return intctrlutil.HandleCRDeletion(reqCtx, r, opsRes.OpsRequest, opsRequestFinalizerName, func() (*ctrl.Result, error) {
return intctrlutil.HandleCRDeletion(reqCtx, r, opsRes.OpsRequest, constant.OpsRequestFinalizerName, func() (*ctrl.Result, error) {
return nil, operations.DequeueOpsRequestInClusterAnnotation(reqCtx.Ctx, r.Client, opsRes)
})
}
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/transformer_component_account_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (t *componentAccountProvisionTransformer) Transform(ctx graph.TransformCont
return nil
}

lifecycleActions := transCtx.CompDef.Spec.LifecycleActions
if lifecycleActions == nil || lifecycleActions.AccountProvision == nil {
return nil
}
// TODO: support custom handler for account
// TODO: build lorry client if accountProvision is built-in
lorryCli, err := t.buildLorryClient(transCtx)
Expand Down
1 change: 1 addition & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ const (
DBComponentDefinitionFinalizerName = "componentdefinition.kubeblocks.io/finalizer"
ConfigurationTemplateFinalizerName = "config.kubeblocks.io/finalizer"
ServiceDescriptorFinalizerName = "servicedescriptor.kubeblocks.io/finalizer"
OpsRequestFinalizerName = "opsrequest.kubeblocks.io/finalizer"

// ConfigurationTplLabelPrefixKey clusterVersion or clusterdefinition using tpl
ConfigurationTplLabelPrefixKey = "config.kubeblocks.io/tpl"
Expand Down
18 changes: 10 additions & 8 deletions pkg/constant/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ const (

// Lorry env names
const (
KBEnvClusterName = "KB_CLUSTER_NAME"
KBEnvComponentName = "KB_COMP_NAME"
KBEnvClusterCompName = "KB_CLUSTER_COMP_NAME"
KBEnvWorkloadType = "KB_WORKLOAD_TYPE"
KBEnvBuiltinHandler = "KB_BUILTIN_HANDLER"
KBEnvCharacterType = "KB_SERVICE_CHARACTER_TYPE"
KBEnvServiceUser = "KB_SERVICE_USER"
KBEnvServicePassword = "KB_SERVICE_PASSWORD"
KBEnvClusterName = "KB_CLUSTER_NAME"
KBEnvComponentName = "KB_COMP_NAME"
KBEnvClusterCompName = "KB_CLUSTER_COMP_NAME"
KBEnvCompReplicas = "KB_COMP_REPLICAS"
KBEnvWorkloadType = "KB_WORKLOAD_TYPE"
KBEnvBuiltinHandler = "KB_BUILTIN_HANDLER"
KBEnvCharacterType = "KB_SERVICE_CHARACTER_TYPE"
KBEnvServiceUser = "KB_SERVICE_USER"
KBEnvServicePassword = "KB_SERVICE_PASSWORD"
KBEnvCompServiceVersion = "KB_COMP_SERVICE_VERSION"

// KBEnvServiceRoles defines the Roles configured in the cluster definition that are visible to users.
KBEnvServiceRoles = "KB_SERVICE_ROLES"
Expand Down

0 comments on commit 5ff87fe

Please sign in to comment.