Skip to content

Commit

Permalink
feat: support configuring multiple hosts when creating a cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
silenceqi committed Jan 17, 2025
1 parent 7db9344 commit f222920
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 89 deletions.
11 changes: 10 additions & 1 deletion modules/elastic/api/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,18 @@ func (h *APIHandler) HandleCreateClusterAction(w http.ResponseWriter, req *http.
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
// TODO validate data format
conf.Enabled = true
if len(conf.Hosts) > 0 && conf.Host == "" {
conf.Host = conf.Hosts[0]
}
conf.Host = strings.TrimSpace(conf.Host)
if conf.Host == "" {
h.WriteError(w, "host is required", http.StatusBadRequest)
return
}
if conf.Schema == "" {
conf.Schema = "http"
}
conf.Endpoint = fmt.Sprintf("%s://%s", conf.Schema, conf.Host)
conf.ID = util.GetUUID()
ctx := &orm.Context{
Expand Down
147 changes: 90 additions & 57 deletions modules/elastic/api/test_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,29 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req
} else if config.Host != "" && config.Schema != "" {
url = fmt.Sprintf("%s://%s", config.Schema, config.Host)
config.Endpoint = url
} else {
resBody["error"] = fmt.Sprintf("invalid config: %v", util.MustToJSON(config))
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}

if url == "" {
panic(errors.Error("invalid url: " + util.MustToJSON(config)))
if url != "" && !util.StringInArray(config.Endpoints, url) {
config.Endpoints = append(config.Endpoints, url)
}

if !util.SuffixStr(url, "/") {
url = fmt.Sprintf("%s/", url)
if config.Schema != "" && len(config.Hosts) > 0 {
for _, host := range config.Hosts {
host = strings.TrimSpace(host)
if host == "" {
continue
}
url = fmt.Sprintf("%s://%s", config.Schema, host)
if !util.StringInArray(config.Endpoints, url) {
config.Endpoints = append(config.Endpoints, url)
}
}
}
if len(config.Endpoints) == 0 {
panic(errors.Error(fmt.Sprintf("invalid config: %v", util.MustToJSON(config))))
}
// limit the number of endpoints to a maximum of 10 to prevent excessive processing
if len(config.Endpoints) > 10 {
config.Endpoints = config.Endpoints[0:10]
}

freq.SetRequestURI(url)
freq.Header.SetMethod("GET")

if (config.BasicAuth == nil || (config.BasicAuth != nil && config.BasicAuth.Username == "")) &&
config.CredentialID != "" && config.CredentialID != "manual" {
credential, err := common.GetCredential(config.CredentialID)
Expand All @@ -112,58 +118,85 @@ func (h TestAPI) HandleTestConnectionAction(w http.ResponseWriter, req *http.Req
config.BasicAuth = &auth
}
}
var (
i int
clusterUUID string
)
for i, url = range config.Endpoints {
if !util.SuffixStr(url, "/") {
url = fmt.Sprintf("%s/", url)
}

if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" {
freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get())
}
freq.SetRequestURI(url)
freq.Header.SetMethod("GET")

const testClientName = "elasticsearch_test_connection"
err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second)
if config.BasicAuth != nil && strings.TrimSpace(config.BasicAuth.Username) != "" {
freq.SetBasicAuth(config.BasicAuth.Username, config.BasicAuth.Password.Get())
}

if err != nil {
panic(err)
}
const testClientName = "elasticsearch_test_connection"
err = api.GetFastHttpClient(testClientName).DoTimeout(freq, fres, 10*time.Second)

var statusCode = fres.StatusCode()
if statusCode > 300 || statusCode == 0 {
resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode)
h.WriteJSON(w, resBody, 500)
return
}
if err != nil {
panic(err)
}

b := fres.Body()
clusterInfo := &elastic.ClusterInformation{}
err = json.Unmarshal(b, clusterInfo)
if err != nil {
panic(err)
}
var statusCode = fres.StatusCode()
if statusCode > 300 || statusCode == 0 {
resBody["error"] = fmt.Sprintf("invalid status code: %d", statusCode)
h.WriteJSON(w, resBody, 500)
return
}

resBody["version"] = clusterInfo.Version.Number
resBody["cluster_uuid"] = clusterInfo.ClusterUUID
resBody["cluster_name"] = clusterInfo.ClusterName
resBody["distribution"] = clusterInfo.Version.Distribution
b := fres.Body()
clusterInfo := &elastic.ClusterInformation{}
err = json.Unmarshal(b, clusterInfo)
if err != nil {
panic(err)
}

