Skip to content

Commit

Permalink
Merge branch 'main' into feat/aws-integration-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmadshaheer authored Feb 6, 2025
2 parents 1d67fd1 + 94c2398 commit 9c3ad77
Show file tree
Hide file tree
Showing 16 changed files with 494 additions and 33 deletions.
4 changes: 2 additions & 2 deletions ee/query-service/model/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func NewLicenseV3(data map[string]interface{}) (*LicenseV3, error) {
if err != nil {
return nil, err
}
// if license status is inactive then default it to basic
if status == LicenseStatusInactive {
// if license status is invalid then default it to basic
if status == LicenseStatusInvalid {
planName = PlanNameBasic
}

Expand Down
2 changes: 1 addition & 1 deletion ee/query-service/model/plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var (
)

var (
LicenseStatusInactive = "INACTIVE"
LicenseStatusInvalid = "INVALID"
)

const DisableUpsell = "DISABLE_UPSELL"
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/AppRoutes/Private.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ function PrivateRoute({ children }: PrivateRouteProps): JSX.Element {
const currentRoute = mapRoutes.get('current');
const shouldSuspendWorkspace =
activeLicenseV3.status === LicenseStatus.SUSPENDED &&
activeLicenseV3.state === LicenseState.PAYMENT_FAILED;
activeLicenseV3.state === LicenseState.DEFAULTED;

if (shouldSuspendWorkspace && currentRoute) {
navigateToWorkSpaceSuspended(currentRoute);
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/container/AppLayout/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ function AppLayout(props: AppLayoutProps): JSX.Element {
if (
!isFetchingActiveLicenseV3 &&
!isNull(activeLicenseV3) &&
activeLicenseV3?.event_queue?.event === LicenseEvent.FAILED_PAYMENT
activeLicenseV3?.event_queue?.event === LicenseEvent.DEFAULT
) {
setShowPaymentFailedWarning(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function WorkspaceSuspended(): JSX.Element {
if (!isFetchingActiveLicenseV3 && activeLicenseV3) {
const shouldSuspendWorkspace =
activeLicenseV3.status === LicenseStatus.SUSPENDED &&
activeLicenseV3.state === LicenseState.PAYMENT_FAILED;
activeLicenseV3.state === LicenseState.DEFAULTED;

if (!shouldSuspendWorkspace) {
history.push(ROUTES.APPLICATION);
Expand Down
4 changes: 2 additions & 2 deletions frontend/src/types/api/licensesV3/getActive.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export enum LicenseEvent {
NO_EVENT = '',
FAILED_PAYMENT = 'FAILED_PAYMENT',
DEFAULT = 'DEFAULT',
}

export enum LicenseStatus {
Expand All @@ -9,7 +9,7 @@ export enum LicenseStatus {
}

export enum LicenseState {
PAYMENT_FAILED = 'PAYMENT_FAILED',
DEFAULTED = 'DEFAULTED',
ACTIVE = 'ACTIVE',
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/query-service/app/clickhouseReader/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (
defaultTraceLocalTableName string = "signoz_index_v3"
defaultTraceResourceTableV3 string = "distributed_traces_v3_resource"
defaultTraceSummaryTable string = "distributed_trace_summary"

defaultMetadataDB string = "signoz_metadata"
defaultMetadataTable string = "distributed_attributes_metadata"
)

// NamespaceConfig is Clickhouse's internal configuration data
Expand Down Expand Up @@ -88,6 +91,9 @@ type namespaceConfig struct {
TraceLocalTableNameV3 string
TraceResourceTableV3 string
TraceSummaryTable string

MetadataDB string
MetadataTable string
}

// Connecto defines how to connect to the database
Expand Down Expand Up @@ -141,6 +147,9 @@ func NewOptions(
TraceLocalTableNameV3: defaultTraceLocalTableName,
TraceResourceTableV3: defaultTraceResourceTableV3,
TraceSummaryTable: defaultTraceSummaryTable,

MetadataDB: defaultMetadataDB,
MetadataTable: defaultMetadataTable,
},
others: make(map[string]*namespaceConfig, len(otherNamespaces)),
}
Expand Down
156 changes: 140 additions & 16 deletions pkg/query-service/app/clickhouseReader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ type ClickHouseReader struct {

fluxIntervalForTraceDetail time.Duration
cache cache.Cache
metadataDB string
metadataTable string
}

// NewTraceReader returns a TraceReader for the database
Expand Down Expand Up @@ -256,6 +258,8 @@ func NewReaderFromClickhouseConnection(

fluxIntervalForTraceDetail: fluxIntervalForTraceDetail,
cache: cache,
metadataDB: options.primary.MetadataDB,
metadataTable: options.primary.MetadataTable,
}
}

Expand Down Expand Up @@ -1454,7 +1458,7 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
var serviceNameIntervalMap = map[string][]tracedetail.Interval{}
var hasMissingSpans bool

userEmail , emailErr := auth.GetEmailFromJwt(ctx)
userEmail, emailErr := auth.GetEmailFromJwt(ctx)
cachedTraceData, err := r.GetWaterfallSpansForTraceWithMetadataCache(ctx, traceID)
if err == nil {
startTime = cachedTraceData.StartTime
Expand Down Expand Up @@ -1530,8 +1534,8 @@ func (r *ClickHouseReader) GetWaterfallSpansForTraceWithMetadata(ctx context.Con
if startTime == 0 || startTimeUnixNano < startTime {
startTime = startTimeUnixNano
}
if endTime == 0 || (startTimeUnixNano + jsonItem.DurationNano ) > endTime {
endTime = (startTimeUnixNano + jsonItem.DurationNano )
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
endTime = (startTimeUnixNano + jsonItem.DurationNano)
}
if durationNano == 0 || jsonItem.DurationNano > durationNano {
durationNano = jsonItem.DurationNano
Expand Down Expand Up @@ -1708,12 +1712,12 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace
}

// metadata calculation
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
startTimeUnixNano := uint64(item.TimeUnixNano.UnixNano())
if startTime == 0 || startTimeUnixNano < startTime {
startTime = startTimeUnixNano
}
if endTime == 0 || ( startTimeUnixNano + jsonItem.DurationNano ) > endTime {
endTime = (startTimeUnixNano + jsonItem.DurationNano )
if endTime == 0 || (startTimeUnixNano+jsonItem.DurationNano) > endTime {
endTime = (startTimeUnixNano + jsonItem.DurationNano)
}
if durationNano == 0 || jsonItem.DurationNano > durationNano {
durationNano = jsonItem.DurationNano
Expand Down Expand Up @@ -1777,7 +1781,7 @@ func (r *ClickHouseReader) GetFlamegraphSpansForTrace(ctx context.Context, trace

trace.Spans = selectedSpansForRequest
trace.StartTimestampMillis = startTime / 1000000
trace.EndTimestampMillis = endTime / 1000000
trace.EndTimestampMillis = endTime / 1000000
return trace, nil
}

Expand Down Expand Up @@ -3896,27 +3900,46 @@ func (r *ClickHouseReader) GetCountOfThings(ctx context.Context, query string) (
}

func (r *ClickHouseReader) GetLatestReceivedMetric(
ctx context.Context, metricNames []string,
ctx context.Context, metricNames []string, labelValues map[string]string,
) (*model.MetricStatus, *model.ApiError) {
// at least 1 metric name must be specified.
// this query can be too slow otherwise.
if len(metricNames) < 1 {
return nil, nil
return nil, model.BadRequest(fmt.Errorf("atleast 1 metric name must be specified"))
}

quotedMetricNames := []string{}
for _, m := range metricNames {
quotedMetricNames = append(quotedMetricNames, fmt.Sprintf(`'%s'`, m))
quotedMetricNames = append(quotedMetricNames, utils.ClickHouseFormattedValue(m))
}
commaSeparatedMetricNames := strings.Join(quotedMetricNames, ", ")

whereClauseParts := []string{
fmt.Sprintf(`metric_name in (%s)`, commaSeparatedMetricNames),
}

if labelValues != nil {
for label, val := range labelValues {
whereClauseParts = append(
whereClauseParts,
fmt.Sprintf(`JSONExtractString(labels, '%s') = '%s'`, label, val),
)
}
}

if len(whereClauseParts) < 1 {
return nil, nil
}

whereClause := strings.Join(whereClauseParts, " AND ")

query := fmt.Sprintf(`
SELECT metric_name, labels, unix_milli
SELECT metric_name, anyLast(labels), max(unix_milli)
from %s.%s
where metric_name in (
%s
)
order by unix_milli desc
where %s
group by metric_name
limit 1
`, signozMetricDBName, signozTSTableNameV4, commaSeparatedMetricNames,
`, signozMetricDBName, signozTSTableNameV4, whereClause,
)

rows, err := r.db.Query(ctx, query)
Expand Down Expand Up @@ -4111,6 +4134,97 @@ func (r *ClickHouseReader) GetLogAttributeKeys(ctx context.Context, req *v3.Filt
return &response, nil
}

func (r *ClickHouseReader) FetchRelatedValues(ctx context.Context, req *v3.FilterAttributeValueRequest) ([]string, error) {
var andConditions []string

andConditions = append(andConditions, fmt.Sprintf("unix_milli >= %d", req.StartTimeMillis))
andConditions = append(andConditions, fmt.Sprintf("unix_milli <= %d", req.EndTimeMillis))

if len(req.ExistingFilterItems) != 0 {
for _, item := range req.ExistingFilterItems {
// we only support string for related values
if item.Key.DataType != v3.AttributeKeyDataTypeString {
continue
}

var colName string
switch item.Key.Type {
case v3.AttributeKeyTypeResource:
colName = "resource_attributes"
case v3.AttributeKeyTypeTag:
colName = "attributes"
default:
// we only support resource and tag for related values as of now
continue
}
// IN doesn't make use of map value index, we convert it to = or !=
operator := item.Operator
if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorIn {
operator = "="
} else if v3.FilterOperator(strings.ToLower(string(item.Operator))) == v3.FilterOperatorNotIn {
operator = "!="
}
addCondition := func(val string) {
andConditions = append(andConditions, fmt.Sprintf("mapContains(%s, '%s') AND %s['%s'] %s %s", colName, item.Key.Key, colName, item.Key.Key, operator, val))
}
switch v := item.Value.(type) {
case string:
fmtVal := utils.ClickHouseFormattedValue(v)
addCondition(fmtVal)
case []string:
for _, val := range v {
fmtVal := utils.ClickHouseFormattedValue(val)
addCondition(fmtVal)
}
case []interface{}:
for _, val := range v {
fmtVal := utils.ClickHouseFormattedValue(val)
addCondition(fmtVal)
}
}
}
}
whereClause := strings.Join(andConditions, " AND ")

var selectColumn string
switch req.TagType {
case v3.TagTypeResource:
selectColumn = "resource_attributes" + "['" + req.FilterAttributeKey + "']"
case v3.TagTypeTag:
selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
default:
selectColumn = "attributes" + "['" + req.FilterAttributeKey + "']"
}

filterSubQuery := fmt.Sprintf(
"SELECT DISTINCT %s FROM %s.%s WHERE %s LIMIT 100",
selectColumn,
r.metadataDB,
r.metadataTable,
whereClause,
)
zap.L().Debug("filterSubQuery for related values", zap.String("query", filterSubQuery))

rows, err := r.db.Query(ctx, filterSubQuery)
if err != nil {
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()

var attributeValues []string
for rows.Next() {
var value string
if err := rows.Scan(&value); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if value != "" {
attributeValues = append(attributeValues, value)
}
}

return attributeValues, nil
}

func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) {
var err error
var filterValueColumn string
Expand Down Expand Up @@ -4212,6 +4326,11 @@ func (r *ClickHouseReader) GetLogAttributeValues(ctx context.Context, req *v3.Fi
}
}

relatedValues, _ := r.FetchRelatedValues(ctx, req)
attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
StringAttributeValues: relatedValues,
}

return &attributeValues, nil

}
Expand Down Expand Up @@ -4892,6 +5011,11 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.
}
}

relatedValues, _ := r.FetchRelatedValues(ctx, req)
attributeValues.RelatedValues = &v3.FilterAttributeValueResponse{
StringAttributeValues: relatedValues,
}

return &attributeValues, nil
}

Expand Down
Loading

0 comments on commit 9c3ad77

Please sign in to comment.