Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #20 from danicaceres1998/upgrade-packages-and-refa…
Browse files Browse the repository at this point in the history
…ctor-code

pgx/v5 & rds/csv integration & dockerization
  • Loading branch information
chill authored Nov 24, 2023
2 parents 48dd38c + fb59947 commit d448e96
Show file tree
Hide file tree
Showing 16 changed files with 707 additions and 225 deletions.
20 changes: 20 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM golang:1.21.4-alpine AS build-stage

WORKDIR /app

COPY go.mod go.sum ./
RUN go mod download

COPY ./ ./

RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./pgreplay ./cmd/pgreplay/main.go

# Deploy the application binary into a lean image
FROM alpine:latest
RUN adduser -D pgreplay-user

COPY --from=build-stage /app/pgreplay /bin/pgreplay

USER pgreplay-user

CMD [ "sh" ]
17 changes: 10 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ PROG=bin/pgreplay
PROJECT=github.com/gocardless/pgreplay-go
VERSION=$(shell git rev-parse --short HEAD)-dev
BUILD_COMMAND=go build -ldflags "-X main.Version=$(VERSION)"
DB_CONN_CONFIG=-h localhost -p 5432 -U postgres
DOCKER_CONN_CONFIG=-h postgres -p 5432 -U postgres

.PHONY: all darwin linux test clean

Expand All @@ -16,20 +18,21 @@ bin/%:
$(BUILD_COMMAND) -o $@ cmd/$*/main.go

createdb:
psql postgres -c "DROP GROUP IF EXISTS pgreplay_test_users; CREATE GROUP pgreplay_test_users WITH LOGIN CREATEDB;"
psql postgres -U pgreplay_test_users -c "CREATE DATABASE pgreplay_test;"
psql pgreplay_test -c "DROP ROLE IF EXISTS alice; CREATE ROLE alice LOGIN;"
psql pgreplay_test -c "DROP ROLE IF EXISTS bob; CREATE ROLE bob LOGIN;"
psql pgreplay_test -c "ALTER GROUP pgreplay_test_users ADD USER alice, bob;"
psql $(DB_CONN_CONFIG) -d postgres -c "CREATE DATABASE pgreplay_test;"

dropdb:
psql postgres -c "DROP DATABASE IF EXISTS pgreplay_test;"
psql $(DB_CONN_CONFIG) -c "DROP DATABASE IF EXISTS pgreplay_test;"

structure:
psql pgreplay_test -U pgreplay_test_users -f pkg/pgreplay/integration/testdata/structure.sql
psql $(DB_CONN_CONFIG) -d pgreplay_test -f pkg/pgreplay/integration/testdata/structure.sql

recreatedb: dropdb createdb structure

createdbdocker:
psql $(DOCKER_CONN_CONFIG) -c "DROP DATABASE IF EXISTS pgreplay_test;"
psql $(DOCKER_CONN_CONFIG) -d postgres -c "CREATE DATABASE pgreplay_test;"
psql $(DOCKER_CONN_CONFIG) -d pgreplay_test -f pkg/pgreplay/integration/testdata/structure.sql

