Skip to content

Commit

Permalink
fix: telemetry store (#6923)
Browse files Browse the repository at this point in the history
* fix: inital changes for telemetry store

* fix: add tests and use proper config for conn

* fix: add telemetry store test

* fix: add backward compatibility for old variables and update example conf

* fix: move wrapper to telemetry store

* fix: no need to pass query for settings

* fix: remove redundant config for ch conn

* fix: use clickhouse dsn instead

* fix: update example config

* fix: update backward compatibility code

* fix: use hooks in telemetrystore

* fix: address minor comments

---------

Co-authored-by: Vibhu Pandey <[email protected]>
  • Loading branch information
nityanandagohain and grandwizard28 authored Jan 30, 2025
1 parent ffd72cf commit d1e7cc1
Show file tree
Hide file tree
Showing 27 changed files with 578 additions and 305 deletions.
17 changes: 16 additions & 1 deletion conf/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,19 @@ apiserver:
- /api/v3/logs/livetail
logging:
excluded_routes:
- /api/v1/health
- /api/v1/health


##################### TelemetryStore #####################
telemetrystore:
# specifies the telemetrystore provider to use.
provider: clickhouse
clickhouse:
# The DSN to use for ClickHouse.
dsn: http://localhost:9000
# Maximum number of idle connections in the connection pool.
max_idle_conns: 50
# Maximum number of open connections to the database.
max_open_conns: 100
# Maximum time to wait for a connection to be established.
dial_timeout: 5s
6 changes: 0 additions & 6 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ type APIHandlerOptions struct {
DataConnector interfaces.DataConnector
SkipConfig *basemodel.SkipConfig
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
AppDao dao.ModelDao
RulesManager *rules.Manager
UsageManager *usage.Manager
Expand Down Expand Up @@ -57,9 +54,6 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Reader: opts.DataConnector,
SkipConfig: opts.SkipConfig,
PreferSpanMetrics: opts.PreferSpanMetrics,
MaxIdleConns: opts.MaxIdleConns,
MaxOpenConns: opts.MaxOpenConns,
DialTimeout: opts.DialTimeout,
AppDao: opts.AppDao,
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags,
Expand Down
10 changes: 4 additions & 6 deletions ee/query-service/app/db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@ type ClickhouseReader struct {

func NewDataConnector(
localDB *sqlx.DB,
ch clickhouse.Conn,
promConfigPath string,
lm interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{
conn: ch.GetConn(),
conn: ch,
appdb: localDB,
ClickHouseReader: ch,
ClickHouseReader: chReader,
}
}

Expand Down
12 changes: 2 additions & 10 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ type ServerOptions struct {
DisableRules bool
RuleRepoURL string
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Expand Down Expand Up @@ -157,11 +154,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouseDB(),
serverOptions.PromConfigPath,
lm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
Expand Down Expand Up @@ -245,7 +240,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}

// start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn())
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouseDB(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
if err != nil {
return nil, err
}
Expand All @@ -266,9 +261,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
DataConnector: reader,
SkipConfig: skipConfig,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: modelDao,
RulesManager: rm,
UsageManager: usageManager,
Expand Down
7 changes: 4 additions & 3 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func main() {
envprovider.NewFactory(),
fileprovider.NewFactory(),
},
}, signoz.DeprecatedFlags{
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
Expand All @@ -161,9 +165,6 @@ func main() {
PrivateHostPort: baseconst.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Expand Down
5 changes: 2 additions & 3 deletions ee/query-service/usage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -46,9 +45,9 @@ type Manager struct {
tenantID string
}

func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) {
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl"))
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl)

tenantID := ""
if len(hostNameRegexMatches) == 2 {
Expand Down
53 changes: 0 additions & 53 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package clickhouseReader

import (
"context"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.uber.org/zap"
)

type Encoding string
Expand All @@ -18,7 +16,6 @@ const (
)

const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
Expand Down Expand Up @@ -58,9 +55,6 @@ type namespaceConfig struct {
namespace string
Enabled bool
Datasource string
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
TraceDB string
OperationsTable string
IndexTable string
Expand Down Expand Up @@ -99,37 +93,6 @@ type namespaceConfig struct {
// Connecto defines how to connect to the database
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)

func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
ctx := context.Background()
options, err := clickhouse.ParseDSN(cfg.Datasource)
if err != nil {
return nil, err
}

// Check if the DSN contained any of the following options, if not set from configuration
if options.MaxIdleConns == 0 {
options.MaxIdleConns = cfg.MaxIdleConns
}
if options.MaxOpenConns == 0 {
options.MaxOpenConns = cfg.MaxOpenConns
}
if options.DialTimeout == 0 {
options.DialTimeout = cfg.DialTimeout
}

zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]), zap.Int("MaxIdleConns", options.MaxIdleConns), zap.Int("MaxOpenConns", options.MaxOpenConns), zap.Duration("DialTimeout", options.DialTimeout))
db, err := clickhouse.Open(options)
if err != nil {
return nil, err
}

if err := db.Ping(ctx); err != nil {
return nil, err
}

return db, nil
}

// Options store storage plugin related configs
type Options struct {
primary *namespaceConfig
Expand All @@ -139,26 +102,13 @@ type Options struct {

// NewOptions creates a new Options struct.
func NewOptions(
datasource string,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
primaryNamespace string,
otherNamespaces ...string,
) *Options {

if datasource == "" {
datasource = defaultDatasource
}

options := &Options{
primary: &namespaceConfig{
namespace: primaryNamespace,
Enabled: true,
Datasource: datasource,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,
Expand All @@ -181,7 +131,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,

LogsTableV2: defaultLogsTableV2,
LogsLocalTableV2: defaultLogsLocalTableV2,
Expand All @@ -200,7 +149,6 @@ func NewOptions(
if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{
namespace: namespace,
Datasource: datasource,
TraceDB: "",
OperationsTable: "",
IndexTable: "",
Expand All @@ -214,7 +162,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
}
} else {
options.others[namespace] = &namespaceConfig{namespace: namespace}
Expand Down
61 changes: 3 additions & 58 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,16 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database
func NewReader(
localDB *sqlx.DB,
db driver.Conn,
configFile string,
featureFlag interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickHouseReader {

datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace)
db, err := initialize(options)

if err != nil {
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}

options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
}

Expand All @@ -208,29 +198,6 @@ func NewReaderFromClickhouseConnection(
os.Exit(1)
}

regex := os.Getenv("ClickHouseOptimizeReadInOrderRegex")
var regexCompiled *regexp.Regexp
if regex != "" {
regexCompiled, err = regexp.Compile(regex)
if err != nil {
zap.L().Error("Incorrect regex for ClickHouseOptimizeReadInOrderRegex")
os.Exit(1)
}
}

wrap := clickhouseConnWrapper{
conn: db,
settings: ClickhouseQuerySettings{
MaxExecutionTime: os.Getenv("ClickHouseMaxExecutionTime"),
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
OptimizeReadInOrderRegex: os.Getenv("ClickHouseOptimizeReadInOrderRegex"),
OptimizeReadInOrderRegexCompiled: regexCompiled,
MaxResultRowsForCHQuery: constants.MaxResultRowsForCHQuery,
},
}

logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema {
Expand All @@ -246,7 +213,7 @@ func NewReaderFromClickhouseConnection(
}

return &ClickHouseReader{
db: wrap,
db: db,
localDB: localDB,
TraceDB: options.primary.TraceDB,
alertManager: alertManager,
Expand Down Expand Up @@ -438,28 +405,6 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
return conf, nil
}

func initialize(options *Options) (clickhouse.Conn, error) {

db, err := connect(options.getPrimary())
if err != nil {
return nil, fmt.Errorf("error connecting to primary db: %v", err)
}

return db, nil
}

func connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
}

return cfg.Connector(cfg)
}

func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}

func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
if err != nil {
Expand Down
Loading

0 comments on commit d1e7cc1

Please sign in to comment.