Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drmorr/custom flush times #7

Merged
merged 2 commits into from
Jul 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
---
github: acrlabs
9 changes: 4 additions & 5 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
name: code verification
on: [push]
on: [push] # yamllint disable-line rule:truthy

jobs:
build:
Expand Down Expand Up @@ -27,10 +28,8 @@ jobs:
uses: actions/setup-go@v4
with:
go-version-file: go.mod
- name: Lint Go code
uses: golangci/golangci-lint-action@v3
with:
version: v1.55
- name: Run pre-commit
uses: pre-commit/[email protected]
- name: Test
run: make test cover
- name: Upload coverage
Expand Down
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
---
run:
timeout: 5m

Expand Down Expand Up @@ -53,7 +54,7 @@ linters-settings:

gosec:
excludes:
- G601 # memory aliasing -- not a problem in 1.22+
- G601 # memory aliasing -- not a problem in 1.22+

importas:
no-unaliased: true
Expand Down
11 changes: 7 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
---
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
- repo: "https://github.com/pre-commit/pre-commit-hooks"
rev: v4.4.0
hooks:
- id: end-of-file-fixer
- id: check-yaml
args: ["--allow-multiple-documents"]
- id: trailing-whitespace
- repo: https://github.com/golangci/golangci-lint
- repo: "https://github.com/adrienverge/yamllint"
rev: v1.35.1
hooks:
- id: yamllint
args: ['--strict']
- repo: "https://github.com/golangci/golangci-lint"
rev: v1.55.2
hooks:
- id: golangci-lint
9 changes: 9 additions & 0 deletions .yamllint
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
extends: default

rules:
line-length:
max: 120
quoted-strings:
required: false
extra-required: [^http://, ^https://]
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ GO_COVER_FILE=$(COVERAGE_DIR)/go-coverage.txt
include build/base.mk
include build/k8s.mk

$(ARTIFACTS)::
CGO_ENABLED=0 go build -trimpath -o $(BUILD_DIR)/$@ ./cmd/.
main:
CGO_ENABLED=0 go build -ldflags "-s -w" -trimpath -o $(BUILD_DIR)/prom2parquet ./cmd/.

lint:
golangci-lint run
Expand Down
2 changes: 1 addition & 1 deletion build
Submodule build updated 4 files
+8 −10 base.mk
+19 −0 docker_tag.sh
+0 −11 get_unclean_sha.sh
+10 −3 k8s.mk
20 changes: 11 additions & 9 deletions cmd/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package main
import (
"sort"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/acrlabs/prom2parquet/pkg/backends"
)

const (
prefixFlag = "prefix"
serverPortFlag = "server-port"
backendFlag = "backend"
backendRootFlag = "backend-root"
verbosityFlag = "verbosity"
prefixFlag = "prefix"
serverPortFlag = "server-port"
flushIntervalFlag = "flush-interval"
backendFlag = "backend"
backendRootFlag = "backend-root"
verbosityFlag = "verbosity"
)

//nolint:gochecknoglobals
Expand All @@ -35,10 +37,10 @@ var logLevelIDs = map[log.Level][]string{
}

type options struct {
port int

backend backends.StorageBackend
backendRoot string
port int
flushInterval time.Duration
backend backends.StorageBackend
backendRoot string

verbosity log.Level
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"fmt"
"os"
"time"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -26,6 +27,13 @@
},
}

root.PersistentFlags().DurationVar(
&opts.flushInterval,
flushIntervalFlag,
10*time.Minute,
"data flush interval",
)

Check warning on line 36 in cmd/root.go

View check run for this annotation

Codecov / codecov/patch

cmd/root.go#L30-L36

Added lines #L30 - L36 were not covered by tests
root.PersistentFlags().IntVarP(
&opts.port,
serverPortFlag,
Expand Down
1 change: 1 addition & 0 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (self *promserver) spawnWriter(ctx context.Context, channelName string) (ch
self.opts.backendRoot,
channelName,
self.opts.backend,
self.opts.flushInterval,
)
if err != nil {
return nil, fmt.Errorf("could not create writer for %s: %w", channelName, err)
Expand Down
2 changes: 2 additions & 0 deletions k8s/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(self):
image=image,
args=[
"/prom2parquet",
"--backend", "s3",
"--backend-root", "simkube",
],
).with_env(env).with_ports(SERVER_PORT).with_security_context(Capability.DEBUG)

Expand Down
114 changes: 65 additions & 49 deletions pkg/parquet/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
const pageNum = 4

type Prom2ParquetWriter struct {
backend backends.StorageBackend
root string
prefix string
backend backends.StorageBackend
root string
prefix string
flushInterval time.Duration

currentFile string
nextFlushTime time.Time
pw *writer.ParquetWriter
currentFile string
pw *writer.ParquetWriter

clock clockwork.Clock
}
Expand All @@ -43,30 +43,50 @@
ctx context.Context,
root, prefix string,
backend backends.StorageBackend,
flushInterval time.Duration,
) (*Prom2ParquetWriter, error) {
return &Prom2ParquetWriter{
backend: backend,
root: root,
prefix: prefix,
backend: backend,
root: root,
prefix: prefix,
flushInterval: flushInterval,

Check warning on line 52 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L49-L52

Added lines #L49 - L52 were not covered by tests

clock: clockwork.NewRealClock(),
}, nil
}