# go get -u github.com/onsi/ginkgo/ginkgo
test:
ginkgo -v -r
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ ALTER SYSTEM SET log_min_duration_statement=0;
SELECT pg_reload_conf();
```

Or, if you need to capture logs for an RDS instance, you can use these parameters in your
instances parameter group:

```
log_destination = csvlog
log_connections = 1
log_disconnections = 1
log_min_error_statement = log
log_min_messages = error
log_statement = all
log_min_duration_statement = 0
```

### 2. Take snapshot

Now we're emitting logs we need to snapshot the database so that we can later
Expand Down
83 changes: 61 additions & 22 deletions cmd/pgreplay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@ package main

import (
"bufio"
"context"
"fmt"
stdlog "log"
"net/http"
"os"
"runtime"
"time"

"github.com/alecthomas/kingpin"
kitlog "github.com/go-kit/kit/log"
level "github.com/go-kit/kit/log/level"
kingpin "github.com/alecthomas/kingpin/v2"
kitlog "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gocardless/pgreplay-go/pkg/pgreplay"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"

"github.com/jackc/pgx"
)

var logger kitlog.Logger
Expand All @@ -28,12 +25,13 @@ var (
debug = app.Flag("debug", "Enable debug logging").Default("false").Bool()
startFlag = app.Flag("start", "Play logs from this time onward ("+pgreplay.PostgresTimestampFormat+")").String()
finishFlag = app.Flag("finish", "Stop playing logs at this time ("+pgreplay.PostgresTimestampFormat+")").String()
metricsAddress = app.Flag("metrics-address", "Address to bind HTTP metrics listener").Default("127.0.0.1").String()
metricsAddress = app.Flag("metrics-address", "Address to bind HTTP metrics listener").Default("0.0.0.0").String()
metricsPort = app.Flag("metrics-port", "Port to bind HTTP metrics listener").Default("9445").Uint16()

filter = app.Command("filter", "Process an errlog file into a pgreplay preprocessed JSON log")
filterJsonInput = filter.Flag("json-input", "JSON input file").ExistingFile()
filterErrlogInput = filter.Flag("errlog-input", "Postgres errlog input file").ExistingFile()
filterCsvLogInput = filter.Flag("csvlog-input", "Postgres CSV log input file").ExistingFile()
filterOutput = filter.Flag("output", "JSON output file").String()
filterNullOutput = filter.Flag("null-output", "Don't output anything, for testing parsing only").Bool()

Expand All @@ -42,9 +40,10 @@ var (
runPort = run.Flag("port", "PostgreSQL database port").Default("5432").Uint16()
runDatname = run.Flag("database", "PostgreSQL root database").Default("postgres").String()
runUser = run.Flag("user", "PostgreSQL root user").Default("postgres").String()
runPassword = run.Flag("password", "PostgreSQL root user password").Default("postgres").String()
runPassword = run.Flag("password", "PostgreSQl password user (the default value is obtained from the DB_PASSWORD env var)").Default(os.Getenv("DB_PASSWORD")).String()
runReplayRate = run.Flag("replay-rate", "Rate of playback, will execute queries at Nx speed").Default("1").Float()
runErrlogInput = run.Flag("errlog-input", "Path to PostgreSQL errlog").ExistingFile()
runCsvLogInput = run.Flag("csvlog-input", "Path to PostgreSQL CSV log").ExistingFile()
runJsonInput = run.Flag("json-input", "Path to preprocessed pgreplay JSON log file").ExistingFile()
)

Expand All @@ -61,11 +60,8 @@ func main() {
logger = level.NewFilter(logger, level.AllowInfo())
}

go func() {
logger.Log("event", "metrics.listen", "address", *metricsAddress, "port", *metricsPort)
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf("%s:%v", *metricsAddress, *metricsPort), nil)
}()
// Starting the Prometheus Server
server := pgreplay.StartPrometheusServer(logger, *metricsAddress, *metricsPort)

var err error
var start, finish *time.Time
Expand All @@ -82,19 +78,24 @@ func main() {
case filter.FullCommand():
var items chan pgreplay.Item

switch checkSingleFormat(filterJsonInput, filterErrlogInput) {
switch checkSingleFormat(filterJsonInput, filterErrlogInput, filterCsvLogInput) {
case filterJsonInput:
items = parseLog(*filterJsonInput, pgreplay.ParseJSON)
case filterErrlogInput:
items = parseLog(*filterErrlogInput, pgreplay.ParseErrlog)
case filterCsvLogInput:
items = parseLog(*filterCsvLogInput, pgreplay.ParseCsvLog)
default:
logger.Log("event", "postgres.error", "error", "you must provide an input")
os.Exit(255)
}

// Apply the start and end filters
items = pgreplay.NewStreamer(start, finish).Filter(items)
items = pgreplay.NewStreamer(start, finish, logger).Filter(items)

if *filterNullOutput {
logger.Log("event", "filter.null_output", "msg", "Null output enabled, logs won't be serialized")
for _ = range items {
for range items {
// no-op
}

Expand Down Expand Up @@ -128,8 +129,10 @@ func main() {
outputFile.Close()

case run.FullCommand():
ctx := context.Background()
database, err := pgreplay.NewDatabase(
pgx.ConnConfig{
ctx,
pgreplay.DatabaseConnConfig{
Host: *runHost,
Port: *runPort,
Database: *runDatname,
Expand All @@ -145,19 +148,25 @@ func main() {

var items chan pgreplay.Item

switch checkSingleFormat(runJsonInput, runErrlogInput) {
switch checkSingleFormat(runJsonInput, runErrlogInput, runCsvLogInput) {
case runJsonInput:
items = parseLog(*runJsonInput, pgreplay.ParseJSON)
case runErrlogInput:
items = parseLog(*runErrlogInput, pgreplay.ParseErrlog)
case runCsvLogInput:
items = parseLog(*runCsvLogInput, pgreplay.ParseCsvLog)
default:
logger.Log("event", "postgres.error", "error", "you must provide an input")
os.Exit(255)
}

stream, err := pgreplay.NewStreamer(start, finish).Stream(items, *runReplayRate)
replay_started := time.Now()
stream, err := pgreplay.NewStreamer(start, finish, logger).Stream(items, *runReplayRate)
if err != nil {
kingpin.Fatalf("failed to start streamer: %s", err)
}

errs, consumeDone := database.Consume(stream)
errs, done := database.Consume(ctx, stream)

var status int

Expand All @@ -167,12 +176,19 @@ func main() {
if err != nil {
logger.Log("event", "consume.error", "error", err)
}
case err := <-consumeDone:
case err := <-done:
if err != nil {
status = 255
}

logger.Log("event", "consume.finished", "error", err, "status", status)
logger.Log("event", "time.elapsed", "total", buildTimeElapsed(replay_started))
logger.Log("event", "server.status", "message", "shutting down the server!")
err = pgreplay.ShutdownServer(ctx, server)
if err != nil {
logger.Log("error", "server.shutdown", "message", err.Error())
}

os.Exit(status)
}
}
Expand Down Expand Up @@ -242,3 +258,26 @@ func parseTimestamp(in string) (*time.Time, error) {
err, "must be a valid timestamp (%s)", pgreplay.PostgresTimestampFormat,
)
}

func buildTimeElapsed(start time.Time) string {
const day = time.Minute * 60 * 24

duration := time.Since(start)

if duration < 0 {
duration *= -1
}

if duration < day {
return duration.String()
}

n := duration / day
duration -= n * day

if duration == 0 {
return fmt.Sprintf("%dd", n)
}

return fmt.Sprintf("%dd%s", n, duration)
}
28 changes: 28 additions & 0 deletions compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: "3"
services:
pgreplay:
command: bash -c "while true; do sleep 10; done"
build:
context: ./
dockerfile: ./dev.dockerfile
environment:
PGHOST: postgres
PGUSER: postgres
volumes:
- ./:/app
expose:
- 9445

postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
POSTGRES_DB: users
volumes:
- postgres-db-volume:/var/lib/postgresql/data
expose:
- 5432

volumes:
postgres-db-volume:
11 changes: 11 additions & 0 deletions dev.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM golang:1.21.4

RUN apt update && apt -y upgrade
RUN apt install -y make postgresql-client

WORKDIR /app

COPY ./ ./
RUN make bin/pgreplay.linux_amd64

EXPOSE 9445
51 changes: 25 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,47 +1,46 @@
module github.com/gocardless/pgreplay-go

go 1.21.1
go 1.21.4

require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/eapache/channels v1.1.0
github.com/go-kit/kit v0.13.0
github.com/jackc/pgx v3.6.2+incompatible
github.com/go-kit/log v0.2.1
github.com/jackc/pgx/v5 v5.4.3
github.com/json-iterator/go v1.1.12
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.29.0
github.com/onsi/gomega v1.27.10
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/client_golang v1.16.0
)

require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/kr/text v0.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit d448e96

Please sign in to comment.