-
Notifications
You must be signed in to change notification settings - Fork 64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(service): add mqtt service #149
Changes from 9 commits
9d5e307
7b4e3a9
cba79ac
333ad76
2243be0
365f8c3
6bef5f6
87b65b1
8c17b96
5e1b0bb
fca4330
37c66fa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,50 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# MQTT | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## URL Format | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_mqtt://**`host`**:**`port`**?topic=**`topic`**_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Optional parameters | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
You can optionally specify the **`disableTLS`**, **`clientID`**, **`username`** and **`password`** parameters in the URL: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
_mqtt://**`host`**:**`port`**?topic=**`topic`**&disableTLS=true&clientID=**`clientID`**&username=**`username`**&password:**`password`**_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Parameters Description | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **Host** - MQTT broker server hostname or IP address (**Required**) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: _empty_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Aliases: `host` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **Port** - MQTT server port, common ones are 8883 for TCP/TLS and 1883 for TCP (**Required**) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: `8883` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **Topic** - Topic where the message is sent (**Required**) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: _empty_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Aliases: `Topic` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **DisableTLS** - disable TLS/SSL Configurations | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: `false` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **ClientID** - The client identifier (ClientID) identifies each MQTT client that connects to an MQTT | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: _empty_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Aliases: `clientID` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **Username** - name of the sender to auth | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: _empty_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Aliases: `clientID` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- **Password** - authentication password or hash | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Default: _empty_ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Aliases: `password` | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+12
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can auto-generated from the config tags
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Certificates to use TCP/TLS | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
To use TCP/TLS connection, it is necessary the files: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- Cerficate Authority: ca.crt | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- Client Certificate: client.crt | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
- Client Key: client.key | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
## Configure TLS in mosquitto | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Generate the certificates [mosquitto-tls](https://mosquitto.org/man/mosquitto-tls-7.html). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package mqtt | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"net/url" | ||
|
||
"github.com/containrrr/shoutrrr/pkg/format" | ||
"github.com/containrrr/shoutrrr/pkg/util" | ||
mqtt "github.com/eclipse/paho.mqtt.golang" | ||
|
||
"github.com/containrrr/shoutrrr/pkg/services/standard" | ||
"github.com/containrrr/shoutrrr/pkg/types" | ||
) | ||
|
||
const ( | ||
maxLength = 268435455 | ||
) | ||
|
||
// Service sends notifications to mqtt topic | ||
type Service struct { | ||
standard.Standard | ||
config *Config | ||
pkr format.PropKeyResolver | ||
} | ||
|
||
// Send notification to mqtt | ||
func (service *Service) Send(message string, params *types.Params) error { | ||
|
||
message, omitted := MessageLimit(message) | ||
|
||
if omitted > 0 { | ||
service.Logf("omitted %v character(s) from the message", omitted) | ||
} | ||
|
||
config := *service.config | ||
if err := service.pkr.UpdateConfigFromParams(&config, params); err != nil { | ||
return err | ||
} | ||
|
||
if err := service.PublishMessageToTopic(message, &config); err != nil { | ||
return fmt.Errorf("an error occurred while sending notification to the MQTT topic: %s", err.Error()) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Initialize loads ServiceConfig from configURL and sets logger for this Service | ||
func (service *Service) Initialize(configURL *url.URL, logger *log.Logger) error { | ||
service.Logger.SetLogger(logger) | ||
service.config = &Config{ | ||
DisableTLS: false, | ||
Port: 8883, | ||
} | ||
service.pkr = format.NewPropKeyResolver(service.config) | ||
err := service.config.setURL(&service.pkr, configURL) | ||
|
||
return err | ||
} | ||
|
||
// MessageLimit returns a string with the maximum size and the amount of omitted characters | ||
func MessageLimit(message string) (string, int) { | ||
size := util.Min(maxLength, len(message)) | ||
omitted := len(message) - size | ||
|
||
return message[:size], omitted | ||
} | ||
|
||
// GetConfig returns the Config for the service | ||
func (service *Service) GetConfig() *Config { | ||
return service.config | ||
} | ||
|
||
// Publish to topic | ||
func (service *Service) Publish(client mqtt.Client, topic string, message string) { | ||
token := client.Publish(topic, 0, false, message) | ||
token.Wait() | ||
} | ||
|
||
// PublishMessageToTopic initializes the client and publishes the message | ||
func (service *Service) PublishMessageToTopic(message string, config *Config) error { | ||
postURL := config.MqttURL() | ||
opts := config.GetClientConfig(postURL) | ||
client := mqtt.NewClient(opts) | ||
token := client.Connect() | ||
|
||
if token.Error() != nil { | ||
return token.Error() | ||
} | ||
|
||
token.Wait() | ||
|
||
service.Publish(client, config.Topic, message) | ||
|
||
client.Disconnect(250) | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,138 @@ | ||||||||||||
package mqtt | ||||||||||||
|
||||||||||||
import ( | ||||||||||||
"crypto/tls" | ||||||||||||
"crypto/x509" | ||||||||||||
"fmt" | ||||||||||||
"io/ioutil" | ||||||||||||
"log" | ||||||||||||
"net/url" | ||||||||||||
"strconv" | ||||||||||||
|
||||||||||||
"github.com/containrrr/shoutrrr/pkg/format" | ||||||||||||
"github.com/containrrr/shoutrrr/pkg/types" | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (URL creds)
Suggested change
|
||||||||||||
mqtt "github.com/eclipse/paho.mqtt.golang" | ||||||||||||
) | ||||||||||||
|
||||||||||||
// Config for use within the mqtt | ||||||||||||
type Config struct { | ||||||||||||
Host string `key:"host" default:"" desc:"MQTT broker server hostname or IP address"` | ||||||||||||
Port uint16 `key:"port" default:"8883" desc:"MQTT server port, common ones are 8883, 1883"` | ||||||||||||
Topic string `key:"topic" default:"" desc:"Topic where the message is sent"` | ||||||||||||
ClientID string `key:"clientid" default:"" desc:"client's id from the message is sent"` | ||||||||||||
Username string `key:"username" default:"" desc:"username for auth"` | ||||||||||||
Password string `key:"password" default:"" desc:"password for auth"` | ||||||||||||
Comment on lines
+23
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (URL creds)
Suggested change
|
||||||||||||
DisableTLS bool `key:"disabletls" default:"No"` | ||||||||||||
} | ||||||||||||
|
||||||||||||
// Enums returns the fields that should use a corresponding EnumFormatter to Print/Parse their values | ||||||||||||
func (config *Config) Enums() map[string]types.EnumFormatter { | ||||||||||||
return map[string]types.EnumFormatter{} | ||||||||||||
} | ||||||||||||
|
||||||||||||
// GetURL returns a URL representation of it's current field values | ||||||||||||
func (config *Config) GetURL() *url.URL { | ||||||||||||
resolver := format.NewPropKeyResolver(config) | ||||||||||||
return config.getURL(&resolver) | ||||||||||||
} | ||||||||||||
|
||||||||||||
// SetURL updates a ServiceConfig from a URL representation of it's field values | ||||||||||||
func (config *Config) SetURL(url *url.URL) error { | ||||||||||||
resolver := format.NewPropKeyResolver(config) | ||||||||||||
return config.setURL(&resolver, url) | ||||||||||||
} | ||||||||||||
|
||||||||||||
func (config *Config) getURL(resolver types.ConfigQueryResolver) *url.URL { | ||||||||||||
|
||||||||||||
return &url.URL{ | ||||||||||||
Host: fmt.Sprintf("%s:%d", config.Host, config.Port), | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (URL creds)
Suggested change
|
||||||||||||
Scheme: Scheme, | ||||||||||||
ForceQuery: true, | ||||||||||||
RawQuery: format.BuildQuery(resolver), | ||||||||||||
} | ||||||||||||
|
||||||||||||
} | ||||||||||||
|
||||||||||||
func (config *Config) setURL(resolver types.ConfigQueryResolver, url *url.URL) error { | ||||||||||||
|
||||||||||||
config.Host = url.Hostname() | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (URL creds)
Suggested change
|
||||||||||||
|
||||||||||||
if port, err := strconv.ParseUint(url.Port(), 10, 16); err == nil { | ||||||||||||
config.Port = uint16(port) | ||||||||||||
} | ||||||||||||
|
||||||||||||
for key, vals := range url.Query() { | ||||||||||||
if err := resolver.Set(key, vals[0]); err != nil { | ||||||||||||
return err | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
return nil | ||||||||||||
} | ||||||||||||
|
||||||||||||
// MqttURL returns a string that is synchronized with the config props | ||||||||||||
func (config *Config) MqttURL() string { | ||||||||||||
MqttHost := config.Host | ||||||||||||
MqttPort := config.Port | ||||||||||||
scheme := DefaultMQTTScheme | ||||||||||||
if config.DisableTLS { | ||||||||||||
scheme = Scheme[:4] | ||||||||||||
} | ||||||||||||
return fmt.Sprintf("%s://%s:%d", scheme, MqttHost, MqttPort) | ||||||||||||
} | ||||||||||||
|
||||||||||||
// GetClientConfig returns the client options | ||||||||||||
func (config *Config) GetClientConfig(postURL string) *mqtt.ClientOptions { | ||||||||||||
opts := mqtt.NewClientOptions() | ||||||||||||
|
||||||||||||
opts.AddBroker(postURL) | ||||||||||||
|
||||||||||||
if len(config.ClientID) > 0 { | ||||||||||||
opts.SetClientID(config.ClientID) | ||||||||||||
} | ||||||||||||
|
||||||||||||
if len(config.Username) > 0 { | ||||||||||||
opts.SetUsername(config.Username) | ||||||||||||
} | ||||||||||||
|
||||||||||||
if len(config.Password) > 0 { | ||||||||||||
opts.SetPassword(config.Password) | ||||||||||||
} | ||||||||||||
|
||||||||||||
if !config.DisableTLS { | ||||||||||||
tlsConfig := config.GetTLSConfig() | ||||||||||||
opts.SetTLSConfig(tlsConfig) | ||||||||||||
} | ||||||||||||
|
||||||||||||
return opts | ||||||||||||
} | ||||||||||||
|
||||||||||||
// GetTLSConfig returns the configuration with the certificates for TLS | ||||||||||||
func (config *Config) GetTLSConfig() *tls.Config { | ||||||||||||
certpool := x509.NewCertPool() | ||||||||||||
ca, err := ioutil.ReadFile("ca.crt") | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requiring the files to have this specific name and be in the working directory of the consuming app makes this really hard to use. Ideally, we should add some kind of generic way to add files/blobs to services (mostly for TLS, so perhaps even a specific TLS-cert interface...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented in #185 |
||||||||||||
|
||||||||||||
if err != nil { | ||||||||||||
log.Fatalln(err.Error()) | ||||||||||||
} | ||||||||||||
certpool.AppendCertsFromPEM(ca) | ||||||||||||
|
||||||||||||
clientKeyPair, err := tls.LoadX509KeyPair("client.crt", "client.key") | ||||||||||||
if err != nil { | ||||||||||||
panic(err) | ||||||||||||
} | ||||||||||||
return &tls.Config{ | ||||||||||||
RootCAs: certpool, | ||||||||||||
ClientAuth: tls.NoClientCert, | ||||||||||||
ClientCAs: nil, | ||||||||||||
InsecureSkipVerify: true, | ||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the point of loading custom certs when verification is disabled? This should absolutely not be the default. |
||||||||||||
Certificates: []tls.Certificate{clientKeyPair}, | ||||||||||||
} | ||||||||||||
} | ||||||||||||
|
||||||||||||
const ( | ||||||||||||
// Scheme is the identifying part of this service's configuration URL | ||||||||||||
Scheme = "mqtt" | ||||||||||||
// DefaultMQTTScheme is the scheme used for MQTT URLs unless overridden | ||||||||||||
DefaultMQTTScheme = "mqtts" | ||||||||||||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(URL creds)