From d877417cbb4ad41a734da2eb19cf6fe9eed0720f Mon Sep 17 00:00:00 2001 From: Koen de Laat Date: Fri, 26 Jan 2024 13:36:21 +0000 Subject: [PATCH] Add connect/dbs --- README.md | 30 +-- config/hsdp.json | 3 + connect/dbs/client.go | 225 +++++++++++++++++++++++ {blr => connect/dbs}/client_test.go | 24 +-- connect/dbs/errors.go | 22 +++ connect/dbs/subscriber_service.go | 142 ++++++++++++++ connect/dbs/subscriber_service_test.go | 91 +++++++++ connect/dbs/subscription_service.go | 139 ++++++++++++++ connect/dbs/subscription_service_test.go | 95 ++++++++++ 9 files changed, 748 insertions(+), 23 deletions(-) create mode 100644 connect/dbs/client.go rename {blr => connect/dbs}/client_test.go (89%) create mode 100644 connect/dbs/errors.go create mode 100644 connect/dbs/subscriber_service.go create mode 100644 connect/dbs/subscriber_service_test.go create mode 100644 connect/dbs/subscription_service.go create mode 100644 connect/dbs/subscription_service_test.go diff --git a/README.md b/README.md index 1ffaa932..60ba4294 100644 --- a/README.md +++ b/README.md @@ -9,17 +9,7 @@ A HSDP API client library enabling Go programs to interact with various HSDP API The current implement covers only a subset of HSDP APIs. Basically, we implement functionality as needed. -- [x] Blob Repository - - [x] Blob Metadata - - [x] Access Policy - - [x] Access URL - - [x] Multipart Upload - - [x] BlobStore Policy management - - [ ] Topic management - - [ ] Store Access - - [ ] Bucket management - - [ ] Contract management - - [ ] Subscription management + - [x] Cartel c.q. Container Host management ([examples](cartel/README.md)) - [x] Clinical Data Repository (CDR) - [x] Tenant Onboarding @@ -53,6 +43,24 @@ The current implement covers only a subset of HSDP APIs. Basically, we implement - [x] Subscriber Types - [x] Resources Limits - [x] Authentication Methods + - [x] Data Broker + - [ ] Data Items + - [x] Subscribers + - [x] SQS + - [ ] Kinesis + - [x] Subscriptions + - [ ] Access Details + - [x] Blob Repository + - [x] Blob Metadata + - [x] Access Policy + - [x] Access URL + - [x] Multipart Upload + - [x] BlobStore Policy management + - [ ] Topic management + - [ ] Store Access + - [ ] Bucket management + - [ ] Contract management + - [ ] Subscription management - [x] Secure Transport Layer (STL) / Edge - [x] Device queries - [x] Application Resources management diff --git a/config/hsdp.json b/config/hsdp.json index c4371a3c..70361e2a 100644 --- a/config/hsdp.json +++ b/config/hsdp.json @@ -201,6 +201,9 @@ "discovery": { "url": "https://discovery-client-test.eu01.connect.hsdp.io/client-test/core/discovery" }, + "dbs": { + "url": "https://databroker-client-test.eu01.connect.hsdp.io/client-test/connect/databroker" + }, "has": { "url": "https://has-client-test.eu-west.philips-healthsuite.com" }, diff --git a/connect/dbs/client.go b/connect/dbs/client.go new file mode 100644 index 00000000..73a0b770 --- /dev/null +++ b/connect/dbs/client.go @@ -0,0 +1,225 @@ +// Package dbs provides support the HSDP Data Broker services +package dbs + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/go-playground/validator/v10" + "github.com/google/go-querystring/query" + autoconf "github.com/philips-software/go-hsdp-api/config" + "github.com/philips-software/go-hsdp-api/iam" + "github.com/philips-software/go-hsdp-api/internal" +) + +const ( + userAgent = "go-hsdp-api/dbs/" + internal.LibraryVersion + APIVersion = "1" +) + +// OptionFunc is the function signature function for options +type OptionFunc func(*http.Request) error + +// Config contains the configuration of a Client +type Config struct { + Region string + Environment string + BaseURL string + DebugLog io.Writer + Retry int +} + +// A Client manages communication with HSDP Data Broker APIs +type Client struct { + // HTTP Client used to communicate with IAM API + *iam.Client + config *Config + baseURL *url.URL + + // User agent used when communicating with the HSDP Blob Repository API + UserAgent string + + validate *validator.Validate + + Subscribers *SubscribersService + Subscriptions *SubscriptionService +} + +// NewClient returns a new DBS client +func NewClient(iamClient *iam.Client, config *Config) (*Client, error) { + validate := validator.New() + if err := validate.Struct(config); err != nil { + return nil, err + } + if iamClient == nil { + return nil, fmt.Errorf("iamClient cannot be nil") + } + doAutoconf(config) + c := &Client{Client: iamClient, config: config, UserAgent: userAgent, validate: validator.New()} + + if err := c.SetBaseURL(config.BaseURL); err != nil { + return nil, err + } + + c.Subscribers = &SubscribersService{Client: c, validate: validator.New()} + c.Subscriptions = &SubscriptionService{Client: c, validate: validator.New()} + + return c, nil +} + +func doAutoconf(config *Config) { + if config.Region != "" && config.Environment != "" { + c, err := autoconf.New( + autoconf.WithRegion(config.Region), + autoconf.WithEnv(config.Environment)) + if err == nil { + theService := c.Service("dbs") + if theService.URL != "" && config.BaseURL == "" { + config.BaseURL = theService.URL + } + } + } +} + +// Close releases allocated resources of clients +func (c *Client) Close() { +} + +// GetBaseURL returns the base URL as configured +func (c *Client) GetBaseURL() string { + if c.baseURL == nil { + return "" + } + return c.baseURL.String() +} + +// SetBaseURL sets the base URL for API requests +func (c *Client) SetBaseURL(urlStr string) error { + if urlStr == "" { + return ErrBaseURLCannotBeEmpty + } + // Make sure the given URL ends with a slash + if !strings.HasSuffix(urlStr, "/") { + urlStr += "/" + } + var err error + c.baseURL, err = url.Parse(urlStr) + return err +} + +// GetEndpointURL returns the Discovery Endpoint URL as configured +func (c *Client) GetEndpointURL() string { + return c.GetBaseURL() +} + +func (c *Client) NewRequest(method, requestPath string, opt interface{}, options ...OptionFunc) (*http.Request, error) { + u := *c.baseURL + // Set the encoded opaque data + u.Opaque = path.Join(c.baseURL.Path, requestPath) + + req := &http.Request{ + Method: method, + URL: &u, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + Header: make(http.Header), + Host: u.Host, + } + if opt != nil { + q, err := query.Values(opt) + if err != nil { + return nil, err + } + u.RawQuery = strings.Replace(q.Encode(), "+", "%20", -1) // https://github.com/golang/go/issues/4013 + } + + if method == "POST" || method == "PUT" { + bodyBytes, err := json.Marshal(opt) + if err != nil { + return nil, err + } + bodyReader := bytes.NewReader(bodyBytes) + + u.RawQuery = "" + req.Body = io.NopCloser(bodyReader) + req.ContentLength = int64(bodyReader.Len()) + req.Header.Set("Content-Type", "application/json") + } + token, err := c.Token() + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json") + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("API-Version", APIVersion) + if c.UserAgent != "" { + req.Header.Set("User-Agent", c.UserAgent) + } + for _, fn := range options { + if fn == nil { + continue + } + if err := fn(req); err != nil { + return nil, err + } + } + return req, nil +} + +// Response is a HSDP DBS API response. This wraps the standard http.Response +// returned from HSDP DBS and provides convenient access to things like errors +type Response struct { + *http.Response +} + +func (r *Response) StatusCode() int { + if r.Response != nil { + return r.Response.StatusCode + } + return 0 +} + +// newResponse creates a new Response for the provided http.Response. +func newResponse(r *http.Response) *Response { + response := &Response{Response: r} + return response +} + +// Do performs a http request. If v implements the io.Writer +// interface, the raw response body will be written to v, without attempting to +// first decode it. +func (c *Client) Do(req *http.Request, v interface{}) (*Response, error) { + resp, err := c.HttpClient().Do(req) + if err != nil { + return nil, err + } + + response := newResponse(resp) + + err = internal.CheckResponse(resp) + if err != nil { + // even though there was an error, we still return the response + // in case the caller wants to inspect it further + return response, err + } + + if v != nil { + defer func() { + _ = resp.Body.Close() + }() // Only close if we plan to read it + if w, ok := v.(io.Writer); ok { + _, err = io.Copy(w, resp.Body) + } else { + err = json.NewDecoder(resp.Body).Decode(v) + } + } + + return response, err +} diff --git a/blr/client_test.go b/connect/dbs/client_test.go similarity index 89% rename from blr/client_test.go rename to connect/dbs/client_test.go index 7ad4666e..fea85cdd 100644 --- a/blr/client_test.go +++ b/connect/dbs/client_test.go @@ -1,13 +1,13 @@ -package blr_test +package dbs_test import ( + "github.com/philips-software/go-hsdp-api/connect/dbs" "io" "net/http" "net/http/httptest" _ "os" "testing" - "github.com/philips-software/go-hsdp-api/blr" "github.com/philips-software/go-hsdp-api/iam" "github.com/stretchr/testify/assert" ) @@ -15,18 +15,18 @@ import ( var ( muxIAM *http.ServeMux serverIAM *httptest.Server - muxBLR *http.ServeMux - serverBLR *httptest.Server + muxDBS *http.ServeMux + serverDBS *httptest.Server iamClient *iam.Client - blrClient *blr.Client + dbsClient *dbs.Client ) func setup(t *testing.T) func() { muxIAM = http.NewServeMux() serverIAM = httptest.NewServer(muxIAM) - muxBLR = http.NewServeMux() - serverBLR = httptest.NewServer(muxBLR) + muxDBS = http.NewServeMux() + serverDBS = httptest.NewServer(muxDBS) var err error @@ -36,16 +36,16 @@ func setup(t *testing.T) func() { SharedKey: "SharedKey", SecretKey: "SecretKey", IAMURL: serverIAM.URL, - IDMURL: serverBLR.URL, + IDMURL: serverDBS.URL, }) if err != nil { t.Fatalf("Failed to create iamClient: %v", err) } - blrClient, err = blr.NewClient(iamClient, &blr.Config{ - BaseURL: serverBLR.URL + "/connect/blobrepository", + dbsClient, err = dbs.NewClient(iamClient, &dbs.Config{ + BaseURL: serverDBS.URL + "/client-test/connect/databroker", }) if err != nil { - t.Fatalf("Failed to create mdmClient: %v", err) + t.Fatalf("Failed to create dbsClient: %v", err) } token := "44d20214-7879-4e35-923d-f9d4e01c9746" @@ -131,7 +131,7 @@ func setup(t *testing.T) func() { return func() { serverIAM.Close() - serverBLR.Close() + serverDBS.Close() } } diff --git a/connect/dbs/errors.go b/connect/dbs/errors.go new file mode 100644 index 00000000..312c03b8 --- /dev/null +++ b/connect/dbs/errors.go @@ -0,0 +1,22 @@ +package dbs + +import ( + "errors" +) + +var ( + ErrNotFound = errors.New("entity not found") + ErrBaseURLCannotBeEmpty = errors.New("base URL cannot be empty") + ErrEmptyResult = errors.New("empty result") + ErrInvalidEndpointURL = errors.New("invalid endpoint URL") + ErrMissingName = errors.New("missing name value") + ErrMissingDescription = errors.New("missing description value") + ErrMalformedInputValue = errors.New("malformed input value") + ErrMissingOrganization = errors.New("missing organization") + ErrMissingProposition = errors.New("missing proposition") + ErrMissingGlobalReference = errors.New("missing global reference") + ErrNotImplementedByHSDP = errors.New("method not implemented by HSDP") + ErrEmptyResults = errors.New("empty results") + ErrOperationFailed = errors.New("operation failed") + ErrCouldNoReadResourceAfterCreate = errors.New("could not read resource after create") +) diff --git a/connect/dbs/subscriber_service.go b/connect/dbs/subscriber_service.go new file mode 100644 index 00000000..6ba240eb --- /dev/null +++ b/connect/dbs/subscriber_service.go @@ -0,0 +1,142 @@ +package dbs + +import ( + "fmt" + "net/http" + "time" + + "github.com/go-playground/validator/v10" + "github.com/philips-software/go-hsdp-api/internal" +) + +type SubscribersService struct { + *Client + validate *validator.Validate +} + +var ( + subscriberAPIVersion = "1" +) + +type SQSSubscriberConfig struct { + ResourceType string `json:"resourceType" validate:"required" enum:"SQSSubscriberConfig"` + NameInfix string `json:"nameInfix" validate:"required"` + Description string `json:"description" validate:"required"` + QueueType string `json:"queueType,omitempty" enum:"Standard|FIFO"` + DeliveryDelaySeconds int `json:"deliveryDelaySeconds,omitempty"` + MessageRetentionPeriod int `json:"messageRetentionPeriod,omitempty"` + ReceiveMessageWaitTimeSeconds int `json:"receiveMessageWaitTimeSeconds,omitempty"` + ServerSideEncryption bool `json:"serverSideEncryption,omitempty"` +} + +type SQSSubscriber struct { + ID string `json:"id"` + Meta *Meta `json:"meta"` + Name string `json:"name" validate:"required"` + Description string `json:"description" validate:"required"` + Status string `json:"status" validate:"required" enum:"Creating|Deleting|Active|Updating|InError"` + ErrorMessage string `json:"errorMessage,omitempty"` + ResourceType string `json:"resourceType" validate:"required" enum:"SQSSubscriber"` + QueueName string `json:"queueName" validate:"required"` + QueueType string `json:"queueType" validate:"required" enum:"Standard|FIFO"` + DeliveryDelaySeconds int `json:"deliveryDelaySeconds" validate:"required"` + MessageRetentionPeriod int `json:"messageRetentionPeriod" validate:"required"` + ReceiveMessageWaitTimeSeconds int `json:"receiveMessageWaitTimeSeconds" validate:"required"` + ServerSideEncryption bool `json:"serverSideEncryption" validate:"required"` +} + +type GetSQSSubscriberOptions struct { + ID *string `url:"_id,omitempty"` + Name *string `url:"name,omitempty"` + LastUpdated *string `url:"_lastUpdated,omitempty"` +} + +type Meta struct { + LastUpdated time.Time `json:"lastUpdated,omitempty"` + VersionID string `json:"versionId,omitempty"` +} + +type SQSBundle struct { + Type string `json:"type,omitempty"` + Entry []SQSSubscriber `json:"entry,omitempty"` +} + +func (b *SubscribersService) CreateSQS(sqsConfig SQSSubscriberConfig) (*SQSSubscriber, *Response, error) { + sqsConfig.ResourceType = "SQSSubscriberConfig" + if err := b.validate.Struct(sqsConfig); err != nil { + return nil, nil, err + } + + req, _ := b.NewRequest(http.MethodPost, "/Subscriber/SQS", sqsConfig, nil) + req.Header.Set("api-version", subscriberAPIVersion) + + var created SQSSubscriber + + resp, err := b.Do(req, &created) + + if err != nil { + return nil, resp, err + } + if created.ID == "" { + return nil, resp, fmt.Errorf("the 'ID' field is missing") + } + return &created, resp, nil +} + +func (b *SubscribersService) GetSQSByID(id string) (*SQSSubscriber, *Response, error) { + req, err := b.NewRequest(http.MethodGet, "/Subscriber/SQS/"+id, nil) + if err != nil { + return nil, nil, err + } + req.Header.Set("api-version", subscriberAPIVersion) + req.Header.Set("Content-Type", "application/json") + + var resource SQSSubscriber + + resp, err := b.Do(req, &resource) + if err != nil { + return nil, resp, err + } + err = internal.CheckResponse(resp.Response) + if err != nil { + return nil, resp, fmt.Errorf("GetSQSByID: %w", err) + } + if resource.ID != id { + return nil, nil, fmt.Errorf("returned resource does not match") + } + return &resource, resp, nil +} + +func (b *SubscribersService) FindSQS(opt *GetSQSSubscriberOptions, options ...OptionFunc) (*[]SQSSubscriber, *Response, error) { + req, err := b.NewRequest(http.MethodGet, "/Subscriber/SQS", opt, options...) + if err != nil { + return nil, nil, err + } + req.Header.Set("api-version", subscriberAPIVersion) + req.Header.Set("Content-Type", "application/json") + + var bundleResponse SQSBundle + + resp, err := b.Do(req, &bundleResponse) + if err != nil { + return nil, resp, err + } + + return &bundleResponse.Entry, resp, err +} + +func (b *SubscribersService) DeleteSQS(subscriber SQSSubscriber) (bool, *Response, error) { + req, err := b.NewRequest(http.MethodDelete, "/Subscriber/SQS/"+subscriber.ID, nil, nil) + if err != nil { + return false, nil, err + } + req.Header.Set("api-version", subscriberAPIVersion) + + var deleteResponse interface{} + + resp, err := b.Do(req, &deleteResponse) + if resp == nil || resp.StatusCode() != http.StatusNoContent { + return false, resp, err + } + return true, resp, nil +} diff --git a/connect/dbs/subscriber_service_test.go b/connect/dbs/subscriber_service_test.go new file mode 100644 index 00000000..373c2df0 --- /dev/null +++ b/connect/dbs/subscriber_service_test.go @@ -0,0 +1,91 @@ +package dbs_test + +import ( + "fmt" + "github.com/philips-software/go-hsdp-api/connect/dbs" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func sqsBody(id, queueType string, infix string, status string) string { + return fmt.Sprintf(`{ + "id": "%s", + "meta": { + "lastUpdated": "2022-12-06T10:18:11.947Z", + "versionId": "1" + }, + "name": "dbs-%s-%s", + "description": "MySQSQueue", + "status": "%s", + "resourceType": "SQSSubscriber", + "queueName": "dbs-%s-%s", + "queueType": "%s", + "deliveryDelaySeconds": 0, + "messageRetentionPeriod": 345600, + "receiveMessageWaitTimeSeconds": 0, + "serverSideEncryption": true +}`, id, infix, id, status, infix, id, queueType) +} + +func TestSQSCRUD(t *testing.T) { + teardown := setup(t) + defer teardown() + + sqsID := "9f80f9e0-5cb2-4ebd-8980-03f550cb453f" + queueType := "FIFO" + infix := "my_infix" + muxDBS.HandleFunc("/client-test/connect/databroker/Subscriber/SQS", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "POST": + w.Header().Set("Etag", "1") + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, sqsBody(sqsID, queueType, infix, "Creating")) + } + }) + muxDBS.HandleFunc("/client-test/connect/databroker/Subscriber/SQS/"+sqsID, func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "GET": + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, sqsBody(sqsID, queueType, infix, "Active")) + case "PUT": + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, sqsBody(sqsID, queueType, infix, "Updating")) + case "DELETE": + w.WriteHeader(http.StatusNoContent) + } + }) + + created, resp, err := dbsClient.Subscribers.CreateSQS(dbs.SQSSubscriberConfig{ + NameInfix: infix, + Description: "MySQSQueue", + QueueType: queueType, + ServerSideEncryption: true, + }) + if !assert.Nil(t, err) { + return + } + if !assert.NotNil(t, resp) { + return + } + if !assert.NotNil(t, created) { + return + } + assert.Equal(t, queueType, created.QueueType) + assert.Equal(t, sqsID, created.ID) + assert.NotNil(t, created.Status) + + res, resp, err := dbsClient.Subscribers.DeleteSQS(*created) + if !assert.Nil(t, err) { + return + } + if !assert.NotNil(t, resp) { + return + } + assert.True(t, res) + assert.Equal(t, http.StatusNoContent, resp.StatusCode()) +} diff --git a/connect/dbs/subscription_service.go b/connect/dbs/subscription_service.go new file mode 100644 index 00000000..43840462 --- /dev/null +++ b/connect/dbs/subscription_service.go @@ -0,0 +1,139 @@ +package dbs + +import ( + "fmt" + "github.com/go-playground/validator/v10" + "github.com/philips-software/go-hsdp-api/internal" + "net/http" +) + +type SubscriptionService struct { + *Client + validate *validator.Validate +} + +var ( + subscriptionAPIVersion = "1" +) + +type TopicSubscriptionConfig struct { + ResourceType string `json:"resourceType" validate:"required" enum:"TopicSubscriptionConfig"` + NameInfix string `json:"nameInfix" validate:"required"` + Description string `json:"description" validate:"required"` + SubscriberId string `json:"subscriberId" validate:"required"` + DeliverDataOnly bool `json:"deliverDataOnly,omitempty"` + KinesisStreamPartitionKey string `json:"kinesisStreamPartitionKey,omitempty"` + DataType string `json:"dataType" validate:"required"` +} + +type Subscriber struct { + ID string `json:"id" validate:"required"` + Type string `json:"type" validate:"required"` + Location string `json:"location" validate:"required"` +} + +type TopicSubscription struct { + ResourceType string `json:"resourceType" validate:"required" enum:"TopicSubscription"` + ID string `json:"id"` + Meta *Meta `json:"meta"` + Name string `json:"name" validate:"required"` + Description string `json:"description" validate:"required"` + Subscriber *Subscriber `json:"subscriber" validate:"required"` + DeliverDataOnly bool `json:"deliverDataOnly,omitempty"` + KinesisStreamPartitionKey string `json:"kinesisStreamPartitionKey,omitempty"` + Status string `json:"status" validate:"required" enum:"Creating|Deleting|Active|Updating|InError"` + ErrorMessage string `json:"errorMessage,omitempty"` + DataType string `json:"dataType" validate:"required"` + RuleName string `json:"ruleName" validate:"required"` +} + +type GetTopicSubscriptionOptions struct { + ID *string `url:"_id,omitempty"` + Name *string `url:"name,omitempty"` + LastUpdated *string `url:"_lastUpdated,omitempty"` +} + +type TopicSubscriptionBundle struct { + Type string `json:"type,omitempty"` + Entry []TopicSubscription `json:"entry,omitempty"` +} + +func (b *SubscriptionService) CreateTopicSubscription(subscriptionConfig TopicSubscriptionConfig) (*TopicSubscription, *Response, error) { + subscriptionConfig.ResourceType = "TopicSubscriptionConfig" + if err := b.validate.Struct(subscriptionConfig); err != nil { + return nil, nil, err + } + + req, _ := b.NewRequest(http.MethodPost, "/Subscription/Topic", subscriptionConfig, nil) + req.Header.Set("api-version", subscriptionAPIVersion) + + var created TopicSubscription + + resp, err := b.Do(req, &created) + + if err != nil { + return nil, resp, err + } + if created.ID == "" { + return nil, resp, fmt.Errorf("the 'ID' field is missing") + } + return &created, resp, nil +} + +func (b *SubscriptionService) GetTopicSubscriptionByID(id string) (*TopicSubscription, *Response, error) { + req, err := b.NewRequest(http.MethodGet, "/Subscription/Topic/"+id, nil) + if err != nil { + return nil, nil, err + } + req.Header.Set("api-version", subscriptionAPIVersion) + req.Header.Set("Content-Type", "application/json") + + var resource TopicSubscription + + resp, err := b.Do(req, &resource) + if err != nil { + return nil, resp, err + } + err = internal.CheckResponse(resp.Response) + if err != nil { + return nil, resp, fmt.Errorf("GetTopicSubscriptionByID: %w", err) + } + if resource.ID != id { + return nil, nil, fmt.Errorf("returned resource does not match") + } + return &resource, resp, nil +} + +func (b *SubscriptionService) FindTopicSubscription(opt *GetTopicSubscriptionOptions, options ...OptionFunc) (*[]TopicSubscription, *Response, error) { + req, err := b.NewRequest(http.MethodGet, "/Subscription/Topic", opt, options...) + if err != nil { + return nil, nil, err + } + req.Header.Set("api-version", subscriptionAPIVersion) + req.Header.Set("Content-Type", "application/json") + + var bundleResponse TopicSubscriptionBundle + + resp, err := b.Do(req, &bundleResponse) + if err != nil { + return nil, resp, err + } + + return &bundleResponse.Entry, resp, err +} + +func (b *SubscriptionService) DeleteTopicSubscription(subscription TopicSubscription) (bool, *Response, error) { + req, err := b.NewRequest(http.MethodDelete, "/Subscription/Topic/"+subscription.ID, nil, nil) + if err != nil { + return false, nil, err + } + req.Header.Set("api-version", subscriberAPIVersion) + + var deleteResponse interface{} + + resp, err := b.Do(req, &deleteResponse) + if resp == nil || resp.StatusCode() != http.StatusNoContent { + return false, resp, err + } + return true, resp, nil +} diff --git a/connect/dbs/subscription_service_test.go b/connect/dbs/subscription_service_test.go new file mode 100644 index 00000000..029a425b --- /dev/null +++ b/connect/dbs/subscription_service_test.go @@ -0,0 +1,95 @@ +package dbs_test + +import ( + "fmt" + "github.com/philips-software/go-hsdp-api/connect/dbs" + "io" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" +) + +func subscriptionBody(id, dataType string, infix string, status string, subscriberId string) string { + return fmt.Sprintf(`{ + "dataType": "%s", + "description": "TestSubscription", + "deliverDataOnly": false, + "id": "%s", + "meta": { + "lastUpdated": "2022-12-06T10:23:20.819Z", + "versionId": "1" + }, + "name": "dbs_d77e1fb8-9e13-48e0-9d20-b6365793a181_%s_%s", + "resourceType": "TopicSubscription", + "ruleName": "dbs_d77e1fb8-9e13-48e0-9d20-b6365793a181_%s_%s", + "status": "%s", + "subscriber": { + "id": "%s", + "type": "SQSSubscriber", + "location": "https://databroker-client-test.eu01.connect.hsdp.io/client-test/connect/databroker/Subscriber/SQS/%s" + } +}`, dataType, id, infix, id, infix, id, status, subscriberId, subscriberId) +} + +func TestSubscriptionCRUD(t *testing.T) { + teardown := setup(t) + defer teardown() + + sqsID := "9f80f9e0-5cb2-4ebd-8980-03f550cb453f" + subscriptionID := "1ca7251b-42a1-4560-99a5-2b359c6f3914" + infix := "my_infix" + dataType := "my-datatype" + muxDBS.HandleFunc("/client-test/connect/databroker/Subscription/Topic", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "POST": + w.Header().Set("Etag", "1") + w.WriteHeader(http.StatusCreated) + _, _ = io.WriteString(w, subscriptionBody(subscriptionID, dataType, infix, "Creating", sqsID)) + } + }) + muxDBS.HandleFunc("/client-test/connect/databroker/Subscription/Topic/"+subscriptionID, func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + switch r.Method { + case "GET": + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, subscriptionBody(subscriptionID, dataType, infix, "Active", sqsID)) + case "PUT": + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, subscriptionBody(subscriptionID, dataType, infix, "Updating", sqsID)) + case "DELETE": + w.WriteHeader(http.StatusNoContent) + } + }) + + created, resp, err := dbsClient.Subscriptions.CreateTopicSubscription(dbs.TopicSubscriptionConfig{ + NameInfix: infix, + Description: "MyTopicSubscription", + SubscriberId: sqsID, + DataType: dataType, + }) + if !assert.Nil(t, err) { + return + } + if !assert.NotNil(t, resp) { + return + } + if !assert.NotNil(t, created) { + return + } + assert.Equal(t, dataType, created.DataType) + assert.Equal(t, sqsID, created.Subscriber.ID) + assert.Equal(t, subscriptionID, created.ID) + assert.NotNil(t, created.Status) + + res, resp, err := dbsClient.Subscriptions.DeleteTopicSubscription(*created) + if !assert.Nil(t, err) { + return + } + if !assert.NotNil(t, resp) { + return + } + assert.True(t, res) + assert.Equal(t, http.StatusNoContent, resp.StatusCode()) +}