Skip to content

Commit

Permalink
feat: query dependency graph data and display in NodeGraph panel #314 #…
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Oct 27, 2023
1 parent a8d1c94 commit d328b39
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 3 deletions.
5 changes: 3 additions & 2 deletions query/internal/plugins/builtin/datav/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ const (
GetServiceInfoListAPI = "getServiceInfoList"
GetServiceNamesAPI = "getServiceNames"
GetServiceOperationsAPI = "getServiceOperations"

GetLogsAPI = "getLogs"
GetDependencyGraphAPI = "getDependencyGraph"
GetLogsAPI = "getLogs"
)

var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult{
GetServiceInfoListAPI: GetServiceInfoList,
GetServiceNamesAPI: GetServiceNames,
GetServiceOperationsAPI: GetServiceOperations,
GetLogsAPI: GetLogs,
GetDependencyGraphAPI: GetDependencyGraph,
}
52 changes: 52 additions & 0 deletions query/internal/plugins/builtin/datav/api/serviceGraph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package api

import (
"fmt"
"strconv"

ch "github.com/ClickHouse/clickhouse-go/v2"
datavmodels "github.com/DataObserve/datav/query/internal/plugins/builtin/datav/models"
pluginUtils "github.com/DataObserve/datav/query/internal/plugins/utils"
"github.com/DataObserve/datav/query/pkg/config"
"github.com/DataObserve/datav/query/pkg/models"
"github.com/gin-gonic/gin"
)

