diff --git a/conf/example.yaml b/conf/example.yaml index cee9ee2f8ac..f0f081c2eaf 100644 --- a/conf/example.yaml +++ b/conf/example.yaml @@ -77,4 +77,19 @@ apiserver: - /api/v3/logs/livetail logging: excluded_routes: - - /api/v1/health \ No newline at end of file + - /api/v1/health + + +##################### TelemetryStore ##################### +telemetrystore: + # specifies the telemetrystore provider to use. + provider: clickhouse + clickhouse: + # The DSN to use for ClickHouse. + dsn: http://localhost:9000 + # Maximum number of idle connections in the connection pool. + max_idle_conns: 50 + # Maximum number of open connections to the database. + max_open_conns: 100 + # Maximum time to wait for a connection to be established. + dial_timeout: 5s \ No newline at end of file diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 8accbab5036..12d54a575d8 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -26,9 +26,6 @@ type APIHandlerOptions struct { DataConnector interfaces.DataConnector SkipConfig *basemodel.SkipConfig PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration AppDao dao.ModelDao RulesManager *rules.Manager UsageManager *usage.Manager @@ -57,9 +54,6 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { Reader: opts.DataConnector, SkipConfig: opts.SkipConfig, PreferSpanMetrics: opts.PreferSpanMetrics, - MaxIdleConns: opts.MaxIdleConns, - MaxOpenConns: opts.MaxOpenConns, - DialTimeout: opts.DialTimeout, AppDao: opts.AppDao, RuleManager: opts.RulesManager, FeatureFlags: opts.FeatureFlags, diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index afc6f3af69a..121d89d5b36 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -20,22 +20,20 @@ type ClickhouseReader struct { func NewDataConnector( localDB *sqlx.DB, + ch clickhouse.Conn, promConfigPath string, lm interfaces.FeatureLookup, - maxIdleConns int, - maxOpenConns int, - dialTimeout time.Duration, cluster string, useLogsNewSchema bool, useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) + chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) return &ClickhouseReader{ - conn: ch.GetConn(), + conn: ch, appdb: localDB, - ClickHouseReader: ch, + ClickHouseReader: chReader, } } diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index 61c081cb79e..ad5e0ee59db 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -74,9 +74,6 @@ type ServerOptions struct { DisableRules bool RuleRepoURL string PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration CacheConfigPath string FluxInterval string FluxIntervalForTraceDetail string @@ -157,11 +154,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { zap.L().Info("Using ClickHouse as datastore ...") qb := db.NewDataConnector( serverOptions.SigNoz.SQLStore.SQLxDB(), + serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.PromConfigPath, lm, - serverOptions.MaxIdleConns, - serverOptions.MaxOpenConns, - serverOptions.DialTimeout, serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, @@ -245,7 +240,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { } // start the usagemanager - usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn()) + usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN) if err != nil { return nil, err } @@ -266,9 +261,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { DataConnector: reader, SkipConfig: skipConfig, PreferSpanMetrics: serverOptions.PreferSpanMetrics, - MaxIdleConns: serverOptions.MaxIdleConns, - MaxOpenConns: serverOptions.MaxOpenConns, - DialTimeout: serverOptions.DialTimeout, AppDao: modelDao, RulesManager: rm, UsageManager: usageManager, diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 52a34c1b739..b19194cfb4f 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -141,6 +141,10 @@ func main() { envprovider.NewFactory(), fileprovider.NewFactory(), }, + }, signoz.DeprecatedFlags{ + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, }) if err != nil { zap.L().Fatal("Failed to create config", zap.Error(err)) @@ -161,9 +165,6 @@ func main() { PrivateHostPort: baseconst.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, CacheConfigPath: cacheConfigPath, FluxInterval: fluxInterval, FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, diff --git a/ee/query-service/usage/manager.go b/ee/query-service/usage/manager.go index 1014506faa3..61f49c0b15c 100644 --- a/ee/query-service/usage/manager.go +++ b/ee/query-service/usage/manager.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "os" "regexp" "strings" "sync/atomic" @@ -46,9 +45,9 @@ type Manager struct { tenantID string } -func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) { +func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) { hostNameRegex := regexp.MustCompile(`tcp://(?P.*):`) - hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl")) + hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl) tenantID := "" if len(hostNameRegexMatches) == 2 { diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index b9de1db054b..5022088f5af 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -1,11 +1,9 @@ package clickhouseReader import ( - "context" "time" "github.com/ClickHouse/clickhouse-go/v2" - "go.uber.org/zap" ) type Encoding string @@ -18,7 +16,6 @@ const ( ) const ( - defaultDatasource string = "tcp://localhost:9000" defaultTraceDB string = "signoz_traces" defaultOperationsTable string = "distributed_signoz_operations" defaultIndexTable string = "distributed_signoz_index_v2" @@ -58,9 +55,6 @@ type namespaceConfig struct { namespace string Enabled bool Datasource string - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration TraceDB string OperationsTable string IndexTable string @@ -99,37 +93,6 @@ type namespaceConfig struct { // Connecto defines how to connect to the database type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error) -func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) { - ctx := context.Background() - options, err := clickhouse.ParseDSN(cfg.Datasource) - if err != nil { - return nil, err - } - - // Check if the DSN contained any of the following options, if not set from configuration - if options.MaxIdleConns == 0 { - options.MaxIdleConns = cfg.MaxIdleConns - } - if options.MaxOpenConns == 0 { - options.MaxOpenConns = cfg.MaxOpenConns - } - if options.DialTimeout == 0 { - options.DialTimeout = cfg.DialTimeout - } - - zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]), zap.Int("MaxIdleConns", options.MaxIdleConns), zap.Int("MaxOpenConns", options.MaxOpenConns), zap.Duration("DialTimeout", options.DialTimeout)) - db, err := clickhouse.Open(options) - if err != nil { - return nil, err - } - - if err := db.Ping(ctx); err != nil { - return nil, err - } - - return db, nil -} - // Options store storage plugin related configs type Options struct { primary *namespaceConfig @@ -139,26 +102,13 @@ type Options struct { // NewOptions creates a new Options struct. func NewOptions( - datasource string, - maxIdleConns int, - maxOpenConns int, - dialTimeout time.Duration, primaryNamespace string, otherNamespaces ...string, ) *Options { - - if datasource == "" { - datasource = defaultDatasource - } - options := &Options{ primary: &namespaceConfig{ namespace: primaryNamespace, Enabled: true, - Datasource: datasource, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, TraceDB: defaultTraceDB, OperationsTable: defaultOperationsTable, IndexTable: defaultIndexTable, @@ -181,7 +131,6 @@ func NewOptions( WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, - Connector: defaultConnector, LogsTableV2: defaultLogsTableV2, LogsLocalTableV2: defaultLogsLocalTableV2, @@ -200,7 +149,6 @@ func NewOptions( if namespace == archiveNamespace { options.others[namespace] = &namespaceConfig{ namespace: namespace, - Datasource: datasource, TraceDB: "", OperationsTable: "", IndexTable: "", @@ -214,7 +162,6 @@ func NewOptions( WriteBatchDelay: defaultWriteBatchDelay, WriteBatchSize: defaultWriteBatchSize, Encoding: defaultEncoding, - Connector: defaultConnector, } } else { options.others[namespace] = &namespaceConfig{namespace: namespace} diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index ca0d1aaf2bf..f3769061ef4 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -166,26 +166,16 @@ type ClickHouseReader struct { // NewTraceReader returns a TraceReader for the database func NewReader( localDB *sqlx.DB, + db driver.Conn, configFile string, featureFlag interfaces.FeatureLookup, - maxIdleConns int, - maxOpenConns int, - dialTimeout time.Duration, cluster string, useLogsNewSchema bool, useTraceNewSchema bool, fluxIntervalForTraceDetail time.Duration, cache cache.Cache, ) *ClickHouseReader { - - datasource := os.Getenv("ClickHouseUrl") - options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace) - db, err := initialize(options) - - if err != nil { - zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) - } - + options := NewOptions(primaryNamespace, archiveNamespace) return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache) } @@ -208,29 +198,6 @@ func NewReaderFromClickhouseConnection( os.Exit(1) } - regex := os.Getenv("ClickHouseOptimizeReadInOrderRegex") - var regexCompiled *regexp.Regexp - if regex != "" { - regexCompiled, err = regexp.Compile(regex) - if err != nil { - zap.L().Error("Incorrect regex for ClickHouseOptimizeReadInOrderRegex") - os.Exit(1) - } - } - - wrap := clickhouseConnWrapper{ - conn: db, - settings: ClickhouseQuerySettings{ - MaxExecutionTime: os.Getenv("ClickHouseMaxExecutionTime"), - MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"), - TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"), - MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"), - OptimizeReadInOrderRegex: os.Getenv("ClickHouseOptimizeReadInOrderRegex"), - OptimizeReadInOrderRegexCompiled: regexCompiled, - MaxResultRowsForCHQuery: constants.MaxResultRowsForCHQuery, - }, - } - logsTableName := options.primary.LogsTable logsLocalTableName := options.primary.LogsLocalTable if useLogsNewSchema { @@ -246,7 +213,7 @@ func NewReaderFromClickhouseConnection( } return &ClickHouseReader{ - db: wrap, + db: db, localDB: localDB, TraceDB: options.primary.TraceDB, alertManager: alertManager, @@ -438,28 +405,6 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config return conf, nil } -func initialize(options *Options) (clickhouse.Conn, error) { - - db, err := connect(options.getPrimary()) - if err != nil { - return nil, fmt.Errorf("error connecting to primary db: %v", err) - } - - return db, nil -} - -func connect(cfg *namespaceConfig) (clickhouse.Conn, error) { - if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto { - return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto) - } - - return cfg.Connector(cfg) -} - -func (r *ClickHouseReader) GetConn() clickhouse.Conn { - return r.db -} - func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time) if err != nil { diff --git a/pkg/query-service/app/clickhouseReader/wrapper.go b/pkg/query-service/app/clickhouseReader/wrapper.go deleted file mode 100644 index b56ebd15894..00000000000 --- a/pkg/query-service/app/clickhouseReader/wrapper.go +++ /dev/null @@ -1,124 +0,0 @@ -package clickhouseReader - -import ( - "context" - "encoding/json" - "regexp" - - "github.com/ClickHouse/clickhouse-go/v2" - "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "go.signoz.io/signoz/pkg/query-service/common" -) - -type ClickhouseQuerySettings struct { - MaxExecutionTime string - MaxExecutionTimeLeaf string - TimeoutBeforeCheckingExecutionSpeed string - MaxBytesToRead string - OptimizeReadInOrderRegex string - OptimizeReadInOrderRegexCompiled *regexp.Regexp - MaxResultRowsForCHQuery int -} - -type clickhouseConnWrapper struct { - conn clickhouse.Conn - settings ClickhouseQuerySettings -} - -func (c clickhouseConnWrapper) Close() error { - return c.conn.Close() -} - -func (c clickhouseConnWrapper) Ping(ctx context.Context) error { - return c.conn.Ping(ctx) -} - -func (c clickhouseConnWrapper) Stats() driver.Stats { - return c.conn.Stats() -} - -func (c clickhouseConnWrapper) addClickHouseSettings(ctx context.Context, query string) context.Context { - settings := clickhouse.Settings{} - - logComment := c.getLogComment(ctx) - if logComment != "" { - settings["log_comment"] = logComment - } - - if ctx.Value("enforce_max_result_rows") != nil { - settings["max_result_rows"] = c.settings.MaxResultRowsForCHQuery - } - - if c.settings.MaxBytesToRead != "" { - settings["max_bytes_to_read"] = c.settings.MaxBytesToRead - } - - if c.settings.MaxExecutionTime != "" { - settings["max_execution_time"] = c.settings.MaxExecutionTime - } - - if c.settings.MaxExecutionTimeLeaf != "" { - settings["max_execution_time_leaf"] = c.settings.MaxExecutionTimeLeaf - } - - if c.settings.TimeoutBeforeCheckingExecutionSpeed != "" { - settings["timeout_before_checking_execution_speed"] = c.settings.TimeoutBeforeCheckingExecutionSpeed - } - - // only list queries of - if c.settings.OptimizeReadInOrderRegex != "" && c.settings.OptimizeReadInOrderRegexCompiled.Match([]byte(query)) { - settings["optimize_read_in_order"] = 0 - } - - ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) - return ctx -} - -func (c clickhouseConnWrapper) getLogComment(ctx context.Context) string { - // Get the key-value pairs from context for log comment - kv := ctx.Value(common.LogCommentKey) - if kv == nil { - return "" - } - - logCommentKVs, ok := kv.(map[string]string) - if !ok { - return "" - } - - logComment, _ := json.Marshal(logCommentKVs) - - return string(logComment) -} - -func (c clickhouseConnWrapper) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { - return c.conn.Query(c.addClickHouseSettings(ctx, query), query, args...) -} - -func (c clickhouseConnWrapper) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { - return c.conn.QueryRow(c.addClickHouseSettings(ctx, query), query, args...) -} - -func (c clickhouseConnWrapper) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - return c.conn.Select(c.addClickHouseSettings(ctx, query), dest, query, args...) -} - -func (c clickhouseConnWrapper) Exec(ctx context.Context, query string, args ...interface{}) error { - return c.conn.Exec(c.addClickHouseSettings(ctx, query), query, args...) -} - -func (c clickhouseConnWrapper) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error { - return c.conn.AsyncInsert(c.addClickHouseSettings(ctx, query), query, wait, args...) -} - -func (c clickhouseConnWrapper) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { - return c.conn.PrepareBatch(c.addClickHouseSettings(ctx, query), query, opts...) -} - -func (c clickhouseConnWrapper) ServerVersion() (*driver.ServerVersion, error) { - return c.conn.ServerVersion() -} - -func (c clickhouseConnWrapper) Contributors() []string { - return c.conn.Contributors() -} diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index e0551440b78..acaa658daee 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -97,10 +97,6 @@ type APIHandler struct { temporalityMap map[string]map[v3.Temporality]bool temporalityMux sync.Mutex - maxIdleConns int - maxOpenConns int - dialTimeout time.Duration - IntegrationsController *integrations.Controller CloudIntegrationsController *cloudintegrations.Controller @@ -142,10 +138,6 @@ type APIHandlerOpts struct { PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration - // dao layer to perform crud on app objects like dashboard, alerts etc AppDao dao.ModelDao @@ -225,9 +217,6 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { skipConfig: opts.SkipConfig, preferSpanMetrics: opts.PreferSpanMetrics, temporalityMap: make(map[string]map[v3.Temporality]bool), - maxIdleConns: opts.MaxIdleConns, - maxOpenConns: opts.MaxOpenConns, - dialTimeout: opts.DialTimeout, alertManager: alertManager, ruleManager: opts.RuleManager, featureFlags: opts.FeatureFlags, diff --git a/pkg/query-service/app/querier/querier_test.go b/pkg/query-service/app/querier/querier_test.go index e0caf6d8c15..2a491362fcb 100644 --- a/pkg/query-service/app/querier/querier_test.go +++ b/pkg/query-service/app/querier/querier_test.go @@ -1352,7 +1352,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { } testName := "name" - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") // iterate over test data, create reader and run test for _, tc := range testCases { diff --git a/pkg/query-service/app/querier/v2/querier_test.go b/pkg/query-service/app/querier/v2/querier_test.go index 800a684e6c0..15a7ca713aa 100644 --- a/pkg/query-service/app/querier/v2/querier_test.go +++ b/pkg/query-service/app/querier/v2/querier_test.go @@ -1406,7 +1406,7 @@ func Test_querier_runWindowBasedListQuery(t *testing.T) { } testName := "name" - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") // iterate over test data, create reader and run test for _, tc := range testCases { diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index 8a1cfb0b741..cda0e141428 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -62,9 +62,6 @@ type ServerOptions struct { DisableRules bool RuleRepoURL string PreferSpanMetrics bool - MaxIdleConns int - MaxOpenConns int - DialTimeout time.Duration CacheConfigPath string FluxInterval string FluxIntervalForTraceDetail string @@ -132,11 +129,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { zap.L().Info("Using ClickHouse as datastore ...") clickhouseReader := clickhouseReader.NewReader( serverOptions.SigNoz.SQLStore.SQLxDB(), + serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.PromConfigPath, fm, - serverOptions.MaxIdleConns, - serverOptions.MaxOpenConns, - serverOptions.DialTimeout, serverOptions.Cluster, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema, @@ -202,9 +197,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { Reader: reader, SkipConfig: skipConfig, PreferSpanMetrics: serverOptions.PreferSpanMetrics, - MaxIdleConns: serverOptions.MaxIdleConns, - MaxOpenConns: serverOptions.MaxOpenConns, - DialTimeout: serverOptions.DialTimeout, AppDao: dao.DB(), RuleManager: rm, FeatureFlags: fm, diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 0199113a74e..830f6ab759b 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" @@ -85,7 +84,6 @@ type Reader interface { ) (*v3.QBFilterSuggestionsResponse, *model.ApiError) // Connection needed for rules, not ideal but required - GetConn() clickhouse.Conn GetQueryEngine() *promql.Engine GetFanoutStorage() *storage.Storage diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index 7fd7c357924..ae1ccea8f17 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -85,6 +85,10 @@ func main() { envprovider.NewFactory(), fileprovider.NewFactory(), }, + }, signoz.DeprecatedFlags{ + MaxIdleConns: maxIdleConns, + MaxOpenConns: maxOpenConns, + DialTimeout: dialTimeout, }) if err != nil { zap.L().Fatal("Failed to create config", zap.Error(err)) @@ -104,9 +108,6 @@ func main() { PrivateHostPort: constants.PrivateHostPort, DisableRules: disableRules, RuleRepoURL: ruleRepoURL, - MaxIdleConns: maxIdleConns, - MaxOpenConns: maxOpenConns, - DialTimeout: dialTimeout, CacheConfigPath: cacheConfigPath, FluxInterval: fluxInterval, FluxIntervalForTraceDetail: fluxIntervalForTraceDetail, diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index 7218ca2167a..6203085e7c6 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -1240,7 +1240,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", } - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) @@ -1339,7 +1339,7 @@ func TestThresholdRuleNoData(t *testing.T) { "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", } - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) @@ -1447,7 +1447,7 @@ func TestThresholdRuleTracesLink(t *testing.T) { "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", } - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) @@ -1572,7 +1572,7 @@ func TestThresholdRuleLogsLink(t *testing.T) { "summary": "The rule threshold is set to {{$threshold}}, and the observed metric value is {{$value}}", } - options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") + options := clickhouseReader.NewOptions("", "", "archiveNamespace") reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true, time.Duration(time.Second), nil) rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index e2db99d0190..464b7ad81a2 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -40,7 +40,7 @@ func NewMockClickhouseReader( require.Nil(t, err, "could not init mock clickhouse") reader := clickhouseReader.NewReaderFromClickhouseConnection( mockDB, - clickhouseReader.NewOptions("", 10, 10, 10*time.Second, ""), + clickhouseReader.NewOptions("", ""), testDB, "", featureFlags, diff --git a/pkg/signoz/config.go b/pkg/signoz/config.go index 7173a86e52d..c9b8f131095 100644 --- a/pkg/signoz/config.go +++ b/pkg/signoz/config.go @@ -13,6 +13,7 @@ import ( "go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/sqlmigrator" "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/telemetrystore" "go.signoz.io/signoz/pkg/web" ) @@ -35,9 +36,20 @@ type Config struct { // API Server config APIServer apiserver.Config `mapstructure:"apiserver"` + + // TelemetryStore config + TelemetryStore telemetrystore.Config `mapstructure:"telemetrystore"` +} + +// DeprecatedFlags are the flags that are deprecated and scheduled for removal. +// These flags are used to ensure backward compatibility with the old flags. +type DeprecatedFlags struct { + MaxIdleConns int + MaxOpenConns int + DialTimeout time.Duration } -func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Config, error) { +func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig, deprecatedFlags DeprecatedFlags) (Config, error) { configFactories := []factory.ConfigFactory{ instrumentation.NewConfigFactory(), web.NewConfigFactory(), @@ -45,6 +57,7 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Confi sqlstore.NewConfigFactory(), sqlmigrator.NewConfigFactory(), apiserver.NewConfigFactory(), + telemetrystore.NewConfigFactory(), } conf, err := config.New(ctx, resolverConfig, configFactories) @@ -57,12 +70,12 @@ func NewConfig(ctx context.Context, resolverConfig config.ResolverConfig) (Confi return Config{}, err } - mergeAndEnsureBackwardCompatibility(&config) + mergeAndEnsureBackwardCompatibility(&config, deprecatedFlags) return config, nil } -func mergeAndEnsureBackwardCompatibility(config *Config) { +func mergeAndEnsureBackwardCompatibility(config *Config, deprecatedFlags DeprecatedFlags) { // SIGNOZ_LOCAL_DB_PATH if os.Getenv("SIGNOZ_LOCAL_DB_PATH") != "" { fmt.Println("[Deprecated] env SIGNOZ_LOCAL_DB_PATH is deprecated and scheduled for removal. Please use SIGNOZ_SQLSTORE_SQLITE_PATH instead.") @@ -87,4 +100,21 @@ func mergeAndEnsureBackwardCompatibility(config *Config) { fmt.Println("Error parsing CONTEXT_TIMEOUT_MAX_ALLOWED, using default value of 600s") } } + if os.Getenv("ClickHouseUrl") != "" { + fmt.Println("[Deprecated] env ClickHouseUrl is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN instead.") + config.TelemetryStore.ClickHouse.DSN = os.Getenv("ClickHouseUrl") + } + + if deprecatedFlags.MaxIdleConns != 50 { + fmt.Println("[Deprecated] flag --max-idle-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS env variable instead.") + config.TelemetryStore.Connection.MaxIdleConns = deprecatedFlags.MaxIdleConns + } + if deprecatedFlags.MaxOpenConns != 100 { + fmt.Println("[Deprecated] flag --max-open-conns is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS env variable instead.") + config.TelemetryStore.Connection.MaxOpenConns = deprecatedFlags.MaxOpenConns + } + if deprecatedFlags.DialTimeout != 5*time.Second { + fmt.Println("[Deprecated] flag --dial-timeout is deprecated and scheduled for removal. Please use SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT environment variable instead.") + config.TelemetryStore.Connection.DialTimeout = deprecatedFlags.DialTimeout + } } diff --git a/pkg/signoz/provider.go b/pkg/signoz/provider.go index d54d1d73bc4..6e690b090e1 100644 --- a/pkg/signoz/provider.go +++ b/pkg/signoz/provider.go @@ -8,6 +8,9 @@ import ( "go.signoz.io/signoz/pkg/sqlmigration" "go.signoz.io/signoz/pkg/sqlstore" "go.signoz.io/signoz/pkg/sqlstore/sqlitesqlstore" + "go.signoz.io/signoz/pkg/telemetrystore" + "go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore" + "go.signoz.io/signoz/pkg/telemetrystore/telemetrystorehook" "go.signoz.io/signoz/pkg/web" "go.signoz.io/signoz/pkg/web/noopweb" "go.signoz.io/signoz/pkg/web/routerweb" @@ -25,9 +28,13 @@ type ProviderConfig struct { // Map of all sql migration provider factories SQLMigrationProviderFactories factory.NamedMap[factory.ProviderFactory[sqlmigration.SQLMigration, sqlmigration.Config]] + + // Map of all telemetrystore provider factories + TelemetryStoreProviderFactories factory.NamedMap[factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config]] } func NewProviderConfig() ProviderConfig { + hook := telemetrystorehook.NewFactory() return ProviderConfig{ CacheProviderFactories: factory.MustNewNamedMap( memorycache.NewFactory(), @@ -50,5 +57,8 @@ func NewProviderConfig() ProviderConfig { sqlmigration.NewAddPipelinesFactory(), sqlmigration.NewAddIntegrationsFactory(), ), + TelemetryStoreProviderFactories: factory.MustNewNamedMap( + clickhousetelemetrystore.NewFactory(hook), + ), } } diff --git a/pkg/signoz/signoz.go b/pkg/signoz/signoz.go index f01475b2132..ea1483de42f 100644 --- a/pkg/signoz/signoz.go +++ b/pkg/signoz/signoz.go @@ -7,15 +7,17 @@ import ( "go.signoz.io/signoz/pkg/factory" "go.signoz.io/signoz/pkg/instrumentation" "go.signoz.io/signoz/pkg/sqlstore" + "go.signoz.io/signoz/pkg/telemetrystore" "go.signoz.io/signoz/pkg/version" "go.signoz.io/signoz/pkg/web" ) type SigNoz struct { - Cache cache.Cache - Web web.Web - SQLStore sqlstore.SQLStore + Cache cache.Cache + Web web.Web + SQLStore sqlstore.SQLStore + TelemetryStore telemetrystore.TelemetryStore } func New( @@ -68,9 +70,21 @@ func New( return nil, err } + telemetrystore, err := factory.NewProviderFromNamedMap( + ctx, + providerSettings, + config.TelemetryStore, + providerConfig.TelemetryStoreProviderFactories, + config.TelemetryStore.Provider, + ) + if err != nil { + return nil, err + } + return &SigNoz{ - Cache: cache, - Web: web, - SQLStore: sqlstore, + Cache: cache, + Web: web, + SQLStore: sqlstore, + TelemetryStore: telemetrystore, }, nil } diff --git a/pkg/telemetrystore/clickhousetelemetrystore/provider.go b/pkg/telemetrystore/clickhousetelemetrystore/provider.go new file mode 100644 index 00000000000..b126e7bd2d5 --- /dev/null +++ b/pkg/telemetrystore/clickhousetelemetrystore/provider.go @@ -0,0 +1,120 @@ +package clickhousetelemetrystore + +import ( + "context" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/telemetrystore" +) + +type provider struct { + settings factory.ScopedProviderSettings + clickHouseConn clickhouse.Conn + hooks []telemetrystore.TelemetryStoreHook +} + +func NewFactory(hookFactories ...factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config]) factory.ProviderFactory[telemetrystore.TelemetryStore, telemetrystore.Config] { + return factory.NewProviderFactory(factory.MustNewName("clickhouse"), func(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStore, error) { + // we want to fail fast so we have hook registration errors before creating the telemetry store + hooks := make([]telemetrystore.TelemetryStoreHook, len(hookFactories)) + for i, hookFactory := range hookFactories { + hook, err := hookFactory.New(ctx, providerSettings, config) + if err != nil { + return nil, err + } + hooks[i] = hook + } + return New(ctx, providerSettings, config, hooks...) + }) +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config, hooks ...telemetrystore.TelemetryStoreHook) (telemetrystore.TelemetryStore, error) { + settings := factory.NewScopedProviderSettings(providerSettings, "go.signoz.io/signoz/pkg/telemetrystore/clickhousetelemetrystore") + + options, err := clickhouse.ParseDSN(config.ClickHouse.DSN) + if err != nil { + return nil, err + } + options.MaxIdleConns = config.Connection.MaxIdleConns + options.MaxOpenConns = config.Connection.MaxOpenConns + options.DialTimeout = config.Connection.DialTimeout + + chConn, err := clickhouse.Open(options) + if err != nil { + return nil, err + } + + return &provider{ + settings: settings, + clickHouseConn: chConn, + hooks: hooks, + }, nil +} + +func (p *provider) ClickHouseDB() clickhouse.Conn { + return p +} + +func (p provider) Close() error { + return p.clickHouseConn.Close() +} + +func (p provider) Ping(ctx context.Context) error { + return p.clickHouseConn.Ping(ctx) +} + +func (p provider) Stats() driver.Stats { + return p.clickHouseConn.Stats() +} + +func (p provider) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { + ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) + rows, err := p.clickHouseConn.Query(ctx, query, args...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, rows, err) + return rows, err +} + +func (p provider) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { + ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) + row := p.clickHouseConn.QueryRow(ctx, query, args...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, nil) + return row +} + +func (p provider) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) + err := p.clickHouseConn.Select(ctx, dest, query, args...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) + return err +} + +func (p provider) Exec(ctx context.Context, query string, args ...interface{}) error { + ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) + err := p.clickHouseConn.Exec(ctx, query, args...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) + return err +} + +func (p provider) AsyncInsert(ctx context.Context, query string, wait bool, args ...interface{}) error { + ctx, query, args = telemetrystore.WrapBeforeQuery(p.hooks, ctx, query, args...) + err := p.clickHouseConn.AsyncInsert(ctx, query, wait, args...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) + return err +} + +func (p provider) PrepareBatch(ctx context.Context, query string, opts ...driver.PrepareBatchOption) (driver.Batch, error) { + ctx, query, args := telemetrystore.WrapBeforeQuery(p.hooks, ctx, query) + batch, err := p.clickHouseConn.PrepareBatch(ctx, query, opts...) + telemetrystore.WrapAfterQuery(p.hooks, ctx, query, args, nil, err) + return batch, err +} + +func (p provider) ServerVersion() (*driver.ServerVersion, error) { + return p.clickHouseConn.ServerVersion() +} + +func (p provider) Contributors() []string { + return p.clickHouseConn.Contributors() +} diff --git a/pkg/telemetrystore/config.go b/pkg/telemetrystore/config.go new file mode 100644 index 00000000000..86c68a5a239 --- /dev/null +++ b/pkg/telemetrystore/config.go @@ -0,0 +1,62 @@ +package telemetrystore + +import ( + "time" + + "go.signoz.io/signoz/pkg/factory" +) + +type Config struct { + // Provider is the provider to use + Provider string `mapstructure:"provider"` + // Connection is the connection configuration + Connection ConnectionConfig `mapstructure:",squash"` + // Clickhouse is the clickhouse configuration + ClickHouse ClickHouseConfig `mapstructure:"clickhouse"` +} + +type ConnectionConfig struct { + // MaxOpenConns is the maximum number of open connections to the database. + MaxOpenConns int `mapstructure:"max_open_conns"` + MaxIdleConns int `mapstructure:"max_idle_conns"` + DialTimeout time.Duration `mapstructure:"dial_timeout"` +} + +type ClickHouseQuerySettings struct { + MaxExecutionTime int `mapstructure:"max_execution_time"` + MaxExecutionTimeLeaf int `mapstructure:"max_execution_time_leaf"` + TimeoutBeforeCheckingExecutionSpeed int `mapstructure:"timeout_before_checking_execution_speed"` + MaxBytesToRead int `mapstructure:"max_bytes_to_read"` + MaxResultRowsForCHQuery int `mapstructure:"max_result_rows_for_ch_query"` +} + +type ClickHouseConfig struct { + DSN string `mapstructure:"dsn"` + + QuerySettings ClickHouseQuerySettings `mapstructure:"settings"` +} + +func NewConfigFactory() factory.ConfigFactory { + return factory.NewConfigFactory(factory.MustNewName("telemetrystore"), newConfig) + +} + +func newConfig() factory.Config { + return Config{ + Provider: "clickhouse", + Connection: ConnectionConfig{ + MaxOpenConns: 100, + MaxIdleConns: 50, + DialTimeout: 5 * time.Second, + }, + ClickHouse: ClickHouseConfig{ + DSN: "http://localhost:9000", + + // No default query settings, as default's are set in ch config + }, + } +} + +func (c Config) Validate() error { + return nil +} diff --git a/pkg/telemetrystore/config_test.go b/pkg/telemetrystore/config_test.go new file mode 100644 index 00000000000..68eec0ff99b --- /dev/null +++ b/pkg/telemetrystore/config_test.go @@ -0,0 +1,95 @@ +package telemetrystore + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.signoz.io/signoz/pkg/config" + "go.signoz.io/signoz/pkg/config/envprovider" + "go.signoz.io/signoz/pkg/factory" +) + +func TestNewWithEnvProvider(t *testing.T) { + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DSN", "http://localhost:9000") + t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__IDLE__CONNS", "60") + t.Setenv("SIGNOZ_TELEMETRYSTORE_MAX__OPEN__CONNS", "150") + t.Setenv("SIGNOZ_TELEMETRYSTORE_DIAL__TIMEOUT", "5s") + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_DEBUG", "true") + + conf, err := config.New( + context.Background(), + config.ResolverConfig{ + Uris: []string{"env:"}, + ProviderFactories: []config.ProviderFactory{ + envprovider.NewFactory(), + }, + }, + []factory.ConfigFactory{ + NewConfigFactory(), + }, + ) + require.NoError(t, err) + + actual := &Config{} + err = conf.Unmarshal("telemetrystore", actual) + + require.NoError(t, err) + + expected := &Config{ + Provider: "clickhouse", + Connection: ConnectionConfig{ + MaxOpenConns: 150, + MaxIdleConns: 60, + DialTimeout: 5 * time.Second, + }, + ClickHouse: ClickHouseConfig{ + DSN: "http://localhost:9000", + }, + } + + assert.Equal(t, expected, actual) +} + +func TestNewWithEnvProviderWithQuerySettings(t *testing.T) { + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME", "10") + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__EXECUTION__TIME__LEAF", "10") + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_TIMEOUT__BEFORE__CHECKING__EXECUTION__SPEED", "10") + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__BYTES__TO__READ", "1000000") + t.Setenv("SIGNOZ_TELEMETRYSTORE_CLICKHOUSE_SETTINGS_MAX__RESULT__ROWS__FOR__CH__QUERY", "10000") + + conf, err := config.New( + context.Background(), + config.ResolverConfig{ + Uris: []string{"env:"}, + ProviderFactories: []config.ProviderFactory{ + envprovider.NewFactory(), + }, + }, + []factory.ConfigFactory{ + NewConfigFactory(), + }, + ) + require.NoError(t, err) + + actual := &Config{} + err = conf.Unmarshal("telemetrystore", actual) + + require.NoError(t, err) + + expected := &Config{ + ClickHouse: ClickHouseConfig{ + QuerySettings: ClickHouseQuerySettings{ + MaxExecutionTime: 10, + MaxExecutionTimeLeaf: 10, + TimeoutBeforeCheckingExecutionSpeed: 10, + MaxBytesToRead: 1000000, + MaxResultRowsForCHQuery: 10000, + }, + }, + } + + assert.Equal(t, expected.ClickHouse.QuerySettings, actual.ClickHouse.QuerySettings) +} diff --git a/pkg/telemetrystore/telemetrystore.go b/pkg/telemetrystore/telemetrystore.go new file mode 100644 index 00000000000..db845982cc7 --- /dev/null +++ b/pkg/telemetrystore/telemetrystore.go @@ -0,0 +1,32 @@ +package telemetrystore + +import ( + "context" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" +) + +type TelemetryStore interface { + // Returns the SigNoz Wrapper for Clickhouse + ClickHouseDB() clickhouse.Conn +} + +type TelemetryStoreHook interface { + BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) + AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) +} + +func WrapBeforeQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) { + for _, hook := range hooks { + ctx, query, args = hook.BeforeQuery(ctx, query, args...) + } + return ctx, query, args +} + +// runAfterHooks executes all after hooks in order +func WrapAfterQuery(hooks []TelemetryStoreHook, ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) { + for _, hook := range hooks { + hook.AfterQuery(ctx, query, args, rows, err) + } +} diff --git a/pkg/telemetrystore/telemetrystorehook/settings.go b/pkg/telemetrystore/telemetrystorehook/settings.go new file mode 100644 index 00000000000..834baf56a2b --- /dev/null +++ b/pkg/telemetrystore/telemetrystorehook/settings.go @@ -0,0 +1,85 @@ +package telemetrystorehook + +import ( + "context" + "encoding/json" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "go.signoz.io/signoz/pkg/factory" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/telemetrystore" +) + +type provider struct { + settings telemetrystore.ClickHouseQuerySettings +} + +func NewFactory() factory.ProviderFactory[telemetrystore.TelemetryStoreHook, telemetrystore.Config] { + return factory.NewProviderFactory(factory.MustNewName("clickhousesettings"), New) +} + +func New(ctx context.Context, providerSettings factory.ProviderSettings, config telemetrystore.Config) (telemetrystore.TelemetryStoreHook, error) { + return &provider{ + settings: config.ClickHouse.QuerySettings, + }, nil +} + +func (h *provider) BeforeQuery(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) { + return h.clickHouseSettings(ctx, query, args...) +} + +func (h *provider) AfterQuery(ctx context.Context, query string, args []interface{}, rows driver.Rows, err error) { + return +} + +// clickHouseSettings adds clickhouse settings to queries +func (h *provider) clickHouseSettings(ctx context.Context, query string, args ...interface{}) (context.Context, string, []interface{}) { + settings := clickhouse.Settings{} + + // Apply default settings + logComment := h.getLogComment(ctx) + if logComment != "" { + settings["log_comment"] = logComment + } + + if ctx.Value("enforce_max_result_rows") != nil { + settings["max_result_rows"] = h.settings.MaxResultRowsForCHQuery + } + + if h.settings.MaxBytesToRead != 0 { + settings["max_bytes_to_read"] = h.settings.MaxBytesToRead + } + + if h.settings.MaxExecutionTime != 0 { + settings["max_execution_time"] = h.settings.MaxExecutionTime + } + + if h.settings.MaxExecutionTimeLeaf != 0 { + settings["max_execution_time_leaf"] = h.settings.MaxExecutionTimeLeaf + } + + if h.settings.TimeoutBeforeCheckingExecutionSpeed != 0 { + settings["timeout_before_checking_execution_speed"] = h.settings.TimeoutBeforeCheckingExecutionSpeed + } + + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings)) + return ctx, query, args +} + +func (h *provider) getLogComment(ctx context.Context) string { + // Get the key-value pairs from context for log comment + kv := ctx.Value(common.LogCommentKey) + if kv == nil { + return "" + } + + logCommentKVs, ok := kv.(map[string]string) + if !ok { + return "" + } + + logComment, _ := json.Marshal(logCommentKVs) + + return string(logComment) +} diff --git a/pkg/telemetrystore/telemetrystoretest/provider.go b/pkg/telemetrystore/telemetrystoretest/provider.go new file mode 100644 index 00000000000..dbcf0c54b9f --- /dev/null +++ b/pkg/telemetrystore/telemetrystoretest/provider.go @@ -0,0 +1,34 @@ +package telemetrystoretest + +import ( + "github.com/ClickHouse/clickhouse-go/v2" + cmock "github.com/srikanthccv/ClickHouse-go-mock" +) + +// Provider represents a mock telemetry store provider for testing +type Provider struct { + mock cmock.ClickConnMockCommon +} + +// New creates a new mock telemetry store provider +func New() (*Provider, error) { + options := &clickhouse.Options{} // Default options + mock, err := cmock.NewClickHouseNative(options) + if err != nil { + return nil, err + } + + return &Provider{ + mock: mock, + }, nil +} + +// Clickhouse returns the mock Clickhouse connection +func (p *Provider) Clickhouse() clickhouse.Conn { + return p.mock.(clickhouse.Conn) +} + +// Mock returns the underlying Clickhouse mock instance for setting expectations +func (p *Provider) Mock() cmock.ClickConnMockCommon { + return p.mock +} diff --git a/pkg/telemetrystore/telemetrystoretest/provider_test.go b/pkg/telemetrystore/telemetrystoretest/provider_test.go new file mode 100644 index 00000000000..7350c21e8ec --- /dev/null +++ b/pkg/telemetrystore/telemetrystoretest/provider_test.go @@ -0,0 +1,44 @@ +package telemetrystoretest + +import ( + "testing" + + "github.com/ClickHouse/clickhouse-go/v2" + cmock "github.com/srikanthccv/ClickHouse-go-mock" + "github.com/stretchr/testify/assert" +) + +func TestNew(t *testing.T) { + tests := []struct { + name string + wantErr bool + }{ + { + name: "should create new provider successfully", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider, err := New() + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, provider) + return + } + + assert.NoError(t, err) + assert.NotNil(t, provider) + assert.NotNil(t, provider.Mock()) + assert.NotNil(t, provider.Clickhouse()) + + // Verify the returned interfaces implement the expected types + _, ok := provider.Mock().(cmock.ClickConnMockCommon) + assert.True(t, ok, "Mock() should return cmock.ClickConnMockCommon") + + _, ok = provider.Clickhouse().(clickhouse.Conn) + assert.True(t, ok, "Clickhouse() should return clickhouse.Conn") + }) + } +}