//fetch cluster health info
freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", config.Endpoint))
fres.Reset()
err = api.GetFastHttpClient(testClientName).Do(freq, fres)
if err != nil {
resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
resBody["version"] = clusterInfo.Version.Number
resBody["cluster_uuid"] = clusterInfo.ClusterUUID
resBody["cluster_name"] = clusterInfo.ClusterName
resBody["distribution"] = clusterInfo.Version.Distribution

if i == 0 {
clusterUUID = clusterInfo.ClusterUUID
} else {
//validate whether two endpoints point to the same cluster
if clusterUUID != clusterInfo.ClusterUUID {
resBody["error"] = fmt.Sprintf("invalid multiple cluster endpoints: %v", config.Endpoints)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
//skip fetch cluster health info if it's not the first endpoint
break
}
//fetch cluster health info
freq.SetRequestURI(fmt.Sprintf("%s/_cluster/health", url))
fres.Reset()
err = api.GetFastHttpClient(testClientName).Do(freq, fres)
if err != nil {
resBody["error"] = fmt.Sprintf("error on get cluster health: %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}

healthInfo := &elastic.ClusterHealth{}
err = json.Unmarshal(fres.Body(), &healthInfo)
if err != nil {
resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
healthInfo := &elastic.ClusterHealth{}
err = json.Unmarshal(fres.Body(), &healthInfo)
if err != nil {
resBody["error"] = fmt.Sprintf("error on decode cluster health info : %v", err)
h.WriteJSON(w, resBody, http.StatusInternalServerError)
return
}
resBody["status"] = healthInfo.Status
resBody["number_of_nodes"] = healthInfo.NumberOfNodes
resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes
resBody["active_shards"] = healthInfo.ActiveShards

freq.Reset()
fres.Reset()
}
resBody["status"] = healthInfo.Status
resBody["number_of_nodes"] = healthInfo.NumberOfNodes
resBody["number_of_data_nodes"] = healthInfo.NumberOf_data_nodes
resBody["active_shards"] = healthInfo.ActiveShards

h.WriteJSON(w, resBody, http.StatusOK)

Expand Down
44 changes: 31 additions & 13 deletions web/src/pages/System/Cluster/Form.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
Button,
Switch,
message,
Spin,
Spin, Select,
} from "antd";
import router from "umi/router";

Expand Down Expand Up @@ -158,6 +158,7 @@ class ClusterForm extends React.Component {
let newVals = {
name: values.name,
host: values.host,
hosts: values.hosts,
credential_id:
values.credential_id !== MANUAL_VALUE
? values.credential_id
Expand Down Expand Up @@ -237,7 +238,7 @@ class ClusterForm extends React.Component {
tryConnect = async (type) => {
const { dispatch, form } = this.props;
if (this.state.needAuth) {
if (type == "agent") {
if (type === "agent") {
this.setState({
...this.state,
credentialRequired: false,
Expand All @@ -252,7 +253,7 @@ class ClusterForm extends React.Component {
}
}
let fieldNames = this.validateFieldNames;
if (type == "agent") {
if (type === "agent") {
fieldNames = this.agentValidateFieldNames;
}
setTimeout(() => {
Expand All @@ -272,7 +273,7 @@ class ClusterForm extends React.Component {

schema: values.isTLS === true ? "https" : "http",
};
if (type == "agent") {
if (type === "agent") {
newVals = {
...newVals,
...{
Expand Down Expand Up @@ -319,7 +320,7 @@ class ClusterForm extends React.Component {
});
this.clusterUUID = res.cluster_uuid;
}
if (type == "agent") {
if (type === "agent") {
this.setState({ btnLoadingAgent: false });
} else {
this.setState({ btnLoading: false });
Expand All @@ -329,6 +330,17 @@ class ClusterForm extends React.Component {
}, 200);
};

validateHostsRule = (rule, value, callback) => {
let vals = value || [];
for(let i = 0; i < vals.length; i++) {
if (!/^[\w\.\-_~%]+(\:\d+)?$/.test(vals[i])) {
return callback(formatMessage({ id: "cluster.regist.form.verify.valid.endpoint" }));
}
}
// validation passed
callback();
};

render() {
const { getFieldDecorator } = this.props.form;
const formItemLayout = {
Expand All @@ -354,6 +366,16 @@ class ClusterForm extends React.Component {
},
};
const { editValue, editMode } = this.props.clusterConfig;
//add host value to hosts field if it's empty
if(editValue.host){
if(!editValue.hosts){
editValue.hosts = [editValue.host];
}else{
if (!editValue.hosts.includes(editValue.host)) {
editValue.hosts.push(editValue.host);
}
}
}
return (
<PageHeaderWrapper>
<Card
Expand Down Expand Up @@ -427,15 +449,11 @@ class ClusterForm extends React.Component {
id: "cluster.manage.label.cluster_host",
})}
>
{getFieldDecorator("host", {
initialValue: editValue.host,
{getFieldDecorator("hosts", {
initialValue: editValue.hosts,
rules: [
{
type: "string",
pattern: /^[\w\.\-_~%]+(\:\d+)?$/,
message: formatMessage({
id: "cluster.regist.form.verify.valid.endpoint",
}),
validator: this.validateHostsRule,
},
{
required: true,
Expand All @@ -444,7 +462,7 @@ class ClusterForm extends React.Component {
}),
},
],
})(<TrimSpaceInput placeholder="127.0.0.1:9200" />)}
})(<Select placeholder="127.0.0.1:9200" mode="tags" />)}
</Form.Item>
<Form.Item style={{ marginBottom: 0 }}>
{getFieldDecorator("version", {
Expand Down
3 changes: 2 additions & 1 deletion web/src/pages/System/Cluster/Step.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ const ClusterStep = ({ dispatch, history, query }) => {
username: values.username,
password: values.password,
},
host: values.host,
hosts: values.hosts,
credential_id:
values.credential_id !== MANUAL_VALUE
? values.credential_id
Expand Down Expand Up @@ -142,6 +142,7 @@ const ClusterStep = ({ dispatch, history, query }) => {
version: clusterConfig.version,
distribution: clusterConfig.distribution,
host: clusterConfig.host,
hosts: clusterConfig.hosts,
location: clusterConfig.location,
credential_id:
clusterConfig.credential_id !== MANUAL_VALUE
Expand Down
2 changes: 1 addition & 1 deletion web/src/pages/System/Cluster/steps/extra_step.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class ExtraStep extends React.Component {
return;
}
let newVals = {
host: initialValue.host,
hosts: initialValue?.hosts || [],
schema: initialValue.isTLS === true ? "https" : "http",
};
newVals = {
Expand Down
Loading

0 comments on commit f222920

Please sign in to comment.