func GetDependencyGraph(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult {
start, _ := strconv.ParseInt(c.Query("start"), 10, 64)
end, _ := strconv.ParseInt(c.Query("end"), 10, 64)

query := fmt.Sprintf(`WITH
quantilesMergeState(0.5, 0.75, 0.9, 0.95, 0.99)(duration_quantiles_state) AS duration_quantiles_state,
finalizeAggregation(duration_quantiles_state) AS result
SELECT
src,
dest,
result[1] AS p50,
result[2] AS p75,
result[3] AS p90,
result[4] AS p95,
result[5] AS p99,
sum(total_count) as calls,
sum(error_count) as errors
FROM %s.%s
WHERE toUInt64(toDateTime(timestamp)) >= ? AND toUInt64(toDateTime(timestamp)) <= ? GROUP BY src, dest`,
config.Data.Observability.DefaultTraceDB, datavmodels.DefaultDependencyGraphTable)

rows, err := conn.Query(c.Request.Context(), query, start, end)
if err != nil {
logger.Warn("Error Query dependency graph", "query", query, "error", err)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}
defer rows.Close()

logger.Info("Query denendency graph", "query", query)

res, err := pluginUtils.ConvertDbRowsToPluginData(rows)
if err != nil {
logger.Warn("Error conver rows to data", "error", err)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

return models.GenPluginResult(models.PluginStatusSuccess, "", res)
}
66 changes: 66 additions & 0 deletions query/internal/plugins/builtin/datav/sqls/dependencyGraph.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
CREATE TABLE IF NOT EXISTS signoz_traces.dependency_graph_minutes_v2 ON CLUSTER cluster (
src LowCardinality(String) CODEC(ZSTD(1)),
dest LowCardinality(String) CODEC(ZSTD(1)),
duration_quantiles_state AggregateFunction(quantiles(0.5, 0.75, 0.9, 0.95, 0.99), Float64) CODEC(Default),
error_count SimpleAggregateFunction(sum, UInt64) CODEC(T64, ZSTD(1)),
total_count SimpleAggregateFunction(sum, UInt64) CODEC(T64, ZSTD(1)),
timestamp DateTime CODEC(DoubleDelta, LZ4),
environment LowCardinality(String) CODEC(ZSTD(1)),
cluster LowCardinality(String) CODEC(ZSTD(1)),
namespace LowCardinality(String) CODEC(ZSTD(1)),
) ENGINE AggregatingMergeTree
PARTITION BY toDate(timestamp)
ORDER BY (timestamp, src, dest, environment, cluster, namespace)
TTL toDateTime(timestamp) + INTERVAL 1296000 SECOND DELETE;


CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_traces.dependency_graph_minutes_service_calls_mv_v2 ON CLUSTER cluster
TO signoz_traces.dependency_graph_minutes_v2 AS
SELECT
A.serviceName as src,
B.serviceName as dest,
quantilesState(0.5, 0.75, 0.9, 0.95, 0.99)(toFloat64(B.durationNano)) as duration_quantiles_state,
countIf(B.statusCode=2) as error_count,
count(*) as total_count,
toStartOfMinute(B.timestamp) as timestamp,
B.resourceTagsMap['environment'] as environment,
B.resourceTagsMap['cluster'] as cluster,
B.resourceTagsMap['namespace'] as namespace
FROM signoz_traces.signoz_index_v2 AS A, signoz_traces.signoz_index_v2 AS B
WHERE (A.serviceName != B.serviceName) AND (A.spanID = B.parentSpanID)
GROUP BY timestamp, src, dest, environment, cluster, namespace;

CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_traces.dependency_graph_minutes_db_calls_mv_v2 ON CLUSTER cluster
TO signoz_traces.dependency_graph_minutes_v2 AS
SELECT
serviceName as src,
tagMap['db.system'] as dest,
quantilesState(0.5, 0.75, 0.9, 0.95, 0.99)(toFloat64(durationNano)) as duration_quantiles_state,
countIf(statusCode=2) as error_count,
count(*) as total_count,
toStartOfMinute(timestamp) as timestamp,
resourceTagsMap['environment'] as environment,
resourceTagsMap['cluster'] as cluster,
resourceTagsMap['namespace'] as namespace
FROM signoz_traces.signoz_index_v2
WHERE dest != '' and kind != 2
GROUP BY timestamp, src, dest, environment, cluster, namespace;

CREATE MATERIALIZED VIEW IF NOT EXISTS signoz_traces.dependency_graph_minutes_messaging_calls_mv_v2 ON CLUSTER cluster
TO signoz_traces.dependency_graph_minutes_v2 AS
SELECT
serviceName as src,
tagMap['messaging.system'] as dest,
quantilesState(0.5, 0.75, 0.9, 0.95, 0.99)(toFloat64(durationNano)) as duration_quantiles_state,
countIf(statusCode=2) as error_count,
count(*) as total_count,
toStartOfMinute(timestamp) as timestamp,
resourceTagsMap['environment'] as environment,
resourceTagsMap['cluster.name'] as cluster,
resourceTagsMap['namespace.name'] as namespace
FROM signoz_traces.signoz_index_v2
WHERE dest != '' and kind != 2
GROUP BY timestamp, src, dest, environment, cluster, namespace;

CREATE TABLE IF NOT EXISTS signoz_traces.distributed_dependency_graph_minutes_v2 ON CLUSTER cluster AS signoz_traces.dependency_graph_minutes_v2
ENGINE = Distributed("cluster", "signoz_traces", dependency_graph_minutes_v2, cityHash64(rand()));
31 changes: 31 additions & 0 deletions query/internal/plugins/builtin/datav/sqls/traces.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
CREATE TABLE IF NOT EXISTS signoz_traces.signoz_index ON CLUSTER {{.SIGNOZ_CLUSTER}} (
timestamp DateTime64(9) CODEC(Delta, ZSTD(1)),
traceID String CODEC(ZSTD(1)),
spanID String CODEC(ZSTD(1)),
parentSpanID String CODEC(ZSTD(1)),
serviceName LowCardinality(String) CODEC(ZSTD(1)),
name LowCardinality(String) CODEC(ZSTD(1)),
kind Int32 CODEC(ZSTD(1)),
durationNano UInt64 CODEC(ZSTD(1)),
tags Array(String) CODEC(ZSTD(1)),
tagsKeys Array(String) CODEC(ZSTD(1)),
tagsValues Array(String) CODEC(ZSTD(1)),
statusCode Int64 CODEC(ZSTD(1)),
references String CODEC(ZSTD(1)),
externalHttpMethod Nullable(String) CODEC(ZSTD(1)),
externalHttpUrl Nullable(String) CODEC(ZSTD(1)),
component Nullable(String) CODEC(ZSTD(1)),
dbSystem Nullable(String) CODEC(ZSTD(1)),
dbName Nullable(String) CODEC(ZSTD(1)),
dbOperation Nullable(String) CODEC(ZSTD(1)),
peerService Nullable(String) CODEC(ZSTD(1)),
INDEX idx_traceID traceID TYPE bloom_filter GRANULARITY 4,
INDEX idx_service serviceName TYPE bloom_filter GRANULARITY 4,
INDEX idx_name name TYPE bloom_filter GRANULARITY 4,
INDEX idx_kind kind TYPE minmax GRANULARITY 4,
INDEX idx_tagsKeys tagsKeys TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_tagsValues tagsValues TYPE bloom_filter(0.01) GRANULARITY 64,
INDEX idx_duration durationNano TYPE minmax GRANULARITY 1
) ENGINE MergeTree
PARTITION BY toDate(timestamp)
ORDER BY (serviceName, -toUnixTimestamp(timestamp));
23 changes: 23 additions & 0 deletions ui/src/types/panel/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,26 @@ export interface Units {
unitsType: UnitsType,
units: Unit[],
}


export interface NodeGraphData {
nodes: Node[]
edges: Edge[]
}

export interface Node {
id: string
label: string
data: {
[key:string]:number
}
}

export interface Edge {
source: string // source node id
target: string // target node id
label: string
data: {
[key:string]:number
}
}
69 changes: 69 additions & 0 deletions ui/src/utils/plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.


import { round } from "lodash"
import { PanelQuery } from "types/dashboard"
import { Edge, NodeGraphData, Node } from "types/panel/plugins"
import { QueryPluginData } from "types/plugin"
import { FieldType, SeriesData } from "types/seriesData"
import { jsonToEqualPairs1, parseLegendFormat } from "./format"
Expand Down Expand Up @@ -196,4 +199,70 @@ export const queryPluginDataToLogs = (data: {
data: chartData
}
}]
}

