From ffc1fe517c36ab05a76a08437b4a8c657089473b Mon Sep 17 00:00:00 2001 From: SammyOina Date: Thu, 24 Aug 2023 15:46:09 +0300 Subject: [PATCH 1/3] move config reorganize internal add health remove call home Signed-off-by: SammyOina --- cmd/main.go | 103 ++----------- configs/config.toml | 1 - docker/docker-compose.yml | 1 - go.mod | 8 +- go.sum | 5 +- health.go | 75 ++++++++++ internal/config/config.go | 79 ++++++++++ internal/{clients => }/grpc/connect.go | 0 internal/{clients => }/grpc/things/client.go | 2 +- .../mainflux/callhome/pkg/client/client.go | 140 ------------------ vendor/modules.txt | 7 +- 11 files changed, 183 insertions(+), 238 deletions(-) create mode 100644 health.go create mode 100644 internal/config/config.go rename internal/{clients => }/grpc/connect.go (100%) rename internal/{clients => }/grpc/things/client.go (93%) delete mode 100644 vendor/github.com/mainflux/callhome/pkg/client/client.go diff --git a/cmd/main.go b/cmd/main.go index b830d69..9e69645 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,12 +10,10 @@ import ( "time" "github.com/absmach/aproxy/auth" - thingsclient "github.com/absmach/aproxy/internal/clients/grpc/things" + "github.com/absmach/aproxy/internal/config" + thingsclient "github.com/absmach/aproxy/internal/grpc/things" mproxy "github.com/absmach/aproxy/mqtt" - "github.com/caarlos0/env/v9" "github.com/cenkalti/backoff/v4" - chclient "github.com/mainflux/callhome/pkg/client" - "github.com/mainflux/mainflux" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" mqttpub "github.com/mainflux/mainflux/pkg/messaging/mqtt" @@ -23,68 +21,20 @@ import ( mp "github.com/mainflux/mproxy/pkg/mqtt" "github.com/mainflux/mproxy/pkg/session" "github.com/mainflux/mproxy/pkg/websocket" - "github.com/pelletier/go-toml/v2" "golang.org/x/sync/errgroup" ) const svcName = "mqtt" -type MQTTAdapterConfig struct { - MQTTPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` - MQTTTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` - MQTTTargetPort string `toml:"TARGET_PORT" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` - MQTTForwarderTimeout Duration `toml:"FORWARDER_TIMEOUT" env:"APROXY_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` - MQTTTargetHealthCheck string `toml:"HEALTH_CHECK" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` -} - -type HTTPAdapterConfig struct { - HTTPPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` - HTTPTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` - HTTPTargetPort string `toml:"TARGET_PORT" env:"APROXY_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` - HTTPTargetPath string `toml:"TARGET_PATH" env:"APROXY_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` -} - -type GeneralConfig struct { - LogLevel string `toml:"LOG_LEVEL" env:"APROXY_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` - Instance string `toml:"INSTANCE" env:"APROXY_MQTT_ADAPTER_INSTANCE" envDefault:""` - JaegerURL string `toml:"JAEGER_URL" env:"APROXY_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` - SendTelemetry bool `toml:"SEND_TELEMETRY" env:"APROXY_SEND_TELEMETRY" envDefault:"true"` - InstanceID string `toml:"INSTANCE_ID" env:"APROXY_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` -} - -type config struct { - MQTTAdapter MQTTAdapterConfig `toml:"MQTTAdapter"` - HTTPAdapter HTTPAdapterConfig `toml:"HTTPAdapter"` - General GeneralConfig `toml:"General"` - ConfigFile string `toml:"-" env:"APROXY_MQTT_ADAPTER_CONFIG_FILE" envDefault:"config.toml"` -} - -type Duration time.Duration - -func (d *Duration) UnmarshalText(b []byte) error { - x, err := time.ParseDuration(string(b)) - if err != nil { - return err - } - *d = Duration(x) - return nil -} - func main() { ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) - cfg := config{} - if err := env.Parse(&cfg); err != nil { + cfg, err := config.NewConfig() + if err != nil { log.Fatalf("failed to load %s configuration : %s", svcName, err) } - if cfg.ConfigFile != "" { - if err := parseConfigFile(&cfg); err != nil { - log.Fatalf("failed to load config file : %v", err) - } - } - logger, err := mflog.New(os.Stdout, cfg.General.LogLevel) if err != nil { log.Fatalf("failed to init logger: %s", err) @@ -106,7 +56,7 @@ func main() { logger.Info(fmt.Sprintf("Broker not ready: %s, next try in %s", e.Error(), next)) } - err := backoff.RetryNotify(healthcheck(cfg), backoff.NewExponentialBackOff(), notify) + err := backoff.RetryNotify(healthcheck(cfg.MQTTAdapter), backoff.NewExponentialBackOff(), notify) if err != nil { logger.Error(fmt.Sprintf("MQTT healthcheck limit exceeded, exiting. %s ", err)) exitCode = 1 @@ -144,19 +94,14 @@ func main() { h := mproxy.NewHandler(logger, authClient) - if cfg.General.SendTelemetry { - chc := chclient.New(svcName, mainflux.Version, logger, cancel) - go chc.CallHome(ctx) - } - logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTAdapter.MQTTPort)) g.Go(func() error { - return proxyMQTT(ctx, cfg, logger, h) + return proxyMQTT(ctx, cfg.MQTTAdapter, logger, h) }) logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPAdapter.HTTPPort)) g.Go(func() error { - return proxyWS(ctx, cfg, logger, h) + return proxyWS(ctx, cfg.HTTPAdapter, logger, h) }) g.Go(func() error { @@ -172,9 +117,9 @@ func main() { } } -func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error { - address := fmt.Sprintf(":%s", cfg.MQTTAdapter.MQTTPort) - target := fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort) +func proxyMQTT(ctx context.Context, cfg config.MQTTAdapterConfig, logger mflog.Logger, handler session.Handler) error { + address := fmt.Sprintf(":%s", cfg.MQTTPort) + target := fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort) mp := mp.New(address, target, handler, logger) errCh := make(chan error) @@ -191,15 +136,15 @@ func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler ses } } -func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error { - target := fmt.Sprintf("%s:%s", cfg.HTTPAdapter.HTTPTargetHost, cfg.HTTPAdapter.HTTPTargetPort) - wp := websocket.New(target, cfg.HTTPAdapter.HTTPTargetPath, "ws", handler, logger) +func proxyWS(ctx context.Context, cfg config.HTTPAdapterConfig, logger mflog.Logger, handler session.Handler) error { + target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort) + wp := websocket.New(target, cfg.HTTPTargetPath, "ws", handler, logger) http.Handle("/mqtt", wp.Handler()) errCh := make(chan error) go func() { - errCh <- wp.Listen(cfg.HTTPAdapter.HTTPPort) + errCh <- wp.Listen(cfg.HTTPPort) }() select { @@ -211,9 +156,9 @@ func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler sessi } } -func healthcheck(cfg config) func() error { +func healthcheck(cfg config.MQTTAdapterConfig) func() error { return func() error { - res, err := http.Get(cfg.MQTTAdapter.MQTTTargetHealthCheck) + res, err := http.Get(cfg.MQTTTargetHealthCheck) if err != nil { return err } @@ -228,19 +173,3 @@ func healthcheck(cfg config) func() error { return nil } } - -func parseConfigFile(cfg *config) error { - file, err := os.Open(cfg.ConfigFile) - if err != nil { - return err - } - fileData, err := io.ReadAll(file) - if err != nil { - return err - } - if err := toml.Unmarshal(fileData, cfg); err != nil { - return err - } - - return nil -} diff --git a/configs/config.toml b/configs/config.toml index de1e224..2276022 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -14,6 +14,5 @@ [General] INSTANCE = "" JAEGER_URL = "http://jaeger:14268/api/traces" - SEND_TELEMETRY = false INSTANCE_ID = "" LOG_LEVEL = "debug" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4c9ec05..3fd67c4 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -29,7 +29,6 @@ services: APROXY_THINGS_AUTH_GRPC_CLIENT_TLS: ${APROXY_THINGS_AUTH_GRPC_CLIENT_TLS} APROXY_THINGS_AUTH_GRPC_CA_CERTS: ${APROXY_THINGS_AUTH_GRPC_CA_CERTS} APROXY_JAEGER_URL: ${APROXY_JAEGER_URL} - APROXY_SEND_TELEMETRY: ${APROXY_SEND_TELEMETRY} networks: - mainflux-base-net volumes: diff --git a/go.mod b/go.mod index cf047f5..987ea7c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21.0 require ( github.com/caarlos0/env/v9 v9.0.0 github.com/cenkalti/backoff/v4 v4.2.1 - github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 github.com/mainflux/mainflux v0.12.0 github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0 @@ -14,7 +13,12 @@ require ( ) require ( - github.com/caarlos0/env/v7 v7.1.0 // indirect + github.com/jackc/pgconn v1.14.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.2 // indirect +) + +require ( + github.com/caarlos0/env/v7 v7.1.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/eclipse/paho.mqtt.golang v1.4.2 // indirect github.com/go-gorp/gorp/v3 v3.1.0 // indirect diff --git a/go.sum b/go.sum index 2c70595..0f0f7ed 100644 --- a/go.sum +++ b/go.sum @@ -109,7 +109,6 @@ github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZ github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= @@ -172,8 +171,6 @@ github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 h1:QN+yhU6Twwwwz8Mu9u12f2TbPsmM/zIvndAhH1dIdWU= -github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2/go.mod h1:q4cTH8I3Y6kDyocJh5dBppuv4dY9drb/2kVdB6FP124= github.com/mainflux/mainflux v0.0.0-20230823124803-822a607e31fe h1:galNZWS6B/O0TM8ByATkpXgTTO18lpOmjmxZ9y0kNXg= github.com/mainflux/mainflux v0.0.0-20230823124803-822a607e31fe/go.mod h1:cHi+VUm+VST3OaROF0W34pqtr7DHOhrjJ3PDNBTI5W4= github.com/mainflux/mproxy v0.3.1-0.20230822124450-4b4dfe600cc2 h1:D5Ofrffx/4FWehczvJbmzD8lfcOkxcIS4XZE/fwl4mo= @@ -244,6 +241,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= @@ -285,6 +283,7 @@ golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWP golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= diff --git a/health.go b/health.go new file mode 100644 index 0000000..b686c0d --- /dev/null +++ b/health.go @@ -0,0 +1,75 @@ +package mainflux + +import ( + "encoding/json" + "net/http" +) + +const ( + contentType = "Content-Type" + contentTypeJSON = "application/health+json" + svcStatus = "pass" + description = " service" +) + +var ( + // Version represents the last service git tag in git history. + // It's meant to be set using go build ldflags: + // -ldflags "-X 'github.com/absmach/aproxy.Version=0.0.0'". + Version = "0.0.0" + // Commit represents the service git commit hash. + // It's meant to be set using go build ldflags: + // -ldflags "-X 'github.com/absmach/aproxy.Commit=ffffffff'". + Commit = "ffffffff" + // BuildTime represetns the service build time. + // It's meant to be set using go build ldflags: + // -ldflags "-X 'github.com/absmach/aproxy.BuildTime=1970-01-01_00:00:00'". + BuildTime = "1970-01-01_00:00:00" +) + +// HealthInfo contains version endpoint response. +type HealthInfo struct { + // Status contains service status. + Status string `json:"status"` + + // Version contains current service version. + Version string `json:"version"` + + // Commit represents the git hash commit. + Commit string `json:"commit"` + + // Description contains service description. + Description string `json:"description"` + + // BuildTime contains service build time. + BuildTime string `json:"build_time"` + + // InstanceID contains the ID of the current service instance + InstanceID string `json:"instance_id"` +} + +// Health exposes an HTTP handler for retrieving service health. +func Health(service, instanceID string) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add(contentType, contentTypeJSON) + if r.Method != http.MethodGet && r.Method != http.MethodHead { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + res := HealthInfo{ + Status: svcStatus, + Version: Version, + Commit: Commit, + Description: service + description, + BuildTime: BuildTime, + InstanceID: instanceID, + } + + w.WriteHeader(http.StatusOK) + + if err := json.NewEncoder(w).Encode(res); err != nil { + w.WriteHeader(http.StatusInternalServerError) + } + }) +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..08f4d62 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,79 @@ +package config + +import ( + "io" + "os" + "time" + + "github.com/caarlos0/env/v7" + "github.com/pelletier/go-toml/v2" +) + +type MQTTAdapterConfig struct { + MQTTPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` + MQTTTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` + MQTTTargetPort string `toml:"TARGET_PORT" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` + MQTTForwarderTimeout Duration `toml:"FORWARDER_TIMEOUT" env:"APROXY_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` + MQTTTargetHealthCheck string `toml:"HEALTH_CHECK" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` +} + +type HTTPAdapterConfig struct { + HTTPPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` + HTTPTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` + HTTPTargetPort string `toml:"TARGET_PORT" env:"APROXY_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` + HTTPTargetPath string `toml:"TARGET_PATH" env:"APROXY_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` +} + +type GeneralConfig struct { + LogLevel string `toml:"LOG_LEVEL" env:"APROXY_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` + Instance string `toml:"INSTANCE" env:"APROXY_MQTT_ADAPTER_INSTANCE" envDefault:""` + JaegerURL string `toml:"JAEGER_URL" env:"APROXY_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` + InstanceID string `toml:"INSTANCE_ID" env:"APROXY_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` +} + +type Config struct { + MQTTAdapter MQTTAdapterConfig `toml:"MQTTAdapter"` + HTTPAdapter HTTPAdapterConfig `toml:"HTTPAdapter"` + General GeneralConfig `toml:"General"` + ConfigFile string `toml:"-" env:"APROXY_MQTT_ADAPTER_CONFIG_FILE" envDefault:"config.toml"` +} + +type Duration time.Duration + +func (d *Duration) UnmarshalText(b []byte) error { + x, err := time.ParseDuration(string(b)) + if err != nil { + return err + } + *d = Duration(x) + return nil +} + +func parseConfigFile(cfg *Config) error { + file, err := os.Open(cfg.ConfigFile) + if err != nil { + return err + } + fileData, err := io.ReadAll(file) + if err != nil { + return err + } + if err := toml.Unmarshal(fileData, cfg); err != nil { + return err + } + + return nil +} + +func NewConfig() (Config, error) { + cfg := Config{} + if err := env.Parse(&cfg); err != nil { + return Config{}, err + } + if cfg.ConfigFile != "" { + if err := parseConfigFile(&cfg); err != nil { + return cfg, err + } + } + return cfg, nil +} diff --git a/internal/clients/grpc/connect.go b/internal/grpc/connect.go similarity index 100% rename from internal/clients/grpc/connect.go rename to internal/grpc/connect.go diff --git a/internal/clients/grpc/things/client.go b/internal/grpc/things/client.go similarity index 93% rename from internal/clients/grpc/things/client.go rename to internal/grpc/things/client.go index 5afa959..ae14df6 100644 --- a/internal/clients/grpc/things/client.go +++ b/internal/grpc/things/client.go @@ -4,7 +4,7 @@ package things import ( - grpcclient "github.com/absmach/aproxy/internal/clients/grpc" + grpcclient "github.com/absmach/aproxy/internal/grpc" "github.com/caarlos0/env/v9" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/things/policies" diff --git a/vendor/github.com/mainflux/callhome/pkg/client/client.go b/vendor/github.com/mainflux/callhome/pkg/client/client.go deleted file mode 100644 index 681f038..0000000 --- a/vendor/github.com/mainflux/callhome/pkg/client/client.go +++ /dev/null @@ -1,140 +0,0 @@ -package client - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/netip" - "strings" - "time" - - mflog "github.com/mainflux/mainflux/logger" -) - -const ( - HomeUrl = "https://deployments.mainflux.io/telemetry" - stopWaitTime = 5 * time.Second - callHomeSleepTime = 30 * time.Minute - backOff = 10 * time.Second - apiKey = "77e04a7c-f207-40dd-8950-c344871fd516" -) - -var ipEndpoints = []string{ - "https://checkip.amazonaws.com/", - "https://ipinfo.io/ip", - "https://api.ipify.org/", -} - -type homingService struct { - serviceName string - version string - logger mflog.Logger - cancel context.CancelFunc - httpClient http.Client -} - -func New(svc, version string, homingLogger mflog.Logger, cancel context.CancelFunc) *homingService { - return &homingService{ - serviceName: svc, - version: version, - logger: homingLogger, - cancel: cancel, - httpClient: *http.DefaultClient, - } -} - -func (hs *homingService) CallHome(ctx context.Context) { - for { - select { - case <-ctx.Done(): - hs.Stop() - default: - data := telemetryData{ - Service: hs.serviceName, - Version: hs.version, - LastSeen: time.Now(), - } - for _, endpoint := range ipEndpoints { - ip, err := hs.getIP(endpoint) - if err != nil { - hs.logger.Warn(fmt.Sprintf("failed to obtain service public IP address for sending Mainflux usage telemetry with error: %v", err)) - continue - } - ip = strings.ReplaceAll(ip, "\n", "") - ip = strings.ReplaceAll(ip, "\\", "") - parsedIP, err := netip.ParseAddr(ip) - if err != nil { - hs.logger.Warn(fmt.Sprintf("failed to parse ip address with error: %v", err)) - continue - } - data.IpAddress = parsedIP.String() - break - } - if err := hs.send(&data); err != nil && data.IpAddress != "" { - hs.logger.Warn(fmt.Sprintf("failed to send Mainflux telemetry data with error: %v", err)) - time.Sleep(backOff) - continue - } - } - time.Sleep(callHomeSleepTime) - } -} - -func (hs *homingService) Stop() { - defer hs.cancel() - c := make(chan bool) - defer close(c) - select { - case <-c: - case <-time.After(stopWaitTime): - } - hs.logger.Info("call home service shutdown") -} - -type telemetryData struct { - Service string `json:"service"` - IpAddress string `json:"ip_address"` - Version string `json:"mainflux_version"` - LastSeen time.Time `json:"last_seen"` -} - -func (hs *homingService) getIP(endpoint string) (string, error) { - req, err := http.NewRequest(http.MethodGet, endpoint, nil) - if err != nil { - return "", err - } - res, err := hs.httpClient.Do(req) - if err != nil { - return "", err - } - b, err := io.ReadAll(res.Body) - defer res.Body.Close() - if err != nil { - return "", err - } - return string(b), nil -} - -func (hs *homingService) send(telDat *telemetryData) error { - b, err := json.Marshal(telDat) - if err != nil { - return err - } - req, err := http.NewRequest(http.MethodPost, HomeUrl, bytes.NewReader(b)) - if err != nil { - return err - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("apikey", apiKey) - res, err := hs.httpClient.Do(req) - if err != nil || res.StatusCode != http.StatusCreated { - if res != nil { - return fmt.Errorf("unsuccessful sending telemetry data with code %d and error: %v", res.StatusCode, err) - } - return err - } - return nil -} diff --git a/vendor/modules.txt b/vendor/modules.txt index a0061a2..232226b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -64,12 +64,16 @@ github.com/golang/protobuf/ptypes/timestamp # github.com/gorilla/websocket v1.5.0 ## explicit; go 1.12 github.com/gorilla/websocket +# github.com/jackc/pgconn v1.14.0 +## explicit; go 1.12 # github.com/jackc/pgio v1.0.0 ## explicit; go 1.12 github.com/jackc/pgio # github.com/jackc/pgpassfile v1.0.0 ## explicit; go 1.12 github.com/jackc/pgpassfile +# github.com/jackc/pgproto3/v2 v2.3.2 +## explicit; go 1.12 # github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a ## explicit; go 1.14 github.com/jackc/pgservicefile @@ -133,9 +137,6 @@ github.com/lestrrat-go/jwx/v2/x25519 # github.com/lestrrat-go/option v1.0.1 ## explicit; go 1.16 github.com/lestrrat-go/option -# github.com/mainflux/callhome v0.0.0-20230626140149-b03b1f4c46f2 -## explicit; go 1.20 -github.com/mainflux/callhome/pkg/client # github.com/mainflux/mainflux v0.12.0 => github.com/mainflux/mainflux v0.0.0-20230823124803-822a607e31fe ## explicit; go 1.20 github.com/mainflux/mainflux From e59f8aeee813e5a04016d1d0682be30cb83128cd Mon Sep 17 00:00:00 2001 From: SammyOina Date: Thu, 24 Aug 2023 17:53:09 +0300 Subject: [PATCH 2/3] update docs and add health endpoint Signed-off-by: SammyOina --- Makefile | 6 ++--- README.md | 54 ++++++++++++++++++++++----------------- cmd/main.go | 22 ++++++---------- docker/.env | 3 --- docker/docker-compose.yml | 6 ++--- health.go | 2 +- internal/config/config.go | 7 +++++ 7 files changed, 52 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index a73a0e0..0dbb1c2 100644 --- a/Makefile +++ b/Makefile @@ -38,9 +38,9 @@ endef $(PROGRAM): $(SOURCES) CGO_ENABLED=$(CGO_ENABLED) GOOS=$(GOOS) GOARCH=$(GOARCH) GOARM=$(GOARM) \ go build -mod=vendor -ldflags "-s -w \ - -X 'github.com/mainflux/mainflux.BuildTime=$(TIME)' \ - -X 'github.com/mainflux/mainflux.Version=$(VERSION)' \ - -X 'github.com/mainflux/mainflux.Commit=$(COMMIT)'" \ + -X 'github.com/absmach/aproxy.BuildTime=$(TIME)' \ + -X 'github.com/absmach/aproxy.Version=$(VERSION)' \ + -X 'github.com/absmach/aproxy.Commit=$(COMMIT)'" \ -o ./build/$(APROXY_DOCKER_IMAGE_NAME_PREFIX)-$(PROGRAM) cmd/main.go clean: diff --git a/README.md b/README.md index 17893eb..6f9bf0b 100644 --- a/README.md +++ b/README.md @@ -11,37 +11,43 @@ aProxy is typically deployed on-premise, in the enterprise cloud, in front of Io ## Usage ```bash -go get github.com/absmach/aproxy -cd $(GOPATH)/github.com/absmach/aproxy +git clone https://github.com/absmach/aproxy.git +cd aproxy make -./aproxy +make docker-image +make run ``` ## Configuration The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|-------------------------|------------------------------------------------|-----------| -| APROXY_WS_HOST | WebSocket inbound (IN) connection host | 0.0.0.0 | -| APROXY_WS_PORT | WebSocket inbound (IN) connection port | 8080 | -| APROXY_WS_PATH | WebSocket inbound (IN) connection path | /mqtt | -| APROXY_WSS_PORT | WebSocket Secure inbound (IN) connection port | 8080 | -| APROXY_WSS_PATH | WebSocket Secure inbound (IN) connection path | /mqtt | -| APROXY_WS_TARGET_SCHEME | WebSocket Target schema | ws | -| APROXY_WS_TARGET_HOST | WebSocket Target host | localhost | -| APROXY_WS_TARGET_PORT | WebSocket Target port | 8888 | -| APROXY_WS_TARGET_PATH | WebSocket Target path | /mqtt | -| APROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 | -| APROXY_MQTT_PORT | MQTT inbound connection port | 1883 | -| APROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 | -| APROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | -| APROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 | -| APROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | -| APROXY_CA_CERTS | Path to trusted CAs in PEM format | | -| APROXY_SERVER_CERT | Path to server certificate in pem format | | -| APROXY_SERVER_KEY | Path to server key in pem format | | -| APROXY_LOG_LEVEL | Log level | debug | +| Variable | Description | Default | +|---------------------------------|------------------------------------------------|-----------| +| APROXY_WS_HOST | WebSocket inbound (IN) connection host | 0.0.0.0 | +| APROXY_WS_PORT | WebSocket inbound (IN) connection port | 8080 | +| APROXY_WS_PATH | WebSocket inbound (IN) connection path | /mqtt | +| APROXY_WSS_PORT | WebSocket Secure inbound (IN) connection port | 8080 | +| APROXY_WSS_PATH | WebSocket Secure inbound (IN) connection path | /mqtt | +| APROXY_WS_TARGET_SCHEME | WebSocket Target schema | ws | +| APROXY_WS_TARGET_HOST | WebSocket Target host | localhost | +| APROXY_WS_TARGET_PORT | WebSocket Target port | 8888 | +| APROXY_WS_TARGET_PATH | WebSocket Target path | /mqtt | +| APROXY_MQTT_HOST | MQTT inbound connection host | 0.0.0.0 | +| APROXY_MQTT_PORT | MQTT inbound connection port | 1883 | +| APROXY_MQTTS_PORT | MQTTS inbound connection port | 8883 | +| APROXY_MQTT_TARGET_HOST | MQTT broker host | 0.0.0.0 | +| APROXY_MQTT_TARGET_PORT | MQTT broker port | 1884 | +| APROXY_CLIENT_TLS | Flag that indicates if TLS should be turned on | false | +| APROXY_CA_CERTS | Path to trusted CAs in PEM format | | +| APROXY_SERVER_CERT | Path to server certificate in pem format | | +| APROXY_SERVER_KEY | Path to server key in pem format | | +| APROXY_LOG_LEVEL | Log level | debug | +| APROXY_MQTT_ADAPTER_CONFIG_FILE | Config file path. This overites env if set. | | +| APROXY_RELEASE_TAG | Docker release tag. | latest | +| APROXY_THINGS_URL | Things url. | | +| APROXY_THINGS_AUTH_GRPC_URL | Things GRPC URL for authentication. | | +| APROXY_THINGS_AUTH_GRPC_TIMEOUT | Things GRPC timeout duration | 1s | ## License [Apache-2.0](LICENSE) diff --git a/cmd/main.go b/cmd/main.go index 9e69645..b232529 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -9,6 +9,7 @@ import ( "os" "time" + "github.com/absmach/aproxy" "github.com/absmach/aproxy/auth" "github.com/absmach/aproxy/internal/config" thingsclient "github.com/absmach/aproxy/internal/grpc/things" @@ -24,7 +25,7 @@ import ( "golang.org/x/sync/errgroup" ) -const svcName = "mqtt" +const svcName = "aproxy" func main() { ctx, cancel := context.WithCancel(context.Background()) @@ -64,14 +65,6 @@ func main() { } } - //nps, err := brokers.NewPubSub(cfg.General.BrokerURL, "mqtt", logger) - if err != nil { - logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) - exitCode = 1 - return - } - //defer nps.Close() - mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort), time.Duration(cfg.MQTTAdapter.MQTTForwarderTimeout)) if err != nil { logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err)) @@ -101,7 +94,7 @@ func main() { logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPAdapter.HTTPPort)) g.Go(func() error { - return proxyWS(ctx, cfg.HTTPAdapter, logger, h) + return proxyWS(ctx, cfg, logger, h) }) g.Go(func() error { @@ -136,15 +129,16 @@ func proxyMQTT(ctx context.Context, cfg config.MQTTAdapterConfig, logger mflog.L } } -func proxyWS(ctx context.Context, cfg config.HTTPAdapterConfig, logger mflog.Logger, handler session.Handler) error { - target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort) - wp := websocket.New(target, cfg.HTTPTargetPath, "ws", handler, logger) +func proxyWS(ctx context.Context, cfg config.Config, logger mflog.Logger, handler session.Handler) error { + target := fmt.Sprintf("%s:%s", cfg.HTTPAdapter.HTTPTargetHost, cfg.HTTPAdapter.HTTPTargetPort) + wp := websocket.New(target, cfg.HTTPAdapter.HTTPTargetPath, "ws", handler, logger) http.Handle("/mqtt", wp.Handler()) + http.Handle("/health", aproxy.Health(svcName, cfg.General.InstanceID)) errCh := make(chan error) go func() { - errCh <- wp.Listen(cfg.HTTPPort) + errCh <- wp.Listen(cfg.HTTPAdapter.HTTPPort) }() select { diff --git a/docker/.env b/docker/.env index 7655b96..f9c52f9 100644 --- a/docker/.env +++ b/docker/.env @@ -10,10 +10,7 @@ APROXY_MQTT_ADAPTER_WS_TARGET_HOST=vernemq APROXY_MQTT_ADAPTER_WS_TARGET_PORT=8080 APROXY_MQTT_ADAPTER_WS_TARGET_PATH=/mqtt APROXY_MQTT_ADAPTER_INSTANCE= -APROXY_MQTT_ADAPTER_ES_URL=es-redis:${APROXY_REDIS_TCP_PORT} -APROXY_MQTT_ADAPTER_ES_PASS= APROXY_MQTT_ADAPTER_INSTANCE_ID= -APROXY_MQTT_ADAPTER_ES_DB=0 APROXY_MQTT_ADAPTER_CONFIG_FILE="config.toml" # Docker image tag diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3fd67c4..235d6a3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -21,9 +21,6 @@ services: APROXY_MQTT_ADAPTER_WS_TARGET_PORT: ${APROXY_MQTT_ADAPTER_WS_TARGET_PORT} APROXY_MQTT_ADAPTER_WS_TARGET_PATH: ${APROXY_MQTT_ADAPTER_WS_TARGET_PATH} APROXY_MQTT_ADAPTER_INSTANCE: ${APROXY_MQTT_ADAPTER_INSTANCE} - APROXY_MQTT_ADAPTER_ES_URL: ${APROXY_MQTT_ADAPTER_ES_URL} - APROXY_MQTT_ADAPTER_ES_PASS: ${APROXY_MQTT_ADAPTER_ES_PASS} - APROXY_MQTT_ADAPTER_ES_DB: ${APROXY_MQTT_ADAPTER_ES_DB} APROXY_THINGS_AUTH_GRPC_URL: ${APROXY_THINGS_AUTH_GRPC_URL} APROXY_THINGS_AUTH_GRPC_TIMEOUT: ${APROXY_THINGS_AUTH_GRPC_TIMEOUT} APROXY_THINGS_AUTH_GRPC_CLIENT_TLS: ${APROXY_THINGS_AUTH_GRPC_CLIENT_TLS} @@ -33,3 +30,6 @@ services: - mainflux-base-net volumes: - ../configs/config.toml:/config.toml + ports: + - ${APROXY_MQTT_ADAPTER_WS_TARGET_PORT}:${APROXY_MQTT_ADAPTER_WS_TARGET_PORT} + diff --git a/health.go b/health.go index b686c0d..a412772 100644 --- a/health.go +++ b/health.go @@ -1,4 +1,4 @@ -package mainflux +package aproxy import ( "encoding/json" diff --git a/internal/config/config.go b/internal/config/config.go index 08f4d62..20e3a49 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( "github.com/pelletier/go-toml/v2" ) +// MQTTAdapterConfig configuration for mqtt proxy. type MQTTAdapterConfig struct { MQTTPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` MQTTTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` @@ -17,6 +18,7 @@ type MQTTAdapterConfig struct { MQTTTargetHealthCheck string `toml:"HEALTH_CHECK" env:"APROXY_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` } +// HTTPAdapterConfig configuration for ws proxy. type HTTPAdapterConfig struct { HTTPPort string `toml:"PORT" env:"APROXY_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` HTTPTargetHost string `toml:"TARGET_HOST" env:"APROXY_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` @@ -24,6 +26,7 @@ type HTTPAdapterConfig struct { HTTPTargetPath string `toml:"TARGET_PATH" env:"APROXY_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` } +// GeneralConfig general service configuration. type GeneralConfig struct { LogLevel string `toml:"LOG_LEVEL" env:"APROXY_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` Instance string `toml:"INSTANCE" env:"APROXY_MQTT_ADAPTER_INSTANCE" envDefault:""` @@ -31,6 +34,7 @@ type GeneralConfig struct { InstanceID string `toml:"INSTANCE_ID" env:"APROXY_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` } +// Config all configuration params for service. type Config struct { MQTTAdapter MQTTAdapterConfig `toml:"MQTTAdapter"` HTTPAdapter HTTPAdapterConfig `toml:"HTTPAdapter"` @@ -38,8 +42,10 @@ type Config struct { ConfigFile string `toml:"-" env:"APROXY_MQTT_ADAPTER_CONFIG_FILE" envDefault:"config.toml"` } +// Duration time duration. type Duration time.Duration +// UnmarshalText custom unmarsher for Duration. func (d *Duration) UnmarshalText(b []byte) error { x, err := time.ParseDuration(string(b)) if err != nil { @@ -65,6 +71,7 @@ func parseConfigFile(cfg *Config) error { return nil } +// NewConfig creates new configuration from env and config file if provided respectively. func NewConfig() (Config, error) { cfg := Config{} if err := env.Parse(&cfg); err != nil { From ed3f08c4bfd5b67b4c04e1d8384829bfafcf5a22 Mon Sep 17 00:00:00 2001 From: SammyOina Date: Thu, 24 Aug 2023 18:09:54 +0300 Subject: [PATCH 3/3] move config file to root Signed-off-by: SammyOina --- configs/config.toml => config.toml | 0 docker/docker-compose.yml | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename configs/config.toml => config.toml (100%) diff --git a/configs/config.toml b/config.toml similarity index 100% rename from configs/config.toml rename to config.toml diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 235d6a3..fde8f2c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -29,7 +29,7 @@ services: networks: - mainflux-base-net volumes: - - ../configs/config.toml:/config.toml + - ../config.toml:/config.toml ports: - ${APROXY_MQTT_ADAPTER_WS_TARGET_PORT}:${APROXY_MQTT_ADAPTER_WS_TARGET_PORT}