forked from alistairking/bgpfinder
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdb.go
170 lines (147 loc) · 4.75 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package bgpfinder
import (
"context"
"strings"
"time"
"github.com/alistairking/bgpfinder/internal/logging"
"github.com/jackc/pgx/v5/pgxpool"
)
// UpsertCollectors inserts or updates collector records.
func UpsertCollectors(ctx context.Context, logger *logging.Logger, db *pgxpool.Pool, collectors []Collector) error {
tx, err := db.Begin(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to begin transaction for UpsertCollectors")
return err
}
defer tx.Rollback(ctx)
stmt := `
INSERT INTO collectors (name, project_name)
VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE
SET project_name = EXCLUDED.project_name
`
logger.Info().Int("collector_count", len(collectors)).Msg("Upserting collectors into DB")
for _, c := range collectors {
logger.Debug().Str("collector", c.Name).Str("project", c.Project.Name).Msg("Executing upsert for collector")
ct, err := tx.Exec(ctx, stmt, c.Name, c.Project.Name)
if err != nil {
logger.Error().Err(err).Str("collector", c.Name).Msg("Failed to execute upsert")
return err
}
logger.Debug().Str("collector", c.Name).Str("command_tag", ct.String()).Msg("Executed upsert for collector")
}
err = tx.Commit(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to commit transaction for UpsertCollectors")
return err
}
logger.Info().Msg("Transaction committed successfully for UpsertCollectors")
return nil
}
// UpsertBGPDumps inserts or updates BGP dump records in batches.
func UpsertBGPDumps(ctx context.Context, logger *logging.Logger, db *pgxpool.Pool, dumps []BGPDump) error {
const batchSize = 10000 // Define an appropriate batch size
total := len(dumps)
for start := 0; start < total; start += batchSize {
end := start + batchSize
if end > total {
end = total
}
batch := dumps[start:end]
logger.Info().Int("batch_start", start).Int("batch_end", end).Int("current_batch_size", len(batch)).Msg("Upserting BGP dumps batch into DB")
tx, err := db.Begin(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to begin transaction for UpsertBGPDumps batch")
return err
}
stmt := `
INSERT INTO bgp_dumps (collector_name, url, dump_type, duration, timestamp)
VALUES ($1, $2, $3, $4, to_timestamp($5))
ON CONFLICT (collector_name, url) DO UPDATE
SET dump_type = EXCLUDED.dump_type,
duration = EXCLUDED.duration,
timestamp = EXCLUDED.timestamp
`
for _, d := range batch {
_, err := tx.Exec(ctx, stmt, d.Collector.Name, d.URL, int16(d.DumpType), d.Duration, d.Timestamp)
if err != nil {
logger.Error().Err(err).Str("collector", d.Collector.Name).Str("url", d.URL).Msg("Failed to execute upsert for BGP dump")
tx.Rollback(ctx)
return err
}
}
err = tx.Commit(ctx)
if err != nil {
logger.Error().Err(err).Msg("Failed to commit transaction for UpsertBGPDumps batch")
return err
}
logger.Info().Int("batch_start", start).Int("batch_end", end).Msg("Transaction committed successfully for UpsertBGPDumps batch")
}
return nil
}
// FetchDataFromDB retrieves BGP dump data filtered by collector names and dump types.
func FetchDataFromDB(ctx context.Context, db *pgxpool.Pool, query Query) ([]BGPDump, error) {
sqlQuery := `
SELECT url, dump_type, duration, collector_name, EXTRACT(EPOCH FROM timestamp)::bigint
FROM bgp_dumps
WHERE collector_name = ANY($1)
AND timestamp >= to_timestamp($2)
AND timestamp < to_timestamp($3)
`
if query.DumpType != DumpTypeAny {
sqlQuery += " AND dump_type = $4"
}
// Extract collector names from the query
collectorNames := make([]string, len(query.Collectors))
for i, c := range query.Collectors {
collectorNames[i] = c.Name
}
var args []interface{}
args = append(args, collectorNames, query.From.Unix(), query.Until.Unix())
if query.DumpType != DumpTypeAny {
args = append(args, int16(query.DumpType))
}
rows, err := db.Query(ctx, sqlQuery, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var results []BGPDump
for rows.Next() {
var (
url string
dumpTypeInt int16
dur interface{}
collectorName string
timestamp int64
)
err := rows.Scan(&url, &dumpTypeInt, &dur, &collectorName, ×tamp)
if err != nil {
return nil, err
}
durationVal := parseInterval(dur)
results = append(results, BGPDump{
URL: url,
DumpType: DumpType(dumpTypeInt),
Duration: durationVal,
Collector: Collector{Name: collectorName},
Timestamp: timestamp,
})
}
return results, nil
}
func parseInterval(val interface{}) time.Duration {
if val == nil {
return 0
}
if s, ok := val.(string); ok {
// Minimal parsing, improve as needed
if strings.Contains(s, "hour") {
return time.Hour
}
if strings.Contains(s, "minute") {
return time.Minute
}
}
return 0
}