Skip to content

Commit

Permalink
NOISSUE - Use slog for Logging (#52)
Browse files Browse the repository at this point in the history
* feat(logging): Use slog for logging

This commit adds logging functionality to the proxy functions in order to track and log connections. The changes include:

- Adding level to be type slog.Leveler
- Changing function signatures to accept a slog logger parameter

These changes improve the observability and debugging capabilities of the codebase by providing detailed logs for tracking connections handled by the proxy functions.

No breaking changes or other significant modifications were made in this commit.

Signed-off-by: Rodney Osodo <[email protected]>

* Add logging statements for starting proxy servers on different ports

Signed-off-by: Rodney Osodo <[email protected]>

---------

Signed-off-by: Rodney Osodo <[email protected]>
  • Loading branch information
rodneyosodo authored Jan 18, 2024
1 parent 0b102d0 commit 695f58d
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 142 deletions.
64 changes: 40 additions & 24 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"crypto/tls"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

mglog "github.com/absmach/magistrala/logger"
"github.com/absmach/mproxy/examples/simple"
hproxy "github.com/absmach/mproxy/pkg/http"
"github.com/absmach/mproxy/pkg/mqtt"
Expand Down Expand Up @@ -104,7 +104,7 @@ type config struct {
wsMQTTConfig WSMQTTConfig
wsConfig WSConfig

logLevel string
logLevel slog.Level
}

type WSConfig struct {
Expand Down Expand Up @@ -146,10 +146,10 @@ type HTTPConfig struct {
func main() {
cfg := loadConfig()

logger, err := mglog.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: cfg.logLevel,
})
logger := slog.New(logHandler)

h := simple.New(logger)

Expand All @@ -164,30 +164,30 @@ func main() {
}

// WSS - MQTT
logger.Info(fmt.Sprintf("Starting encrypted WebSocket proxy on port %s ", cfg.wsMQTTConfig.wssPort))
logServerStart(logger, "encrypted WebSocket", cfg.wsMQTTConfig.wssPort, cfg.wsMQTTConfig.targetPort)
go proxyMQTTWSS(cfg, tlsCfg, logger, h, errs)
// MQTTS
logger.Info(fmt.Sprintf("Starting MQTTS proxy on port %s ", cfg.mqttConfig.mqttsPort))
logServerStart(logger, "MQTTS", cfg.mqttConfig.mqttsPort, cfg.mqttConfig.targetPort)
go proxyMQTTS(ctx, cfg.mqttConfig, tlsCfg, logger, h, errs)
// WSS
logger.Info(fmt.Sprintf("Starting WSS proxy on port %s ", cfg.wsConfig.port))
logServerStart(logger, "WSS", cfg.wsConfig.port, cfg.wsConfig.targetPort)
go proxyWSS(ctx, cfg, logger, h, errs)
// HTTPS
logger.Info(fmt.Sprintf("Starting HTTPS proxy on port %s ", cfg.httpConfig.port))
logServerStart(logger, "HTTPS", cfg.httpConfig.port, cfg.httpConfig.targetPort)
go proxyHTTPS(ctx, cfg.httpConfig, logger, h, errs)
} else {
// WS - MQTT
logger.Info(fmt.Sprintf("Starting WebSocket proxy on port %s ", cfg.wsMQTTConfig.port))
logServerStart(logger, "WebSocket", cfg.wsMQTTConfig.port, cfg.wsMQTTConfig.targetPort)
go proxyMQTTWS(cfg.wsMQTTConfig, logger, h, errs)

// MQTT
logger.Info(fmt.Sprintf("Starting MQTT proxy on port %s ", cfg.mqttConfig.port))
logServerStart(logger, "MQTT", cfg.mqttConfig.port, cfg.mqttConfig.targetPort)
go proxyMQTT(ctx, cfg.mqttConfig, logger, h, errs)
// WS
logger.Info(fmt.Sprintf("Starting WS proxy on port %s ", cfg.wsConfig.port))
logServerStart(logger, "WS", cfg.wsConfig.port, cfg.wsConfig.targetPort)
go proxyWS(ctx, cfg.wsConfig, logger, h, errs)
// HTTP
logger.Info(fmt.Sprintf("Starting HTTP proxy on port %s ", cfg.httpConfig.port))
logServerStart(logger, "HTTP", cfg.httpConfig.port, cfg.httpConfig.targetPort)
go proxyHTTP(ctx, cfg.httpConfig, logger, h, errs)
}

Expand All @@ -197,7 +197,7 @@ func main() {
errs <- fmt.Errorf("%s", <-c)
}()

err = <-errs
err := <-errs
logger.Error(fmt.Sprintf("mProxy terminated: %s", err))
}

Expand All @@ -215,6 +215,11 @@ func loadConfig() config {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}

