-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
179 lines (147 loc) · 4.56 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package main
import (
"context"
"fmt"
"os"
"os/signal"
"time"
"github.com/philips-software/loki-cf-logdrain/handlers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/spf13/viper"
"github.com/labstack/echo/v4"
"net/http"
_ "net/http/pprof"
"go.opentelemetry.io/contrib/instrumentation/github.com/labstack/echo/otelecho"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)
var commit = "deadbeaf"
var release = "v1.2.2"
var buildVersion = release + "-" + commit
// Initializes an OTLP exporter, and configures the corresponding trace and
// metric providers.
func initProvider() (func(context.Context) error, error) {
ctx := context.Background()
res, err := resource.New(ctx,
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceName("go-hello-world"),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create resource: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
conn, err := grpc.NewClient(os.Getenv("OTLP_ADDRESS"),
// Note the use of insecure transport here. TLS is recommended in production.
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}
// Set up a trace exporter
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
// Register the trace exporter with a TracerProvider, using a batch
// span processor to aggregate spans before export.
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
otel.SetTracerProvider(tracerProvider)
// set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.TraceContext{})
// Shutdown will flush any remaining spans and shut down the exporter.
return tracerProvider.Shutdown, nil
}
func main() {
e := make(chan *echo.Echo, 1)
os.Exit(realMain(e))
}
func realMain(echoChan chan<- *echo.Echo) int {
ctx := context.Background()
shutdown, err := initProvider()
if err == nil {
defer func() {
if err := shutdown(ctx); err != nil {
fmt.Printf("failed to shutdown TracerProvider: %v\n", err)
}
}()
}
viper.SetEnvPrefix("loki-cf-logdrain")
viper.SetDefault("transport_url", "")
viper.SetDefault("promtail_endpoint", "localhost:1514")
viper.AutomaticEnv()
token := os.Getenv("TOKEN")
// Echo framework
e := echo.New()
// Tracing
tracer := otel.Tracer("loki-cf-logdrain")
e.Use(otelecho.Middleware("loki-cf-logdrain"))
healthHandler := handlers.HealthHandler{}
e.GET("/health", healthHandler.Handler(ctx, tracer))
e.GET("/api/version", handlers.VersionHandler(buildVersion))
promtailEndpoint := viper.GetString("promtail_endpoint")
syslogHandler, err := handlers.NewSyslogHandler(token, promtailEndpoint)
if err != nil {
fmt.Printf("syslogHandler: %v\n", err)
return 8
}
e.POST("/syslog/drain/:token", syslogHandler.Handler(ctx, tracer))
setupPprof()
setupInterrupts()
// RabbitMQ
fmt.Printf("Looking for RabbitMQ server..\n")
rabbitMQHandler, err := handlers.NewRabbitMQHandler(promtailEndpoint)
if err == nil {
fmt.Printf("Creating RabbitMQ worker..\n")
_, err = rabbitMQHandler.CreateWorker("log_drainer_exchange", "direct", "log_drainer_rk", "loki_queue", "loki")
if err != nil {
fmt.Printf("Error creating RabbitMQ worker: %v\n", err)
}
} else {
fmt.Printf("No RabbitMQ server found: %v\n", err)
}
echoChan <- e
exitCode := 0
if err := e.Start(listenString()); err != nil {
fmt.Printf("error: %v\n", err)
exitCode = 6
}
return exitCode
}
func setupInterrupts() {
// Setup a channel to receive a signal
done := make(chan os.Signal, 1)
// Notify this channel when a SIGINT is received
signal.Notify(done, os.Interrupt)
// Fire off a goroutine to loop until that channel receives a signal.
// When a signal is received simply exit the program
go func() {
for range done {
os.Exit(0)
}
}()
}
func setupPprof() {
go func() {
_ = http.ListenAndServe("localhost:6060", nil)
}()
}
func listenString() string {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
return ":" + port
}