Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: cloud integrations: agent check-in api #7004

Merged
merged 14 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions pkg/query-service/app/cloudintegrations/availableServices.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,25 @@ func readAllServiceDefinitions() error {
continue
}

cloudProviderDirPath := path.Join(rootDirName, d.Name())
cloudServices, err := readServiceDefinitionsFromDir(cloudProviderDirPath)
cloudProvider := d.Name()

cloudProviderDirPath := path.Join(rootDirName, cloudProvider)
cloudServices, err := readServiceDefinitionsFromDir(cloudProvider, cloudProviderDirPath)
if err != nil {
return fmt.Errorf("couldn't read %s service definitions", d.Name())
return fmt.Errorf("couldn't read %s service definitions: %w", cloudProvider, err)
}

if len(cloudServices) < 1 {
return fmt.Errorf("no %s services could be read", d.Name())
return fmt.Errorf("no %s services could be read", cloudProvider)
}

availableServices[d.Name()] = cloudServices
availableServices[cloudProvider] = cloudServices
}

return nil
}

func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
func readServiceDefinitionsFromDir(cloudProvider string, cloudProviderDirPath string) (
map[string]CloudServiceDetails, error,
) {
svcDefDirs, err := fs.ReadDir(serviceDefinitionFiles, cloudProviderDirPath)
Expand All @@ -118,7 +120,7 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
}

svcDirPath := path.Join(cloudProviderDirPath, d.Name())
s, err := readServiceDefinition(svcDirPath)
s, err := readServiceDefinition(cloudProvider, svcDirPath)
if err != nil {
return nil, fmt.Errorf("couldn't read svc definition for %s: %w", d.Name(), err)
}
Expand All @@ -135,14 +137,14 @@ func readServiceDefinitionsFromDir(cloudProviderDirPath string) (
return svcDefs, nil
}

func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(dirpath, "integration.json")
func readServiceDefinition(cloudProvider string, svcDirpath string) (*CloudServiceDetails, error) {
integrationJsonPath := path.Join(svcDirpath, "integration.json")

serializedSpec, err := serviceDefinitionFiles.ReadFile(integrationJsonPath)
if err != nil {
return nil, fmt.Errorf(
"couldn't find integration.json in %s: %w",
dirpath, err,
svcDirpath, err,
)
}

Expand All @@ -155,28 +157,17 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
}

hydrated, err := integrations.HydrateFileUris(
integrationSpec, serviceDefinitionFiles, dirpath,
integrationSpec, serviceDefinitionFiles, svcDirpath,
)
if err != nil {
return nil, fmt.Errorf(
"couldn't hydrate files referenced in service definition %s: %w",
integrationJsonPath, err,
)
}
hydratedSpec := hydrated.(map[string]any)

hydratedSpec := hydrated.(map[string]interface{})
hydratedSpecJson, err := koanfJson.Parser().Marshal(hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't serialize hydrated integration spec back to JSON %s: %w",
integrationJsonPath, err,
)
}

