diff --git a/api.go b/api.go index 0f81b82..56e2bb4 100644 --- a/api.go +++ b/api.go @@ -54,6 +54,7 @@ var ( const ( X_API_PROXY_COMMUNICATION_SYTLE = "X-Api-Proxy-Communication-Style" + clientRegionError = "Failed to find the client for region" ) // Query for pxGrid, ERS or other API @@ -82,7 +83,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { createEnv := createResponse{} createMultipartEnv := createMultipartResponse{} queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/object/multipart") - resp, err := d.tenant.regionalHttpClient.R(). + + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + + resp, err := regionalClient.R(). SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync"). SetResult(&createMultipartEnv). Post(queryPath) @@ -92,7 +99,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { if resp.StatusCode() == http.StatusNotFound { // Multipart Large API payload is not supported by this device, try single part queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/object") - resp, err = d.tenant.regionalHttpClient.R(). + + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + + resp, err = regionalClient.R(). SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync"). SetResult(&createEnv). Post(queryPath) @@ -186,7 +199,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { // Trigger query respEnv := queryResponse{} queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query") - resp, err := d.tenant.regionalHttpClient.R(). + + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + + resp, err := regionalClient.R(). SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync"). SetBody(reqEnv). SetResult(&respEnv). @@ -216,7 +235,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { // Poll queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/"+queryId) - resp, err = d.tenant.regionalHttpClient.R(). + + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + + resp, err = regionalClient.R(). SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync"). SetBody(respEnv). SetResult(&respEnv). @@ -240,7 +265,12 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { var reader io.ReadCloser if respEnv.ObjectUrl != "" { // Download object - hresp, err := d.tenant.regionalHttpClient.R(). + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + + hresp, err := regionalClient.R(). SetDoNotParseResponse(true). Get(respEnv.ObjectUrl) if err != nil { @@ -270,8 +300,11 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) { func (d *Device) fallbackQuery(request *http.Request, payload []byte) (*http.Response, error) { queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), request.URL) - - req := d.tenant.regionalHttpClient.R() + regionalClient := d.tenant.regionalHttpClients[d.Fqdn()] + if regionalClient == nil { + return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn()) + } + req := regionalClient.R() if request.Header != nil { for name, values := range request.Header { @@ -307,7 +340,7 @@ func (q *queryCloser) Close() error { // Delete query queryPath := fmt.Sprintf(directModePath, url.PathEscape(q.device.ID()), "/query/"+q.queryId) - req := q.device.tenant.regionalHttpClient.R() + req := q.device.tenant.regionalHttpClients[q.device.Fqdn()].R() req.SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync") _, err := req.Execute(http.MethodDelete, queryPath) return err diff --git a/app.go b/app.go index 3ba446c..901e5b5 100644 --- a/app.go +++ b/app.go @@ -8,6 +8,7 @@ import ( "fmt" "net/http" "net/url" + "reflect" "strings" "sync" "time" @@ -61,6 +62,9 @@ type Config struct { // Hostname of the regional cloud environment RegionalFQDN string + // Hostnames of the regional cloud environments + RegionalFQDNs []string + // Hostname of the global cloud environment GlobalFQDN string @@ -111,8 +115,8 @@ type Config struct { // App struct is the entry point for the pxGrid Cloud Go SDK type App struct { config Config - httpClient *resty.Client // global HTTP client - conn *pubsub.Connection // pubsub WebSocket connection + httpClient *resty.Client // global HTTP client + conn []*pubsub.Connection // pubsub WebSocket connection tenantMap sync.Map deviceMap sync.Map @@ -130,7 +134,7 @@ var ( ) func (app *App) String() string { - return fmt.Sprintf("App[ID: %s, RegionalFQDN: %s]", app.config.ID, app.config.RegionalFQDN) + return fmt.Sprintf("App[ID: %s, RegionalFQDNs: %v]", app.config.ID, app.config.RegionalFQDNs) } // New creates and returns a new instance of App @@ -186,7 +190,15 @@ func New(config Config) (*App, error) { func validateConfig(config *Config) error { // sanitize all the input config.ID = strings.TrimSpace(config.ID) - config.RegionalFQDN = strings.TrimSpace(config.RegionalFQDN) + if len(config.RegionalFQDNs) == 0 { + config.RegionalFQDNs = make([]string, 1) + config.RegionalFQDNs[0] = strings.TrimSpace(config.RegionalFQDN) + } else { + for i, regionalFQDN := range config.RegionalFQDNs { + config.RegionalFQDNs[i] = strings.TrimSpace(regionalFQDN) + } + } + log.Logger.Infof("RegionalFQDNs: %v", config.RegionalFQDNs) config.GlobalFQDN = strings.TrimSpace(config.GlobalFQDN) config.ReadStreamID = strings.TrimSpace(config.ReadStreamID) config.WriteStreamID = strings.TrimSpace(config.WriteStreamID) @@ -195,9 +207,14 @@ func validateConfig(config *Config) error { if config.ID == "" { return errors.New("ID must not be empty") } + for _, regionalFQDN := range config.RegionalFQDNs { + if regionalFQDN == "" { + return errors.New("RegionalFQDN must not be empty") + } + } - if config.RegionalFQDN == "" || config.GlobalFQDN == "" { - return errors.New("RegionalFQDN and GlobalFQDN must not be empty") + if config.GlobalFQDN == "" { + return errors.New("GlobalFQDN must not be empty") } if config.ReadStreamID == "" || config.WriteStreamID == "" { @@ -214,7 +231,10 @@ func validateConfig(config *Config) error { // Close shuts down the App instance and releases all the resources func (app *App) Close() error { if app.conn != nil { - app.conn.Disconnect() + for _, connection := range app.conn { + connection.Disconnect() + } + app.conn = nil } app.ctxCancel() app.wg.Wait() @@ -244,7 +264,10 @@ func (app *App) reportError(err error) { // Close shuts down the App instance and releases all the resources func (app *App) close() error { if app.conn != nil { - app.conn.Disconnect() + for _, connection := range app.conn { + connection.Disconnect() + } + app.conn = nil } app.tenantMap.Range(func(key interface{}, _ interface{}) bool { @@ -263,37 +286,44 @@ func (app *App) close() error { // pubsubConnect opens a websocket connection to pxGrid Cloud func (app *App) pubsubConnect() error { var err error - app.conn, err = pubsub.NewConnection(pubsub.Config{ - GroupID: app.config.GroupID, - Domain: url.PathEscape(app.config.RegionalFQDN), - APIKeyProvider: func() ([]byte, error) { - if app.config.GetCredentials != nil { - credentials, e := app.config.GetCredentials() - if e != nil { - return nil, e + for _, fqdn := range app.config.RegionalFQDNs { + + connection, connectionErr := pubsub.NewConnection(pubsub.Config{ + GroupID: app.config.GroupID, + Domain: url.PathEscape(fqdn), + APIKeyProvider: func() ([]byte, error) { + if app.config.GetCredentials != nil { + credentials, e := app.config.GetCredentials() + if e != nil { + return nil, e + } + return credentials.ApiKey, e + } else { + return []byte(app.config.ApiKey), nil } - return credentials.ApiKey, e - } else { - return []byte(app.config.ApiKey), nil - } - }, - Transport: app.config.Transport, - }) - if err != nil { - return fmt.Errorf("failed to create pubsub connection: %v", err) + }, + Transport: app.config.Transport, + }) + if connectionErr != nil { + return fmt.Errorf("failed to create pubsub connection: %v", connectionErr) + } + app.conn = append(app.conn, connection) } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - err = app.conn.Connect(ctx) - if err != nil { - return fmt.Errorf("failed to connect pubsub connection: %v", err) - } + for _, connection := range app.conn { + err = connection.Connect(ctx) + if err != nil { + return fmt.Errorf("failed to connect pubsub connection: %v", err) + } - err = app.conn.Subscribe(app.config.ReadStreamID, app.readStreamHandler()) - if err != nil { - return fmt.Errorf("failed to subscribe: %v", err) + err = connection.Subscribe(app.config.ReadStreamID, app.readStreamHandler()) + if err != nil { + return fmt.Errorf("failed to subscribe: %v", err) + } } + return nil } @@ -540,18 +570,22 @@ func (app *App) SetTenant(id, name, apiToken string) (*Tenant, error) { func (app *App) setTenant(tenant *Tenant) error { app.tenantMap.Store(tenant.id, tenant) - - httpClient := resty.NewWithClient(app.httpClient.GetClient()). - SetBaseURL(app.httpClient.HostURL) - tenant.setHttpClient(httpClient) - - regionalHostURL := url.URL{ - Scheme: defaultHTTPScheme, - Path: url.PathEscape(app.config.RegionalFQDN), + regionalHttpClients := make(map[string]*resty.Client) + for _, regionalFQDN := range app.config.RegionalFQDNs { + httpClient := resty.NewWithClient(app.httpClient.GetClient()). + SetBaseURL(app.httpClient.HostURL) + tenant.setHttpClient(httpClient) + + regionalHostURL := url.URL{ + Scheme: defaultHTTPScheme, + Path: url.PathEscape(regionalFQDN), + } + regionalHttpClient := resty.NewWithClient(app.httpClient.GetClient()). + SetBaseURL(regionalHostURL.String()) + regionalHttpClients[regionalFQDN] = regionalHttpClient } - regionalHttpClient := resty.NewWithClient(app.httpClient.GetClient()). - SetBaseURL(regionalHostURL.String()) - tenant.setRegionalHttpClient(regionalHttpClient) + tenant.setRegionalHttpClients(regionalHttpClients) + tenant.app = app devices, err := tenant.getDevices() @@ -618,12 +652,22 @@ func (app *App) startPubsubConnect() { //reset backoff factor for successful connection backoffFactor = 0 - select { - case err = <-app.conn.Error: - app.close() - app.reportError(err) - case <-app.ctx.Done(): - return + cases := make([]reflect.SelectCase, len(app.conn)) + for i, connection := range app.conn { + cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(connection.Error)} + } + cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(app.ctx.Done())}) + for { + chosen, value, ok := reflect.Select(cases) + if !ok { + // The chosen channel has been closed, so zero out the channel to disable the case + cases[chosen].Chan = reflect.ValueOf(nil) + return + } else { + err = fmt.Errorf(value.String()) + app.close() + app.reportError(err) + } } } diff --git a/device.go b/device.go index 49140c2..8a95f65 100644 --- a/device.go +++ b/device.go @@ -16,6 +16,7 @@ type Device struct { region string status string tenant *Tenant + fqdn string } func (d *Device) String() string { @@ -48,6 +49,11 @@ func (d *Device) Tenant() *Tenant { return d.tenant } +// Region returns device's fqdn +func (d *Device) Fqdn() string { + return d.fqdn +} + // DeviceStatus represents the status of a device type DeviceStatus struct { Status string @@ -61,6 +67,7 @@ type getDeviceResponse struct { } `json:"deviceInfo"` MgtInfo struct { Region string `json:"region"` + Fqdn string `json:"fqdn"` } `json:"mgtInfo"` Meta struct { EnrollmentStatus string `json:"enrollmentStatus"` diff --git a/examples/basic-consumer/main.go b/examples/basic-consumer/main.go index bb3dc74..013118e 100644 --- a/examples/basic-consumer/main.go +++ b/examples/basic-consumer/main.go @@ -18,12 +18,13 @@ import ( var logger *log.DefaultLogger = &log.DefaultLogger{Level: log.LogLevelInfo} type appConfig struct { - Id string `yaml:"id"` - ApiKey string `yaml:"apiKey"` - GlobalFQDN string `yaml:"globalFQDN"` - RegionalFQDN string `yaml:"regionalFQDN"` - ReadStream string `yaml:"readStream"` - WriteStream string `yaml:"writeStream"` + Id string `yaml:"id"` + ApiKey string `yaml:"apiKey"` + GlobalFQDN string `yaml:"globalFQDN"` + RegionalFQDN string `yaml:"regionalFQDN"` + RegionalFQDNs []string `yaml:"regionalFQDNs"` + ReadStream string `yaml:"readStream"` + WriteStream string `yaml:"writeStream"` } type tenantConfig struct { @@ -106,6 +107,7 @@ func main() { GetCredentials: getCredentials, GlobalFQDN: config.App.GlobalFQDN, RegionalFQDN: config.App.RegionalFQDN, + RegionalFQDNs: config.App.RegionalFQDNs, DeviceActivationHandler: activationHandler, DeviceDeactivationHandler: deactivationHandler, TenantUnlinkedHandler: tenantUnlinkedHandler, diff --git a/examples/echo-query/config_sample.yaml b/examples/echo-query/config_sample.yaml index 4a7dcf7..4afe123 100644 --- a/examples/echo-query/config_sample.yaml +++ b/examples/echo-query/config_sample.yaml @@ -2,7 +2,8 @@ app: id: replace_with_app_id apiKey: replace_with_api_key globalFQDN: dnaservices.cisco.com - regionalFQDN: neoffers.cisco.com + regionalFQDNs: + - neoffers.cisco.com readStream: replace_with_read_stream writeStream: replace_with_write_stream diff --git a/examples/echo-query/main.go b/examples/echo-query/main.go index eeaced5..ee64427 100644 --- a/examples/echo-query/main.go +++ b/examples/echo-query/main.go @@ -17,12 +17,12 @@ import ( var logger *log.DefaultLogger = &log.DefaultLogger{Level: log.LogLevelInfo} type appConfig struct { - Id string `yaml:"id"` - ApiKey string `yaml:"apiKey"` - GlobalFQDN string `yaml:"globalFQDN"` - RegionalFQDN string `yaml:"regionalFQDN"` - ReadStream string `yaml:"readStream"` - WriteStream string `yaml:"writeStream"` + Id string `yaml:"id"` + ApiKey string `yaml:"apiKey"` + GlobalFQDN string `yaml:"globalFQDN"` + RegionalFQDNs []string `yaml:"regionalFQDNs"` + ReadStream string `yaml:"readStream"` + WriteStream string `yaml:"writeStream"` } type tenantConfig struct { @@ -81,6 +81,8 @@ func main() { insecure := flag.Bool("insecure", false, "Insecure TLS") file := flag.String("in", "", "File for input for echo (optional). stdin if not specified") out := flag.String("out", "", "File for output for echo (optional). stdout if not specified") + url := flag.String("url", "", "request url") + method := flag.String("method", "", "request type") flag.Parse() config, err := loadConfig(*configFile) if err != nil { @@ -106,7 +108,7 @@ func main() { ID: config.App.Id, GetCredentials: getCredentials, GlobalFQDN: config.App.GlobalFQDN, - RegionalFQDN: config.App.RegionalFQDN, + RegionalFQDNs: config.App.RegionalFQDNs, DeviceActivationHandler: activationHandler, DeviceDeactivationHandler: deactivationHandler, TenantUnlinkedHandler: tenantUnlinkedHandler, @@ -160,9 +162,41 @@ func main() { logger.Errorf("No device found. tenant=%s", tenant.Name()) os.Exit(-1) } - // Select first device - device := devices[0] - logger.Infof("Selected first device name=%s tenant=%s id=%s", device.Name(), device.Tenant().Name(), device.ID()) + + var filteredDevices []sdk.Device + //Filter the devices based on the configured regions + for _, device := range devices { + if len(appConfig.RegionalFQDNs) != 0 { + for _, configuredRegionalFQDN := range appConfig.RegionalFQDNs { + if device.Fqdn() == configuredRegionalFQDN { + filteredDevices = append(filteredDevices, device) + } + } + } else { + if device.Fqdn() == appConfig.RegionalFQDN { + filteredDevices = append(filteredDevices, device) + } + } + } + + logger.Infof("List of devices for the configured regions are %v", filteredDevices) + + // Setup output + var writer io.Writer + if *out != "" { + f, err := os.Create(*out) + if err != nil { + panic(err) + } + defer f.Close() + writer = f + } else { + writer = os.Stdout + } + + // Loop through all the devices of the configured regions + device := filteredDevices[0] + logger.Infof("Selected device name=%s tenant=%s id=%s region=%s", device.Name(), device.Tenant().Name(), device.ID(), device.Region()) // Setup input var reader io.Reader @@ -177,27 +211,21 @@ func main() { reader = os.Stdin } - // Perform echo-query - req, _ := http.NewRequest(http.MethodPost, "/pxgrid/echo/query", reader) + if *method == "" { + *method = http.MethodPost + } + if *url == "" { + *url = "/pxgrid/echo/query" + } + + // Perform api request + req, _ := http.NewRequest(*method, *url, reader) resp, err := device.Query(req) if err != nil { panic(err) } defer resp.Body.Close() - // Setup output - var writer io.Writer - if *out != "" { - f, err := os.Create(*out) - if err != nil { - panic(err) - } - defer f.Close() - writer = f - } else { - writer = os.Stdout - } - // Write body to output n, err := io.Copy(writer, resp.Body) if err != nil { diff --git a/tenant.go b/tenant.go index 999181d..a0f6791 100644 --- a/tenant.go +++ b/tenant.go @@ -19,9 +19,9 @@ type Tenant struct { name string apiToken string - app *App - httpClient *resty.Client - regionalHttpClient *resty.Client + app *App + httpClient *resty.Client + regionalHttpClients map[string]*resty.Client } func (t *Tenant) String() string { @@ -68,6 +68,7 @@ func (t *Tenant) getDevices() ([]Device, error) { region: d.MgtInfo.Region, status: d.Meta.EnrollmentStatus, tenant: t, + fqdn: d.MgtInfo.Fqdn, }) } @@ -115,6 +116,7 @@ func (t *Tenant) getDeviceByID(deviceId string) (*Device, error) { region: gdr.MgtInfo.Region, status: gdr.Meta.EnrollmentStatus, tenant: t, + fqdn: gdr.MgtInfo.Fqdn, }, nil } @@ -141,12 +143,15 @@ func (t *Tenant) setHttpClient(httpClient *resty.Client) { }) } -func (t *Tenant) setRegionalHttpClient(regionalHttpClient *resty.Client) { - t.regionalHttpClient = regionalHttpClient - t.regionalHttpClient.OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error { - request.SetHeader("X-API-KEY", t.ApiToken()) - return nil - }) +func (t *Tenant) setRegionalHttpClients(regionalHttpClients map[string]*resty.Client) { + + t.regionalHttpClients = regionalHttpClients + for _, regionalHttpClient := range t.regionalHttpClients { + regionalHttpClient.OnBeforeRequest(func(_ *resty.Client, request *resty.Request) error { + request.SetHeader("X-API-KEY", t.ApiToken()) + return nil + }) + } } func (t *Tenant) MarshalJSON() ([]byte, error) {