Skip to content

Commit

Permalink
update controller
Browse files Browse the repository at this point in the history
Signed-off-by: pixiake <[email protected]>
  • Loading branch information
pixiake committed Nov 5, 2020
1 parent cbc0edd commit 215357a
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 73 deletions.
48 changes: 40 additions & 8 deletions apis/kubekey/v1alpha1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"github.com/kubesphere/kubekey/pkg/util"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"regexp"
"strconv"
Expand Down Expand Up @@ -202,9 +203,15 @@ func (cfg *ClusterSpec) GenerateCertSANs() []string {
return defaultCertSANs
}

func (cfg *ClusterSpec) GroupHosts() (*HostGroups, error) {
func (cfg *ClusterSpec) GroupHosts(logger *log.Logger) (*HostGroups, error) {
clusterHostsGroups := HostGroups{}
etcdGroup, masterGroup, workerGroup, err := cfg.ParseRolesList()

hostList := map[string]string{}
for _, host := range cfg.Hosts {
hostList[host.Name] = host.Name
}

etcdGroup, masterGroup, workerGroup, err := cfg.ParseRolesList(hostList, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -253,6 +260,13 @@ func (cfg *ClusterSpec) GroupHosts() (*HostGroups, error) {
}

//Check that the parameters under roleGroups are incorrect
if len(masterGroup) == 0 {
logger.Fatal(errors.New("The number of master cannot be 0."))
}
if len(etcdGroup) == 0 {
logger.Fatal(errors.New("The number of etcd cannot be 0."))
}

if len(masterGroup) != len(clusterHostsGroups.Master) {
return nil, errors.New("Incorrect nodeName under roleGroups/master in the configuration file, Please check before installing.")
}
Expand All @@ -262,7 +276,6 @@ func (cfg *ClusterSpec) GroupHosts() (*HostGroups, error) {
if len(workerGroup) != len(clusterHostsGroups.Worker) {
return nil, errors.New("Incorrect nodeName under roleGroups/work in the configuration file, Please check before installing.")
}

clusterHostsGroups.Client = append(clusterHostsGroups.Client, clusterHostsGroups.Master[0])
return &clusterHostsGroups, nil
}
Expand All @@ -271,46 +284,65 @@ func (cfg *ClusterSpec) ClusterIP() string {
return util.ParseIp(cfg.Network.KubeServiceCIDR)[2]
}

func (cfg *ClusterSpec) ParseRolesList() ([]string, []string, []string, error) {
func (cfg *ClusterSpec) ParseRolesList(hostList map[string]string, logger *log.Logger) ([]string, []string, []string, error) {
etcdGroupList := []string{}
masterGroupList := []string{}
workerGroupList := []string{}

for _, host := range cfg.RoleGroups.Etcd {
if strings.Contains(host, "[") && strings.Contains(host, "]") && strings.Contains(host, ":") {
etcdGroupList = append(etcdGroupList, getHostsRange(host)...)
etcdGroupList = append(etcdGroupList, getHostsRange(host, hostList, "etcd", logger)...)
} else {
if err := hostVerify(hostList, host, "etcd"); err != nil {
logger.Fatal(err)
}
etcdGroupList = append(etcdGroupList, host)
}
}

for _, host := range cfg.RoleGroups.Master {
if strings.Contains(host, "[") && strings.Contains(host, "]") && strings.Contains(host, ":") {
masterGroupList = append(masterGroupList, getHostsRange(host)...)
masterGroupList = append(masterGroupList, getHostsRange(host, hostList, "master", logger)...)
} else {
if err := hostVerify(hostList, host, "master"); err != nil {
logger.Fatal(err)
}
masterGroupList = append(masterGroupList, host)
}
}

for _, host := range cfg.RoleGroups.Worker {
if strings.Contains(host, "[") && strings.Contains(host, "]") && strings.Contains(host, ":") {
workerGroupList = append(workerGroupList, getHostsRange(host)...)
workerGroupList = append(workerGroupList, getHostsRange(host, hostList, "worker", logger)...)
} else {
if err := hostVerify(hostList, host, "worker"); err != nil {
logger.Fatal(err)
}
workerGroupList = append(workerGroupList, host)
}
}
return etcdGroupList, masterGroupList, workerGroupList, nil
}

func getHostsRange(rangeStr string) []string {
func getHostsRange(rangeStr string, hostList map[string]string, group string, logger *log.Logger) []string {
hostRangeList := []string{}
r := regexp.MustCompile(`\[(\d+)\:(\d+)\]`)
nameSuffix := r.FindStringSubmatch(rangeStr)
namePrefix := strings.Split(rangeStr, nameSuffix[0])[0]
nameSuffixStart, _ := strconv.Atoi(nameSuffix[1])
nameSuffixEnd, _ := strconv.Atoi(nameSuffix[2])
for i := nameSuffixStart; i <= nameSuffixEnd; i++ {
if err := hostVerify(hostList, fmt.Sprintf("%s%d", namePrefix, i), group); err != nil {
logger.Fatal(err)
}
hostRangeList = append(hostRangeList, fmt.Sprintf("%s%d", namePrefix, i))
}
return hostRangeList
}

func hostVerify(hostList map[string]string, hostName string, group string) error {
if _, ok := hostList[hostName]; !ok {
return errors.New(fmt.Sprintf("[%s] is in [%s] group, but not in hosts list.", hostName, group))
}
return nil
}
5 changes: 3 additions & 2 deletions apis/kubekey/v1alpha1/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"fmt"
"github.com/kubesphere/kubekey/pkg/util"
log "github.com/sirupsen/logrus"
"os"
"strings"
)
Expand Down Expand Up @@ -59,12 +60,12 @@ const (
DefaultEtcdBackupScriptDir = "/usr/local/bin/kube-scripts"
)

func (cfg *ClusterSpec) SetDefaultClusterSpec(incluster bool) (*ClusterSpec, *HostGroups, error) {
func (cfg *ClusterSpec) SetDefaultClusterSpec(incluster bool, logger *log.Logger) (*ClusterSpec, *HostGroups, error) {
clusterCfg := ClusterSpec{}

clusterCfg.Hosts = SetDefaultHostsCfg(cfg)
clusterCfg.RoleGroups = cfg.RoleGroups
hostGroups, err := clusterCfg.GroupHosts()
hostGroups, err := clusterCfg.GroupHosts(logger)
if err != nil {
return nil, nil, err
}
Expand Down
35 changes: 22 additions & 13 deletions controllers/kubekey/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}
if err := updateRunJob(r, ctx, cluster, jobFound, log, CreateCluster); err != nil {
return ctrl.Result{}, err
return ctrl.Result{Requeue: true}, err
}
}

Expand All @@ -123,7 +123,7 @@ func (r *ClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
return ctrl.Result{}, err
}
if err := updateRunJob(r, ctx, cluster, jobFound, log, AddNodes); err != nil {
return ctrl.Result{}, err
return ctrl.Result{Requeue: true}, err
}
}

Expand Down Expand Up @@ -334,34 +334,43 @@ func updateRunJob(r *ClusterReconciler, ctx context.Context, cluster *kubekeyv1a
} else if action == AddNodes {
name = fmt.Sprintf("%s-add-nodes", cluster.Name)
}

// Check if the job already exists, if not create a new one
if err := r.Get(ctx, types.NamespacedName{Name: name, Namespace: "kubekey-system"}, jobFound); err != nil && !kubeErr.IsNotFound(err) {
return nil
} else if err == nil {
} else if err == nil && (jobFound.Status.Failed != 0 || jobFound.Status.Succeeded != 0) {
// delete old pods
podlist := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace("kubekey-system"),
client.MatchingLabels{"job-name": name},
}
_ = r.List(context.TODO(), podlist, listOpts...)
for _, pod := range podlist.Items {
_ = r.Delete(ctx, &pod)
if err := r.List(context.TODO(), podlist, listOpts...); err == nil && len(podlist.Items) != 0 {
for _, pod := range podlist.Items {
_ = r.Delete(ctx, &pod)
}
}
if err := r.Delete(ctx, jobFound); err != nil {
log.Error(err, "Failed to delete old Job", "Job.Namespace", jobFound.Namespace, "Job.Name", jobFound.Name)
return err
}
}

jobCluster := r.jobForCluster(cluster, action)
log.Info("Creating a new Job to create cluster", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
if err := r.Create(ctx, jobCluster); err != nil {
log.Error(err, "Failed to create new Job", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
return err
jobCluster := r.jobForCluster(cluster, action)
log.Info("Creating a new Job to create cluster", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
if err := r.Create(ctx, jobCluster); err != nil {
log.Error(err, "Failed to create new Job", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
return err
}
} else if kubeErr.IsNotFound(err) {
jobCluster := r.jobForCluster(cluster, action)
log.Info("Creating a new Job to create cluster", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
if err := r.Create(ctx, jobCluster); err != nil {
log.Error(err, "Failed to create new Job", "Job.Namespace", jobCluster.Namespace, "Job.Name", jobCluster.Name)
return err
}
}
if err := updateStatusRunner(r, cluster, action); err != nil {
return err
}

return nil
}
18 changes: 11 additions & 7 deletions manifests/parameters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ data:
]
},
"storagePlugins": [
{
"name": "local",
"title": "Local Volume[Use only for testing]",
"description": "A local volume represents a mounted local storage device such as a disk, partition or directory."
},
{
"name": "nfs",
"title": "NFS",
Expand All @@ -81,14 +86,14 @@ data:
{
"name": "nfs.server",
"title": "NFS Server",
"description": "Hostname of the NFS server(ip or hostname).",
"description": "Hostname of the NFS server(ip or hostname). Required.",
"default": "",
"type": "string"
},
{
"name": "nfs.path",
"title": "NFS Path",
"description": "Basepath of the mount point.",
"description": "Basepath of the mount point. For example, '/mnt/kube'. Required",
"default": "",
"type": "string"
}
Expand Down Expand Up @@ -117,15 +122,15 @@ data:
{
"name": "redisVolumeSize",
"title": "Redis Volume Size",
"description": "Redis PVC size.",
"description": "Redis Storage size.",
"default": "2Gi",
"unit": "Gi",
"type": "integer"
},
{
"name": "mysqlVolumeSize",
"title": "MySQL Volume Size",
"description": "MySQL PVC size.",
"description": "MySQL Storage size.",
"default": "20Gi",
"unit": "Gi",
"type": "integer"
Expand All @@ -141,15 +146,15 @@ data:
{
"name": "etcdVolumeSize",
"title": "etcd Volume Size",
"description": "etcd PVC size.",
"description": "etcd Storage size.",
"default": "20Gi",
"unit": "Gi",
"type": "integer"
},
{
"name": "openldapVolumeSize",
"title": "Openldap Volume Size",
"description": "Openldap PVC size.",
"description": "Openldap Storage size.",
"default": "2Gi",
"unit": "Gi",
"type": "integer"
Expand Down Expand Up @@ -331,4 +336,3 @@ metadata:
kubesphere.io/creator: admin
name: kubekey-parameters
namespace: kubekey-system

11 changes: 9 additions & 2 deletions pkg/addons/charts/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"log"
"os"
"path/filepath"
"strings"
"time"
)

Expand Down Expand Up @@ -77,8 +78,14 @@ func InstallChart(mgr *manager.Manager, addon *kubekeyapiv1alpha1.Addon, kubecon
if err != nil {
return err
}
s, _ := clientset.CoreV1().Secrets("kubesphere-system").Get(context.TODO(), "kubesphere-secret", metav1.GetOptions{})
valueOpts.Values = append(valueOpts.Values, fmt.Sprintf("authentication.jwtSecret=%s", string(s.Data["secret"])))
for index, value := range addon.Sources.Chart.Values {
if strings.Contains(value, "registry=") {
addon.Sources.Chart.Values[index] = strings.TrimSuffix(value, "/")
}
}
if s, err := clientset.CoreV1().Secrets("kubesphere-system").Get(context.TODO(), "kubesphere-secret", metav1.GetOptions{}); err == nil {
valueOpts.Values = append(valueOpts.Values, fmt.Sprintf("authentication.jwtSecret=%s", string(s.Data["secret"])))
}
}
}
if len(addon.Sources.Chart.ValuesFile) != 0 {
Expand Down
6 changes: 4 additions & 2 deletions pkg/addons/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func InstallAddons(mgr *manager.Manager) error {
if err := mgr.RunTaskOnMasterNodes(checkKubeSphereStatus, true); err != nil {
return err
}
kubesphere.ResultNotes(mgr.InCluster)
if err := kubesphere.ResultNotes(mgr.InCluster); err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -107,7 +109,7 @@ func installAddon(mgr *manager.Manager, addon *kubekeyapiv1alpha1.Addon, kubecon
return nil
}

func checkKubeSphereStatus(mgr *manager.Manager, node *kubekeyapiv1alpha1.HostCfg) error {
func checkKubeSphereStatus(mgr *manager.Manager, _ *kubekeyapiv1alpha1.HostCfg) error {
if mgr.Runner.Index == 0 {
go kubesphere.CheckKubeSphereStatus(mgr)
}
Expand Down
Loading

0 comments on commit 215357a

Please sign in to comment.