From f776b152c380640dbd3e87f69e66e3a3d6f7ea12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Borov=C4=8Danin?= Date: Wed, 22 Apr 2020 20:09:28 +0200 Subject: [PATCH] NOISSUE - Refactor mProxy (#13) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Enrich logging for connection close Signed-off-by: Dušan Borovčanin * Handle connection close explicitly Refactor code. Signed-off-by: Dušan Borovčanin * Update examples Signed-off-by: Dušan Borovčanin * Rename handler field in Proxy and Session Signed-off-by: Dušan Borovčanin * Fix logging Signed-off-by: Dušan Borovčanin * Replace event with handler Signed-off-by: Dušan Borovčanin * Rename EventHandler to Handler Signed-off-by: Dušan Borovčanin * Handle only the first error Signed-off-by: Dušan Borovčanin --- cmd/main.go | 10 +++---- examples/simple/simple.go | 42 +++++++++++++-------------- pkg/mqtt/mqtt.go | 29 +++++++++++------- pkg/session/{events.go => handler.go} | 4 +-- pkg/session/session.go | 28 ++++++++++-------- pkg/websocket/websocket.go | 10 +++---- 6 files changed, 67 insertions(+), 56 deletions(-) rename pkg/session/{events.go => handler.go} (92%) diff --git a/cmd/main.go b/cmd/main.go index 738d854a..2ee7f222 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -70,17 +70,17 @@ func main() { log.Fatalf(err.Error()) } - ev := simple.New(logger) + h := simple.New(logger) errs := make(chan error, 3) // HTTP logger.Info(fmt.Sprintf("Starting HTTP proxy on port %s ", cfg.httpPort)) - go proxyHTTP(cfg, logger, ev, errs) + go proxyHTTP(cfg, logger, h, errs) // MQTT logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s ", cfg.mqttPort)) - go proxyMQTT(cfg, logger, ev, errs) + go proxyMQTT(cfg, logger, h, errs) go func() { c := make(chan os.Signal, 2) @@ -121,7 +121,7 @@ func loadConfig() config { } } -func proxyHTTP(cfg config, logger logger.Logger, evt session.Event, errs chan error) { +func proxyHTTP(cfg config, logger logger.Logger, evt session.Handler, errs chan error) { target := fmt.Sprintf("%s:%s", cfg.httpTargetHost, cfg.httpTargetPort) wp := websocket.New(target, cfg.httpTargetPath, cfg.httpScheme, evt, logger) http.Handle("/mqtt", wp.Handler()) @@ -130,7 +130,7 @@ func proxyHTTP(cfg config, logger logger.Logger, evt session.Event, errs chan er errs <- http.ListenAndServe(p, nil) } -func proxyMQTT(cfg config, logger logger.Logger, evt session.Event, errs chan error) { +func proxyMQTT(cfg config, logger logger.Logger, evt session.Handler, errs chan error) { address := fmt.Sprintf("%s:%s", cfg.mqttHost, cfg.mqttPort) target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort) mp := mqtt.New(address, target, evt, logger) diff --git a/examples/simple/simple.go b/examples/simple/simple.go index 6f7ead47..7f2d1fb4 100644 --- a/examples/simple/simple.go +++ b/examples/simple/simple.go @@ -8,62 +8,62 @@ import ( "github.com/mainflux/mproxy/pkg/session" ) -var _ session.Event = (*Event)(nil) +var _ session.Handler = (*Handler)(nil) -// Event implements mqtt.Event interface -type Event struct { +// Handler implements mqtt.Handler interface +type Handler struct { logger logger.Logger } // New creates new Event entity -func New(logger logger.Logger) *Event { - return &Event{ +func New(logger logger.Logger) *Handler { + return &Handler{ logger: logger, } } // AuthConnect is called on device connection, // prior forwarding to the MQTT broker -func (e *Event) AuthConnect(c *session.Client) error { - e.logger.Info(fmt.Sprintf("AuthRegister() - clientID: %s, username: %s, password: %s", c.ID, c.Username, string(c.Password))) +func (h *Handler) AuthConnect(c *session.Client) error { + h.logger.Info(fmt.Sprintf("AuthRegister() - clientID: %s, username: %s, password: %s", c.ID, c.Username, string(c.Password))) return nil } // AuthPublish is called on device publish, // prior forwarding to the MQTT broker -func (e *Event) AuthPublish(c *session.Client, topic *string, payload *[]byte) error { - e.logger.Info(fmt.Sprintf("AuthPublish() - clientID: %s, topic: %s, payload: %s", c.ID, *topic, string(*payload))) +func (h *Handler) AuthPublish(c *session.Client, topic *string, payload *[]byte) error { + h.logger.Info(fmt.Sprintf("AuthPublish() - clientID: %s, topic: %s, payload: %s", c.ID, *topic, string(*payload))) return nil } // AuthSubscribe is called on device publish, // prior forwarding to the MQTT broker -func (e *Event) AuthSubscribe(c *session.Client, topics *[]string) error { - e.logger.Info(fmt.Sprintf("AuthSubscribe() - clientID: %s, topics: %s", c.ID, strings.Join(*topics, ","))) +func (h *Handler) AuthSubscribe(c *session.Client, topics *[]string) error { + h.logger.Info(fmt.Sprintf("AuthSubscribe() - clientID: %s, topics: %s", c.ID, strings.Join(*topics, ","))) return nil } // Connect - after client successfully connected -func (e *Event) Connect(c *session.Client) { - e.logger.Info(fmt.Sprintf("Register() - username: %s, clientID: %s", c.Username, c.ID)) +func (h *Handler) Connect(c *session.Client) { + h.logger.Info(fmt.Sprintf("Register() - username: %s, clientID: %s", c.Username, c.ID)) } // Publish - after client successfully published -func (e *Event) Publish(c *session.Client, topic *string, payload *[]byte) { - e.logger.Info(fmt.Sprintf("Publish() - username: %s, clientID: %s, topic: %s, payload: %s", c.Username, c.ID, *topic, string(*payload))) +func (h *Handler) Publish(c *session.Client, topic *string, payload *[]byte) { + h.logger.Info(fmt.Sprintf("Publish() - username: %s, clientID: %s, topic: %s, payload: %s", c.Username, c.ID, *topic, string(*payload))) } // Subscribe - after client successfully subscribed -func (e *Event) Subscribe(c *session.Client, topics *[]string) { - e.logger.Info(fmt.Sprintf("Subscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ","))) +func (h *Handler) Subscribe(c *session.Client, topics *[]string) { + h.logger.Info(fmt.Sprintf("Subscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ","))) } // Unsubscribe - after client unsubscribed -func (e *Event) Unsubscribe(c *session.Client, topics *[]string) { - e.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ","))) +func (h *Handler) Unsubscribe(c *session.Client, topics *[]string) { + h.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, clientID: %s, topics: %s", c.Username, c.ID, strings.Join(*topics, ","))) } // Disconnect on conection lost -func (e *Event) Disconnect(c *session.Client) { - e.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconenectd", c.Username, c.ID)) +func (h *Handler) Disconnect(c *session.Client) { + h.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconenected", c.Username, c.ID)) } diff --git a/pkg/mqtt/mqtt.go b/pkg/mqtt/mqtt.go index 16fdedf0..d377544c 100644 --- a/pkg/mqtt/mqtt.go +++ b/pkg/mqtt/mqtt.go @@ -1,6 +1,7 @@ package mqtt import ( + "fmt" "io" "net" @@ -12,15 +13,16 @@ import ( type Proxy struct { address string target string - event session.Event + handler session.Handler logger logger.Logger } -func New(address, target string, event session.Event, logger logger.Logger) *Proxy { +// New returns a new mqtt Proxy instance. +func New(address, target string, handler session.Handler, logger logger.Logger) *Proxy { return &Proxy{ address: address, target: target, - event: event, + handler: handler, logger: logger, } } @@ -34,24 +36,23 @@ func (p Proxy) accept(l net.Listener) { } p.logger.Info("Accepted new client") - go p.handleConnection(conn) + go p.handle(conn) } } -func (p Proxy) handleConnection(inbound net.Conn) { - defer inbound.Close() - +func (p Proxy) handle(inbound net.Conn) { + defer p.close(inbound) outbound, err := net.Dial("tcp", p.target) if err != nil { p.logger.Error("Cannot connect to remote broker " + p.target) return } - defer outbound.Close() + defer p.close(outbound) - c := session.New(inbound, outbound, p.event, p.logger) + s := session.New(inbound, outbound, p.handler, p.logger) - if err := c.Stream(); err != io.EOF { - p.logger.Warn("Broken connection for client: " + c.Client.ID + " with error: " + err.Error()) + if err = s.Stream(); err != io.EOF { + p.logger.Warn("Broken connection for client: " + s.Client.ID + " with error: " + err.Error()) } } @@ -69,3 +70,9 @@ func (p Proxy) Proxy() error { p.logger.Info("Server Exiting...") return nil } + +func (p Proxy) close(conn net.Conn) { + if err := conn.Close(); err != nil { + p.logger.Warn(fmt.Sprintf("Error closing connection %s", err.Error())) + } +} diff --git a/pkg/session/events.go b/pkg/session/handler.go similarity index 92% rename from pkg/session/events.go rename to pkg/session/handler.go index 09d44b69..a2386f69 100644 --- a/pkg/session/events.go +++ b/pkg/session/handler.go @@ -1,7 +1,7 @@ package session -// Event is an interface for mProxy hooks -type Event interface { +// Handler is an interface for mProxy hooks +type Handler interface { // Authorization on client `CONNECT` // Each of the params are passed by reference, so that it can be changed AuthConnect(client *Client) error diff --git a/pkg/session/session.go b/pkg/session/session.go index 6313ffbc..9f664fe0 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -19,31 +19,35 @@ type Session struct { logger logger.Logger inbound net.Conn outbound net.Conn - event Event + handler Handler Client Client } // New creates a new Session. -func New(inbound, outbound net.Conn, event Event, logger logger.Logger) *Session { +func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger) *Session { return &Session{ logger: logger, inbound: inbound, outbound: outbound, - event: event, + handler: handler, } } // Stream starts proxying traffic between client and broker. func (s *Session) Stream() error { // In parallel read from client, send to broker - // and read from broker, send to client + // and read from broker, send to client. errs := make(chan error, 2) go s.stream(up, s.inbound, s.outbound, errs) go s.stream(down, s.outbound, s.inbound, errs) + // Handle whichever error happens first. + // The other routine won't be blocked when writing + // to the errors channel because it is buffered. err := <-errs - s.event.Disconnect(&s.Client) + + s.handler.Disconnect(&s.Client) return err } @@ -83,7 +87,7 @@ func (s *Session) authorize(pkt packets.ControlPacket) error { Username: p.Username, Password: p.Password, } - if err := s.event.AuthConnect(&s.Client); err != nil { + if err := s.handler.AuthConnect(&s.Client); err != nil { return err } // Copy back to the packet in case values are changed by Event handler. @@ -93,9 +97,9 @@ func (s *Session) authorize(pkt packets.ControlPacket) error { p.Password = s.Client.Password return nil case *packets.PublishPacket: - return s.event.AuthPublish(&s.Client, &p.TopicName, &p.Payload) + return s.handler.AuthPublish(&s.Client, &p.TopicName, &p.Payload) case *packets.SubscribePacket: - return s.event.AuthSubscribe(&s.Client, &p.Topics) + return s.handler.AuthSubscribe(&s.Client, &p.Topics) default: return nil } @@ -104,13 +108,13 @@ func (s *Session) authorize(pkt packets.ControlPacket) error { func (s Session) notify(pkt packets.ControlPacket) { switch p := pkt.(type) { case *packets.ConnectPacket: - s.event.Connect(&s.Client) + s.handler.Connect(&s.Client) case *packets.PublishPacket: - s.event.Publish(&s.Client, &p.TopicName, &p.Payload) + s.handler.Publish(&s.Client, &p.TopicName, &p.Payload) case *packets.SubscribePacket: - s.event.Subscribe(&s.Client, &p.Topics) + s.handler.Subscribe(&s.Client, &p.Topics) case *packets.UnsubscribePacket: - s.event.Unsubscribe(&s.Client, &p.Topics) + s.handler.Unsubscribe(&s.Client, &p.Topics) default: return } diff --git a/pkg/websocket/websocket.go b/pkg/websocket/websocket.go index d1aa8cb4..30850c06 100644 --- a/pkg/websocket/websocket.go +++ b/pkg/websocket/websocket.go @@ -10,16 +10,17 @@ import ( "github.com/mainflux/mproxy/pkg/session" ) -// New - creates new HTTP proxy +// Proxy represents WS Proxy. type Proxy struct { target string path string scheme string - event session.Event + event session.Handler logger logger.Logger } -func New(target, path, scheme string, event session.Event, logger logger.Logger) *Proxy { +// New - creates new HTTP proxy +func New(target, path, scheme string, event session.Handler, logger logger.Logger) *Proxy { return &Proxy{ target: target, path: path, @@ -40,7 +41,7 @@ var upgrader = websocket.Upgrader{ }, } -// Handle - proxies HTTP traffic +// Handler - proxies HTTP traffic func (p Proxy) Handler() http.Handler { return p.handle() } @@ -88,5 +89,4 @@ func (p Proxy) pass(in *websocket.Conn) { err = session.Stream() errc <- err p.logger.Warn("Broken connection for client: " + session.Client.ID + " with error: " + err.Error()) - }