var serviceDef CloudServiceDetails
decoder := json.NewDecoder(bytes.NewReader(hydratedSpecJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&serviceDef)
serviceDef, err := ParseStructWithJsonTagsFromMap[CloudServiceDetails](hydratedSpec)
if err != nil {
return nil, fmt.Errorf(
"couldn't parse hydrated JSON spec read from %s: %w",
Expand All @@ -189,11 +180,13 @@ func readServiceDefinition(dirpath string) (*CloudServiceDetails, error) {
return nil, fmt.Errorf("invalid service definition %s: %w", serviceDef.Id, err)
}

return &serviceDef, nil
serviceDef.TelemetryCollectionStrategy.Provider = cloudProvider

return serviceDef, nil

}

func validateServiceDefinition(s CloudServiceDetails) error {
func validateServiceDefinition(s *CloudServiceDetails) error {
// Validate dashboard data
seenDashboardIds := map[string]interface{}{}
for _, dd := range s.Assets.Dashboards {
Expand All @@ -211,7 +204,29 @@ func validateServiceDefinition(s CloudServiceDetails) error {
seenDashboardIds[dashboardId] = nil
}

if s.TelemetryCollectionStrategy == nil {
return fmt.Errorf("telemetry_collection_strategy is required")
}

// potentially more to follow

return nil
}

func ParseStructWithJsonTagsFromMap[StructType any](data map[string]any) (
*StructType, error,
) {
mapJson, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("couldn't marshal map to json: %w", err)
}

var res StructType
decoder := json.NewDecoder(bytes.NewReader(mapJson))
decoder.DisallowUnknownFields()
err = decoder.Decode(&res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal json back to struct: %w", err)
}
return &res, nil
}
82 changes: 80 additions & 2 deletions pkg/query-service/app/cloudintegrations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/jmoiron/sqlx"
"go.signoz.io/signoz/pkg/query-service/model"
"golang.org/x/exp/maps"
)

var SupportedCloudProviders = []string{
Expand Down Expand Up @@ -92,6 +93,11 @@ type GenerateConnectionUrlRequest struct {
type SigNozAgentConfig struct {
// The region in which SigNoz agent should be installed.
Region string `json:"region"`

raj-k-singh marked this conversation as resolved.
Show resolved Hide resolved
IngestionUrl string `json:"ingestion_url"`
IngestionKey string `json:"ingestion_key"`
SigNozAPIUrl string `json:"signoz_api_url"`
SigNozAPIKey string `json:"signoz_api_key"`
}

type GenerateConnectionUrlResponse struct {
Expand Down Expand Up @@ -163,7 +169,17 @@ type AgentCheckInRequest struct {
}

type AgentCheckInResponse struct {
Account AccountRecord `json:"account"`
AccountId string `json:"account_id"`
CloudAccountId string `json:"cloud_account_id"`
RemovedAt *time.Time `json:"removed_at"`

IntegrationConfig IntegrationConfigForAgent `json:"integration_config"`
}

type IntegrationConfigForAgent struct {
EnabledRegions []string `json:"enabled_regions"`

TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry,omitempty"`
}

func (c *Controller) CheckInAsAgent(
Expand Down Expand Up @@ -201,8 +217,70 @@ func (c *Controller) CheckInAsAgent(
return nil, model.WrapApiError(apiErr, "couldn't upsert cloud account")
}

// prepare and return integration config to be consumed by agent
telemetryCollectionStrategy, err := NewCloudTelemetryCollectionStrategy(cloudProvider)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't init telemetry collection strategy: %w", err,
))
}

agentConfig := IntegrationConfigForAgent{
EnabledRegions: []string{},
TelemetryCollectionStrategy: telemetryCollectionStrategy,
}

if account.Config != nil && account.Config.EnabledRegions != nil {
agentConfig.EnabledRegions = account.Config.EnabledRegions
}

services, apiErr := listCloudProviderServices(cloudProvider)
if apiErr != nil {
return nil, model.WrapApiError(apiErr, "couldn't list cloud services")
}
svcDetailsById := map[string]*CloudServiceDetails{}
for _, svcDetails := range services {
svcDetailsById[svcDetails.Id] = &svcDetails
}

svcConfigs, apiErr := c.serviceConfigRepo.getAllForAccount(
ctx, cloudProvider, *account.CloudAccountId,
)
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "couldn't get service configs for cloud account",
)
}

// accumulate config in a fixed order to ensure same config generated across runs
configuredSvcIds := maps.Keys(svcConfigs)
slices.Sort(configuredSvcIds)

for _, svcId := range configuredSvcIds {
svcDetails := svcDetailsById[svcId]
svcConfig := svcConfigs[svcId]

if svcDetails != nil {
metricsEnabled := svcConfig.Metrics != nil && svcConfig.Metrics.Enabled
logsEnabled := svcConfig.Logs != nil && svcConfig.Logs.Enabled
if logsEnabled || metricsEnabled {
err := agentConfig.TelemetryCollectionStrategy.AddServiceStrategy(
svcDetails.TelemetryCollectionStrategy, logsEnabled, metricsEnabled,
)
if err != nil {
return nil, model.InternalError(fmt.Errorf(
"couldn't add service telemetry collection strategy: %w", err,
))
}
}
}
}

return &AgentCheckInResponse{
Account: *account,
AccountId: account.Id,
CloudAccountId: *account.CloudAccountId,
RemovedAt: account.RemovedAt,
IntegrationConfig: agentConfig,
}, nil
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/query-service/app/cloudintegrations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func TestAgentCheckIns(t *testing.T) {
},
)
require.Nil(apiErr)
require.Equal(testAccountId1, resp1.Account.Id)
require.Equal(testCloudAccountId1, *resp1.Account.CloudAccountId)
require.Equal(testAccountId1, resp1.AccountId)
require.Equal(testCloudAccountId1, resp1.CloudAccountId)

// The agent should not be able to check in with a different
// cloud account id for the same account.
Expand Down Expand Up @@ -262,9 +262,10 @@ func makeTestConnectedAccount(t *testing.T, controller *Controller, cloudAccount
},
)
require.Nil(apiErr)
require.Equal(testAccountId, resp.Account.Id)
require.Equal(cloudAccountId, *resp.Account.CloudAccountId)

