Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): upgrade pgx.Conn to pgxpool.New() for Concurrency Safety #206

Merged
merged 4 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/quantm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package config

import (
"fmt"
"log/slog"
"os"

"github.com/knadh/koanf/providers/env"
Expand Down Expand Up @@ -69,8 +68,6 @@ func (c *Config) Load() {
if err := k.Unmarshal("", c); err != nil {
panic(err)
}

slog.Info("config loaded", "config.db", c.DB)
}

func New() *Config {
Expand Down
122 changes: 79 additions & 43 deletions cmd/quantm/config/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"go.breu.io/quantm/internal/pulse"
)

type (
ModeFn func(*graceful.Graceful) error
)

const (
ServiceGithub = "github"
ServiceSlack = "slack"
Expand All @@ -33,49 +37,16 @@ const (

// Setup configures the application based on the provided config.
func (c *Config) Setup(app *graceful.Graceful) error {
switch c.Mode {
case ModeMigrate:
c.SetupLogger()

if err := c.SetupDB(); err != nil {
return err
}

case ModeWebhook:
if err := c.SetupServices(app); err != nil {
return err
}

app.Add(ServiceWebhook, NewWebhookServer(), ServiceDurable)
case ModeGRPC:
if err := c.SetupServices(app); err != nil {
return err
}

app.Add(ServiceNomad, nomad.New(nomad.WithConfig(c.Nomad)), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
case ModeWorkers:
if err := c.SetupServices(app); err != nil {
return err
}

workers.Core()
workers.Hooks()

app.Add(ServiceCoreQueue, durable.OnCore(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceHooksQueue, durable.OnHooks(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
case ModeDefault:
if err := c.SetupServices(app); err != nil {
return err
}

workers.Core()
workers.Hooks()

app.Add(ServiceWebhook, NewWebhookServer(), ServiceDurable)
app.Add(ServiceNomad, nomad.New(nomad.WithConfig(c.Nomad)), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceCoreQueue, durable.OnCore(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceHooksQueue, durable.OnHooks(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
default:
modes := map[Mode]ModeFn{
ModeMigrate: c.migrate,
ModeWebhook: c.webhook,
ModeGRPC: c.grpc,
ModeWorkers: c.workers,
ModeDefault: c.all,
}

if fn, ok := modes[c.Mode]; ok {
return fn(app)
}

return nil
Expand Down Expand Up @@ -176,3 +147,68 @@ func (c *Config) SetupPulse() error {

return nil
}

// migrate configures the application for database migrations.
func (c *Config) migrate(app *graceful.Graceful) error {
c.SetupLogger()

if err := c.SetupDB(); err != nil {
return err
}

return nil
}

// webhook configures the application for webhook mode.
func (c *Config) webhook(app *graceful.Graceful) error {
if err := c.SetupServices(app); err != nil {
return err
}

app.Add(ServiceWebhook, NewWebhookServer(), ServiceDurable)

return nil
}

// grpc configures the application for gRPC mode.
func (c *Config) grpc(app *graceful.Graceful) error {
if err := c.SetupServices(app); err != nil {
return err
}

app.Add(ServiceNomad, nomad.New(nomad.WithConfig(c.Nomad)), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)

return nil
}

// workers configures the application for worker mode.
func (c *Config) workers(app *graceful.Graceful) error {
if err := c.SetupServices(app); err != nil {
return err
}

workers.Core()
workers.Hooks()

app.Add(ServiceCoreQueue, durable.OnCore(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceHooksQueue, durable.OnHooks(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)

return nil
}

// all configures the application with all services and modes.
func (c *Config) all(app *graceful.Graceful) error {
if err := c.SetupServices(app); err != nil {
return err
}

workers.Core()
workers.Hooks()

app.Add(ServiceWebhook, NewWebhookServer(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceNomad, nomad.New(nomad.WithConfig(c.Nomad)), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceCoreQueue, durable.OnCore(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)
app.Add(ServiceHooksQueue, durable.OnHooks(), ServiceKernel, ServiceDB, ServiceDurable, ServicePulse)

return nil
}
4 changes: 3 additions & 1 deletion cmd/quantm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func main() {
conf.Load()
conf.Parse()

// - run migrations and exit if mode is migrate
if conf.Mode == config.ModeMigrate {
if err := migrations.Run(ctx, db.Get(db.WithConfig(conf.DB))); err != nil {
slog.Error("unable to run migrations", "error", err.Error())

os.Exit(1)
}

os.Exit(0)
}

// - run the app based on mode, exit 1 on error else wait for signal

quit := make(chan os.Signal, 1)
app := graceful.New()

Expand Down
6 changes: 6 additions & 0 deletions cmd/quantm/workers/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,11 @@ func Core() {
// Register branch workflows and activities
q.RegisterWorkflow(repos.BranchWorkflow)
q.RegisterActivity(repos.NewBranchActivities())

// Register trunk workflows and activities
q.RegisterWorkflow(repos.TrunkWorkflow)

// Register notify activities
q.RegisterActivity(repos.NewNotifyActivities())
}
}
14 changes: 7 additions & 7 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ services:
#
# - temporal: for workflow management
# - zitadel: for identity and access management (maybe)
# - questdb: for time series data (evalutation)
# - clickhouse: for time series data (evaluation)
# - clickhouse: for analytics
################################

# temporal.io
Expand Down Expand Up @@ -64,16 +63,17 @@ services:
container_name: clickhouse
image: clickhouse/clickhouse-server:24.8
environment:
CLICKHOUSE_DB: ${PULSE__CH__NAME}
CLICKHOUSE_DB: ${PULSE__NAME}
CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: 1
CLICKHOUSE_USER: ${PULSE__CH__USER}
CLICKHOUSE_PASSWORD: ${PULSE__CH__PASS}
CLICKHOUSE_USER: ${PULSE__USER}
CLICKHOUSE_PASSWORD: ${PULSE__PASS}
ports:
- "6665:8123"
- "6666:9000"
- "8123:8123" # why do we need http port?
- "9000:9000"
volumes:
- clickhouse-data:/var/lib/clickhouse
- clickhouse-logs:/var/log/clickhouse-server/
# command: ["--", "-L", "-", "-E", "-"] # docker run -it clickhouse/clickhouse-server:head -- -L - -E -
networks:
- ctrlplane

Expand Down
55 changes: 36 additions & 19 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
description = "quantm.io";

inputs = {
nixpkgs.url = "github:nixos/nixpkgs/nixos-24.05";
nixpkgs.url = "github:nixos/nixpkgs/nixos-24.05"; # TODO: upgrade to 24.11 after pinning libgit2 to 1.7.2
flake-utils.url = "github:numtide/flake-utils";
gomod2nix.url = "github:nix-community/gomod2nix";
gomod2nix.inputs.nixpkgs.follows = "nixpkgs";
Expand All @@ -14,12 +14,17 @@
outputs = {
nixpkgs,
flake-utils,
gomod2nix,
breu,
...
}:
flake-utils.lib.eachDefaultSystem (
system: let
pkgs = import nixpkgs {inherit system;};
pkgs = import nixpkgs {
inherit system;
overlays = [gomod2nix.overlays.default];
};

buildGoModule = pkgs.buildGo123Module;

setup = breu.setup.${system};
Expand All @@ -30,37 +35,49 @@
pkgs.http-parser
pkgs.zlib
pkgs.python3 # required for http-parser in libgit2
# use pkgs hash https://lazamar.co.uk/nix-versions/?package=libgit2&version=1.7.2&fullName=libgit2-1.7.2&keyName=libgit2&revision=05bbf675397d5366259409139039af8077d695ce&channel=nixpkgs-unstable#instructions
pkgs.libgit2
];

# Development packages for use in the dev shell
dev = [
pkgs.gomod2nix
pkgs.libpg_query # FIXME: probably not required anymore.
(pkgs.callPackage ./tools/nix/pkgs/sqlc.nix {inherit buildGoModule;})
];

# Set up the development shell with our base and dev packages
shell = setup.shell base dev {};

# Build the quantm binary
quantm = pkgs.stdenv.mkDerivation {
name = "quantm";
# FIXME: cannot build, see https://github.com/nix-community/gomod2nix/pull/168
quantm = pkgs.buildGoApplication {
pname = "quantm";
version = "0.1";
src = ./.;

nativeBuildInputs = base;

buildPhase = ''
export GOROOT=${pkgs.go_1_23}/share/go
export GOCACHE="$TEMPDIR/go-cache"
export GOMODCACHE="$TEMPDIR/go-mod-cache"
go build -x -tags static,system_libgit2 -o ./tmp/quantm ./cmd/quantm
'';

installPhase = ''
mkdir -p $out/bin
cp ./tmp/quantm $out/bin/quantm
'';
modules = ./gomod2nix.toml;
nativeBuildInputs = [pkgs.pkgconf];
subPackages = ["cmd/quantm"];
# go = pkgs.go_1_23;
buildInputs = base;
tags = ["static" "system_libgit2"];
};
# Build the quantm binary
# quantm = pkgs.stdenv.mkDerivation {
# name = "quantm";
# src = ./.;
# nativeBuildInputs = [pkgs.pkgconf];
# buildInputs = base;
# buildPhase = ''
# export GOROOT=${pkgs.go_1_23}/share/go
# export HOME=$(pwd)
# echo go env
# go build -tags static,system_libgit2 -o ./tmp/quantm ./cmd/quantm
# '';
# installPhase = ''
# mkdir -p $out/bin
# cp ./tmp/quantm $out/bin/quantm
# '';
# };
in {
devShells.default = shell;
packages.quantm = quantm;
Expand Down
Loading
Loading