Skip to content

Commit

Permalink
Multi region support (#35)
Browse files Browse the repository at this point in the history
Multi-region support
  • Loading branch information
kamalrajmadhurakasan authored Jul 24, 2024
1 parent 24923ca commit 80b01d6
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 98 deletions.
49 changes: 41 additions & 8 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var (

const (
X_API_PROXY_COMMUNICATION_SYTLE = "X-Api-Proxy-Communication-Style"
clientRegionError = "Failed to find the client for region"
)

// Query for pxGrid, ERS or other API
Expand Down Expand Up @@ -82,7 +83,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {
createEnv := createResponse{}
createMultipartEnv := createMultipartResponse{}
queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/object/multipart")
resp, err := d.tenant.regionalHttpClient.R().

regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}

resp, err := regionalClient.R().
SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync").
SetResult(&createMultipartEnv).
Post(queryPath)
Expand All @@ -92,7 +99,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {
if resp.StatusCode() == http.StatusNotFound {
// Multipart Large API payload is not supported by this device, try single part
queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/object")
resp, err = d.tenant.regionalHttpClient.R().

regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}

resp, err = regionalClient.R().
SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync").
SetResult(&createEnv).
Post(queryPath)
Expand Down Expand Up @@ -186,7 +199,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {
// Trigger query
respEnv := queryResponse{}
queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query")
resp, err := d.tenant.regionalHttpClient.R().

regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}

resp, err := regionalClient.R().
SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync").
SetBody(reqEnv).
SetResult(&respEnv).
Expand Down Expand Up @@ -216,7 +235,13 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {

// Poll
queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), "/query/"+queryId)
resp, err = d.tenant.regionalHttpClient.R().

regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}

resp, err = regionalClient.R().
SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync").
SetBody(respEnv).
SetResult(&respEnv).
Expand All @@ -240,7 +265,12 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {
var reader io.ReadCloser
if respEnv.ObjectUrl != "" {
// Download object
hresp, err := d.tenant.regionalHttpClient.R().
regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}

hresp, err := regionalClient.R().
SetDoNotParseResponse(true).
Get(respEnv.ObjectUrl)
if err != nil {
Expand Down Expand Up @@ -270,8 +300,11 @@ func (d *Device) Query(request *http.Request) (*http.Response, error) {

func (d *Device) fallbackQuery(request *http.Request, payload []byte) (*http.Response, error) {
queryPath := fmt.Sprintf(directModePath, url.PathEscape(d.ID()), request.URL)

req := d.tenant.regionalHttpClient.R()
regionalClient := d.tenant.regionalHttpClients[d.Fqdn()]
if regionalClient == nil {
return nil, fmt.Errorf("%s - %s", clientRegionError, d.Fqdn())
}
req := regionalClient.R()

if request.Header != nil {
for name, values := range request.Header {
Expand Down Expand Up @@ -307,7 +340,7 @@ func (q *queryCloser) Close() error {

// Delete query
queryPath := fmt.Sprintf(directModePath, url.PathEscape(q.device.ID()), "/query/"+q.queryId)
req := q.device.tenant.regionalHttpClient.R()
req := q.device.tenant.regionalHttpClients[q.device.Fqdn()].R()
req.SetHeader(X_API_PROXY_COMMUNICATION_SYTLE, "sync")
_, err := req.Execute(http.MethodDelete, queryPath)
return err
Expand Down
142 changes: 93 additions & 49 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/http"
"net/url"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -61,6 +62,9 @@ type Config struct {
// Hostname of the regional cloud environment
RegionalFQDN string

// Hostnames of the regional cloud environments
RegionalFQDNs []string

// Hostname of the global cloud environment
GlobalFQDN string

Expand Down Expand Up @@ -111,8 +115,8 @@ type Config struct {
// App struct is the entry point for the pxGrid Cloud Go SDK
type App struct {
config Config
httpClient *resty.Client // global HTTP client
conn *pubsub.Connection // pubsub WebSocket connection
httpClient *resty.Client // global HTTP client
conn []*pubsub.Connection // pubsub WebSocket connection

tenantMap sync.Map
deviceMap sync.Map
Expand All @@ -130,7 +134,7 @@ var (
)

func (app *App) String() string {
return fmt.Sprintf("App[ID: %s, RegionalFQDN: %s]", app.config.ID, app.config.RegionalFQDN)
return fmt.Sprintf("App[ID: %s, RegionalFQDNs: %v]", app.config.ID, app.config.RegionalFQDNs)
}

// New creates and returns a new instance of App
Expand Down Expand Up @@ -186,7 +190,15 @@ func New(config Config) (*App, error) {
func validateConfig(config *Config) error {
// sanitize all the input
config.ID = strings.TrimSpace(config.ID)
config.RegionalFQDN = strings.TrimSpace(config.RegionalFQDN)
if len(config.RegionalFQDNs) == 0 {
config.RegionalFQDNs = make([]string, 1)
config.RegionalFQDNs[0] = strings.TrimSpace(config.RegionalFQDN)
} else {
for i, regionalFQDN := range config.RegionalFQDNs {
config.RegionalFQDNs[i] = strings.TrimSpace(regionalFQDN)
}
}
log.Logger.Infof("RegionalFQDNs: %v", config.RegionalFQDNs)
config.GlobalFQDN = strings.TrimSpace(config.GlobalFQDN)
config.ReadStreamID = strings.TrimSpace(config.ReadStreamID)
config.WriteStreamID = strings.TrimSpace(config.WriteStreamID)
Expand All @@ -195,9 +207,14 @@ func validateConfig(config *Config) error {
if config.ID == "" {
return errors.New("ID must not be empty")
}
for _, regionalFQDN := range config.RegionalFQDNs {
if regionalFQDN == "" {
return errors.New("RegionalFQDN must not be empty")
}
}

if config.RegionalFQDN == "" || config.GlobalFQDN == "" {
return errors.New("RegionalFQDN and GlobalFQDN must not be empty")
if config.GlobalFQDN == "" {
return errors.New("GlobalFQDN must not be empty")
}

if config.ReadStreamID == "" || config.WriteStreamID == "" {
Expand All @@ -214,7 +231,10 @@ func validateConfig(config *Config) error {
// Close shuts down the App instance and releases all the resources
func (app *App) Close() error {
if app.conn != nil {
app.conn.Disconnect()
for _, connection := range app.conn {
connection.Disconnect()
}
app.conn = nil
}
app.ctxCancel()
app.wg.Wait()
Expand Down Expand Up @@ -244,7 +264,10 @@ func (app *App) reportError(err error) {
// Close shuts down the App instance and releases all the resources
func (app *App) close() error {
if app.conn != nil {
app.conn.Disconnect()
for _, connection := range app.conn {
connection.Disconnect()
}
app.conn = nil
}

app.tenantMap.Range(func(key interface{}, _ interface{}) bool {
Expand All @@ -263,37 +286,44 @@ func (app *App) close() error {
// pubsubConnect opens a websocket connection to pxGrid Cloud
func (app *App) pubsubConnect() error {
var err error
app.conn, err = pubsub.NewConnection(pubsub.Config{
GroupID: app.config.GroupID,
Domain: url.PathEscape(app.config.RegionalFQDN),
APIKeyProvider: func() ([]byte, error) {
if app.config.GetCredentials != nil {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
for _, fqdn := range app.config.RegionalFQDNs {

connection, connectionErr := pubsub.NewConnection(pubsub.Config{
GroupID: app.config.GroupID,
Domain: url.PathEscape(fqdn),
APIKeyProvider: func() ([]byte, error) {
if app.config.GetCredentials != nil {
credentials, e := app.config.GetCredentials()
if e != nil {
return nil, e
}
return credentials.ApiKey, e
} else {
return []byte(app.config.ApiKey), nil
}
return credentials.ApiKey, e
} else {
return []byte(app.config.ApiKey), nil
}
},
Transport: app.config.Transport,
})
if err != nil {
return fmt.Errorf("failed to create pubsub connection: %v", err)
},
Transport: app.config.Transport,
})
if connectionErr != nil {
return fmt.Errorf("failed to create pubsub connection: %v", connectionErr)
}
app.conn = append(app.conn, connection)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

err = app.conn.Connect(ctx)
if err != nil {
return fmt.Errorf("failed to connect pubsub connection: %v", err)
}
for _, connection := range app.conn {
err = connection.Connect(ctx)
if err != nil {
return fmt.Errorf("failed to connect pubsub connection: %v", err)
}

err = app.conn.Subscribe(app.config.ReadStreamID, app.readStreamHandler())
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
err = connection.Subscribe(app.config.ReadStreamID, app.readStreamHandler())
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
}
}

return nil
}

Expand Down Expand Up @@ -540,18 +570,22 @@ func (app *App) SetTenant(id, name, apiToken string) (*Tenant, error) {

func (app *App) setTenant(tenant *Tenant) error {
app.tenantMap.Store(tenant.id, tenant)

httpClient := resty.NewWithClient(app.httpClient.GetClient()).
SetBaseURL(app.httpClient.HostURL)
tenant.setHttpClient(httpClient)

regionalHostURL := url.URL{
Scheme: defaultHTTPScheme,
Path: url.PathEscape(app.config.RegionalFQDN),
regionalHttpClients := make(map[string]*resty.Client)
for _, regionalFQDN := range app.config.RegionalFQDNs {
httpClient := resty.NewWithClient(app.httpClient.GetClient()).
SetBaseURL(app.httpClient.HostURL)
tenant.setHttpClient(httpClient)

regionalHostURL := url.URL{
Scheme: defaultHTTPScheme,
Path: url.PathEscape(regionalFQDN),
}
regionalHttpClient := resty.NewWithClient(app.httpClient.GetClient()).
SetBaseURL(regionalHostURL.String())
regionalHttpClients[regionalFQDN] = regionalHttpClient
}
regionalHttpClient := resty.NewWithClient(app.httpClient.GetClient()).
SetBaseURL(regionalHostURL.String())
tenant.setRegionalHttpClient(regionalHttpClient)
tenant.setRegionalHttpClients(regionalHttpClients)

tenant.app = app

devices, err := tenant.getDevices()
Expand Down Expand Up @@ -618,12 +652,22 @@ func (app *App) startPubsubConnect() {
//reset backoff factor for successful connection
backoffFactor = 0

select {
case err = <-app.conn.Error:
app.close()
app.reportError(err)
case <-app.ctx.Done():
return
cases := make([]reflect.SelectCase, len(app.conn))
for i, connection := range app.conn {
cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(connection.Error)}
}
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(app.ctx.Done())})
for {
chosen, value, ok := reflect.Select(cases)
if !ok {
// The chosen channel has been closed, so zero out the channel to disable the case
cases[chosen].Chan = reflect.ValueOf(nil)
return
} else {
err = fmt.Errorf(value.String())
app.close()
app.reportError(err)
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions device.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Device struct {
region string
status string
tenant *Tenant
fqdn string
}

func (d *Device) String() string {
Expand Down Expand Up @@ -48,6 +49,11 @@ func (d *Device) Tenant() *Tenant {
return d.tenant
}

// Region returns device's fqdn
func (d *Device) Fqdn() string {
return d.fqdn
}

// DeviceStatus represents the status of a device
type DeviceStatus struct {
Status string
Expand All @@ -61,6 +67,7 @@ type getDeviceResponse struct {
} `json:"deviceInfo"`
MgtInfo struct {
Region string `json:"region"`
Fqdn string `json:"fqdn"`
} `json:"mgtInfo"`
Meta struct {
EnrollmentStatus string `json:"enrollmentStatus"`
Expand Down
14 changes: 8 additions & 6 deletions examples/basic-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
var logger *log.DefaultLogger = &log.DefaultLogger{Level: log.LogLevelInfo}

type appConfig struct {
Id string `yaml:"id"`
ApiKey string `yaml:"apiKey"`
GlobalFQDN string `yaml:"globalFQDN"`
RegionalFQDN string `yaml:"regionalFQDN"`
ReadStream string `yaml:"readStream"`
WriteStream string `yaml:"writeStream"`
Id string `yaml:"id"`
ApiKey string `yaml:"apiKey"`
GlobalFQDN string `yaml:"globalFQDN"`
RegionalFQDN string `yaml:"regionalFQDN"`
RegionalFQDNs []string `yaml:"regionalFQDNs"`
ReadStream string `yaml:"readStream"`
WriteStream string `yaml:"writeStream"`
}

type tenantConfig struct {
Expand Down Expand Up @@ -106,6 +107,7 @@ func main() {
GetCredentials: getCredentials,
GlobalFQDN: config.App.GlobalFQDN,
RegionalFQDN: config.App.RegionalFQDN,
RegionalFQDNs: config.App.RegionalFQDNs,
DeviceActivationHandler: activationHandler,
DeviceDeactivationHandler: deactivationHandler,
TenantUnlinkedHandler: tenantUnlinkedHandler,
Expand Down
3 changes: 2 additions & 1 deletion examples/echo-query/config_sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ app:
id: replace_with_app_id
apiKey: replace_with_api_key
globalFQDN: dnaservices.cisco.com
regionalFQDN: neoffers.cisco.com
regionalFQDNs:
- neoffers.cisco.com
readStream: replace_with_read_stream
writeStream: replace_with_write_stream

Expand Down
Loading

0 comments on commit 80b01d6

Please sign in to comment.