Skip to content

Commit

Permalink
fix: 获取数据源支持 cache,修正编码中的小 bug
Browse files Browse the repository at this point in the history
  • Loading branch information
cobolbaby committed Jun 18, 2024
1 parent 6d3b5fa commit 7a6f057
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 79 deletions.
75 changes: 54 additions & 21 deletions example/grafana/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (
"fmt"
"net/url"
"os"
"strings"
"regexp"
"sync"

"pg_lineage/internal/lineage"
C "pg_lineage/pkg/config"
Expand All @@ -22,7 +23,15 @@ import (
"github.com/neo4j/neo4j-go-driver/v4/neo4j"
)

var config C.Config
type DataSourceCache struct {
cache map[string]*models.DataSource
mu sync.Mutex
}

var (
dsMatchRule *regexp.Regexp
config C.Config
)

func init() {
configFile := flag.String("c", "./config.yaml", "path to config.yaml")
Expand All @@ -37,6 +46,7 @@ func init() {
fmt.Println(err)
os.Exit(1)
}
dsMatchRule = regexp.MustCompile(config.Grafana.DataSourceMatchRules)
}

func main() {
Expand Down Expand Up @@ -88,6 +98,10 @@ func processDashboards(client *grafanaclient.GrafanaHTTPAPI, neo4jDriver neo4j.D
pageVar := int64(1)
limitVar := int64(100)

dsCache := &DataSourceCache{
cache: make(map[string]*models.DataSource),
}

for {
params := grafanasearch.NewSearchParams().WithType(&typeVar).WithPage(&pageVar).WithLimit(&limitVar)
dashboards, err := client.Search.Search(params)
Expand All @@ -100,7 +114,7 @@ func processDashboards(client *grafanaclient.GrafanaHTTPAPI, neo4jDriver neo4j.D
}

for _, dashboardItem := range dashboards.Payload {
err := processDashboardItem(client, neo4jDriver, db, dashboardItem)
err := processDashboardItem(client, neo4jDriver, db, dashboardItem, dsCache)
if err != nil {
log.Errorf("Error processing dashboard item: %v", err)
}
Expand All @@ -112,73 +126,87 @@ func processDashboards(client *grafanaclient.GrafanaHTTPAPI, neo4jDriver neo4j.D
return nil
}

func processDashboardItem(client *grafanaclient.GrafanaHTTPAPI, neo4jDriver neo4j.Driver, db *sql.DB, dashboardItem *models.Hit) error {
func processDashboardItem(client *grafanaclient.GrafanaHTTPAPI, neo4jDriver neo4j.Driver, db *sql.DB, dashboardItem *models.Hit, cache *DataSourceCache) error {
dashboardFullWithMeta, err := client.Dashboards.GetDashboardByUID(dashboardItem.UID)
if err != nil {
return fmt.Errorf("error getting dashboard by UID: %v", err)
}

var dashboard lineage.DashboardFullWithMeta
var dashboard *lineage.DashboardFullWithMeta
dashboardJSON, err := json.Marshal(dashboardFullWithMeta.Payload)
if err != nil {
return fmt.Errorf("error marshalling dashboard JSON: %v", err)
}

err = json.Unmarshal(dashboardJSON, &dashboard)
if err != nil {
if err = json.Unmarshal(dashboardJSON, &dashboard); err != nil {
return fmt.Errorf("error unmarshalling dashboard JSON: %v", err)
}

log.Debugf("Dashboard Title: %s\n", dashboard.Dashboard.Title)
log.Debugf("Dashboard Title: %s", dashboard.Dashboard.Title)
for _, panel := range dashboard.Dashboard.Panels {
if panel.Datasource == nil {
continue
}

var datasource *models.DataSource

if datasourceName, ok := panel.Datasource.(string); ok {
datasource, err := client.Datasources.GetDataSourceByName(datasourceName)
if err != nil {
log.Errorf("Error getting datasource by name: %v", err)
continue
cache.mu.Lock()
var found bool
if datasource, found = cache.cache[datasourceName]; !found {
ds, err := client.Datasources.GetDataSourceByName(datasourceName)
if err != nil {
cache.mu.Unlock()
log.Errorf("Error getting datasource by name %s: %v", datasourceName, err)
continue
}
datasource = ds.Payload
cache.cache[datasourceName] = ds.Payload
}
log.Debugf("Datasource Name: %s, Datasource Type: %s\n", datasource.Payload.Name, datasource.Payload.Type)
cache.mu.Unlock()

log.Debugf("Datasource Name: %s, Datasource Type: %s, Datasource Database: %s", datasource.Name, datasource.Type, datasource.Database)

if datasource.Payload.Type != "postgres" || !strings.Contains(config.Postgres.DSN, datasource.Payload.URL) {
if datasource.Type != "postgres" || datasource.Database != "bdc" {
continue
}
if !dsMatchRule.MatchString(datasource.URL) {
continue
}

} else {
log.Error("Datasource is not a string")
log.Errorf("Datasource is %v, not a string", panel.Datasource)
continue
}

log.Debugf("Panel ID: %d, Panel Type: %s, Panel Title: %s\n", panel.ID, panel.Type, panel.Title)
log.Debugf("Panel ID: %d, Panel Type: %s, Panel Title: %s", panel.ID, panel.Type, panel.Title)

dependencies, err := getPanelDependencies(db, panel)
dependencies, err := getPanelDependencies(panel, db)
if err != nil {
log.Errorf("Error getting panel dependencies: %v", err)
continue
}

if len(dependencies) > 0 {
lineage.CreatePanelGraph(neo4jDriver.NewSession(neo4j.SessionConfig{}), &panel, &dashboard, dependencies)
lineage.CreatePanelGraph(neo4jDriver.NewSession(neo4j.SessionConfig{}), panel, dashboard, dependencies)
}
}

return nil
}

func getPanelDependencies(db *sql.DB, panel lineage.Panel) ([]*lineage.Table, error) {
func getPanelDependencies(panel *lineage.Panel, db *sql.DB) ([]*lineage.Table, error) {
var dependencies []*lineage.Table

for _, t := range panel.Targets {
var r []*lineage.Table

if t.RawSQL != "" {
log.Debugf("Panel Datasource: %s, Panel RawSQL: %s\n", panel.Datasource, t.RawSQL)
log.Debugf("Panel Datasource: %s, Panel RawSQL: %s", panel.Datasource, t.RawSQL)
r, _ = parseRawSQL(t.RawSQL, db)
}
if t.Query != "" {
log.Debugf("Panel Datasource: %s, Panel Query: %s\n", panel.Datasource, t.Query)
log.Debugf("Panel Datasource: %s, Panel Query: %s", panel.Datasource, t.Query)
r, _ = parseRawSQL(t.Query, db)
}

Expand All @@ -203,9 +231,14 @@ func parseRawSQL(rawsql string, db *sql.DB) ([]*lineage.Table, error) {
return nil, err
}

// TODO:操作得前置
sqlTree.SetNamespace(config.Postgres.Alias)

var depTables []*lineage.Table
for _, v := range sqlTree.ShrinkGraph().GetNodes() {
if r, ok := v.(*lineage.Table); ok {
r.Database = sqlTree.GetNamespace()

depTables = append(depTables, r)
}
}
Expand Down
14 changes: 4 additions & 10 deletions internal/erd/erd.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func CreateGraph(session neo4j.Session, relationShips map[string]*RelationShip)

func CreateNode(tx neo4j.Transaction, r *Column) error {
// CREATE CONSTRAINT ON (cc:ERD:PG) ASSERT cc.id IS UNIQUE
records, err := tx.Run(`
_, err := tx.Run(`
MERGE (n:ERD:PG:`+r.Schema+` {id: $id})
ON CREATE SET n.schemaname = $schemaname, n.relname = $relname, n.udt = timestamp()
ON MATCH SET n.udt = timestamp()
Expand All @@ -56,15 +56,8 @@ func CreateNode(tx neo4j.Transaction, r *Column) error {
"schemaname": r.Schema,
"relname": r.RelName,
})
// In face of driver native errors, make sure to return them directly.
// Depending on the error, the driver may try to execute the function again.
if err != nil {
return err
}
if _, err := records.Single(); err != nil {
return err
}
return nil

return err
}

func CreateEdge(tx neo4j.Transaction, id string, r *RelationShip) error {
Expand Down Expand Up @@ -96,5 +89,6 @@ func CreateEdge(tx neo4j.Transaction, id string, r *RelationShip) error {
"rarg": r.TColumn.Schema + "." + r.TColumn.RelName + "." + r.TColumn.Field,
"type": r.Type,
})

return err
}
46 changes: 20 additions & 26 deletions internal/lineage/lineage_grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Panel struct {

type Dashboard struct {
ID int `json:"id"`
Panels []Panel `json:"panels"`
Panels []*Panel `json:"panels"`
Tags []string `json:"tags"`
Templating struct {
List []struct {
Expand Down Expand Up @@ -69,7 +69,7 @@ func CreatePanelGraph(session neo4j.Session, p *Panel, d *DashboardFullWithMeta,
}
defer tx.Close()

if _, err := CreatePanelNode(tx, p, d); err != nil {
if err := CreatePanelNode(tx, p, d); err != nil {
return fmt.Errorf("failed to insert Panel node: %w", err)
}

Expand All @@ -83,53 +83,47 @@ func CreatePanelGraph(session neo4j.Session, p *Panel, d *DashboardFullWithMeta,
}

// 提交事务
if err = tx.Commit(); err != nil {
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

// 创建图中节点
func CreatePanelNode(tx neo4j.Transaction, p *Panel, d *DashboardFullWithMeta) (*Panel, error) {
func CreatePanelNode(tx neo4j.Transaction, p *Panel, d *DashboardFullWithMeta) error {
// 需要将 ID 作为唯一主键
// CREATE CONSTRAINT ON (cc:Lineage:Grafana) ASSERT cc.id IS UNIQUE
records, err := tx.Run(`
_, err := tx.Run(`
MERGE (n:Lineage:Grafana:`+d.Meta.FolderTitle+` {id: $id})
ON CREATE SET n.dashboard = $dashboard, n.panel = $panel, n.rawsql = $rawsql, n.udt = timestamp()
ON CREATE SET n.dashboard_title = $dashboard_title, n.dashboard_uid = $dashboard_uid,
n.panel_type = $panel_type, n.panel_title = $panel_title, n.panel_description = $panel_description,
n.rawsql = $rawsql, n.udt = timestamp(),
ON MATCH SET n.udt = timestamp()
RETURN n.id
`,
map[string]interface{}{
"id": fmt.Sprintf("%d:%d", d.Dashboard.ID, p.ID),
"dashboard": d.Dashboard.Title,
"panel": p.Title,
"rawsql": "",
"id": fmt.Sprintf("%d:%d", d.Dashboard.ID, p.ID),
"dashboard_title": d.Dashboard.Title,
"dashboard_uid": d.Dashboard.UID,
"panel_type": p.Type,
"panel_title": p.Title,
"panel_description": p.Description,
"rawsql": "",
})
// In face of driver native errors, make sure to return them directly.
// Depending on the error, the driver may try to execute the function again.
if err != nil {
return nil, err
}
record, err := records.Single()
if err != nil {
return nil, err
}
// You can also retrieve values by name, with e.g. `id, found := record.Get("n.id")`
p.ID = record.Values[0].(int)
return p, nil

return err
}

// 创建图中边
func CreatePanelEdge(tx neo4j.Transaction, p *Panel, d *DashboardFullWithMeta, t *Table) error {
_, err := tx.Run(`
MATCH (pnode:Lineage:PG:$schmea {id: $pid}), (cnode:Lineage:Grafana {id: $cid})
MATCH (pnode:Lineage:PG:`+t.SchemaName+` {id: $pid}), (cnode:Lineage:Grafana {id: $cid})
CREATE (pnode)-[e:DownStream {udt: timestamp()}]->(cnode)
RETURN e
`, map[string]interface{}{
"schema": t.SchemaName,
"pid": fmt.Sprintf("%s.%s.%s", t.Database, t.SchemaName, t.RelName),
"cid": fmt.Sprintf("%d:%d", d.Dashboard.ID, p.ID),
"pid": fmt.Sprintf("%s.%s.%s", t.Database, t.SchemaName, t.RelName),
"cid": fmt.Sprintf("%d:%d", d.Dashboard.ID, p.ID),
})

return err
Expand Down
27 changes: 9 additions & 18 deletions internal/lineage/lineage_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func CreateGraph(session neo4j.Session, graph *depgraph.Graph, udf *Udf) error {
r.Database = graph.GetNamespace()
r.Calls = udf.Calls

if _, err := CreateNode(tx, r); err != nil {
if err := CreateNode(tx, r); err != nil {
return nil, err
}
}
Expand All @@ -51,7 +51,7 @@ func CreateGraph(session neo4j.Session, graph *depgraph.Graph, udf *Udf) error {
udf.DestID = kk
udf.Database = graph.GetNamespace()

if _, err := CreateEdge(tx, udf); err != nil {
if err := CreateEdge(tx, udf); err != nil {
return nil, err
}
}
Expand All @@ -69,10 +69,10 @@ func CreateGraph(session neo4j.Session, graph *depgraph.Graph, udf *Udf) error {
// 3. 对现有服务的改造,就时重写下面两个方法

// 创建图中节点
func CreateNode(tx neo4j.Transaction, r *Table) (*Table, error) {
func CreateNode(tx neo4j.Transaction, r *Table) error {
// 需要将 ID 作为唯一主键
// CREATE CONSTRAINT ON (cc:Lineage:PG) ASSERT cc.id IS UNIQUE
records, err := tx.Run(`
_, err := tx.Run(`
MERGE (n:Lineage:PG:`+r.SchemaName+` {id: $id})
ON CREATE SET n.database = $database, n.schemaname = $schemaname, n.relname = $relname, n.udt = timestamp(),
n.relpersistence = $relpersistence, n.calls = $calls
Expand All @@ -87,22 +87,12 @@ func CreateNode(tx neo4j.Transaction, r *Table) (*Table, error) {
"relpersistence": r.RelPersistence,
"calls": r.Calls,
})
// In face of driver native errors, make sure to return them directly.
// Depending on the error, the driver may try to execute the function again.
if err != nil {
return nil, err
}
record, err := records.Single()
if err != nil {
return nil, err
}
// You can also retrieve values by name, with e.g. `id, found := record.Get("n.id")`
r.ID = record.Values[0].(string)
return r, nil

return err
}

// 创建图中边
func CreateEdge(tx neo4j.Transaction, r *Udf) (*Udf, error) {
func CreateEdge(tx neo4j.Transaction, r *Udf) error {
_, err := tx.Run(`
MATCH (pnode {id: $pid}), (cnode {id: $cid})
CREATE (pnode)-[e:DownStream {id: $id, database: $database, schemaname: $schemaname, procname: $procname, calls: $calls, udt: timestamp()}]->(cnode)
Expand All @@ -117,7 +107,7 @@ func CreateEdge(tx neo4j.Transaction, r *Udf) (*Udf, error) {
"calls": r.Calls,
})

return nil, err
return err
}

func CompleteLineageGraphInfo(session neo4j.Session, r *Table) error {
Expand Down Expand Up @@ -149,5 +139,6 @@ func CompleteLineageGraphInfo(session neo4j.Session, r *Table) error {
}
return result.Consume()
})

return err
}
9 changes: 5 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ type Config struct {
} `mapstructure:"neo4j"`
Log log.LoggerConfig
Grafana struct {
Host string `mapstructure:"host"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
OrgID int64 `mapstructure:"org_id"`
Host string `mapstructure:"host"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
OrgID int64 `mapstructure:"org_id"`
DataSourceMatchRules string `mapstructure:"ds_match_rules"`
} `mapstructure:"grafana"`
}

Expand Down

0 comments on commit 7a6f057

Please sign in to comment.