diff --git a/cmd/main.go b/cmd/main.go index ac7a9ec..aa0cc3d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -26,6 +26,7 @@ import ( mp "github.com/mainflux/mproxy/pkg/mqtt" "github.com/mainflux/mproxy/pkg/session" "github.com/mainflux/mproxy/pkg/websocket" + "github.com/pelletier/go-toml/v2" "golang.org/x/sync/errgroup" ) @@ -34,22 +35,35 @@ const ( envPrefixES = "MF_MQTT_ADAPTER_ES_" ) +type MQTTAdapterConfig struct { + MQTTPort string `toml:"PORT" env:"MF_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` + MQTTTargetHost string `toml:"TARGET_HOST" env:"MF_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` + MQTTTargetPort string `toml:"TARGET_PORT" env:"MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` + MQTTForwarderTimeout time.Duration `toml:"FORWARDER_TIMEOUT" env:"MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` + MQTTTargetHealthCheck string `toml:"HEALTH_CHECK" env:"MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` +} + +type HTTPAdapterConfig struct { + HTTPPort string `toml:"PORT" env:"MF_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` + HTTPTargetHost string `toml:"TARGET_HOST" env:"MF_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` + HTTPTargetPort string `toml:"TARGET_PORT" env:"MF_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` + HTTPTargetPath string `toml:"TARGET_PATH" env:"MF_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` +} + +type GeneralConfig struct { + LogLevel string `toml:"LOG_LEVEL" env:"MF_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` + Instance string `toml:"INSTANCE" env:"MF_MQTT_ADAPTER_INSTANCE" envDefault:""` + JaegerURL string `toml:"JAEGER_URL" env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` + BrokerURL string `toml:"BROKER_URL" env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + SendTelemetry bool `toml:"SEND_TELEMETRY" env:"MF_SEND_TELEMETRY" envDefault:"true"` + InstanceID string `toml:"INSTANCE_ID" env:"MF_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` +} + type config struct { - LogLevel string `env:"MF_MQTT_ADAPTER_LOG_LEVEL" envDefault:"info"` - MQTTPort string `env:"MF_MQTT_ADAPTER_MQTT_PORT" envDefault:"1883"` - MQTTTargetHost string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_HOST" envDefault:"localhost"` - MQTTTargetPort string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_PORT" envDefault:"1883"` - MQTTForwarderTimeout time.Duration `env:"MF_MQTT_ADAPTER_FORWARDER_TIMEOUT" envDefault:"30s"` - MQTTTargetHealthCheck string `env:"MF_MQTT_ADAPTER_MQTT_TARGET_HEALTH_CHECK" envDefault:""` - HTTPPort string `env:"MF_MQTT_ADAPTER_WS_PORT" envDefault:"8080"` - HTTPTargetHost string `env:"MF_MQTT_ADAPTER_WS_TARGET_HOST" envDefault:"localhost"` - HTTPTargetPort string `env:"MF_MQTT_ADAPTER_WS_TARGET_PORT" envDefault:"8080"` - HTTPTargetPath string `env:"MF_MQTT_ADAPTER_WS_TARGET_PATH" envDefault:"/mqtt"` - Instance string `env:"MF_MQTT_ADAPTER_INSTANCE" envDefault:""` - JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"` - BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` - SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"` - InstanceID string `env:"MF_MQTT_ADAPTER_INSTANCE_ID" envDefault:""` + MQTTAdapter MQTTAdapterConfig `toml:"MQTTAdapter"` + HTTPAdapter HTTPAdapterConfig `toml:"HTTPAdapter"` + General GeneralConfig `toml:"General"` + ConfigFile string `toml:"-" env:"MF_MQTT_ADAPTER_CONFIG_FILE" envDefault:""` } func main() { @@ -61,7 +75,13 @@ func main() { log.Fatalf("failed to load %s configuration : %s", svcName, err) } - logger, err := mflog.New(os.Stdout, cfg.LogLevel) + if cfg.ConfigFile != "" { + if err := parseConfigFile(&cfg); err != nil { + log.Fatalf("failed to load config file : %v", err) + } + } + + logger, err := mflog.New(os.Stdout, cfg.General.LogLevel) if err != nil { log.Fatalf("failed to init logger: %s", err) } @@ -69,15 +89,15 @@ func main() { var exitCode int defer mflog.ExitWithError(&exitCode) - if cfg.InstanceID == "" { - if cfg.InstanceID, err = uuid.New().ID(); err != nil { + if cfg.General.InstanceID == "" { + if cfg.General.InstanceID, err = uuid.New().ID(); err != nil { logger.Error(fmt.Sprintf("failed to generate instanceID: %s", err)) exitCode = 1 return } } - if cfg.MQTTTargetHealthCheck != "" { + if cfg.MQTTAdapter.MQTTTargetHealthCheck != "" { notify := func(e error, next time.Duration) { logger.Info(fmt.Sprintf("Broker not ready: %s, next try in %s", e.Error(), next)) } @@ -90,7 +110,7 @@ func main() { } } - nps, err := brokers.NewPubSub(cfg.BrokerURL, "mqtt", logger) + nps, err := brokers.NewPubSub(cfg.General.BrokerURL, "mqtt", logger) if err != nil { logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) exitCode = 1 @@ -98,7 +118,7 @@ func main() { } defer nps.Close() - mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort), cfg.MQTTForwarderTimeout) + mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort), cfg.MQTTAdapter.MQTTForwarderTimeout) if err != nil { logger.Error(fmt.Sprintf("failed to create MQTT publisher: %s", err)) exitCode = 1 @@ -113,7 +133,7 @@ func main() { return } - np, err := brokers.NewPublisher(cfg.BrokerURL) + np, err := brokers.NewPublisher(cfg.General.BrokerURL) if err != nil { logger.Error(fmt.Sprintf("failed to connect to message broker: %s", err)) exitCode = 1 @@ -135,17 +155,17 @@ func main() { h := mproxy.NewHandler([]messaging.Publisher{np}, logger, authClient) - if cfg.SendTelemetry { + if cfg.General.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) go chc.CallHome(ctx) } - logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTPort)) + logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s", cfg.MQTTAdapter.MQTTPort)) g.Go(func() error { return proxyMQTT(ctx, cfg, logger, h) }) - logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPPort)) + logger.Info(fmt.Sprintf("Starting MQTT over WS proxy on port %s", cfg.HTTPAdapter.HTTPPort)) g.Go(func() error { return proxyWS(ctx, cfg, logger, h) }) @@ -164,8 +184,8 @@ func main() { } func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error { - address := fmt.Sprintf(":%s", cfg.MQTTPort) - target := fmt.Sprintf("%s:%s", cfg.MQTTTargetHost, cfg.MQTTTargetPort) + address := fmt.Sprintf(":%s", cfg.MQTTAdapter.MQTTPort) + target := fmt.Sprintf("%s:%s", cfg.MQTTAdapter.MQTTTargetHost, cfg.MQTTAdapter.MQTTTargetPort) mp := mp.New(address, target, handler, logger) errCh := make(chan error) @@ -183,14 +203,14 @@ func proxyMQTT(ctx context.Context, cfg config, logger mflog.Logger, handler ses } func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler session.Handler) error { - target := fmt.Sprintf("%s:%s", cfg.HTTPTargetHost, cfg.HTTPTargetPort) - wp := websocket.New(target, cfg.HTTPTargetPath, "ws", handler, logger) + target := fmt.Sprintf("%s:%s", cfg.HTTPAdapter.HTTPTargetHost, cfg.HTTPAdapter.HTTPTargetPort) + wp := websocket.New(target, cfg.HTTPAdapter.HTTPTargetPath, "ws", handler, logger) http.Handle("/mqtt", wp.Handler()) errCh := make(chan error) go func() { - errCh <- wp.Listen(cfg.HTTPPort) + errCh <- wp.Listen(cfg.HTTPAdapter.HTTPPort) }() select { @@ -204,7 +224,7 @@ func proxyWS(ctx context.Context, cfg config, logger mflog.Logger, handler sessi func healthcheck(cfg config) func() error { return func() error { - res, err := http.Get(cfg.MQTTTargetHealthCheck) + res, err := http.Get(cfg.MQTTAdapter.MQTTTargetHealthCheck) if err != nil { return err } @@ -219,3 +239,19 @@ func healthcheck(cfg config) func() error { return nil } } + +func parseConfigFile(cfg *config) error { + file, err := os.Open(cfg.ConfigFile) + if err != nil { + return err + } + fileData, err := io.ReadAll(file) + if err != nil { + return err + } + if err := toml.Unmarshal(fileData, cfg); err != nil { + return err + } + + return nil +} diff --git a/configs/config.toml b/configs/config.toml new file mode 100644 index 0000000..823f901 --- /dev/null +++ b/configs/config.toml @@ -0,0 +1,20 @@ +[MQTTAdapter] + PORT = "1883" + TARGET_HOST = "mqtt.example.com" + TARGET_PORT = "1883" + FORWARDER_TIMEOUT = "45s" + HEALTH_CHECK = "/health" + +[HTTPAdapter] + PORT = "8080" + TARGET_HOST = "ws.example.com" + TARGET_PORT = "8080" + TARGET_PATH = "/mqtt" + +[General] + INSTANCE = "example-instance" + JAEGER_URL = "http://jaeger.example.com:14268/api/traces" + BROKER_URL = "nats://nats.example.com:4222" + SEND_TELEMETRY = true + INSTANCE_ID = "unique-instance-id" + LOG_LEVEL = "debug" diff --git a/docker/.env b/docker/.env index 8fa36ac..b6f4ea2 100644 --- a/docker/.env +++ b/docker/.env @@ -31,3 +31,4 @@ MF_MQTT_ADAPTER_ES_URL=es-redis:${MF_REDIS_TCP_PORT} MF_MQTT_ADAPTER_ES_PASS= MF_MQTT_ADAPTER_INSTANCE_ID= MF_MQTT_ADAPTER_ES_DB=0 +MF_MQTT_ADAPTER_CONFIG_FILE="config.toml" diff --git a/go.mod b/go.mod index 2ee2538..9099394 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/nats-io/nats.go v1.27.1 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/pelletier/go-toml/v2 v2.0.9 github.com/rabbitmq/amqp091-go v1.8.1 // indirect github.com/rubenv/sql-migrate v1.5.1 // indirect github.com/segmentio/asm v1.2.0 // indirect diff --git a/go.sum b/go.sum index ba5de98..91cfc69 100644 --- a/go.sum +++ b/go.sum @@ -230,6 +230,9 @@ github.com/opencontainers/runc v1.1.7 h1:y2EZDS8sNng4Ksf0GUYNhKbTShZJPJg1FiXJNH/ github.com/opencontainers/runc v1.1.7/go.mod h1:CbUumNnWCuTGFukNXahoo/RFBZvDAgRh/smNYNOhA50= github.com/ory/dockertest/v3 v3.10.0 h1:4K3z2VMe8Woe++invjaTB7VRyQXQy5UY+loujO4aNE4= github.com/ory/dockertest/v3 v3.10.0/go.mod h1:nr57ZbRWMqfsdGdFNLHz5jjNdDb7VVFnzAeW1n5N1Lg= +github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= +github.com/pelletier/go-toml/v2 v2.0.9 h1:uH2qQXheeefCCkuBBSLi7jCiSmj3VRh2+Goq2N7Xxu0= +github.com/pelletier/go-toml/v2 v2.0.9/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=