func (self *Prom2ParquetWriter) Listen(stream <-chan prompb.TimeSeries) {
if err := self.flush(); err != nil {
log.Errorf("could not flush writer: %v", err)
self.listen(stream, self.getFlushTimer(), nil)

Check warning on line 59 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L59

Added line #L59 was not covered by tests
}

func (self *Prom2ParquetWriter) listen(
stream <-chan prompb.TimeSeries,
flushTimer <-chan time.Time,
running chan<- bool, // used for testing
) {
if err := self.createBackendWriter(); err != nil {
log.Errorf("could not create backend writer: %v", err)

Check warning on line 68 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L68

Added line #L68 was not covered by tests
return
}
defer self.closeFile()

flushTicker := time.NewTicker(time.Minute)
// self.pw is a pointer to the writer instance, but it can get switched
// out from under us whenever we flush; go defer evaluates the function
// args when the defer call happens, not when the deferred function actually
// executes, so here we need to use a double pointer so that we can make
// sure we're closing the actual correct writer instance
defer func(pw **writer.ParquetWriter) {
closeFile(*pw)
close(running)
}(&self.pw)

if running != nil {
running <- true
}

for {
select {
case ts, ok := <-stream:
if !ok {
self.closeFile()
return
}

Expand All @@ -79,36 +99,29 @@
log.Errorf("could not write datapoint: %v", err)
}
}
case <-flushTicker.C:
if time.Now().After(self.nextFlushTime) {
log.Infof("Flush triggered: %v >= %v", time.Now(), self.nextFlushTime)
if err := self.flush(); err != nil {
log.Errorf("could not flush data: %v", err)
return
}
case <-flushTimer:
flushTimer = self.getFlushTimer()
log.Infof("flush triggered for %v", self.currentFile)

// Run this in a separate goroutine so that writing the data
// to S3 (with throttling or whatever) doesn't block the new incoming
// datapoints
go closeFile(self.pw)
if err := self.createBackendWriter(); err != nil {
log.Errorf("could not create backend writer: %v", err)
return

Check warning on line 112 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L111-L112

Added lines #L111 - L112 were not covered by tests
}
}
}
}

func (self *Prom2ParquetWriter) closeFile() {
if self.pw != nil {
if err := self.pw.WriteStop(); err != nil {
log.Errorf("can't close parquet writer: %v", err)
}
self.pw = nil
}
}

func (self *Prom2ParquetWriter) flush() error {
now := self.clock.Now().UTC()

self.closeFile()
func (self *Prom2ParquetWriter) createBackendWriter() error {
basename := self.now().Truncate(self.flushInterval).Format("20060102150405")
self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, basename)

self.currentFile = fmt.Sprintf("%s/%s.parquet", self.prefix, now.Format("2006010215"))
fw, err := backends.ConstructBackendForFile(self.root, self.currentFile, self.backend)
if err != nil {
return fmt.Errorf("can't create storage backend writer: %w", err)
return fmt.Errorf("can't create storage backend: %w", err)

Check warning on line 124 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L124

Added line #L124 was not covered by tests
}

pw, err := writer.NewParquetWriter(fw, new(DataPoint), pageNum)
Expand All @@ -118,21 +131,24 @@
pw.CompressionType = parquet.CompressionCodec_SNAPPY

self.pw = pw
self.advanceFlushTime(&now)

return nil
}

func (self *Prom2ParquetWriter) advanceFlushTime(now *time.Time) {
nextHour := now.Add(time.Hour)
self.nextFlushTime = time.Date(
nextHour.Year(),
nextHour.Month(),
nextHour.Day(),
nextHour.Hour(),
0,
0,
0,
nextHour.Location(),
)
func (self *Prom2ParquetWriter) getFlushTimer() <-chan time.Time {
now := self.now()
nextFlushTime := now.Truncate(self.flushInterval).Add(self.flushInterval)
return time.After(nextFlushTime.Sub(now))
}

func (self *Prom2ParquetWriter) now() time.Time {
return self.clock.Now().UTC()
}

func closeFile(pw *writer.ParquetWriter) {
if pw != nil {
if err := pw.WriteStop(); err != nil {
log.Errorf("can't close parquet writer: %v", err)
}

Check warning on line 152 in pkg/parquet/writer.go

View check run for this annotation

Codecov / codecov/patch

pkg/parquet/writer.go#L151-L152

Added lines #L151 - L152 were not covered by tests
}
}
Loading
Loading