return &resp.Account
require.Equal(testAccountId, resp.AccountId)
require.Equal(cloudAccountId, resp.CloudAccountId)

acc, err := controller.accountsRepo.get(context.TODO(), "aws", resp.AccountId)
require.Nil(err)
return acc
}
106 changes: 106 additions & 0 deletions pkg/query-service/app/cloudintegrations/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type CloudServiceDetails struct {
DataCollected DataCollectedForService `json:"data_collected"`

ConnectionStatus *CloudServiceConnectionStatus `json:"status,omitempty"`

TelemetryCollectionStrategy *CloudTelemetryCollectionStrategy `json:"telemetry_collection_strategy"`
}

type CloudServiceConfig struct {
Expand Down Expand Up @@ -216,3 +218,107 @@ type SignalConnectionStatus struct {
LastReceivedTsMillis int64 `json:"last_received_ts_ms"` // epoch milliseconds
LastReceivedFrom string `json:"last_received_from"` // resource identifier
}

type CloudTelemetryCollectionStrategy struct {
Provider string `json:"provider"`

AWSMetrics *AWSMetricsCollectionStrategy `json:"aws_metrics,omitempty"`
AWSLogs *AWSLogsCollectionStrategy `json:"aws_logs,omitempty"`
}

func NewCloudTelemetryCollectionStrategy(provider string) (*CloudTelemetryCollectionStrategy, error) {
if provider == "aws" {
raj-k-singh marked this conversation as resolved.
Show resolved Hide resolved
return &CloudTelemetryCollectionStrategy{
Provider: "aws",
AWSMetrics: &AWSMetricsCollectionStrategy{
CloudwatchMetricsStreamFilters: []CloudwatchMetricStreamFilter{},
},
AWSLogs: &AWSLogsCollectionStrategy{
CloudwatchLogsSubscriptions: []CloudwatchLogsSubscriptionConfig{},
},
}, nil
}

return nil, fmt.Errorf("unsupported cloud provider: %s", provider)
}

// Helper for accumulating strategies for enabled services.
func (cs *CloudTelemetryCollectionStrategy) AddServiceStrategy(
svcStrategy *CloudTelemetryCollectionStrategy,
logsEnabled bool,
metricsEnabled bool,
) error {
if svcStrategy.Provider != cs.Provider {
return fmt.Errorf(
"can't add %s service strategy to strategy for %s",
svcStrategy.Provider, cs.Provider,
)
}

if cs.Provider == "aws" {
if logsEnabled {
cs.AWSLogs.AddServiceStrategy(svcStrategy.AWSLogs)
}
if metricsEnabled {
cs.AWSMetrics.AddServiceStrategy(svcStrategy.AWSMetrics)
}
return nil
}

return fmt.Errorf("unsupported cloud provider: %s", cs.Provider)
raj-k-singh marked this conversation as resolved.
Show resolved Hide resolved

}

type AWSMetricsCollectionStrategy struct {
// to be used as https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cloudwatch-metricstream.html#cfn-cloudwatch-metricstream-includefilters
CloudwatchMetricsStreamFilters []CloudwatchMetricStreamFilter `json:"cloudwatch_metric_stream_filters"`
}

type CloudwatchMetricStreamFilter struct {
// json tags here are in the shape expected by AWS API as detailed at
// https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-cloudwatch-metricstream-metricstreamfilter.html
Namespace string `json:"Namespace"`
MetricNames []string `json:"MetricNames,omitempty"`
}

func (amc *AWSMetricsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSMetricsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}

amc.CloudwatchMetricsStreamFilters = append(
amc.CloudwatchMetricsStreamFilters,
svcStrategy.CloudwatchMetricsStreamFilters...,
)
return nil
}

type AWSLogsCollectionStrategy struct {
CloudwatchLogsSubscriptions []CloudwatchLogsSubscriptionConfig `json:"cloudwatch_logs_subscriptions"`
}

type CloudwatchLogsSubscriptionConfig struct {
// subscribe to all logs groups with specified prefix.
// eg: `/aws/rds/`
LogGroupNamePrefix string `json:"log_group_name_prefix"`

// https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html
// "" implies no filtering is required.
FilterPattern string `json:"filter_pattern"`
}

func (alc *AWSLogsCollectionStrategy) AddServiceStrategy(
svcStrategy *AWSLogsCollectionStrategy,
) error {
if svcStrategy == nil {
return nil
}

alc.CloudwatchLogsSubscriptions = append(
alc.CloudwatchLogsSubscriptions,
svcStrategy.CloudwatchLogsSubscriptions...,
)
return nil
}
Loading
Loading