From 3b550c485d4bbe5365464bd31190fff5e0b4d64e Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Mon, 3 Feb 2025 20:52:15 +0530 Subject: [PATCH] Feat: cloud integrations: agent check-in api (#7004) * chore: cloudintegrations: rename rds def * feat: cloudintegrations: shape of agent check in response for v0 release * chore: cloudintegrations: add validation for response expected after agent check in * chore: cloudintegrations: accumulate teletry collection strategies for enabled services * chore: cloudintegrations: use map struct to parse from map to struct with json tags * chore: cloudintegrations: telemetry collection strategy for services * chore: cloudintegrations: wrap up test for agent check in resp * chore: some cleanup * chore: some cleanup * chore: some minor renaming * chore: address review comment * chore: some cleanup --------- Co-authored-by: Srikanth Chekuri --- .../cloudintegrations/availableServices.go | 67 ++++--- .../app/cloudintegrations/controller.go | 82 ++++++++- .../app/cloudintegrations/controller_test.go | 13 +- .../app/cloudintegrations/model.go | 106 +++++++++++ .../aws/ec2/integration.json | 9 + .../aws/{rdsPostgres => rds}/icon.svg | 0 .../aws/{rdsPostgres => rds}/integration.json | 21 ++- .../aws/{rdsPostgres => rds}/overview.md | 0 .../signoz_cloud_integrations_test.go | 168 ++++++++++++++++-- 9 files changed, 416 insertions(+), 50 deletions(-) rename pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/{rdsPostgres => rds}/icon.svg (100%) rename pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/{rdsPostgres => rds}/integration.json (60%) rename pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/{rdsPostgres => rds}/overview.md (100%) diff --git a/pkg/query-service/app/cloudintegrations/availableServices.go b/pkg/query-service/app/cloudintegrations/availableServices.go index 474077e7e4f..f2cf7c80675 100644 --- a/pkg/query-service/app/cloudintegrations/availableServices.go +++ b/pkg/query-service/app/cloudintegrations/availableServices.go @@ -86,23 +86,25 @@ func readAllServiceDefinitions() error { continue } - cloudProviderDirPath := path.Join(rootDirName, d.Name()) - cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath) + cloudProvider := d.Name() + + cloudProviderDirPath := path.Join(rootDirName, cloudProvider) + cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath) if err != nil { - return fmt.Errorf("couldn't read %s service definitions", d.Name()) + return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err) } if len(cloudServices) < 1 { - return fmt.Errorf("no %s services could be read", d.Name()) + return fmt.Errorf("no %s services could be read", cloudProvider) } - availableServices[d.Name()] = cloudServices + availableServices[cloudProvider] = cloudServices } return nil } -func readServiceDefinitionsFromDir(cloudProviderDirPath string) ( +func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) ( map[string]CloudServiceDetails, error, ) { svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath) @@ -118,7 +120,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) ( } svcDirPath := path.Join(cloudProviderDirPath, d.Name()) - s, err := readServiceDefinition(svcDirPath) + s, err := readServiceDefinition(cloudProvider, svcDirPath) if err != nil { return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err) } @@ -135,14 +137,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) ( return svcDefs, nil } -func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) { - integrationJsonPath := path.Join(dirpath, "integration.json") +func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) { + integrationJsonPath := path.Join(svcDirpath, "integration.json") serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath) if err != nil { return nil, fmt.Errorf( "couldn't find integration.json in %s: %w", - dirpath, err, + svcDirpath, err, ) } @@ -155,7 +157,7 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) { } hydrated, err := integrations.HydrateFileUris( - integrationSpec, serviceDefinitionFiles, dirpath, + integrationSpec, serviceDefinitionFiles, svcDirpath, ) if err != nil { return nil, fmt.Errorf( @@ -163,20 +165,9 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) { integrationJsonPath, err, ) } + hydratedSpec := hydrated.(map[string]any) - hydratedSpec := hydrated.(map[string]interface{}) - hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec) - if err != nil { - return nil, fmt.Errorf( - "couldn't serialize hydrated integration spec back to JSON %s: %w", - integrationJsonPath, err, - ) - } - - var serviceDef CloudServiceDetails - decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson)) - decoder.DisallowUnknownFields() - err = decoder.Decode(&serviceDef) + serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec) if err != nil { return nil, fmt.Errorf( "couldn't parse hydrated JSON spec read from %s: %w", @@ -189,11 +180,13 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) { return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err) } - return &serviceDef, nil + serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider + + return serviceDef, nil } -func validateServiceDefinition(s CloudServiceDetails) error { +func validateServiceDefinition(s *CloudServiceDetails) error { // Validate dashboard data seenDashboardIds := map[string]interface{}{} for _, dd := range s.Assets.Dashboards { @@ -211,7 +204,29 @@ func validateServiceDefinition(s CloudServiceDetails) error { seenDashboardIds[dashboardId] = nil } + if s.TelemetryCollectionStrategy == nil { + return fmt.Errorf("telemetry_collection_strategy is required") + } + // potentially more to follow return nil } + +func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) ( + *StructType, error, +) { + mapJson, err := json.Marshal(data) + if err != nil { + return nil, fmt.Errorf("couldn't marshal map to json: %w", err) + } + + var res StructType + decoder := json.NewDecoder(bytes.NewReader(mapJson)) + decoder.DisallowUnknownFields() + err = decoder.Decode(&res) + if err != nil { + return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err) + } + return &res, nil +} diff --git a/pkg/query-service/app/cloudintegrations/controller.go b/pkg/query-service/app/cloudintegrations/controller.go index 04b0407847a..7d7bab77bff 100644 --- a/pkg/query-service/app/cloudintegrations/controller.go +++ b/pkg/query-service/app/cloudintegrations/controller.go @@ -8,6 +8,7 @@ import ( "github.com/jmoiron/sqlx" "go.signoz.io/signoz/pkg/query-service/model" + "golang.org/x/exp/maps" ) var SupportedCloudProviders = []string{ @@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct { type SigNozAgentConfig struct { // The region in which SigNoz agent should be installed. Region string `json:"region"` + + IngestionUrl string `json:"ingestion_url"` + IngestionKey string `json:"ingestion_key"` + SigNozAPIUrl string `json:"signoz_api_url"` + SigNozAPIKey string `json:"signoz_api_key"` } type GenerateConnectionUrlResponse struct { @@ -163,7 +169,17 @@ type AgentCheckInRequest struct { } type AgentCheckInResponse struct { - Account AccountRecord `json:"account"` + AccountId string `json:"account_id"` + CloudAccountId string `json:"cloud_account_id"` + RemovedAt *time.Time `json:"removed_at"` + + IntegrationConfig IntegrationConfigForAgent `json:"integration_config"` +} + +type IntegrationConfigForAgent struct { + EnabledRegions []string `json:"enabled_regions"` + + TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"` } func (c *Controller) CheckInAsAgent( @@ -201,8 +217,70 @@ func (c *Controller) CheckInAsAgent( return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account") } + // prepare and return integration config to be consumed by agent + telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't init telemetry collection strategy: %w", err, + )) + } + + agentConfig := IntegrationConfigForAgent{ + EnabledRegions: []string{}, + TelemetryCollectionStrategy: telemetryCollectionStrategy, + } + + if account.Config != nil && account.Config.EnabledRegions != nil { + agentConfig.EnabledRegions = account.Config.EnabledRegions + } + + services, apiErr := listCloudProviderServices(cloudProvider) + if apiErr != nil { + return nil, model.WrapApiError(apiErr, "couldn't list cloud services") + } + svcDetailsById := map[string]*CloudServiceDetails{} + for _, svcDetails := range services { + svcDetailsById[svcDetails.Id] = &svcDetails + } + + svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount( + ctx, cloudProvider, *account.CloudAccountId, + ) + if apiErr != nil { + return nil, model.WrapApiError( + apiErr, "couldn't get service configs for cloud account", + ) + } + + // accumulate config in a fixed order to ensure same config generated across runs + configuredSvcIds := maps.Keys(svcConfigs) + slices.Sort(configuredSvcIds) + + for _, svcId := range configuredSvcIds { + svcDetails := svcDetailsById[svcId] + svcConfig := svcConfigs[svcId] + + if svcDetails != nil { + metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled + logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled + if logsEnabled || metricsEnabled { + err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy( + svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled, + ) + if err != nil { + return nil, model.InternalError(fmt.Errorf( + "couldn't add service telemetry collection strategy: %w", err, + )) + } + } + } + } + return &AgentCheckInResponse{ - Account: *account, + AccountId: account.Id, + CloudAccountId: *account.CloudAccountId, + RemovedAt: account.RemovedAt, + IntegrationConfig: agentConfig, }, nil } diff --git a/pkg/query-service/app/cloudintegrations/controller_test.go b/pkg/query-service/app/cloudintegrations/controller_test.go index 7b361615d08..70673f457d6 100644 --- a/pkg/query-service/app/cloudintegrations/controller_test.go +++ b/pkg/query-service/app/cloudintegrations/controller_test.go @@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) { }, ) require.Nil(apiErr) - require.Equal(testAccountId1, resp1.Account.Id) - require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId) + require.Equal(testAccountId1, resp1.AccountId) + require.Equal(testCloudAccountId1, resp1.CloudAccountId) // The agent should not be able to check in with a different // cloud account id for the same account. @@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount }, ) require.Nil(apiErr) - require.Equal(testAccountId, resp.Account.Id) - require.Equal(cloudAccountId, *resp.Account.CloudAccountId) - - return &resp.Account + require.Equal(testAccountId, resp.AccountId) + require.Equal(cloudAccountId, resp.CloudAccountId) + acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId) + require.Nil(err) + return acc } diff --git a/pkg/query-service/app/cloudintegrations/model.go b/pkg/query-service/app/cloudintegrations/model.go index 5f35336b9fe..f64def30bbc 100644 --- a/pkg/query-service/app/cloudintegrations/model.go +++ b/pkg/query-service/app/cloudintegrations/model.go @@ -140,6 +140,8 @@ type CloudServiceDetails struct { DataCollected DataCollectedForService `json:"data_collected"` ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"` + + TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"` } type CloudServiceConfig struct { @@ -216,3 +218,107 @@ type SignalConnectionStatus struct { LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds LastReceivedFrom string `json:"last_received_from"` // resource identifier } + +type CloudTelemetryCollectionStrategy struct { + Provider string `json:"provider"` + + AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"` + AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"` +} + +func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) { + if provider == "aws" { + return &CloudTelemetryCollectionStrategy{ + Provider: "aws", + AWSMetrics: &AWSMetricsCollectionStrategy{ + CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{}, + }, + AWSLogs: &AWSLogsCollectionStrategy{ + CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{}, + }, + }, nil + } + + return nil, fmt.Errorf("unsupported cloud provider: %s", provider) +} + +// Helper for accumulating strategies for enabled services. +func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy( + svcStrategy *CloudTelemetryCollectionStrategy, + logsEnabled bool, + metricsEnabled bool, +) error { + if svcStrategy.Provider != cs.Provider { + return fmt.Errorf( + "can't add %s service strategy to strategy for %s", + svcStrategy.Provider, cs.Provider, + ) + } + + if cs.Provider == "aws" { + if logsEnabled { + cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs) + } + if metricsEnabled { + cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics) + } + return nil + } + + return fmt.Errorf("unsupported cloud provider: %s", cs.Provider) + +} + +type AWSMetricsCollectionStrategy struct { + // to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters + CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"` +} + +type CloudwatchMetricStreamFilter struct { + // json tags here are in the shape expected by AWS API as detailed at + // https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html + Namespace string `json:"Namespace"` + MetricNames []string `json:"MetricNames,omitempty"` +} + +func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy( + svcStrategy *AWSMetricsCollectionStrategy, +) error { + if svcStrategy == nil { + return nil + } + + amc.CloudwatchMetricsStreamFilters = append( + amc.CloudwatchMetricsStreamFilters, + svcStrategy.CloudwatchMetricsStreamFilters..., + ) + return nil +} + +type AWSLogsCollectionStrategy struct { + CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"` +} + +type CloudwatchLogsSubscriptionConfig struct { + // subscribe to all logs groups with specified prefix. + // eg: `/aws/rds/` + LogGroupNamePrefix string `json:"log_group_name_prefix"` + + // https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html + // "" implies no filtering is required. + FilterPattern string `json:"filter_pattern"` +} + +func (alc *AWSLogsCollectionStrategy) AddServiceStrategy( + svcStrategy *AWSLogsCollectionStrategy, +) error { + if svcStrategy == nil { + return nil + } + + alc.CloudwatchLogsSubscriptions = append( + alc.CloudwatchLogsSubscriptions, + svcStrategy.CloudwatchLogsSubscriptions..., + ) + return nil +} diff --git a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/ec2/integration.json b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/ec2/integration.json index 55f85ca5bd4..e9ef4944d84 100644 --- a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/ec2/integration.json +++ b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/ec2/integration.json @@ -26,5 +26,14 @@ } ], "logs": [] + }, + "telemetry_collection_strategy": { + "aws_metrics": { + "cloudwatch_metric_stream_filters": [ + { + "Namespace": "AWS/EC2" + } + ] + } } } \ No newline at end of file diff --git a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/icon.svg b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/icon.svg similarity index 100% rename from pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/icon.svg rename to pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/icon.svg diff --git a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/integration.json b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/integration.json similarity index 60% rename from pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/integration.json rename to pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/integration.json index 969cbf56b45..396b4dfb62a 100644 --- a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/integration.json +++ b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/integration.json @@ -1,6 +1,6 @@ { - "id": "rds-postgres", - "title": "RDS Postgres", + "id": "rds", + "title": "Amazon RDS", "icon": "file://icon.svg", "overview": "file://overview.md", "assets": { @@ -26,5 +26,22 @@ } ], "logs": [] + }, + "telemetry_collection_strategy": { + "aws_metrics": { + "cloudwatch_metric_stream_filters": [ + { + "Namespace": "AWS/RDS" + } + ] + }, + "aws_logs": { + "cloudwatch_logs_subscriptions": [ + { + "log_group_name_prefix": "/aws/rds", + "filter_pattern": "" + } + ] + } } } \ No newline at end of file diff --git a/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/overview.md b/pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/overview.md similarity index 100% rename from pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rdsPostgres/overview.md rename to pkg/query-service/app/cloudintegrations/serviceDefinitions/aws/rds/overview.md diff --git a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go index 07424b18160..f3a55645f0d 100644 --- a/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go +++ b/pkg/query-service/tests/integration/signoz_cloud_integrations_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "testing" "time" @@ -23,7 +24,6 @@ import ( func TestAWSIntegrationAccountLifecycle(t *testing.T) { // Test for happy path of connecting and managing AWS integration accounts - t0 := time.Now() require := require.New(t) testbed := NewCloudIntegrationsTestBed(t, nil) @@ -67,11 +67,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { CloudAccountId: testAWSAccountId, }, ) - require.Equal(testAccountId, agentCheckInResp.Account.Id) - require.Equal(testAccountConfig, *agentCheckInResp.Account.Config) - require.Equal(testAWSAccountId, *agentCheckInResp.Account.CloudAccountId) - require.LessOrEqual(t0.Unix(), agentCheckInResp.Account.CreatedAt.Unix()) - require.Nil(agentCheckInResp.Account.RemovedAt) + require.Equal(testAccountId, agentCheckInResp.AccountId) + require.Equal(testAWSAccountId, agentCheckInResp.CloudAccountId) + require.Nil(agentCheckInResp.RemovedAt) // Polling for connection status from UI should now return latest status accountStatusResp1 := testbed.GetAccountStatusFromQS("aws", testAccountId) @@ -107,10 +105,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { CloudAccountId: testAWSAccountId, }, ) - require.Equal(testAccountId, agentCheckInResp1.Account.Id) - require.Equal(testAccountConfig2, *agentCheckInResp1.Account.Config) - require.Equal(testAWSAccountId, *agentCheckInResp1.Account.CloudAccountId) - require.Nil(agentCheckInResp1.Account.RemovedAt) + require.Equal(testAccountId, agentCheckInResp1.AccountId) + require.Equal(testAWSAccountId, agentCheckInResp1.CloudAccountId) + require.Nil(agentCheckInResp1.RemovedAt) // Should be able to disconnect/remove account from UI. tsBeforeDisconnect := time.Now() @@ -125,9 +122,9 @@ func TestAWSIntegrationAccountLifecycle(t *testing.T) { CloudAccountId: testAWSAccountId, }, ) - require.Equal(testAccountId, agentCheckInResp2.Account.Id) - require.Equal(testAWSAccountId, *agentCheckInResp2.Account.CloudAccountId) - require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.Account.RemovedAt) + require.Equal(testAccountId, agentCheckInResp2.AccountId) + require.Equal(testAWSAccountId, agentCheckInResp2.CloudAccountId) + require.LessOrEqual(tsBeforeDisconnect, *agentCheckInResp2.RemovedAt) } func TestAWSIntegrationServices(t *testing.T) { @@ -194,6 +191,149 @@ func TestAWSIntegrationServices(t *testing.T) { } +func TestConfigReturnedWhenAgentChecksIn(t *testing.T) { + require := require.New(t) + + testbed := NewCloudIntegrationsTestBed(t, nil) + + // configure a connected account + testAccountConfig := cloudintegrations.AccountConfig{ + EnabledRegions: []string{"us-east-1", "us-east-2"}, + } + connectionUrlResp := testbed.GenerateConnectionUrlFromQS( + "aws", cloudintegrations.GenerateConnectionUrlRequest{ + AgentConfig: cloudintegrations.SigNozAgentConfig{ + Region: "us-east-1", + SigNozAPIKey: "test-api-key", + }, + AccountConfig: testAccountConfig, + }, + ) + testAccountId := connectionUrlResp.AccountId + require.NotEmpty(testAccountId) + require.NotEmpty(connectionUrlResp.ConnectionUrl) + + testAWSAccountId := "389389489489" + checkinResp := testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + + require.Equal(testAccountId, checkinResp.AccountId) + require.Equal(testAWSAccountId, checkinResp.CloudAccountId) + require.Nil(checkinResp.RemovedAt) + require.Equal(testAccountConfig.EnabledRegions, checkinResp.IntegrationConfig.EnabledRegions) + + telemetryCollectionStrategy := checkinResp.IntegrationConfig.TelemetryCollectionStrategy + require.Equal("aws", telemetryCollectionStrategy.Provider) + require.NotNil(telemetryCollectionStrategy.AWSMetrics) + require.Empty(telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters) + require.NotNil(telemetryCollectionStrategy.AWSLogs) + require.Empty(telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions) + + // helper + setServiceConfig := func(svcId string, metricsEnabled bool, logsEnabled bool) { + testSvcConfig := cloudintegrations.CloudServiceConfig{} + if metricsEnabled { + testSvcConfig.Metrics = &cloudintegrations.CloudServiceMetricsConfig{ + Enabled: metricsEnabled, + } + } + if logsEnabled { + testSvcConfig.Logs = &cloudintegrations.CloudServiceLogsConfig{ + Enabled: logsEnabled, + } + } + + updateSvcConfigResp := testbed.UpdateServiceConfigWithQS("aws", svcId, cloudintegrations.UpdateServiceConfigRequest{ + CloudAccountId: testAWSAccountId, + Config: testSvcConfig, + }) + require.Equal(svcId, updateSvcConfigResp.Id) + require.Equal(testSvcConfig, updateSvcConfigResp.Config) + } + + setServiceConfig("ec2", true, false) + setServiceConfig("rds", true, true) + + checkinResp = testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + + require.Equal(testAccountId, checkinResp.AccountId) + require.Equal(testAWSAccountId, checkinResp.CloudAccountId) + require.Nil(checkinResp.RemovedAt) + + integrationConf := checkinResp.IntegrationConfig + require.Equal(testAccountConfig.EnabledRegions, integrationConf.EnabledRegions) + + telemetryCollectionStrategy = integrationConf.TelemetryCollectionStrategy + require.Equal("aws", telemetryCollectionStrategy.Provider) + require.NotNil(telemetryCollectionStrategy.AWSMetrics) + metricStreamNamespaces := []string{} + for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters { + metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace) + } + require.Equal([]string{"AWS/EC2", "AWS/RDS"}, metricStreamNamespaces) + + require.NotNil(telemetryCollectionStrategy.AWSLogs) + logGroupPrefixes := []string{} + for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions { + logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix) + } + require.Equal(1, len(logGroupPrefixes)) + require.True(strings.HasPrefix(logGroupPrefixes[0], "/aws/rds")) + + // change regions and update service configs and validate config changes for agent + testAccountConfig2 := cloudintegrations.AccountConfig{ + EnabledRegions: []string{"us-east-2", "us-west-1"}, + } + latestAccount := testbed.UpdateAccountConfigWithQS( + "aws", testAccountId, testAccountConfig2, + ) + require.Equal(testAccountId, latestAccount.Id) + require.Equal(testAccountConfig2, *latestAccount.Config) + + // disable metrics for one and logs for the other. + // config should be as expected. + setServiceConfig("ec2", false, false) + setServiceConfig("rds", true, false) + + checkinResp = testbed.CheckInAsAgentWithQS( + "aws", cloudintegrations.AgentCheckInRequest{ + AccountId: testAccountId, + CloudAccountId: testAWSAccountId, + }, + ) + require.Equal(testAccountId, checkinResp.AccountId) + require.Equal(testAWSAccountId, checkinResp.CloudAccountId) + require.Nil(checkinResp.RemovedAt) + integrationConf = checkinResp.IntegrationConfig + require.Equal(testAccountConfig2.EnabledRegions, integrationConf.EnabledRegions) + + telemetryCollectionStrategy = integrationConf.TelemetryCollectionStrategy + require.Equal("aws", telemetryCollectionStrategy.Provider) + require.NotNil(telemetryCollectionStrategy.AWSMetrics) + metricStreamNamespaces = []string{} + for _, f := range telemetryCollectionStrategy.AWSMetrics.CloudwatchMetricsStreamFilters { + metricStreamNamespaces = append(metricStreamNamespaces, f.Namespace) + } + require.Equal([]string{"AWS/RDS"}, metricStreamNamespaces) + + require.NotNil(telemetryCollectionStrategy.AWSLogs) + logGroupPrefixes = []string{} + for _, f := range telemetryCollectionStrategy.AWSLogs.CloudwatchLogsSubscriptions { + logGroupPrefixes = append(logGroupPrefixes, f.LogGroupNamePrefix) + } + require.Equal(0, len(logGroupPrefixes)) + +} + type CloudIntegrationsTestBed struct { t *testing.T testUser *model.User @@ -412,7 +552,7 @@ func RequestQSAndParseResp[ResponseType any]( err := json.Unmarshal(respDataJson, &resp) if err != nil { - tb.t.Fatalf("could not unmarshal apiResponse.Data json into %T", resp) + tb.t.Fatalf("could not unmarshal apiResponse.Data json into %T: %v", resp, err) } return &resp