Skip to content

Commit

Permalink
Merge pull request kubesphere#4651 from iawia002/validation
Browse files Browse the repository at this point in the history
Use the kube-system UID to identify if the member cluster already exists
  • Loading branch information
ks-ci-bot authored Feb 21, 2022
2 parents fe1d988 + 7deb9c2 commit f50de9a
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 96 deletions.
10 changes: 5 additions & 5 deletions api/ks-openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -21081,10 +21081,10 @@
},
"v1alpha2.Node": {
"required": [
"rank",
"labelMinor",
"id",
"label",
"labelMinor",
"rank",
"controls"
],
"properties": {
Expand Down Expand Up @@ -21192,10 +21192,10 @@
},
"v1alpha2.NodeSummary": {
"required": [
"labelMinor",
"rank",
"id",
"label"
"label",
"labelMinor",
"rank"
],
"properties": {
"adjacency": {
Expand Down
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -10770,6 +10770,10 @@
"description": "Region is the name of the region in which all of the nodes in the cluster exist. e.g. 'us-east1'.",
"type": "string"
},
"uid": {
"description": "UID is the kube-system namespace UID of the cluster, which represents the unique ID of the cluster.",
"type": "string"
},
"zones": {
"description": "Zones are the names of availability zones in which the nodes of the cluster exist, e.g. 'us-east1-a'.",
"type": "array",
Expand Down
4 changes: 4 additions & 0 deletions config/crds/cluster.kubesphere.io_clusters.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions pkg/controller/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,13 @@ func (c *clusterController) syncCluster(key string) error {
klog.Errorf("Failed to get kubernetes version, %#v", err)
return err
}

cluster.Status.KubernetesVersion = version.GitVersion

nodes, err := clusterDt.client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get cluster nodes, %#v", err)
return err
}

cluster.Status.NodeCount = len(nodes.Items)

configz, err := c.tryToFetchKubeSphereComponents(clusterDt.config.Host, clusterDt.transport)
Expand All @@ -599,6 +597,13 @@ func (c *clusterController) syncCluster(key string) error {
cluster.Status.KubeSphereVersion = v
}

// Use kube-system namespace UID as cluster ID
kubeSystem, err := clusterDt.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return err
}
cluster.Status.UID = kubeSystem.UID

// label cluster host cluster if configz["multicluster"]==true
if mc, ok := configz[configzMultiCluster]; ok && mc && c.checkIfClusterIsHostCluster(nodes) {
if cluster.Labels == nil {
Expand Down
97 changes: 32 additions & 65 deletions pkg/kapis/cluster/v1alpha1/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,13 @@ func (h *handler) updateKubeConfig(request *restful.Request, response *restful.R
return
}

_, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint, cluster.Spec.Connection.KubeConfig)
_, err = validateKubeSphereAPIServer(config)
if err != nil {
api.HandleBadRequest(response, request, fmt.Errorf("unable validate kubesphere endpoint, %v", err))
return
}

err = h.validateMemberClusterConfiguration(cluster.Spec.Connection.KubeConfig)
err = h.validateMemberClusterConfiguration(clientSet)
if err != nil {
api.HandleBadRequest(response, request, fmt.Errorf("failed to validate member cluster configuration, err: %v", err))
}
Expand Down Expand Up @@ -340,29 +340,38 @@ func (h *handler) validateCluster(request *restful.Request, response *restful.Re
return
}

err = h.validateKubeConfig(cluster.Spec.Connection.KubeConfig)
config, err := k8sutil.LoadKubeConfigFromBytes(cluster.Spec.Connection.KubeConfig)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}

_, err = validateKubeSphereAPIServer(cluster.Spec.Connection.KubeSphereAPIEndpoint, cluster.Spec.Connection.KubeConfig)
config.Timeout = defaultTimeout
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
api.HandleBadRequest(response, request, err)
return
}

if err = h.validateKubeConfig(cluster.Name, clientSet); err != nil {
api.HandleBadRequest(response, request, err)
return
}

if _, err = validateKubeSphereAPIServer(config); err != nil {
api.HandleBadRequest(response, request, fmt.Errorf("unable validate kubesphere endpoint, %v", err))
return
}

err = h.validateMemberClusterConfiguration(cluster.Spec.Connection.KubeConfig)
if err != nil {
if err = h.validateMemberClusterConfiguration(clientSet); err != nil {
api.HandleBadRequest(response, request, fmt.Errorf("failed to validate member cluster configuration, err: %v", err))
}

response.WriteHeader(http.StatusOK)
}

// validateKubeConfig takes base64 encoded kubeconfig and check its validity
func (h *handler) validateKubeConfig(kubeconfig []byte) error {
config, err := k8sutil.LoadKubeConfigFromBytes(kubeconfig)
func (h *handler) validateKubeConfig(clusterName string, clientSet kubernetes.Interface) error {
kubeSystem, err := clientSet.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -372,60 +381,30 @@ func (h *handler) validateKubeConfig(kubeconfig []byte) error {
return err
}

// clusters with the exactly same KubernetesAPIEndpoint considered to be one
// clusters with the exactly same kube-system namespace UID considered to be one
// MUST not import the same cluster twice
for _, cluster := range clusters {
if len(cluster.Spec.Connection.KubernetesAPIEndpoint) != 0 && cluster.Spec.Connection.KubernetesAPIEndpoint == config.Host {
return fmt.Errorf("existing cluster %s with the exacty same server address, MUST not import the same cluster twice", cluster.Name)
for _, existedCluster := range clusters {
if existedCluster.Status.UID == kubeSystem.UID {
return fmt.Errorf("cluster %s already exists (%s), MUST not import the same cluster twice", clusterName, existedCluster.Name)
}
}

config.Timeout = defaultTimeout

clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

_, err = clientSet.Discovery().ServerVersion()

return err
}

// validateKubeSphereAPIServer uses version api to check the accessibility
// If kubesphere apiserver endpoint is not provided, use kube-apiserver proxy instead
func validateKubeSphereAPIServer(ksEndpoint string, kubeconfig []byte) (*version.Info, error) {
if len(ksEndpoint) == 0 && len(kubeconfig) == 0 {
return nil, fmt.Errorf("neither kubesphere api endpoint nor kubeconfig was provided")
func validateKubeSphereAPIServer(config *rest.Config) (*version.Info, error) {
transport, err := rest.TransportFor(config)
if err != nil {
return nil, err
}

client := http.Client{
Timeout: defaultTimeout,
Timeout: defaultTimeout,
Transport: transport,
}

path := fmt.Sprintf("%s/kapis/version", ksEndpoint)

if len(ksEndpoint) != 0 {
_, err := url.Parse(ksEndpoint)
if err != nil {
return nil, err
}
} else {
config, err := k8sutil.LoadKubeConfigFromBytes(kubeconfig)
if err != nil {
return nil, err
}

transport, err := rest.TransportFor(config)
if err != nil {
return nil, err
}

client.Transport = transport
path = fmt.Sprintf("%s/api/v1/namespaces/%s/services/:%s:/proxy/kapis/version", config.Host, KubesphereNamespace, KubeSphereApiServer)
}

response, err := client.Get(path)
response, err := client.Get(fmt.Sprintf("%s/api/v1/namespaces/%s/services/:%s:/proxy/kapis/version", config.Host, KubesphereNamespace, KubeSphereApiServer))
if err != nil {
return nil, err
}
Expand All @@ -450,13 +429,13 @@ func validateKubeSphereAPIServer(ksEndpoint string, kubeconfig []byte) (*version

// validateMemberClusterConfiguration compares host and member cluster jwt, if they are not same, it changes member
// cluster jwt to host's, then restart member cluster ks-apiserver.
func (h *handler) validateMemberClusterConfiguration(memberKubeconfig []byte) error {
func (h *handler) validateMemberClusterConfiguration(clientSet kubernetes.Interface) error {
hConfig, err := h.getHostClusterConfig()
if err != nil {
return err
}

mConfig, err := h.getMemberClusterConfig(memberKubeconfig)
mConfig, err := h.getMemberClusterConfig(clientSet)
if err != nil {
return err
}
Expand All @@ -469,19 +448,7 @@ func (h *handler) validateMemberClusterConfiguration(memberKubeconfig []byte) er
}

// getMemberClusterConfig returns KubeSphere running config by the given member cluster kubeconfig
func (h *handler) getMemberClusterConfig(kubeconfig []byte) (*config.Config, error) {
config, err := k8sutil.LoadKubeConfigFromBytes(kubeconfig)
if err != nil {
return nil, err
}

config.Timeout = defaultTimeout

clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

func (h *handler) getMemberClusterConfig(clientSet kubernetes.Interface) (*config.Config, error) {
memberCm, err := clientSet.CoreV1().ConfigMaps(KubesphereNamespace).Get(context.Background(), KubeSphereConfigName, metav1.GetOptions{})
if err != nil {
return nil, err
Expand Down
34 changes: 10 additions & 24 deletions pkg/kapis/cluster/v1alpha1/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"

Expand All @@ -32,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/client-go/kubernetes"
k8s "k8s.io/client-go/kubernetes"
k8sfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -42,7 +41,6 @@ import (
"kubesphere.io/kubesphere/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/utils/k8sutil"
"kubesphere.io/kubesphere/pkg/version"
)

const (
Expand Down Expand Up @@ -372,32 +370,15 @@ func TestValidateKubeConfig(t *testing.T) {
_ = env.Stop()
}()

err = h.validateKubeConfig([]byte(base64EncodedKubeConfig))
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}
}

var ver = version.Get()

func endpoint(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(ver)
}

func TestValidateKubeSphereEndpoint(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(endpoint))
defer svr.Close()

got, err := validateKubeSphereAPIServer(svr.URL, nil)
err = h.validateKubeConfig("test", clientSet)
if err != nil {
t.Fatal(err)
}

if diff := cmp.Diff(&ver, got); len(diff) != 0 {
t.Errorf("%T +got, -expected %v", ver, diff)
}

}

func TestValidateMemberClusterConfiguration(t *testing.T) {
Expand Down Expand Up @@ -448,15 +429,20 @@ func TestValidateMemberClusterConfiguration(t *testing.T) {
_ = env.Stop()
}()

clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatal(err)
}

addMemberClusterResource(hostCm, t)

err = h.validateMemberClusterConfiguration([]byte(base64EncodedKubeConfig))
err = h.validateMemberClusterConfiguration(clientSet)
if err != nil {
t.Fatal(err)
}

addMemberClusterResource(memberCm, t)
err = h.validateMemberClusterConfiguration([]byte(base64EncodedKubeConfig))
err = h.validateMemberClusterConfiguration(clientSet)
if err == nil {
t.Fatal()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

const (
Expand Down Expand Up @@ -168,6 +169,9 @@ type ClusterStatus struct {
// every amount of time, like 5 minutes.
// +optional
Configz map[string]bool `json:"configz,omitempty"`

// UID is the kube-system namespace UID of the cluster, which represents the unique ID of the cluster.
UID types.UID `json:"uid,omitempty"`
}

// +genclient
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f50de9a

Please sign in to comment.