Skip to content

Commit

Permalink
Merge pull request #69 from mycelial-labs/sb/autopaho_setup_connect
Browse files Browse the repository at this point in the history
autopaho: set connect packet parameters
  • Loading branch information
Al S-M authored Jul 20, 2021
2 parents a215349 + 4efbe69 commit 21ea806
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 8 deletions.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
.PHONY: test
test:
.PHONY: test unittest

unittest:
go test -race -tags=unittest ./autopaho/ -v -count 1

test: unittest
go test -race ./packets/ -v -count 1
go test -race ./paho/ -v -count 1

102 changes: 101 additions & 1 deletion autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"crypto/tls"
"errors"
"fmt"
"github.com/gorilla/websocket"
"net/http"
"net/url"
"sync"
"time"

"github.com/gorilla/websocket"

"github.com/eclipse/paho.golang/paho"
)

Expand Down Expand Up @@ -49,6 +50,21 @@ type ClientConfig struct {
Debug paho.Logger // By default set to NOOPLogger{},set to a logger for debugging info
PahoDebug paho.Logger // debugger passed to the paho package (will default to NOOPLogger{})

connectUsername string
connectPassword []byte

willTopic string
willPayload []byte
willQos byte
willRetain bool
willPayloadFormat byte
willMessageExpiry uint32
willContentType string
willResponseTopic string
willCorrelationData []byte

connectPacketBuilder func(*paho.Connect) *paho.Connect

// We include the full paho.ClientConfig in order to simplify moving between the two packages.
// Note that that Conn will be ignored.
paho.ClientConfig
Expand All @@ -65,6 +81,90 @@ type ConnectionManager struct {
done chan struct{} // Channel that will be closed when the process has cleanly shutdown
}

// ResetUsernamePassword clears any configured username and password on the client configuration
func (cfg *ClientConfig) ResetUsernamePassword() {
cfg.connectPassword = []byte{}
cfg.connectUsername = ""
}

// SetUsernamePassword configures username and password properties for the Connect packets
// These values are staged in the ClientConfig, and preparation of the Connect packet is deferred.
func (cfg *ClientConfig) SetUsernamePassword(username string, password []byte) {
if len(username) > 0 {
cfg.connectUsername = username
}

if len(password) > 0 {
cfg.connectPassword = password
}
}

// SetWillMessage configures the Will topic, payload, QOS and Retain facets of the client connection
// These values are staged in the ClientConfig, for later preparation of the Connect packet.
func (cfg *ClientConfig) SetWillMessage(topic string, payload []byte, qos byte, retain bool) {
cfg.willTopic = topic
cfg.willPayload = payload
cfg.willQos = qos
cfg.willRetain = retain
}

// SetConnectPacketConfigurator assigns a callback for modification of the Connect packet, called before the connection is opened, allowing the application to adjust its configuration before establishing a connection.
// This function should be treated as asynchronous, and expected to have no side effects.
func (cfg *ClientConfig) SetConnectPacketConfigurator(fn func(*paho.Connect) *paho.Connect) bool {
cfg.connectPacketBuilder = fn
return fn != nil
}

// buildConnectPacket constructs a Connect packet for the paho client, based on staged configuration.
// If the program uses SetConnectPacketConfigurator, the provided callback will be executed with the preliminary Connect packet representation.
func (cfg *ClientConfig) buildConnectPacket() *paho.Connect {

cp := &paho.Connect{
KeepAlive: cfg.KeepAlive,
ClientID: cfg.ClientID,
CleanStart: true, // while persistence is not supported we should probably start clean...
}

if len(cfg.connectUsername) > 0 {
cp.UsernameFlag = true
cp.Username = cfg.connectUsername
}

if len(cfg.connectPassword) > 0 {
cp.PasswordFlag = true
cp.Password = cfg.connectPassword
}

if len(cfg.willTopic) > 0 && len(cfg.willPayload) > 0 {
cp.WillMessage = &paho.WillMessage{
Retain: cfg.willRetain,
Payload: cfg.willPayload,
Topic: cfg.willTopic,
QoS: cfg.willQos,
}

// how the broker should wait before considering the client disconnected
// hopefully this default is sensible for most applications, tolerating short interruptions
willDelayInterval := uint32(2 * cfg.KeepAlive)

cp.WillProperties = &paho.WillProperties{
// Most of these are nil/empty or defaults until related methods are exposed for configuration
WillDelayInterval: &willDelayInterval,
PayloadFormat: &cfg.willPayloadFormat,
MessageExpiry: &cfg.willMessageExpiry,
ContentType: cfg.willContentType,
ResponseTopic: cfg.willResponseTopic,
CorrelationData: cfg.willCorrelationData,
}
}

if nil != cfg.connectPacketBuilder {
cp = cfg.connectPacketBuilder(cp)
}

return cp
}

// NewConnection creates a connection manager and begins the connection process (will retry until the context is cancelled)
func NewConnection(ctx context.Context, cfg ClientConfig) (*ConnectionManager, error) {
if cfg.Debug == nil {
Expand Down
94 changes: 94 additions & 0 deletions autopaho/auto_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// build +unittest

package autopaho

import (
"bytes"
"fmt"
"net/url"
"testing"
"time"

"github.com/eclipse/paho.golang/paho"
)

func TestClientConfig_buildConnectPacket(t *testing.T) {
broker, _ := url.Parse("tcp://127.0.0.1:1883")

config := ClientConfig{
BrokerUrls: []*url.URL{broker},
KeepAlive: 5,
ConnectRetryDelay: 5 * time.Second,
ConnectTimeout: 5 * time.Second,

// extends the lower-level paho.ClientConfig
ClientConfig: paho.ClientConfig{
ClientID: "test",
},
}

// Validate initial state
cp := config.buildConnectPacket()

if cp.WillMessage != nil {
t.Errorf("Expected empty Will message, got: %v", cp.WillMessage)
}

if cp.UsernameFlag != false || cp.Username != "" {
t.Errorf("Expected absent/empty username, got: flag=%v username=%v", cp.UsernameFlag, cp.Username)
}

if cp.PasswordFlag != false || len(cp.Password) > 0 {
t.Errorf("Expected absent/empty password, got: flag=%v password=%v", cp.PasswordFlag, cp.Password)
}

// Set some common parameters
config.SetUsernamePassword("testuser", []byte("testpassword"))
config.SetWillMessage(fmt.Sprintf("client/%s/state", config.ClientID), []byte("disconnected"), 1, true)

cp = config.buildConnectPacket()

if cp.UsernameFlag == false || cp.Username != "testuser" {
t.Errorf("Expected a username, got: flag=%v username=%v", cp.UsernameFlag, cp.Username)
}

pmatch := bytes.Compare(cp.Password, []byte("testpassword"))

if cp.PasswordFlag == false || len(cp.Password) == 0 || pmatch != 0 {
t.Errorf("Expected a password, got: flag=%v password=%v", cp.PasswordFlag, cp.Password)
}

if cp.WillMessage == nil {
t.Error("Expected a Will message, found nil")
}

if cp.WillMessage.Topic != "client/test/state" {
t.Errorf("Will message topic did not match expected [%v], found [%v]", "client/test/state", cp.WillMessage.Topic)
}

if cp.WillMessage.QoS != byte(1) {
t.Errorf("Will message QOS did not match expected [1]: found [%v]", cp.WillMessage.QoS)
}

if cp.WillMessage.Retain != true {
t.Errorf("Will message Retain did not match expected [true]: found [%v]", cp.WillMessage.Retain)
}

if *(cp.WillProperties.WillDelayInterval) != 10 { // assumes default 2x keep alive
t.Errorf("Will message Delay Interval did not match expected [10]: found [%v]", *(cp.Properties.WillDelayInterval))
}

// Set an override method for the connect packet
config.SetConnectPacketConfigurator(func(c *paho.Connect) *paho.Connect {
delay := uint32(200)
c.WillProperties.WillDelayInterval = &delay
return c
})

cp = config.buildConnectPacket()

if *(cp.WillProperties.WillDelayInterval) != 200 { // verifies the override
t.Errorf("Will message Delay Interval did not match expected [200]: found [%v]", *(cp.Properties.WillDelayInterval))
}

}
6 changes: 1 addition & 5 deletions autopaho/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ func establishBrokerConnection(ctx context.Context, cfg ClientConfig) (*paho.Cli

if err == nil {
cli := paho.NewClient(cfg.ClientConfig)
cp := &paho.Connect{
KeepAlive: cfg.KeepAlive,
ClientID: cfg.ClientID,
CleanStart: true, // while persistence is not supported we should probably start clean...
}
cp := cfg.buildConnectPacket()
var ca *paho.Connack
ca, err = cli.Connect(connectionCtx, cp) // will return an error if the connection is unsuccessful (checks the reason code)
if err == nil { // Successfully connected
Expand Down

0 comments on commit 21ea806

Please sign in to comment.