diff --git a/docs/services/mqtt.md b/docs/services/mqtt.md new file mode 100644 index 00000000..c3eafd59 --- /dev/null +++ b/docs/services/mqtt.md @@ -0,0 +1,50 @@ +# MQTT + +## URL Format + +_mqtt://**`host`**:**`port`**?topic=**`topic`**_ + +## Optional parameters + +You can optionally specify the **`disableTLS`**, **`clientID`**, **`username`** and **`password`** parameters in the URL: +_mqtt://**`host`**:**`port`**?topic=**`topic`**&disableTLS=true&clientID=**`clientID`**&username=**`username`**&password:**`password`**_ + +## Parameters Description + +- **Host** - MQTT broker server hostname or IP address (**Required**) + Default: _empty_ + Aliases: `host` + +- **Port** - MQTT server port, common ones are 8883 for TCP/TLS and 1883 for TCP (**Required**) + Default: `8883` + +- **Topic** - Topic where the message is sent (**Required**) + Default: _empty_ + Aliases: `Topic` + +- **DisableTLS** - disable TLS/SSL Configurations + Default: `false` + +- **ClientID** - The client identifier (ClientID) identifies each MQTT client that connects to an MQTT + Default: _empty_ + Aliases: `clientID` + +- **Username** - name of the sender to auth + Default: _empty_ + Aliases: `clientID` + +- **Password** - authentication password or hash + Default: _empty_ + Aliases: `password` + +## Certificates to use TCP/TLS + +To use TCP/TLS connection, it is necessary the files: + +- Cerficate Authority: ca.crt +- Client Certificate: client.crt +- Client Key: client.key + +## Configure TLS in mosquitto + +Generate the certificates [mosquitto-tls](https://mosquitto.org/man/mosquitto-tls-7.html). diff --git a/docs/services/overview.md b/docs/services/overview.md index 46c85ae7..6f355185 100644 --- a/docs/services/overview.md +++ b/docs/services/overview.md @@ -11,6 +11,7 @@ Click on the service for a more thorough explanation. | [IFTTT](./ifttt.md) | *ifttt://__`key`__/?events=__`event1`__[,__`event2`__,...]&value1=__`value1`__&value2=__`value2`__&value3=__`value3`__* | | [Join](./join.md) | *join://shoutrrr:__`api-key`__@join/?devices=__`device1`__[,__`device2`__, ...][&icon=__`icon`__][&title=__`title`__]* | | [Mattermost](./mattermost.md) | *mattermost://[__`username`__@]__`mattermost-host`__/__`token`__[/__`channel`__]* | +| [MQTT](./mqtt.md) | *mqtt://__`host`__:__`port`__?topic=__`topic`__* | [OpsGenie](./opsgenie.md) | *opsgenie://__`host`__/token?responders=__`responder1`__[,__`responder2`__]* | | [Pushbullet](./pushbullet.md) | *pushbullet://__`api-token`__[/__`device`__/#__`channel`__/__`email`__]* | | [Pushover](./pushover.md) | *pushover://shoutrrr:__`apiToken`__@__`userKey`__/?devices=__`device1`__[,__`device2`__, ...]* | diff --git a/go.mod b/go.mod index a85a4782..883f45ab 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/containrrr/shoutrrr go 1.12 require ( + github.com/eclipse/paho.mqtt.golang v1.3.2 // indirect github.com/fatih/color v1.10.0 github.com/google/uuid v1.1.5 // indirect github.com/jarcoal/httpmock v1.0.4 diff --git a/go.sum b/go.sum index e1969ea2..cc836c33 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= +github.com/eclipse/paho.mqtt.golang v1.3.2 h1:ICzfxSyrR8bOsh9l8JBBOwO1tc2C26oEyody0ml0L6E= +github.com/eclipse/paho.mqtt.golang v1.3.2/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/fatih/color v1.6.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -102,6 +104,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= @@ -277,6 +281,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190522155817-f3200d17e092 h1:4QSRKanuywn15aTZvI/mIDEgPQpswuFndXpOj3rKEco= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= diff --git a/pkg/router/servicemap.go b/pkg/router/servicemap.go index 88d120e5..3833ccfd 100644 --- a/pkg/router/servicemap.go +++ b/pkg/router/servicemap.go @@ -10,6 +10,7 @@ import ( "github.com/containrrr/shoutrrr/pkg/services/logger" "github.com/containrrr/shoutrrr/pkg/services/matrix" "github.com/containrrr/shoutrrr/pkg/services/mattermost" + "github.com/containrrr/shoutrrr/pkg/services/mqtt" "github.com/containrrr/shoutrrr/pkg/services/opsgenie" "github.com/containrrr/shoutrrr/pkg/services/pushbullet" "github.com/containrrr/shoutrrr/pkg/services/pushover" @@ -33,6 +34,7 @@ var serviceMap = map[string]func() t.Service{ "logger": func() t.Service { return &logger.Service{} }, "matrix": func() t.Service { return &matrix.Service{} }, "mattermost": func() t.Service { return &mattermost.Service{} }, + "mqtt": func() t.Service { return &mqtt.Service{} }, "opsgenie": func() t.Service { return &opsgenie.Service{} }, "pushbullet": func() t.Service { return &pushbullet.Service{} }, "pushover": func() t.Service { return &pushover.Service{} }, diff --git a/pkg/services/mqtt/mqtt.go b/pkg/services/mqtt/mqtt.go new file mode 100644 index 00000000..94a9258b --- /dev/null +++ b/pkg/services/mqtt/mqtt.go @@ -0,0 +1,98 @@ +package mqtt + +import ( + "fmt" + "log" + "net/url" + + "github.com/containrrr/shoutrrr/pkg/format" + "github.com/containrrr/shoutrrr/pkg/util" + mqtt "github.com/eclipse/paho.mqtt.golang" + + "github.com/containrrr/shoutrrr/pkg/services/standard" + "github.com/containrrr/shoutrrr/pkg/types" +) + +const ( + maxLength = 268435455 +) + +// Service sends notifications to mqtt topic +type Service struct { + standard.Standard + config *Config + pkr format.PropKeyResolver +} + +// Send notification to mqtt +func (service *Service) Send(message string, params *types.Params) error { + + message, omitted := MessageLimit(message) + + if omitted > 0 { + service.Logf("omitted %v character(s) from the message", omitted) + } + + config := *service.config + if err := service.pkr.UpdateConfigFromParams(&config, params); err != nil { + return err + } + + if err := service.PublishMessageToTopic(message, &config); err != nil { + return fmt.Errorf("an error occurred while sending notification to the MQTT topic: %s", err.Error()) + } + + return nil +} + +// Initialize loads ServiceConfig from configURL and sets logger for this Service +func (service *Service) Initialize(configURL *url.URL, logger *log.Logger) error { + service.Logger.SetLogger(logger) + service.config = &Config{ + DisableTLS: false, + Port: 8883, + } + service.pkr = format.NewPropKeyResolver(service.config) + err := service.config.setURL(&service.pkr, configURL) + + return err +} + +// MessageLimit returns a string with the maximum size and the amount of omitted characters +func MessageLimit(message string) (string, int) { + size := util.Min(maxLength, len(message)) + omitted := len(message) - size + + return message[:size], omitted +} + +// GetConfig returns the Config for the service +func (service *Service) GetConfig() *Config { + return service.config +} + +// Publish to topic +func (service *Service) Publish(client mqtt.Client, topic string, message string) { + token := client.Publish(topic, 0, false, message) + token.Wait() +} + +// PublishMessageToTopic initializes the client and publishes the message +func (service *Service) PublishMessageToTopic(message string, config *Config) error { + postURL := config.MqttURL() + opts := config.GetClientConfig(postURL) + client := mqtt.NewClient(opts) + token := client.Connect() + + if token.Error() != nil { + return token.Error() + } + + token.Wait() + + service.Publish(client, config.Topic, message) + + client.Disconnect(250) + + return nil +} diff --git a/pkg/services/mqtt/mqtt_config.go b/pkg/services/mqtt/mqtt_config.go new file mode 100644 index 00000000..36bd17f5 --- /dev/null +++ b/pkg/services/mqtt/mqtt_config.go @@ -0,0 +1,146 @@ +package mqtt + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + "net/url" + "strconv" + + "github.com/containrrr/shoutrrr/pkg/format" + "github.com/containrrr/shoutrrr/pkg/types" + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// Config for use within the mqtt +type Config struct { + Host string `key:"host" default:"" desc:"MQTT broker server hostname or IP address"` + Port uint16 `key:"port" default:"8883" desc:"MQTT server port, common ones are 8883, 1883"` + Topic string `key:"topic" default:"" desc:"Topic where the message is sent"` + ClientID string `key:"clientid" default:"" desc:"client's id from the message is sent"` + Username string `key:"username" default:"" desc:"username for auth"` + Password string `key:"password" default:"" desc:"password for auth"` + DisableTLS bool `key:"disabletls" default:"No"` +} + +// DefaultConfig creates a PropKeyResolver and uses it to populate the default values of a new Config, returning both +func DefaultConfig() (*Config, format.PropKeyResolver) { + config := &Config{} + pkr := format.NewPropKeyResolver(config) + _ = pkr.SetDefaultProps(config) + return config, pkr +} + +// Enums returns the fields that should use a corresponding EnumFormatter to Print/Parse their values +func (config *Config) Enums() map[string]types.EnumFormatter { + return map[string]types.EnumFormatter{} +} + +// GetURL returns a URL representation of it's current field values +func (config *Config) GetURL() *url.URL { + resolver := format.NewPropKeyResolver(config) + return config.getURL(&resolver) +} + +// SetURL updates a ServiceConfig from a URL representation of it's field values +func (config *Config) SetURL(url *url.URL) error { + resolver := format.NewPropKeyResolver(config) + return config.setURL(&resolver, url) +} + +func (config *Config) getURL(resolver types.ConfigQueryResolver) *url.URL { + + return &url.URL{ + Host: fmt.Sprintf("%s:%d", config.Host, config.Port), + Scheme: Scheme, + ForceQuery: true, + RawQuery: format.BuildQuery(resolver), + } + +} + +func (config *Config) setURL(resolver types.ConfigQueryResolver, url *url.URL) error { + + config.Host = url.Hostname() + + if port, err := strconv.ParseUint(url.Port(), 10, 16); err == nil { + config.Port = uint16(port) + } + + for key, vals := range url.Query() { + if err := resolver.Set(key, vals[0]); err != nil { + return err + } + } + + return nil +} + +// MqttURL returns a string that is synchronized with the config props +func (config *Config) MqttURL() string { + MqttHost := config.Host + MqttPort := config.Port + scheme := DefaultMQTTScheme + if config.DisableTLS { + scheme = Scheme[:4] + } + return fmt.Sprintf("%s://%s:%d", scheme, MqttHost, MqttPort) +} + +// GetClientConfig returns the client options +func (config *Config) GetClientConfig(postURL string) *mqtt.ClientOptions { + opts := mqtt.NewClientOptions() + + opts.AddBroker(postURL) + + if len(config.ClientID) > 0 { + opts.SetClientID(config.ClientID) + } + + if len(config.Username) > 0 { + opts.SetUsername(config.Username) + } + + if len(config.Password) > 0 { + opts.SetPassword(config.Password) + } + + if !config.DisableTLS { + tlsConfig := config.GetTLSConfig() + opts.SetTLSConfig(tlsConfig) + } + + return opts +} + +// GetTLSConfig returns the configuration with the certificates for TLS +func (config *Config) GetTLSConfig() *tls.Config { + certpool := x509.NewCertPool() + ca, err := ioutil.ReadFile("ca.crt") + + if err != nil { + log.Fatalln(err.Error()) + } + certpool.AppendCertsFromPEM(ca) + + clientKeyPair, err := tls.LoadX509KeyPair("client.crt", "client.key") + if err != nil { + panic(err) + } + return &tls.Config{ + RootCAs: certpool, + ClientAuth: tls.NoClientCert, + ClientCAs: nil, + InsecureSkipVerify: true, + Certificates: []tls.Certificate{clientKeyPair}, + } +} + +const ( + // Scheme is the identifying part of this service's configuration URL + Scheme = "mqtt" + // DefaultMQTTScheme is the scheme used for MQTT URLs unless overridden + DefaultMQTTScheme = "mqtts" +) diff --git a/pkg/services/mqtt/mqtt_test.go b/pkg/services/mqtt/mqtt_test.go new file mode 100644 index 00000000..73ccce43 --- /dev/null +++ b/pkg/services/mqtt/mqtt_test.go @@ -0,0 +1,160 @@ +package mqtt + +import ( + "fmt" + "log" + "net/url" + "testing" + + "github.com/containrrr/shoutrrr/pkg/format" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + logger = log.New(GinkgoWriter, "Test", log.LstdFlags) + service *Service +) + +func TestMqtt(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "MQTT Suite") +} + +var _ = Describe("the MQTT service", func() { + BeforeEach(func() { + service = &Service{} + service.SetLogger(logger) + }) + + When("TLS is enabled", func() { + It("should use mqtts schema", func() { + broker := "localhost" + port := 8883 + disableTLS := "false" + config, _ := getTestConfig(fmt.Sprintf("mqtt://%s:%d?disableTLS=%s", broker, port, disableTLS)) + postURL := config.MqttURL() + + Expect(postURL).To(Equal(fmt.Sprintf("mqtts://%s:%d", broker, port))) + }) + }) + + When("TLS is disabled", func() { + It("should use mqtt schema", func() { + broker := "localhost" + port := 1883 + disableTLS := "true" + config, _ := getTestConfig(fmt.Sprintf("mqtt://%s:%d?disableTLS=%s", broker, port, disableTLS)) + postURL := config.MqttURL() + + Expect(postURL).To(Equal(fmt.Sprintf("mqtt://%s:%d", broker, port))) + }) + }) + + When("a MQTT URL is provided", func() { + It("should disable TLS", func() { + broker := "localhost" + port := 1883 + disableTLS := "true" + config, _ := getTestConfig(fmt.Sprintf("mqtt://%s:%d?disableTLS=%s", broker, port, disableTLS)) + config.MqttURL() + Expect(config.DisableTLS).To(BeTrue()) + }) + }) + + When("a MQTT URL is provided", func() { + It("should enable TLS", func() { + broker := "localhost" + port := 8883 + disableTLS := "false" + config, _ := getTestConfig(fmt.Sprintf("mqtt://%s:%d?disableTLS=%s", broker, port, disableTLS)) + config.MqttURL() + Expect(config.DisableTLS).To(BeFalse()) + }) + }) + + Describe("creating a config", func() { + When("creating a default config", func() { + It("should not return an error", func() { + config := &Config{} + pkr := format.NewPropKeyResolver(config) + err := pkr.SetDefaultProps(config) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + When("generating a config object with optional arguments", func() { + mqttURL, _ := url.Parse("mqtt://localhost:1883?topic=topic/test&disableTls=true&clientId=1&username=testUser&password=password") + config := &Config{} + err := config.SetURL(mqttURL) + It("should not have caused an error", func() { + Expect(err).NotTo(HaveOccurred()) + }) + It("should set host", func() { + Expect(config.Host).To(Equal("localhost")) + }) + It("should set Port", func() { + Expect(config.Port).To(Equal(uint16(1883))) + }) + + It("should set topic", func() { + Expect(config.Topic).To(Equal("topic/test")) + }) + + It("should set client", func() { + Expect(config.ClientID).To(Equal("1")) + }) + + It("should set username", func() { + Expect(config.Username).To(Equal("testUser")) + }) + + It("should set password", func() { + Expect(config.Password).To(Equal("password")) + }) + + It("should not set DisableTLS", func() { + Expect(config.DisableTLS).Should(BeTrue()) + }) + }) + + When("generating a config object without optional arguments", func() { + mqttURL, _ := url.Parse("mqtt://localhost:1883?topic=topic/test") + config := &Config{} + err := config.SetURL(mqttURL) + It("should not have caused an error", func() { + Expect(err).NotTo(HaveOccurred()) + }) + + It("should not set client", func() { + Expect(config.ClientID).To(Equal("")) + }) + + It("should set username", func() { + Expect(config.Username).To(Equal("")) + }) + + It("should set password", func() { + Expect(config.Password).To(Equal("")) + }) + + It("should set DisableTLS", func() { + Expect(config.DisableTLS).Should(BeFalse()) + }) + }) + + }) +}) + +// GetTestConfig return the object config of the service +func getTestConfig(testURL string) (*Config, *url.URL) { + + serviceURL, err := url.Parse(testURL) + Expect(err).NotTo(HaveOccurred()) + config, pkr := DefaultConfig() + err = config.setURL(&pkr, serviceURL) + Expect(err).NotTo(HaveOccurred()) + + return config, config.getURL(&pkr) +}