var level slog.Level
if err := level.UnmarshalText([]byte(env(envLogLevel, defLogLevel))); err != nil {
log.Fatalf("Invalid value passed for %s with error: %s\n", envLogLevel, err)
}

return config{
// WS
wsMQTTConfig: WSMQTTConfig{
Expand Down Expand Up @@ -261,42 +266,42 @@ func loadConfig() config {
},

// Log
logLevel: env(envLogLevel, defLogLevel),
logLevel: level,
}
}

func proxyMQTTWS(cfg WSMQTTConfig, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyMQTTWS(cfg WSMQTTConfig, logger *slog.Logger, handler session.Handler, errs chan error) {
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
wp := websocket.New(target, cfg.targetPath, cfg.targetScheme, handler, nil, logger)
http.Handle(cfg.path, wp.Handler())

errs <- wp.Listen(cfg.port)
}

func proxyMQTTWSS(cfg config, tlsCfg *tls.Config, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyMQTTWSS(cfg config, tlsCfg *tls.Config, logger *slog.Logger, handler session.Handler, errs chan error) {
target := fmt.Sprintf("%s:%s", cfg.wsMQTTConfig.targetHost, cfg.wsMQTTConfig.targetPort)
wp := websocket.New(target, cfg.wsMQTTConfig.targetPath, cfg.wsMQTTConfig.targetScheme, handler, nil, logger)
http.Handle(cfg.wsMQTTConfig.wssPath, wp.Handler())
errs <- wp.ListenTLS(tlsCfg, cfg.serverCert, cfg.serverKey, cfg.wsMQTTConfig.wssPort)
}

func proxyMQTT(ctx context.Context, cfg MQTTConfig, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyMQTT(ctx context.Context, cfg MQTTConfig, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.host, cfg.port)
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
mp := mqtt.New(address, target, handler, nil, logger)

errs <- mp.Listen(ctx)
}

func proxyMQTTS(ctx context.Context, cfg MQTTConfig, tlsCfg *tls.Config, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyMQTTS(ctx context.Context, cfg MQTTConfig, tlsCfg *tls.Config, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.host, cfg.mqttsPort)
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
mp := mqtt.New(address, target, handler, nil, logger)

errs <- mp.ListenTLS(ctx, tlsCfg)
}

func proxyHTTP(ctx context.Context, cfg HTTPConfig, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyHTTP(_ context.Context, cfg HTTPConfig, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.host, cfg.port)
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
hp, err := hproxy.NewProxy(address, target, handler, logger)
Expand All @@ -308,7 +313,7 @@ func proxyHTTP(ctx context.Context, cfg HTTPConfig, logger mglog.Logger, handler
errs <- hp.Listen()
}

func proxyHTTPS(ctx context.Context, cfg HTTPConfig, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyHTTPS(_ context.Context, cfg HTTPConfig, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.host, cfg.port)
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
hp, err := hproxy.NewProxy(address, target, handler, logger)
Expand All @@ -320,7 +325,7 @@ func proxyHTTPS(ctx context.Context, cfg HTTPConfig, logger mglog.Logger, handle
errs <- hp.ListenTLS(cfg.serverCert, cfg.serverKey)
}

func proxyWS(ctx context.Context, cfg WSConfig, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyWS(_ context.Context, cfg WSConfig, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.host, cfg.port)
target := fmt.Sprintf("%s:%s", cfg.targetHost, cfg.targetPort)
wp, err := websockets.NewProxy(address, target, logger, handler)
Expand All @@ -330,7 +335,7 @@ func proxyWS(ctx context.Context, cfg WSConfig, logger mglog.Logger, handler ses
errs <- wp.Listen()
}

func proxyWSS(ctx context.Context, cfg config, logger mglog.Logger, handler session.Handler, errs chan error) {
func proxyWSS(_ context.Context, cfg config, logger *slog.Logger, handler session.Handler, errs chan error) {
address := fmt.Sprintf("%s:%s", cfg.wsConfig.host, cfg.wsConfig.port)
target := fmt.Sprintf("%s:%s", cfg.wsConfig.targetHost, cfg.wsConfig.targetPort)
wp, err := websockets.NewProxy(address, target, logger, handler)
Expand All @@ -339,3 +344,14 @@ func proxyWSS(ctx context.Context, cfg config, logger mglog.Logger, handler sess
}
errs <- wp.ListenTLS(cfg.serverCert, cfg.serverKey)
}

func logServerStart(logger *slog.Logger, name, port, targetPort string) {
logger.Info("Starting "+name+" proxy",
slog.Group("server",
slog.String("port", port),
),
slog.Group("target",
slog.String("port", targetPort),
),
)
}
87 changes: 30 additions & 57 deletions examples/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package simple
import (
"context"
"errors"
"fmt"
"strings"
"log/slog"

"github.com/absmach/magistrala/logger"
"github.com/absmach/mproxy/pkg/session"
)

Expand All @@ -16,11 +14,11 @@ var _ session.Handler = (*Handler)(nil)

// Handler implements mqtt.Handler interface
type Handler struct {
logger logger.Logger
logger *slog.Logger
}

// New creates new Event entity
func New(logger logger.Logger) *Handler {
func New(logger *slog.Logger) *Handler {
return &Handler{
logger: logger,
}
Expand All @@ -29,91 +27,66 @@ func New(logger logger.Logger) *Handler {
// AuthConnect is called on device connection,
// prior forwarding to the MQTT broker
func (h *Handler) AuthConnect(ctx context.Context) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthConnect() - sessionID: %s, username: %s, password: %s, client_CN: %s", s.ID, s.Username, string(s.Password), s.Cert.Subject.CommonName))
return nil
return h.logAction(ctx, "AuthConnect", nil, nil)
}

// AuthPublish is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthPublish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthPublish() - sessionID: %s, topic: %s, payload: %s", s.ID, *topic, string(*payload)))

return nil
return h.logAction(ctx, "AuthPublish", &[]string{*topic}, payload)
}

// AuthSubscribe is called on device publish,
// prior forwarding to the MQTT broker
func (h *Handler) AuthSubscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("AuthSubscribe() - sessionID: %s, topics: %s", s.ID, strings.Join(*topics, ",")))
return nil
return h.logAction(ctx, "AuthSubscribe", topics, nil)
}

// Connect - after client successfully connected
func (h *Handler) Connect(ctx context.Context) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Connect() - username: %s, sessionID: %s", s.Username, s.ID))
return nil
return h.logAction(ctx, "Connect", nil, nil)
}

// Publish - after client successfully published
func (h *Handler) Publish(ctx context.Context, topic *string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Publish() - username: %s, sessionID: %s, topic: %s, payload: %s", s.Username, s.ID, *topic, string(*payload)))
return nil
return h.logAction(ctx, "Publish", &[]string{*topic}, payload)
}

// Subscribe - after client successfully subscribed
func (h *Handler) Subscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Subscribe() - username: %s, sessionID: %s, topics: %s", s.Username, s.ID, strings.Join(*topics, ",")))
return nil
return h.logAction(ctx, "Subscribe", topics, nil)
}

// Unsubscribe - after client unsubscribed
func (h *Handler) Unsubscribe(ctx context.Context, topics *[]string) error {
s, ok := session.FromContext(ctx)
if !ok {
h.logger.Error(errSessionMissing.Error())
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Unsubscribe() - username: %s, sessionID: %s, topics: %s", s.Username, s.ID, strings.Join(*topics, ",")))
return nil
return h.logAction(ctx, "Unsubscribe", topics, nil)
}

// Disconnect on connection lost
func (h *Handler) Disconnect(ctx context.Context) error {
return h.logAction(ctx, "Disconnect", nil, nil)
}

func (h *Handler) logAction(ctx context.Context, action string, topics *[]string, payload *[]byte) error {
s, ok := session.FromContext(ctx)
args := []interface{}{
slog.Group("session", slog.String("id", s.ID), slog.String("username", s.Username)),
}
if s.Cert.Subject.CommonName != "" {
args = append(args, slog.Group("cert", slog.String("cn", s.Cert.Subject.CommonName)))
}
if topics != nil {
args = append(args, slog.Any("topics", *topics))
}
if payload != nil {
args = append(args, slog.Any("payload", *payload))
}
if !ok {
h.logger.Error(errSessionMissing.Error())
args = append(args, slog.Any("error", errSessionMissing))
h.logger.Error(action+"() failed to complete", args...)
return errSessionMissing
}
h.logger.Info(fmt.Sprintf("Disconnect() - client with username: %s and ID: %s disconnected", s.Username, s.ID))
h.logger.Info(action+"() completed successfully", args...)

return nil
}
7 changes: 1 addition & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ go 1.21
toolchain go1.21.4

require (
github.com/absmach/magistrala v0.11.1-0.20231220185538-1fe2e74a741f
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/google/uuid v1.5.0
github.com/gorilla/websocket v1.5.1
golang.org/x/sync v0.6.0
)

require (
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
golang.org/x/net v0.20.0 // indirect
)
require golang.org/x/net v0.20.0 // indirect
14 changes: 0 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
github.com/absmach/magistrala v0.11.1-0.20231220185538-1fe2e74a741f h1:QIfX7wem1z6mj3GpkVu7WX3dFXzO8O+Sr+Bffw9YqPw=
github.com/absmach/magistrala v0.11.1-0.20231220185538-1fe2e74a741f/go.mod h1:vnzZ/Y6v0L/14BQTUGR9Na+qRXZ3o4l3yyMD1b5nfjk=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 695f58d

Please sign in to comment.