This repository has been archived by the owner on Dec 2, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
126 lines (112 loc) · 3.39 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"context"
"database/sql"
"errors"
"log"
"os"
"path/filepath"
_ "github.com/glebarez/go-sqlite"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/redis/go-redis/v9"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
var ctx = context.TODO()
var rdb *redis.Client // Redis DB (rdb)
var mdb *mongo.Database // Main DB (mdb)
var udb *sql.DB // Uploads DB (udb)
var s3 *minio.Client // MinIO client (s3)
var ErrBucketDoesNotExist = errors.New("data-exports S3 bucket does not exist")
func main() {
// Load dotenv
godotenv.Load()
// Delete existing data exports in the output directory
err := filepath.Walk(os.Getenv("OUTPUT_DIR"), func(path string, info os.FileInfo, err error) error {
if path == os.Getenv("OUTPUT_DIR") || info.Name() == ".gitkeep" {
return nil
}
return os.Remove(path)
})
if err != nil {
log.Fatalln(err)
}
// Connect to Redis
opt, err := redis.ParseURL(os.Getenv("REDIS_URI"))
if err != nil {
log.Fatalln(err)
}
rdb = redis.NewClient(opt)
_, err = rdb.Ping(ctx).Result()
if err != nil {
log.Fatalln(err)
}
// Connect to the main MongoDB database
serverAPI := options.ServerAPI(options.ServerAPIVersion1)
opts := options.Client().ApplyURI(os.Getenv("MAIN_DB_URI")).SetServerAPIOptions(serverAPI)
client, err := mongo.Connect(ctx, opts)
if err != nil {
log.Fatalln(err)
}
defer client.Disconnect(ctx)
mdb = client.Database(os.Getenv("MAIN_DB_NAME"))
// Test the main database connection
var result bson.M
if err := mdb.RunCommand(ctx, bson.D{{Key: "ping", Value: 1}}).Decode(&result); err != nil {
log.Fatalln(err)
}
// Connect to the SQL uploads database
udb, err = sql.Open(os.Getenv("UPLOADS_DB_DRIVER"), os.Getenv("UPLOADS_DB_URI"))
if err != nil {
log.Fatalln(err)
}
if err := udb.Ping(); err != nil {
log.Fatalln(err)
}
// Connect to MinIO
s3, err = minio.New(os.Getenv("MINIO_ENDPOINT"), &minio.Options{
Creds: credentials.NewStaticV4(os.Getenv("MINIO_ACCESS_KEY"), os.Getenv("MINIO_SECRET_KEY"), ""),
Secure: os.Getenv("MINIO_SSL") == "1",
})
if err != nil {
log.Fatalln(err)
}
bucketExists, err := s3.BucketExists(ctx, "data-exports")
if err != nil {
log.Fatalln(err)
} else if !bucketExists {
log.Fatalln(ErrBucketDoesNotExist)
}
// Tell other running instances (if there are any) to quit
err = rdb.Publish(ctx, "data_exports", "1").Err()
if err != nil {
log.Fatalln(err)
}
// Start listening for data export requests
pubsub := rdb.Subscribe(ctx, "data_exports")
defer pubsub.Close()
rdb.Publish(ctx, "data_exports", "0")
for msg := range pubsub.Channel() {
if msg.Payload == "0" { // Look for new data export requests
// Get data export requests
var dataExports []DataExport
filter := bson.D{{Key: "status", Value: "pending"}}
opts := options.Find().SetProjection(bson.D{{Key: "_id", Value: 1}, {Key: "user", Value: 1}})
cur, err := mdb.Collection("data_exports").Find(ctx, filter, opts)
if err != nil {
log.Fatalln(err)
}
cur.All(ctx, &dataExports)
// Execute all data export requests
for _, dataExport := range dataExports {
dataExport.execute()
}
} else if msg.Payload == "1" { // Quit if another instance has started running
panic("Another instance has started running. Exiting...")
}
}
}