Skip to content

Commit

Permalink
add toml file parsing
Browse files Browse the repository at this point in the history
Signed-off-by: SammyOina <[email protected]>
  • Loading branch information
SammyOina committed Aug 23, 2023
1 parent 23900ab commit b2ab812
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 31 deletions.
98 changes: 67 additions & 31 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
mp "github.com/mainflux/mproxy/pkg/mqtt"
"github.com/mainflux/mproxy/pkg/session"
"github.com/mainflux/mproxy/pkg/websocket"
"github.com/pelletier/go-toml/v2"
"golang.org/x/sync/errgroup"
)

Expand All @@ -34,22 +35,35 @@ const (
envPrefixES = "MF_MQTT_ADAPTER_ES_"
)

type MQTTAdapterConfig struct {
MQTTPort string `toml:"PORT" env:"MF_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `toml:"TARGET_HOST" env:"MF_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `toml:"TARGET_PORT" env:"MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTForwarderTimeout time.Duration `toml:"FORWARDER_TIMEOUT" env:"MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `toml:"HEALTH_CHECK" env:"MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
}

type HTTPAdapterConfig struct {
HTTPPort string `toml:"PORT" env:"MF_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `toml:"TARGET_HOST" env:"MF_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `toml:"TARGET_PORT" env:"MF_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `toml:"TARGET_PATH" env:"MF_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
}

type GeneralConfig struct {
LogLevel string `toml:"LOG_LEVEL" env:"MF_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
Instance string `toml:"INSTANCE" env:"MF_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL string `toml:"JAEGER_URL" env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
BrokerURL string `toml:"BROKER_URL" env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `toml:"SEND_TELEMETRY" env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `toml:"INSTANCE_ID" env:"MF_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
}

type config struct {
LogLevel string `env:"MF_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"`
MQTTPort string `env:"MF_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"`
MQTTTargetHost string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"`
MQTTTargetPort string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"`
MQTTForwarderTimeout time.Duration `env:"MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"`
MQTTTargetHealthCheck string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""`
HTTPPort string `env:"MF_MQTT_ADAPTER_WS_PORT" envDefault:"8080"`
HTTPTargetHost string `env:"MF_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"`
HTTPTargetPort string `env:"MF_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"`
HTTPTargetPath string `env:"MF_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"`
Instance string `env:"MF_MQTT_ADAPTER_INSTANCE" envDefault:""`
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"`
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
InstanceID string `env:"MF_MQTT_ADAPTER_INSTANCE_ID" envDefault:""`
MQTTAdapter MQTTAdapterConfig `toml:"MQTTAdapter"`
HTTPAdapter HTTPAdapterConfig `toml:"HTTPAdapter"`
General GeneralConfig `toml:"General"`
ConfigFile string `toml:"-" env:"MF_MQTT_ADAPTER_CONFIG_FILE" envDefault:""`
}

func main() {
Expand All @@ -61,23 +75,29 @@ func main() {
log.Fatalf("failed to load %s configuration : %s", svcName, err)
}

logger, err := mflog.New(os.Stdout, cfg.LogLevel)
if cfg.ConfigFile != "" {
if err := parseConfigFile(&cfg); err != nil {
log.Fatalf("failed to load config file : %v", err)
}
}

logger, err := mflog.New(os.Stdout, cfg.General.LogLevel)
if err != nil {
log.Fatalf("failed to init logger: %s", err)
}

var exitCode int
defer mflog.ExitWithError(&exitCode)

if cfg.InstanceID == "" {
if cfg.InstanceID, err = uuid.New().ID(); err != nil {
if cfg.General.InstanceID == "" {
if cfg.General.InstanceID, err = uuid.New().ID(); err != nil {
logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err))
exitCode = 1
return
}
}

if cfg.MQTTTargetHealthCheck != "" {
if cfg.MQTTAdapter.MQTTTargetHealthCheck != "" {
notify := func(e error, next time.Duration) {
logger.Info(fmt.Sprintf("Broker not ready: %s, next try in %s", e.Error(), next))
}
Expand All @@ -90,15 +110,15 @@ func main() {
}
}

nps, err := brokers.NewPubSub(cfg.BrokerURL, "mqtt", logger)
nps, err := brokers.NewPubSub(cfg.General.BrokerURL, "mqtt", logger)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
return
}
defer nps.Close()

mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTForwarderTimeout)
mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort), cfg.MQTTAdapter.MQTTForwarderTimeout)
if err != nil {
logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err))
exitCode = 1
Expand All @@ -113,7 +133,7 @@ func main() {
return
}

