Skip to content

Commit

Permalink
Feat: cloud integrations: agent check-in api (#7004)
Browse files Browse the repository at this point in the history
* chore: cloudintegrations: rename rds def

* feat: cloudintegrations: shape of agent check in response for v0 release

* chore: cloudintegrations: add validation for response expected after agent check in

* chore: cloudintegrations: accumulate teletry collection strategies for enabled services

* chore: cloudintegrations: use map struct to parse from map to struct with json tags

* chore: cloudintegrations: telemetry collection strategy for services

* chore: cloudintegrations: wrap up test for agent check in resp

* chore: some cleanup

* chore: some cleanup

* chore: some minor renaming

* chore: address review comment

* chore: some cleanup

---------

Co-authored-by: Srikanth Chekuri <[email protected]>
  • Loading branch information
raj-k-singh and srikanthccv authored Feb 3, 2025
1 parent 784dccf commit 3b550c4
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 50 deletions.
67 changes: 41 additions & 26 deletions pkg/query-service/app/cloudintegrations/availableServices.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,25 @@ func readAllServiceDefinitions() error {
continue
}

cloudProviderDirPath := path.Join(rootDirName, d.Name())
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
cloudProvider := d.Name()

cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions", d.Name())
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}

if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", d.Name())
return fmt.Errorf("no %s services could be read", cloudProvider)
}

availableServices[d.Name()] = cloudServices
availableServices[cloudProvider] = cloudServices
}

return nil
}

func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
Expand All @@ -118,7 +120,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
}

svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(svcDirPath)
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
Expand All @@ -135,14 +137,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
return svcDefs, nil
}

func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(dirpath, "integration.json")
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")

serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
dirpath, err,
svcDirpath, err,
)
}

Expand All @@ -155,28 +157,17 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
}

hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, dirpath,
integrationSpec, serviceDefinitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)

hydratedSpec := hydrated.(map[string]interface{})
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize hydrated integration spec back to JSON %s: %w",
integrationJsonPath, err,
)
}

var serviceDef CloudServiceDetails
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&serviceDef)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
Expand All @@ -189,11 +180,13 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}

return &serviceDef, nil
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider

return serviceDef, nil

}

func validateServiceDefinition(s CloudServiceDetails) error {
func validateServiceDefinition(s *CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
Expand All @@ -211,7 +204,29 @@ func validateServiceDefinition(s CloudServiceDetails) error {
seenDashboardIds[dashboardId] = nil
}

if s.TelemetryCollectionStrategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}

// potentially more to follow

return nil
}

func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
*StructType, error,
) {
mapJson, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
}

var res StructType
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
}
return &res, nil
}
82 changes: 80 additions & 2 deletions pkg/query-service/app/cloudintegrations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)

var SupportedCloudProviders = []string{
Expand Down Expand Up @@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct {
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`

IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
}

type GenerateConnectionUrlResponse struct {
Expand Down Expand Up @@ -163,7 +169,17 @@ type AgentCheckInRequest struct {
}

type AgentCheckInResponse struct {
Account AccountRecord `json:"account"`
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`

IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}

type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`

TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
}

func (c *Controller) CheckInAsAgent(
Expand Down Expand Up @@ -201,8 +217,70 @@ func (c *Controller) CheckInAsAgent(
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}

// prepare and return integration config to be consumed by agent
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
))
}

agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: telemetryCollectionStrategy,
}

if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}

services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}

svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *account.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}

// accumulate config in a fixed order to ensure same config generated across runs
configuredSvcIds := maps.Keys(svcConfigs)
slices.Sort(configuredSvcIds)

for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]

if svcDetails != nil {
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
if logsEnabled || metricsEnabled {
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
}
}

return &AgentCheckInResponse{
Account: *account,
AccountId: account.Id,
CloudAccountId: *account.CloudAccountId,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/query-service/app/cloudintegrations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) {
},
)
require.Nil(apiErr)
require.Equal(testAccountId1, resp1.Account.Id)
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
require.Equal(testAccountId1, resp1.AccountId)
require.Equal(testCloudAccountId1, resp1.CloudAccountId)

// The agent should not be able to check in with a different
// cloud account id for the same account.
Expand Down Expand Up @@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp.Account.Id)
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)

return &resp.Account
require.Equal(testAccountId, resp.AccountId)
require.Equal(cloudAccountId, resp.CloudAccountId)

acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId)
require.Nil(err)
return acc
}
106 changes: 106 additions & 0 deletions pkg/query-service/app/cloudintegrations/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type CloudServiceDetails struct {
DataCollected DataCollectedForService `json:"data_collected"`

ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`

TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
}

type CloudServiceConfig struct {
Expand Down Expand Up @@ -216,3 +218,107 @@ type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}

type CloudTelemetryCollectionStrategy struct {
Provider string `json:"provider"`

AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
}

func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
if provider == "aws" {
return &CloudTelemetryCollectionStrategy{
Provider: "aws",
AWSMetrics: &AWSMetricsCollectionStrategy{
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{},
},
AWSLogs: &AWSLogsCollectionStrategy{
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
},
}, nil
}

return nil, fmt.Errorf("unsupported cloud provider: %s", provider)
}

// Helper for accumulating strategies for enabled services.
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
svcStrategy *CloudTelemetryCollectionStrategy,
logsEnabled bool,
metricsEnabled bool,
) error {
if svcStrategy.Provider != cs.Provider {
return fmt.Errorf(
"can't add %s service strategy to strategy for %s",
svcStrategy.Provider, cs.Provider,
)
}

if cs.Provider == "aws" {
if logsEnabled {
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs)
}
if metricsEnabled {
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics)
}
return nil
}

return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)

}

type AWSMetricsCollectionStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
}

type CloudwatchMetricStreamFilter struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
}

func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSMetricsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}

amc.CloudwatchMetricsStreamFilters = append(
amc.CloudwatchMetricsStreamFilters,
svcStrategy.CloudwatchMetricsStreamFilters...,
)
return nil
}

type AWSLogsCollectionStrategy struct {
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
}

type CloudwatchLogsSubscriptionConfig struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`

// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
}

func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSLogsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}

alc.CloudwatchLogsSubscriptions = append(
alc.CloudwatchLogsSubscriptions,
svcStrategy.CloudwatchLogsSubscriptions...,
)
return nil
}
Loading

0 comments on commit 3b550c4

Please sign in to comment.