Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drmorr/testing #3

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ linters-settings:
disabled-checks:
- appendAssign

gosec:
excludes:
- G601 # memory aliasing -- not a problem in 1.22+

importas:
no-unaliased: true
alias:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ lint:

test:
mkdir -p $(COVERAGE_DIR)
go test -coverprofile=$(GO_COVER_FILE) ./...
go test -v -coverprofile=$(GO_COVER_FILE) ./...

cover:
go tool cover -func=$(GO_COVER_FILE)
37 changes: 26 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
![build status](https://github.com/acrlabs/prom2parquet/actions/workflows/verify.yml/badge.svg)

# prom2parquet

Remote write target for Prometheus that saves metrics to parquet files

**This should be considered an alpha project**
**⚠️ This should be considered an alpha project. ⚠️**
In particular, the schema for the saved Parquet files is likely to change in the future.

## Overview

Expand All @@ -25,30 +28,31 @@ Usage:
prom2parquet [flags]

Flags:
--clean-local-storage delete pod-local parquet files upon flush
--backend backend supported remote backends for saving parquet files
(valid options: none, s3/aws) (default local)
--backend-root string root path/location for the specified backend (e.g. bucket name for AWS S3)
(default "/data")
-h, --help help for prom2parquet
--prefix string directory prefix for saving parquet files
--remote remote supported remote endpoints for saving parquet files
(valid options: none, s3/aws) (default none)
-p, --server-port int port for the remote write endpoint to listen on (default 1234)
-v, --verbosity verbosity log level (valid options: debug, error, fatal, info, panic, trace, warning/warn)
(default info)
```

Here is a brief overview of the options:

### clean-local-storage
### backend

To reduce pod-local storage, you can configure prom2parquet to remove all parquet files after they've been written
(currently once per hour). This is generally not very useful unless you've also configured a remote storage option.
Where to store the Parquet files;; currently supports pod-local storage and AWS S3.

### prefix
### backend-root

This option provides a prefix that can be used to differentiate between metrics collections.
"Root" location for the backend storage. For pod-local storage this is the base directory, for AWS S3 this is the
bucket name.

### remote
### prefix

Whether to save the parquet files to some remote storage; currently the only supported remote storage option is AWS S3.
This option provides a prefix that can be used to differentiate between metrics collections.

### server-port

Expand Down Expand Up @@ -89,6 +93,17 @@ the executable, create and push the Docker images, and deploy to the configured
All build artifacts are placed in the `.build/` subdirectory. You can remove this directory or run `make clean` to
clean up.

### Testing

Run `make test` to run all the unit/integration tests. If you want to test using pod-local storage, and you want to
flush the Parquet files to disk without terminating the pod (e.g., so you can copy them elsewhere), you can send the
process a SIGUSR1:

```
> kubectl exec prom2parquet-pod -- kill -s SIGUSR1 <pid>
> kubectl cp prom2parquet-pod:/path/to/files ./
```

### Code of Conduct

Applied Computing Research Labs has a strict code of conduct we expect all contributors to adhere to. Please read the
Expand Down
22 changes: 11 additions & 11 deletions cmd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ import (

log "github.com/sirupsen/logrus"

"github.com/acrlabs/prom2parquet/pkg/remotes"
"github.com/acrlabs/prom2parquet/pkg/backends"
)

const (
prefixFlag = "prefix"
serverPortFlag = "server-port"
cleanLocalStorageFlag = "clean-local-storage"
remoteFlag = "remote"
verbosityFlag = "verbosity"
prefixFlag = "prefix"
serverPortFlag = "server-port"
backendFlag = "backend"
backendRootFlag = "backend-root"
verbosityFlag = "verbosity"
)

//nolint:gochecknoglobals
var supportedRemoteIDs = map[remotes.Endpoint][]string{
remotes.None: {"none"},
remotes.S3: {"s3", "aws"},
var supportedBackendIDs = map[backends.StorageBackend][]string{
backends.Local: {"local"},
backends.S3: {"s3", "aws"},
}

//nolint:gochecknoglobals
Expand All @@ -38,8 +38,8 @@ type options struct {
prefix string
port int

cleanLocalStorage bool
remote remotes.Endpoint
backend backends.StorageBackend
backendRoot string

verbosity log.Level
}
Expand Down
22 changes: 12 additions & 10 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,22 @@
"port for the remote write endpoint to listen on",
)

root.PersistentFlags().BoolVar(
&opts.cleanLocalStorage,
cleanLocalStorageFlag,
false,
"delete pod-local parquet files upon flush",
)
root.PersistentFlags().Var(
enumflag.New(&opts.remote, remoteFlag, supportedRemoteIDs, enumflag.EnumCaseInsensitive),
remoteFlag,
enumflag.New(&opts.backend, backendFlag, supportedBackendIDs, enumflag.EnumCaseInsensitive),
backendFlag,

Check warning on line 45 in cmd/root.go

View check run for this annotation

Codecov / codecov/patch

cmd/root.go#L44-L45

Added lines #L44 - L45 were not covered by tests
fmt.Sprintf(
"supported remote endpoints for saving parquet files\n(valid options: %s)",
validArgs(supportedRemoteIDs),
"supported remote backends for saving parquet files\n(valid options: %s)",
validArgs(supportedBackendIDs),

Check warning on line 48 in cmd/root.go

View check run for this annotation

Codecov / codecov/patch

cmd/root.go#L47-L48

Added lines #L47 - L48 were not covered by tests
),
)

root.PersistentFlags().StringVar(
&opts.backendRoot,
backendRootFlag,
"/data",
"root path/location for the specified backend (e.g. bucket name for AWS S3)",
)

Check warning on line 58 in cmd/root.go

View check run for this annotation

Codecov / codecov/patch

cmd/root.go#L52-L58

Added lines #L52 - L58 were not covered by tests
root.PersistentFlags().VarP(
enumflag.New(&opts.verbosity, verbosityFlag, logLevelIDs, enumflag.EnumCaseInsensitive),
verbosityFlag,
Expand All @@ -66,6 +67,7 @@

func start(opts *options) {
util.SetupLogging(opts.verbosity)
log.Infof("running with options: %v", opts)

Check warning on line 70 in cmd/root.go

View check run for this annotation

Codecov / codecov/patch

cmd/root.go#L70

Added line #L70 was not covered by tests

server := newServer(opts)
server.run()
Expand Down
57 changes: 31 additions & 26 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
opts *options
channels map[string]chan prompb.TimeSeries

m sync.RWMutex
m sync.RWMutex
flushChannel chan os.Signal
killChannel chan os.Signal
}

func newServer(opts *options) *promserver {
Expand All @@ -37,18 +39,18 @@
httpserv: &http.Server{Addr: fulladdr, Handler: mux, ReadHeaderTimeout: 10 * time.Second},
opts: opts,
channels: map[string]chan prompb.TimeSeries{},

flushChannel: make(chan os.Signal, 1),
killChannel: make(chan os.Signal, 1),
}
mux.HandleFunc("/receive", s.metricsReceive)

return s
}

func (self *promserver) run() {
flushChannel := make(chan os.Signal, 1)
signal.Notify(flushChannel, syscall.SIGUSR1)

killChannel := make(chan os.Signal, 1)
signal.Notify(killChannel, syscall.SIGTERM)
signal.Notify(self.flushChannel, syscall.SIGUSR1)
signal.Notify(self.killChannel, syscall.SIGTERM)

endChannel := make(chan struct{}, 1)

Expand All @@ -59,15 +61,15 @@
}()

go func() {
<-killChannel
<-self.killChannel
self.handleShutdown()
close(endChannel)
}()

go func() {
<-flushChannel
log.Infof("SIGUSR1 received")
self.stopServer(true)
<-self.flushChannel
log.Infof("SIGUSR1 received; sleeping indefinitely")
self.stopServer()
}()

log.Infof("server listening on %s", self.httpserv.Addr)
Expand All @@ -82,15 +84,15 @@
}
}()

self.stopServer(false)
self.stopServer()
timer := time.AfterFunc(shutdownTime, func() {
os.Exit(0)
})

<-timer.C
}

func (self *promserver) stopServer(stayAlive bool) {
func (self *promserver) stopServer() {
log.Infof("flushing all data files")
for _, ch := range self.channels {
close(ch)
Expand All @@ -101,11 +103,6 @@
if err := self.httpserv.Shutdown(ctxTimeout); err != nil {
log.Errorf("failed shutting server down: %v", err)
}

if stayAlive {
log.Infof("sleeping indefinitely")
select {}
}
}

func (self *promserver) metricsReceive(w http.ResponseWriter, req *http.Request) {
Expand All @@ -115,10 +112,18 @@
return
}

for _, ts := range body.Timeseries {
if err := self.sendTimeseries(req.Context(), body.Timeseries); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

Check warning on line 118 in cmd/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/server.go#L115-L118

Added lines #L115 - L118 were not covered by tests
}

func (self *promserver) sendTimeseries(ctx context.Context, timeserieses []prompb.TimeSeries) (err error) {
for _, ts := range timeserieses {
// I'm not 100% sure which of these things would be recreated/shadowed below, so to be safe
// I'm just declaring everything upfront
var ch chan prompb.TimeSeries
var ok bool
var err error

nameLabel, _ := lo.Find(ts.Labels, func(i prompb.Label) bool { return i.Name == model.MetricNameLabel })
metricName := nameLabel.Value
Expand All @@ -130,15 +135,16 @@
self.m.RUnlock()

if !ok {
ch, err = self.spawnWriter(req.Context(), metricName)
ch, err = self.spawnWriter(ctx, metricName)

Check warning on line 138 in cmd/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/server.go#L138

Added line #L138 was not covered by tests
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return fmt.Errorf("could not spawn timeseries writer for %s: %w", metricName, err)

Check warning on line 140 in cmd/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/server.go#L140

Added line #L140 was not covered by tests
}
}

ch <- ts
}

return nil
}

func (self *promserver) spawnWriter(ctx context.Context, metricName string) (chan prompb.TimeSeries, error) {
Expand All @@ -148,19 +154,18 @@
log.Infof("new metric name seen, creating writer %s", metricName)
writer, err := parquet.NewProm2ParquetWriter(
ctx,
"/data",
self.opts.backendRoot,
self.opts.prefix,
metricName,
self.opts.cleanLocalStorage,
self.opts.remote,
self.opts.backend,
)
if err != nil {
return nil, fmt.Errorf("could not create writer for %s: %w", metricName, err)
}
ch := make(chan prompb.TimeSeries)
self.channels[metricName] = ch

go writer.Listen(ch)
go writer.Listen(ch) //nolint:contextcheck // the req context and the backend creation context should be separate

return ch, nil
}
Loading
Loading