np, err := brokers.NewPublisher(cfg.BrokerURL)
np, err := brokers.NewPublisher(cfg.General.BrokerURL)
if err != nil {
logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err))
exitCode = 1
Expand All @@ -135,17 +155,17 @@ func main() {

h := mproxy.NewHandler([]messaging.Publisher{np}, logger, authClient)

if cfg.SendTelemetry {
if cfg.General.SendTelemetry {
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
go chc.CallHome(ctx)
}

logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort))
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTAdapter.MQTTPort))
g.Go(func() error {
return proxyMQTT(ctx, cfg, logger, h)
})

logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPPort))
logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPAdapter.HTTPPort))
g.Go(func() error {
return proxyWS(ctx, cfg, logger, h)
})
Expand All @@ -164,8 +184,8 @@ func main() {
}

func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error {
address := fmt.Sprintf(":%s", cfg.MQTTPort)
target := fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort)
address := fmt.Sprintf(":%s", cfg.MQTTAdapter.MQTTPort)
target := fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort)
mp := mp.New(address, target, handler, logger)

errCh := make(chan error)
Expand All @@ -183,14 +203,14 @@ func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler ses
}

func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error {
target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort)
wp := websocket.New(target, cfg.HTTPTargetPath, "ws", handler, logger)
target := fmt.Sprintf("%s:%s", cfg.HTTPAdapter.HTTPTargetHost, cfg.HTTPAdapter.HTTPTargetPort)
wp := websocket.New(target, cfg.HTTPAdapter.HTTPTargetPath, "ws", handler, logger)
http.Handle("/mqtt", wp.Handler())

errCh := make(chan error)

go func() {
errCh <- wp.Listen(cfg.HTTPPort)
errCh <- wp.Listen(cfg.HTTPAdapter.HTTPPort)
}()

select {
Expand All @@ -204,7 +224,7 @@ func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler sessi

func healthcheck(cfg config) func() error {
return func() error {
res, err := http.Get(cfg.MQTTTargetHealthCheck)
res, err := http.Get(cfg.MQTTAdapter.MQTTTargetHealthCheck)
if err != nil {
return err
}
Expand All @@ -219,3 +239,19 @@ func healthcheck(cfg config) func() error {
return nil
}
}

func parseConfigFile(cfg *config) error {
file, err := os.Open(cfg.ConfigFile)
if err != nil {
return err
}
fileData, err := io.ReadAll(file)
if err != nil {
return err
}
if err := toml.Unmarshal(fileData, cfg); err != nil {
return err
}

return nil
}
20 changes: 20 additions & 0 deletions configs/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[MQTTAdapter]
PORT = "1883"
TARGET_HOST = "mqtt.example.com"
TARGET_PORT = "1883"
FORWARDER_TIMEOUT = "45s"
HEALTH_CHECK = "/health"

[HTTPAdapter]
PORT = "8080"
TARGET_HOST = "ws.example.com"
TARGET_PORT = "8080"
TARGET_PATH = "/mqtt"

[General]
INSTANCE = "example-instance"
JAEGER_URL = "http://jaeger.example.com:14268/api/traces"
BROKER_URL = "nats://nats.example.com:4222"
SEND_TELEMETRY = true
INSTANCE_ID = "unique-instance-id"
LOG_LEVEL = "debug"
1 change: 1 addition & 0 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ MF_MQTT_ADAPTER_ES_URL=es-redis:${MF_REDIS_TCP_PORT}
MF_MQTT_ADAPTER_ES_PASS=
MF_MQTT_ADAPTER_INSTANCE_ID=
MF_MQTT_ADAPTER_ES_DB=0
MF_MQTT_ADAPTER_CONFIG_FILE="config.toml"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/nats-io/nats.go v1.27.1 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.9
github.com/rabbitmq/amqp091-go v1.8.1 // indirect
github.com/rubenv/sql-migrate v1.5.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/
github.com/opencontainers/runc v1.1.7/go.mod h1:CbUumNnWCuTGFukNXahoo/RFBZvDAgRh/smNYNOhA50=
github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4=
github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0=
github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down

0 comments on commit b2ab812

Please sign in to comment.