Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: telemetry store #6923

Merged
merged 14 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion conf/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,19 @@ apiserver:
- /api/v3/logs/livetail
logging:
excluded_routes:
- /api/v1/health
- /api/v1/health


##################### TelemetryStore #####################
telemetrystore:
# specifies the telemetrystore provider to use.
provider: clickhouse
clickhouse:
# The DSN to use for ClickHouse.
dsn: http://localhost:9000
# Maximum number of idle connections in the connection pool.
max_idle_conns: 50
# Maximum number of open connections to the database.
max_open_conns: 100
# Maximum time to wait for a connection to be established.
dial_timeout: 5s
6 changes: 0 additions & 6 deletions ee/query-service/app/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ type APIHandlerOptions struct {
DataConnector interfaces.DataConnector
SkipConfig *basemodel.SkipConfig
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
AppDao dao.ModelDao
RulesManager *rules.Manager
UsageManager *usage.Manager
Expand Down Expand Up @@ -57,9 +54,6 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) {
Reader: opts.DataConnector,
SkipConfig: opts.SkipConfig,
PreferSpanMetrics: opts.PreferSpanMetrics,
MaxIdleConns: opts.MaxIdleConns,
MaxOpenConns: opts.MaxOpenConns,
DialTimeout: opts.DialTimeout,
AppDao: opts.AppDao,
RuleManager: opts.RulesManager,
FeatureFlags: opts.FeatureFlags,
Expand Down
10 changes: 4 additions & 6 deletions ee/query-service/app/db/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,20 @@ type ClickhouseReader struct {

func NewDataConnector(
localDB *sqlx.DB,
ch clickhouse.Conn,
promConfigPath string,
lm interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickhouseReader {
ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
chReader := basechr.NewReader(localDB, ch, promConfigPath, lm, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
return &ClickhouseReader{
conn: ch.GetConn(),
conn: ch,
appdb: localDB,
ClickHouseReader: ch,
ClickHouseReader: chReader,
}
}

Expand Down
12 changes: 2 additions & 10 deletions ee/query-service/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ type ServerOptions struct {
DisableRules bool
RuleRepoURL string
PreferSpanMetrics bool
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
CacheConfigPath string
FluxInterval string
FluxIntervalForTraceDetail string
Expand Down Expand Up @@ -157,11 +154,9 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
zap.L().Info("Using ClickHouse as datastore ...")
qb := db.NewDataConnector(
serverOptions.SigNoz.SQLStore.SQLxDB(),
serverOptions.SigNoz.TelemetryStore.ClickHouse(),
serverOptions.PromConfigPath,
lm,
serverOptions.MaxIdleConns,
serverOptions.MaxOpenConns,
serverOptions.DialTimeout,
serverOptions.Cluster,
serverOptions.UseLogsNewSchema,
serverOptions.UseTraceNewSchema,
Expand Down Expand Up @@ -245,7 +240,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}

// start the usagemanager
usageManager, err := usage.New(modelDao, lm.GetRepo(), reader.GetConn())
usageManager, err := usage.New(modelDao, lm.GetRepo(), serverOptions.SigNoz.TelemetryStore.ClickHouse(), serverOptions.Config.TelemetryStore.ClickHouse.DSN)
if err != nil {
return nil, err
}
Expand All @@ -266,9 +261,6 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
DataConnector: reader,
SkipConfig: skipConfig,
PreferSpanMetrics: serverOptions.PreferSpanMetrics,
MaxIdleConns: serverOptions.MaxIdleConns,
MaxOpenConns: serverOptions.MaxOpenConns,
DialTimeout: serverOptions.DialTimeout,
AppDao: modelDao,
RulesManager: rm,
UsageManager: usageManager,
Expand Down
7 changes: 4 additions & 3 deletions ee/query-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func main() {
envprovider.NewFactory(),
fileprovider.NewFactory(),
},
}, signoz.DepricatedFlags{
nityanandagohain marked this conversation as resolved.
Show resolved Hide resolved
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
})
if err != nil {
zap.L().Fatal("Failed to create config", zap.Error(err))
Expand All @@ -161,9 +165,6 @@ func main() {
PrivateHostPort: baseconst.PrivateHostPort,
DisableRules: disableRules,
RuleRepoURL: ruleRepoURL,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
CacheConfigPath: cacheConfigPath,
FluxInterval: fluxInterval,
FluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
Expand Down
5 changes: 2 additions & 3 deletions ee/query-service/usage/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"regexp"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -46,9 +45,9 @@ type Manager struct {
tenantID string
}

func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn) (*Manager, error) {
func New(modelDao dao.ModelDao, licenseRepo *license.Repo, clickhouseConn clickhouse.Conn, chUrl string) (*Manager, error) {
hostNameRegex := regexp.MustCompile(`tcp://(?P<hostname>.*):`)
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(os.Getenv("ClickHouseUrl"))
hostNameRegexMatches := hostNameRegex.FindStringSubmatch(chUrl)

tenantID := ""
if len(hostNameRegexMatches) == 2 {
Expand Down
53 changes: 0 additions & 53 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package clickhouseReader

import (
"context"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"go.uber.org/zap"
)

type Encoding string
Expand All @@ -18,7 +16,6 @@ const (
)

const (
defaultDatasource string = "tcp://localhost:9000"
defaultTraceDB string = "signoz_traces"
defaultOperationsTable string = "distributed_signoz_operations"
defaultIndexTable string = "distributed_signoz_index_v2"
Expand Down Expand Up @@ -58,9 +55,6 @@ type namespaceConfig struct {
namespace string
Enabled bool
Datasource string
MaxIdleConns int
MaxOpenConns int
DialTimeout time.Duration
TraceDB string
OperationsTable string
IndexTable string
Expand Down Expand Up @@ -99,37 +93,6 @@ type namespaceConfig struct {
// Connecto defines how to connect to the database
type Connector func(cfg *namespaceConfig) (clickhouse.Conn, error)

func defaultConnector(cfg *namespaceConfig) (clickhouse.Conn, error) {
ctx := context.Background()
options, err := clickhouse.ParseDSN(cfg.Datasource)
if err != nil {
return nil, err
}

// Check if the DSN contained any of the following options, if not set from configuration
if options.MaxIdleConns == 0 {
options.MaxIdleConns = cfg.MaxIdleConns
}
if options.MaxOpenConns == 0 {
options.MaxOpenConns = cfg.MaxOpenConns
}
if options.DialTimeout == 0 {
options.DialTimeout = cfg.DialTimeout
}

zap.L().Info("Connecting to Clickhouse", zap.String("at", options.Addr[0]), zap.Int("MaxIdleConns", options.MaxIdleConns), zap.Int("MaxOpenConns", options.MaxOpenConns), zap.Duration("DialTimeout", options.DialTimeout))
db, err := clickhouse.Open(options)
if err != nil {
return nil, err
}

if err := db.Ping(ctx); err != nil {
return nil, err
}

return db, nil
}

// Options store storage plugin related configs
type Options struct {
primary *namespaceConfig
Expand All @@ -139,26 +102,13 @@ type Options struct {

// NewOptions creates a new Options struct.
func NewOptions(
datasource string,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
primaryNamespace string,
otherNamespaces ...string,
) *Options {

if datasource == "" {
datasource = defaultDatasource
}

options := &Options{
primary: &namespaceConfig{
namespace: primaryNamespace,
Enabled: true,
Datasource: datasource,
MaxIdleConns: maxIdleConns,
MaxOpenConns: maxOpenConns,
DialTimeout: dialTimeout,
TraceDB: defaultTraceDB,
OperationsTable: defaultOperationsTable,
IndexTable: defaultIndexTable,
Expand All @@ -181,7 +131,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,

LogsTableV2: defaultLogsTableV2,
LogsLocalTableV2: defaultLogsLocalTableV2,
Expand All @@ -200,7 +149,6 @@ func NewOptions(
if namespace == archiveNamespace {
options.others[namespace] = &namespaceConfig{
namespace: namespace,
Datasource: datasource,
TraceDB: "",
OperationsTable: "",
IndexTable: "",
Expand All @@ -214,7 +162,6 @@ func NewOptions(
WriteBatchDelay: defaultWriteBatchDelay,
WriteBatchSize: defaultWriteBatchSize,
Encoding: defaultEncoding,
Connector: defaultConnector,
}
} else {
options.others[namespace] = &namespaceConfig{namespace: namespace}
Expand Down
61 changes: 3 additions & 58 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,16 @@ type ClickHouseReader struct {
// NewTraceReader returns a TraceReader for the database
func NewReader(
localDB *sqlx.DB,
db driver.Conn,
configFile string,
featureFlag interfaces.FeatureLookup,
maxIdleConns int,
maxOpenConns int,
dialTimeout time.Duration,
cluster string,
useLogsNewSchema bool,
useTraceNewSchema bool,
fluxIntervalForTraceDetail time.Duration,
cache cache.Cache,
) *ClickHouseReader {

datasource := os.Getenv("ClickHouseUrl")
options := NewOptions(datasource, maxIdleConns, maxOpenConns, dialTimeout, primaryNamespace, archiveNamespace)
db, err := initialize(options)

if err != nil {
zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err))
}

options := NewOptions(primaryNamespace, archiveNamespace)
return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema, fluxIntervalForTraceDetail, cache)
}

Expand All @@ -208,29 +198,6 @@ func NewReaderFromClickhouseConnection(
os.Exit(1)
}

regex := os.Getenv("ClickHouseOptimizeReadInOrderRegex")
var regexCompiled *regexp.Regexp
if regex != "" {
regexCompiled, err = regexp.Compile(regex)
if err != nil {
zap.L().Error("Incorrect regex for ClickHouseOptimizeReadInOrderRegex")
os.Exit(1)
}
}

wrap := clickhouseConnWrapper{
conn: db,
settings: ClickhouseQuerySettings{
MaxExecutionTime: os.Getenv("ClickHouseMaxExecutionTime"),
MaxExecutionTimeLeaf: os.Getenv("ClickHouseMaxExecutionTimeLeaf"),
TimeoutBeforeCheckingExecutionSpeed: os.Getenv("ClickHouseTimeoutBeforeCheckingExecutionSpeed"),
MaxBytesToRead: os.Getenv("ClickHouseMaxBytesToRead"),
OptimizeReadInOrderRegex: os.Getenv("ClickHouseOptimizeReadInOrderRegex"),
OptimizeReadInOrderRegexCompiled: regexCompiled,
MaxResultRowsForCHQuery: constants.MaxResultRowsForCHQuery,
},
}

logsTableName := options.primary.LogsTable
logsLocalTableName := options.primary.LogsLocalTable
if useLogsNewSchema {
Expand All @@ -246,7 +213,7 @@ func NewReaderFromClickhouseConnection(
}

return &ClickHouseReader{
db: wrap,
db: db,
localDB: localDB,
TraceDB: options.primary.TraceDB,
alertManager: alertManager,
Expand Down Expand Up @@ -438,28 +405,6 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config
return conf, nil
}

func initialize(options *Options) (clickhouse.Conn, error) {

db, err := connect(options.getPrimary())
if err != nil {
return nil, fmt.Errorf("error connecting to primary db: %v", err)
}

return db, nil
}

func connect(cfg *namespaceConfig) (clickhouse.Conn, error) {
if cfg.Encoding != EncodingJSON && cfg.Encoding != EncodingProto {
return nil, fmt.Errorf("unknown encoding %q, supported: %q, %q", cfg.Encoding, EncodingJSON, EncodingProto)
}

return cfg.Connector(cfg)
}

func (r *ClickHouseReader) GetConn() clickhouse.Conn {
return r.db
}

func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time)
if err != nil {
Expand Down
Loading
Loading