Skip to content

Commit

Permalink
NOISSUE - Refactor mProxy (#13)
Browse files Browse the repository at this point in the history
* Enrich logging for connection close

Signed-off-by: Dušan Borovčanin <[email protected]>

* Handle connection close explicitly

Refactor code.

Signed-off-by: Dušan Borovčanin <[email protected]>

* Update examples

Signed-off-by: Dušan Borovčanin <[email protected]>

* Rename handler field in Proxy and Session

Signed-off-by: Dušan Borovčanin <[email protected]>

* Fix logging

Signed-off-by: Dušan Borovčanin <[email protected]>

* Replace event with handler

Signed-off-by: Dušan Borovčanin <[email protected]>

* Rename EventHandler to Handler

Signed-off-by: Dušan Borovčanin <[email protected]>

* Handle only the first error

Signed-off-by: Dušan Borovčanin <[email protected]>
  • Loading branch information
dborovcanin authored Apr 22, 2020
1 parent ce73038 commit f776b15
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 56 deletions.
10 changes: 5 additions & 5 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ func main() {
log.Fatalf(err.Error())
}

ev := simple.New(logger)
h := simple.New(logger)

errs := make(chan error, 3)

// HTTP
logger.Info(fmt.Sprintf("Starting HTTP proxy on port %s ", cfg.httpPort))
go proxyHTTP(cfg, logger, ev, errs)
go proxyHTTP(cfg, logger, h, errs)

// MQTT
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s ", cfg.mqttPort))
go proxyMQTT(cfg, logger, ev, errs)
go proxyMQTT(cfg, logger, h, errs)

go func() {
c := make(chan os.Signal, 2)
Expand Down Expand Up @@ -121,7 +121,7 @@ func loadConfig() config {
}
}

func proxyHTTP(cfg config, logger logger.Logger, evt session.Event, errs chan error) {
func proxyHTTP(cfg config, logger logger.Logger, evt session.Handler, errs chan error) {
target := fmt.Sprintf("%s:%s", cfg.httpTargetHost, cfg.httpTargetPort)
wp := websocket.New(target, cfg.httpTargetPath, cfg.httpScheme, evt, logger)
http.Handle("/mqtt", wp.Handler())
Expand All @@ -130,7 +130,7 @@ func proxyHTTP(cfg config, logger logger.Logger, evt session.Event, errs chan er
errs <- http.ListenAndServe(p, nil)
}

func proxyMQTT(cfg config, logger logger.Logger, evt session.Event, errs chan error) {
func proxyMQTT(cfg config, logger logger.Logger, evt session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttPort)
target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort)
mp := mqtt.New(address, target, evt, logger)
Expand Down
42 changes: 21 additions & 21 deletions examples/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,62 @@ import (
"github.com/mainflux/mproxy/pkg/session"
)

var _ session.Event = (*Event)(nil)
var _ session.Handler = (*Handler)(nil)

// Event implements mqtt.Event interface
type Event struct {
// Handler implements mqtt.Handler interface
type Handler struct {
logger logger.Logger
}

// New creates new Event entity
func New(logger logger.Logger) *Event {
return &Event{
func New(logger logger.Logger) *Handler {
return &Handler{
logger: logger,
}
}

// AuthConnect is called on device connection,
// prior forwarding to the MQTT broker
func (e *Event) AuthConnect(c *session.Client) error {
e.logger.Info(fmt.Sprintf("AuthRegister() - clientID: %s, username: %s, password: %s", c.ID, c.Username, string(c.Password)))
func (h *Handler) AuthConnect(c *session.Client) error {
h.logger.Info(fmt.Sprintf("AuthRegister() - clientID: %s, username: %s, password: %s", c.ID, c.Username, string(c.Password)))
return nil
}

// AuthPublish is called on device publish,
// prior forwarding to the MQTT broker
func (e *Event) AuthPublish(c *session.Client, topic *string, payload *[]byte) error {
e.logger.Info(fmt.Sprintf("AuthPublish() - clientID: %s, topic: %s, payload: %s", c.ID, *topic, string(*payload)))
func (h *Handler) AuthPublish(c *session.Client, topic *string, payload *[]byte) error {
h.logger.Info(fmt.Sprintf("AuthPublish() - clientID: %s, topic: %s, payload: %s", c.ID, *topic, string(*payload)))
return nil
}

// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (e *Event) AuthSubscribe(c *session.Client, topics *[]string) error {
e.logger.Info(fmt.Sprintf("AuthSubscribe() - clientID: %s, topics: %s", c.ID, strings.Join(*topics, ",")))
func (h *Handler) AuthSubscribe(c *session.Client, topics *[]string) error {
h.logger.Info(fmt.Sprintf("AuthSubscribe() - clientID: %s, topics: %s", c.ID, strings.Join(*topics, ",")))
return nil
}

// Connect - after client successfully connected
func (e *Event) Connect(c *session.Client) {
e.logger.Info(fmt.Sprintf("Register() - username: %s, clientID: %s", c.Username, c.ID))
func (h *Handler) Connect(c *session.Client) {
h.logger.Info(fmt.Sprintf("Register() - username: %s, clientID: %s", c.Username, c.ID))
}

// Publish - after client successfully published
func (e *Event) Publish(c *session.Client, topic *string, payload *[]byte) {
e.logger.Info(fmt.Sprintf("Publish() - username: %s, clientID: %s, topic: %s, payload: %s", c.Username, c.ID, *topic, string(*payload)))
func (h *Handler) Publish(c *session.Client, topic *string, payload *[]byte) {
h.logger.Info(fmt.Sprintf("Publish() - username: %s, clientID: %s, topic: %s, payload: %s", c.Username, c.ID, *topic, string(*payload)))
}

// Subscribe - after client successfully subscribed
func (e *Event) Subscribe(c *session.Client, topics *[]string) {
e.logger.Info(fmt.Sprintf("Subscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ",")))
func (h *Handler) Subscribe(c *session.Client, topics *[]string) {
h.logger.Info(fmt.Sprintf("Subscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ",")))
}

// Unsubscribe - after client unsubscribed
func (e *Event) Unsubscribe(c *session.Client, topics *[]string) {
e.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ",")))
func (h *Handler) Unsubscribe(c *session.Client, topics *[]string) {
h.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ",")))
}

// Disconnect on conection lost
func (e *Event) Disconnect(c *session.Client) {
e.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconenectd", c.Username, c.ID))
func (h *Handler) Disconnect(c *session.Client) {
h.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconenected", c.Username, c.ID))
}
29 changes: 18 additions & 11 deletions pkg/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mqtt

import (
"fmt"
"io"
"net"

Expand All @@ -12,15 +13,16 @@ import (
type Proxy struct {
address string
target string
event session.Event
handler session.Handler
logger logger.Logger
}

func New(address, target string, event session.Event, logger logger.Logger) *Proxy {
// New returns a new mqtt Proxy instance.
func New(address, target string, handler session.Handler, logger logger.Logger) *Proxy {
return &Proxy{
address: address,
target: target,
event: event,
handler: handler,
logger: logger,
}
}
Expand All @@ -34,24 +36,23 @@ func (p Proxy) accept(l net.Listener) {
}

p.logger.Info("Accepted new client")
go p.handleConnection(conn)
go p.handle(conn)
}
}

func (p Proxy) handleConnection(inbound net.Conn) {
defer inbound.Close()

func (p Proxy) handle(inbound net.Conn) {
defer p.close(inbound)
outbound, err := net.Dial("tcp", p.target)
if err != nil {
p.logger.Error("Cannot connect to remote broker " + p.target)
return
}
defer outbound.Close()
defer p.close(outbound)

c := session.New(inbound, outbound, p.event, p.logger)
s := session.New(inbound, outbound, p.handler, p.logger)

if err := c.Stream(); err != io.EOF {
p.logger.Warn("Broken connection for client: " + c.Client.ID + " with error: " + err.Error())
if err = s.Stream(); err != io.EOF {
p.logger.Warn("Broken connection for client: " + s.Client.ID + " with error: " + err.Error())
}
}

Expand All @@ -69,3 +70,9 @@ func (p Proxy) Proxy() error {
p.logger.Info("Server Exiting...")
return nil
}

func (p Proxy) close(conn net.Conn) {
if err := conn.Close(); err != nil {
p.logger.Warn(fmt.Sprintf("Error closing connection %s", err.Error()))
}
}
4 changes: 2 additions & 2 deletions pkg/session/events.go → pkg/session/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package session

// Event is an interface for mProxy hooks
type Event interface {
// Handler is an interface for mProxy hooks
type Handler interface {
// Authorization on client `CONNECT`
// Each of the params are passed by reference, so that it can be changed
AuthConnect(client *Client) error
Expand Down
28 changes: 16 additions & 12 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,35 @@ type Session struct {
logger logger.Logger
inbound net.Conn
outbound net.Conn
event Event
handler Handler
Client Client
}

// New creates a new Session.
func New(inbound, outbound net.Conn, event Event, logger logger.Logger) *Session {
func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger) *Session {
return &Session{
logger: logger,
inbound: inbound,
outbound: outbound,
event: event,
handler: handler,
}
}

// Stream starts proxying traffic between client and broker.
func (s *Session) Stream() error {
// In parallel read from client, send to broker
// and read from broker, send to client
// and read from broker, send to client.
errs := make(chan error, 2)

go s.stream(up, s.inbound, s.outbound, errs)
go s.stream(down, s.outbound, s.inbound, errs)

// Handle whichever error happens first.
// The other routine won't be blocked when writing
// to the errors channel because it is buffered.
err := <-errs
s.event.Disconnect(&s.Client)

s.handler.Disconnect(&s.Client)
return err
}

Expand Down Expand Up @@ -83,7 +87,7 @@ func (s *Session) authorize(pkt packets.ControlPacket) error {
Username: p.Username,
Password: p.Password,
}
if err := s.event.AuthConnect(&s.Client); err != nil {
if err := s.handler.AuthConnect(&s.Client); err != nil {
return err
}
// Copy back to the packet in case values are changed by Event handler.
Expand All @@ -93,9 +97,9 @@ func (s *Session) authorize(pkt packets.ControlPacket) error {
p.Password = s.Client.Password
return nil
case *packets.PublishPacket:
return s.event.AuthPublish(&s.Client, &p.TopicName, &p.Payload)
return s.handler.AuthPublish(&s.Client, &p.TopicName, &p.Payload)
case *packets.SubscribePacket:
return s.event.AuthSubscribe(&s.Client, &p.Topics)
return s.handler.AuthSubscribe(&s.Client, &p.Topics)
default:
return nil
}
Expand All @@ -104,13 +108,13 @@ func (s *Session) authorize(pkt packets.ControlPacket) error {
func (s Session) notify(pkt packets.ControlPacket) {
switch p := pkt.(type) {
case *packets.ConnectPacket:
s.event.Connect(&s.Client)
s.handler.Connect(&s.Client)
case *packets.PublishPacket:
s.event.Publish(&s.Client, &p.TopicName, &p.Payload)
s.handler.Publish(&s.Client, &p.TopicName, &p.Payload)
case *packets.SubscribePacket:
s.event.Subscribe(&s.Client, &p.Topics)
s.handler.Subscribe(&s.Client, &p.Topics)
case *packets.UnsubscribePacket:
s.event.Unsubscribe(&s.Client, &p.Topics)
s.handler.Unsubscribe(&s.Client, &p.Topics)
default:
return
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"github.com/mainflux/mproxy/pkg/session"
)

// New - creates new HTTP proxy
// Proxy represents WS Proxy.
type Proxy struct {
target string
path string
scheme string
event session.Event
event session.Handler
logger logger.Logger
}

func New(target, path, scheme string, event session.Event, logger logger.Logger) *Proxy {
// New - creates new HTTP proxy
func New(target, path, scheme string, event session.Handler, logger logger.Logger) *Proxy {
return &Proxy{
target: target,
path: path,
Expand All @@ -40,7 +41,7 @@ var upgrader = websocket.Upgrader{
},
}

// Handle - proxies HTTP traffic
// Handler - proxies HTTP traffic
func (p Proxy) Handler() http.Handler {
return p.handle()
}
Expand Down Expand Up @@ -88,5 +89,4 @@ func (p Proxy) pass(in *websocket.Conn) {
err = session.Stream()
errc <- err
p.logger.Warn("Broken connection for client: " + session.Client.ID + " with error: " + err.Error())

}

0 comments on commit f776b15

Please sign in to comment.