diff --git a/modules/elastic/api/manage.go b/modules/elastic/api/manage.go index 48d31915..d3d68c56 100644 --- a/modules/elastic/api/manage.go +++ b/modules/elastic/api/manage.go @@ -65,9 +65,18 @@ func (h *APIHandler) HandleCreateClusterAction(w http.ResponseWriter, req *http. h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - // TODO validate data format conf.Enabled = true + if len(conf.Hosts) > 0 && conf.Host == "" { + conf.Host = conf.Hosts[0] + } conf.Host = strings.TrimSpace(conf.Host) + if conf.Host == "" { + h.WriteError(w, "host is required", http.StatusBadRequest) + return + } + if conf.Schema == "" { + conf.Schema = "http" + } conf.Endpoint = fmt.Sprintf("%s://%s", conf.Schema, conf.Host) conf.ID = util.GetUUID() ctx := &orm.Context{ diff --git a/modules/elastic/api/test_connection.go b/modules/elastic/api/test_connection.go index 5d79f9ee..c8f2ac18 100644 --- a/modules/elastic/api/test_connection.go +++ b/modules/elastic/api/test_connection.go @@ -80,23 +80,29 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req } else if config.Host != "" && config.Schema != "" { url = fmt.Sprintf("%s://%s", config.Schema, config.Host) config.Endpoint = url - } else { - resBody["error"] = fmt.Sprintf("invalid config: %v", util.MustToJSON(config)) - h.WriteJSON(w, resBody, http.StatusInternalServerError) - return } - - if url == "" { - panic(errors.Error("invalid url: " + util.MustToJSON(config))) + if url != "" && !util.StringInArray(config.Endpoints, url) { + config.Endpoints = append(config.Endpoints, url) } - - if !util.SuffixStr(url, "/") { - url = fmt.Sprintf("%s/", url) + if config.Schema != "" && len(config.Hosts) > 0 { + for _, host := range config.Hosts { + host = strings.TrimSpace(host) + if host == "" { + continue + } + url = fmt.Sprintf("%s://%s", config.Schema, host) + if !util.StringInArray(config.Endpoints, url) { + config.Endpoints = append(config.Endpoints, url) + } + } + } + if len(config.Endpoints) == 0 { + panic(errors.Error(fmt.Sprintf("invalid config: %v", util.MustToJSON(config)))) + } + // limit the number of endpoints to a maximum of 10 to prevent excessive processing + if len(config.Endpoints) > 10 { + config.Endpoints = config.Endpoints[0:10] } - - freq.SetRequestURI(url) - freq.Header.SetMethod("GET") - if (config.BasicAuth == nil || (config.BasicAuth != nil && config.BasicAuth.Username == "")) && config.CredentialID != "" && config.CredentialID != "manual" { credential, err := common.GetCredential(config.CredentialID) @@ -112,58 +118,85 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req config.BasicAuth = &auth } } + var ( + i int + clusterUUID string + ) + for i, url = range config.Endpoints { + if !util.SuffixStr(url, "/") { + url = fmt.Sprintf("%s/", url) + } - if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" { - freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get()) - } + freq.SetRequestURI(url) + freq.Header.SetMethod("GET") - const testClientName = "elasticsearch_test_connection" - err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second) + if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" { + freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get()) + } - if err != nil { - panic(err) - } + const testClientName = "elasticsearch_test_connection" + err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second) - var statusCode = fres.StatusCode() - if statusCode > 300 || statusCode == 0 { - resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode) - h.WriteJSON(w, resBody, 500) - return - } + if err != nil { + panic(err) + } - b := fres.Body() - clusterInfo := &elastic.ClusterInformation{} - err = json.Unmarshal(b, clusterInfo) - if err != nil { - panic(err) - } + var statusCode = fres.StatusCode() + if statusCode > 300 || statusCode == 0 { + resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode) + h.WriteJSON(w, resBody, 500) + return + } - resBody["version"] = clusterInfo.Version.Number - resBody["cluster_uuid"] = clusterInfo.ClusterUUID - resBody["cluster_name"] = clusterInfo.ClusterName - resBody["distribution"] = clusterInfo.Version.Distribution + b := fres.Body() + clusterInfo := &elastic.ClusterInformation{} + err = json.Unmarshal(b, clusterInfo) + if err != nil { + panic(err) + } - //fetch cluster health info - freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", config.Endpoint)) - fres.Reset() - err = api.GetFastHttpClient(testClientName).Do(freq, fres) - if err != nil { - resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err) - h.WriteJSON(w, resBody, http.StatusInternalServerError) - return - } + resBody["version"] = clusterInfo.Version.Number + resBody["cluster_uuid"] = clusterInfo.ClusterUUID + resBody["cluster_name"] = clusterInfo.ClusterName + resBody["distribution"] = clusterInfo.Version.Distribution + + if i == 0 { + clusterUUID = clusterInfo.ClusterUUID + } else { + //validate whether two endpoints point to the same cluster + if clusterUUID != clusterInfo.ClusterUUID { + resBody["error"] = fmt.Sprintf("invalid multiple cluster endpoints: %v", config.Endpoints) + h.WriteJSON(w, resBody, http.StatusInternalServerError) + return + } + //skip fetch cluster health info if it's not the first endpoint + break + } + //fetch cluster health info + freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", url)) + fres.Reset() + err = api.GetFastHttpClient(testClientName).Do(freq, fres) + if err != nil { + resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err) + h.WriteJSON(w, resBody, http.StatusInternalServerError) + return + } - healthInfo := &elastic.ClusterHealth{} - err = json.Unmarshal(fres.Body(), &healthInfo) - if err != nil { - resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err) - h.WriteJSON(w, resBody, http.StatusInternalServerError) - return + healthInfo := &elastic.ClusterHealth{} + err = json.Unmarshal(fres.Body(), &healthInfo) + if err != nil { + resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err) + h.WriteJSON(w, resBody, http.StatusInternalServerError) + return + } + resBody["status"] = healthInfo.Status + resBody["number_of_nodes"] = healthInfo.NumberOfNodes + resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes + resBody["active_shards"] = healthInfo.ActiveShards + + freq.Reset() + fres.Reset() } - resBody["status"] = healthInfo.Status - resBody["number_of_nodes"] = healthInfo.NumberOfNodes - resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes - resBody["active_shards"] = healthInfo.ActiveShards h.WriteJSON(w, resBody, http.StatusOK) diff --git a/web/src/pages/System/Cluster/Form.js b/web/src/pages/System/Cluster/Form.js index 544babca..d735b3ec 100644 --- a/web/src/pages/System/Cluster/Form.js +++ b/web/src/pages/System/Cluster/Form.js @@ -8,7 +8,7 @@ import { Button, Switch, message, - Spin, + Spin, Select, } from "antd"; import router from "umi/router"; @@ -158,6 +158,7 @@ class ClusterForm extends React.Component { let newVals = { name: values.name, host: values.host, + hosts: values.hosts, credential_id: values.credential_id !== MANUAL_VALUE ? values.credential_id @@ -237,7 +238,7 @@ class ClusterForm extends React.Component { tryConnect = async (type) => { const { dispatch, form } = this.props; if (this.state.needAuth) { - if (type == "agent") { + if (type === "agent") { this.setState({ ...this.state, credentialRequired: false, @@ -252,7 +253,7 @@ class ClusterForm extends React.Component { } } let fieldNames = this.validateFieldNames; - if (type == "agent") { + if (type === "agent") { fieldNames = this.agentValidateFieldNames; } setTimeout(() => { @@ -272,7 +273,7 @@ class ClusterForm extends React.Component { schema: values.isTLS === true ? "https" : "http", }; - if (type == "agent") { + if (type === "agent") { newVals = { ...newVals, ...{ @@ -319,7 +320,7 @@ class ClusterForm extends React.Component { }); this.clusterUUID = res.cluster_uuid; } - if (type == "agent") { + if (type === "agent") { this.setState({ btnLoadingAgent: false }); } else { this.setState({ btnLoading: false }); @@ -329,6 +330,17 @@ class ClusterForm extends React.Component { }, 200); }; + validateHostsRule = (rule, value, callback) => { + let vals = value || []; + for(let i = 0; i < vals.length; i++) { + if (!/^[\w\.\-_~%]+(\:\d+)?$/.test(vals[i])) { + return callback(formatMessage({ id: "cluster.regist.form.verify.valid.endpoint" })); + } + } + // validation passed + callback(); + }; + render() { const { getFieldDecorator } = this.props.form; const formItemLayout = { @@ -354,6 +366,16 @@ class ClusterForm extends React.Component { }, }; const { editValue, editMode } = this.props.clusterConfig; + //add host value to hosts field if it's empty + if(editValue.host){ + if(!editValue.hosts){ + editValue.hosts = [editValue.host]; + }else{ + if (!editValue.hosts.includes(editValue.host)) { + editValue.hosts.push(editValue.host); + } + } + } return ( - {getFieldDecorator("host", { - initialValue: editValue.host, + {getFieldDecorator("hosts", { + initialValue: editValue.hosts, rules: [ { - type: "string", - pattern: /^[\w\.\-_~%]+(\:\d+)?$/, - message: formatMessage({ - id: "cluster.regist.form.verify.valid.endpoint", - }), + validator: this.validateHostsRule, }, { required: true, @@ -444,7 +462,7 @@ class ClusterForm extends React.Component { }), }, ], - })()} + })()} + })(