export const queryPluginDataToNodeGraph = (data: QueryPluginData, query: PanelQuery): NodeGraphData => {
const sourceIndex = data.columns.indexOf("src")
const targetIndex = data.columns.indexOf("dest")
const callsIndex = data.columns.indexOf("calls")
const errorsIndex = data.columns.indexOf("errors")
const p50Index = data.columns.indexOf("p50")
const p75Index = data.columns.indexOf("p75")
const p90Index = data.columns.indexOf("p90")
const p95Index = data.columns.indexOf("p95")
const p99Index = data.columns.indexOf("p99")

const nodesMap: Record<string,Node> = {}
const edges: Edge[] = []
for (const row of data.data) {
const callsCount = row[callsIndex]
const errorsCount = row[errorsIndex]
const successCount = callsCount- errorsCount
const latency = round(row[p99Index] / 1000000,1)
const source = row[sourceIndex]
const target = row[targetIndex]
const edge: Edge = {
source: source,
target: target,
label: `${callsCount} / ${errorsCount} / ${latency}ms`,
data: {
success: successCount,
error: errorsCount,
p50: row[p50Index] / 1000000,
p75: row[p75Index] / 1000000,
p90: row[p90Index] / 1000000,
p95: row[p95Index] / 1000000,
p99: row[p99Index] / 1000000,
}
}
edges.push(edge)

const node = nodesMap[target]
if (!node) {
nodesMap[target] = {
id: target,
label: target,
data: {
success: successCount,
error: errorsCount,
}
}
} else {
node.data.success += successCount
node.data.error += errorsCount
}

const srcNode = nodesMap[source]
if (!srcNode) nodesMap[source] = {
id: source,
label: source,
data: {}
}
}

const nodes = Object.values(nodesMap)
return {
nodes,
edges
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { isEmpty } from "utils/validate"
import { roundDsTime } from "utils/datasource"
import { $variables } from "src/views/variables/store"
import { QueryPluginResult } from "types/plugin"
import { queryPluginDataToLogs, queryPluginDataToTable, queryPluginDataToTimeSeries } from "utils/plugins"
import { queryPluginDataToLogs, queryPluginDataToNodeGraph, queryPluginDataToTable, queryPluginDataToTimeSeries } from "utils/plugins"
import { DataFormat } from "types/format"
import { $datavQueryParams } from "./store"
import { parseVariableFormat } from "utils/format"
Expand Down Expand Up @@ -102,6 +102,9 @@ export const runQuery = async (panel: Panel, q: PanelQuery, range: TimeRange, ds
case DataFormat.Logs:
data = queryPluginDataToLogs(res.data.data as any, q)
break
case DataFormat.NodeGraph:
data = queryPluginDataToNodeGraph(res.data.data as any, q)
break
default:
data = queryPluginDataToTable(res.data.data, q)
}
Expand Down

0 comments on commit d328b39

Please sign in to comment.