Skip to content

Commit

Permalink
MG-2186 - Migrate gocoap library from v2 to v3.3 (absmach#2183)
Browse files Browse the repository at this point in the history
Signed-off-by: 1998-felix <[email protected]>
  • Loading branch information
felixgateru authored May 22, 2024
1 parent e9172f7 commit 42be65a
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 174 deletions.
95 changes: 56 additions & 39 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
svcerr "github.com/absmach/magistrala/pkg/errors/service"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/go-chi/chi/v5"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
"github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
"github.com/plgd-dev/go-coap/v3/message/pool"
"github.com/plgd-dev/go-coap/v3/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

Expand All @@ -42,6 +43,7 @@ const (
var (
errMalformedSubtopic = errors.New("malformed subtopic")
errBadOptions = errors.New("bad options")
errMethodNotAllowed = errors.New("method not allowed")
)

var (
Expand All @@ -66,75 +68,93 @@ func MakeCoAPHandler(svc coap.Service, l *slog.Logger) mux.HandlerFunc {
return handler
}

func sendResp(w mux.ResponseWriter, resp *message.Message) {
if err := w.Client().WriteMessage(resp); err != nil {
func sendResp(w mux.ResponseWriter, resp *pool.Message) {
if err := w.Conn().WriteMessage(resp); err != nil {
logger.Warn(fmt.Sprintf("Can't set response: %s", err))
}
}

func handler(w mux.ResponseWriter, m *mux.Message) {
resp := message.Message{
Code: codes.Content,
Token: m.Token,
Context: m.Context,
Options: make(message.Options, 0, 16),
resp := pool.NewMessage(w.Conn().Context())
resp.SetToken(m.Token())
for _, opt := range m.Options() {
resp.AddOptionBytes(opt.ID, opt.Value)
}
defer sendResp(w, &resp)
defer sendResp(w, resp)

msg, err := decodeMessage(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error decoding message: %s", err))
resp.Code = codes.BadRequest
resp.SetCode(codes.BadRequest)
return
}
key, err := parseKey(m)
if err != nil {
logger.Warn(fmt.Sprintf("Error parsing auth: %s", err))
resp.Code = codes.Unauthorized
resp.SetCode(codes.Unauthorized)
return
}
switch m.Code {

switch m.Code() {
case codes.GET:
err = handleGet(m.Context, m, w.Client(), msg, key)
resp.SetCode(codes.Content)
err = handleGet(m, w, msg, key)
case codes.POST:
resp.Code = codes.Created
err = service.Publish(m.Context, key, msg)
resp.SetCode(codes.Created)
err = service.Publish(m.Context(), key, msg)
default:
err = svcerr.ErrNotFound
err = errMethodNotAllowed
}

if err != nil {
switch {
case err == errBadOptions:
resp.Code = codes.BadOption
case err == svcerr.ErrNotFound:
resp.Code = codes.NotFound
case errors.Contains(err, svcerr.ErrAuthorization),
errors.Contains(err, svcerr.ErrAuthentication):
resp.Code = codes.Unauthorized
resp.SetCode(codes.BadOption)
case err == errMethodNotAllowed:
resp.SetCode(codes.MethodNotAllowed)
case errors.Contains(err, svcerr.ErrAuthorization):
resp.SetCode(codes.Forbidden)
case errors.Contains(err, svcerr.ErrAuthentication):
resp.SetCode(codes.Unauthorized)
default:
resp.Code = codes.InternalServerError
resp.SetCode(codes.InternalServerError)
}
}
}

func handleGet(ctx context.Context, m *mux.Message, c mux.Client, msg *messaging.Message, key string) error {
func handleGet(m *mux.Message, w mux.ResponseWriter, msg *messaging.Message, key string) error {
var obs uint32
obs, err := m.Options.Observe()
obs, err := m.Options().Observe()
if err != nil {
logger.Warn(fmt.Sprintf("Error reading observe option: %s", err))
return errBadOptions
}
if obs == startObserve {
c := coap.NewClient(c, m.Token, logger)
return service.Subscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), c)
}
return service.Unsubscribe(ctx, key, msg.GetChannel(), msg.GetSubtopic(), m.Token.String())
c := coap.NewClient(w.Conn(), m.Token(), logger)
w.Conn().AddOnClose(func() {
err := service.Unsubscribe(context.Background(), key, msg.GetChannel(), msg.GetSubtopic(), c.Token())
args := []any{
slog.String("channel_id", msg.GetChannel()),
slog.String("subtopic", msg.GetSubtopic()),
slog.String("token", c.Token()),
}
if err != nil {
args = append(args, slog.Any("error", err))
logger.Warn("Unsubscribe idle client failed to complete successfully ", args...)
return
}
logger.Warn("Unsubscribe idle client completed successfully", args...)
})
return service.Subscribe(w.Conn().Context(), key, msg.GetChannel(), msg.GetSubtopic(), c)
}
return service.Unsubscribe(w.Conn().Context(), key, msg.GetChannel(), msg.GetSubtopic(), m.Token().String())
}

func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
if msg.Options == nil {
if msg.Options() == nil {
return &messaging.Message{}, errBadOptions
}
path, err := msg.Options.Path()
path, err := msg.Path()
if err != nil {
return &messaging.Message{}, err
}
Expand All @@ -155,8 +175,8 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
Created: time.Now().UnixNano(),
}

if msg.Body != nil {
buff, err := io.ReadAll(msg.Body)
if msg.Body() != nil {
buff, err := io.ReadAll(msg.Body())
if err != nil {
return ret, err
}
Expand All @@ -166,10 +186,7 @@ func decodeMessage(msg *mux.Message) (*messaging.Message, error) {
}

func parseKey(msg *mux.Message) (string, error) {
if obs, _ := msg.Options.Observe(); obs != 0 && msg.Code == codes.GET {
return "", nil
}
authKey, err := msg.Options.GetString(message.URIQuery)
authKey, err := msg.Options().GetString(message.URIQuery)
if err != nil {
return "", err
}
Expand Down
47 changes: 22 additions & 25 deletions coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ package coap

import (
"bytes"
"context"
"fmt"
"log/slog"
"sync/atomic"

"github.com/absmach/magistrala/pkg/errors"
"github.com/absmach/magistrala/pkg/messaging"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
mux "github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/codes"
mux "github.com/plgd-dev/go-coap/v3/mux"
)

// Client wraps CoAP client.
Expand All @@ -36,50 +35,47 @@ type Client interface {
var ErrOption = errors.New("unable to set option")

type client struct {
client mux.Client
conn mux.Conn
token message.Token
observe uint32
logger *slog.Logger
}

// NewClient instantiates a new Observer.
func NewClient(c mux.Client, tkn message.Token, l *slog.Logger) Client {
func NewClient(conn mux.Conn, tkn message.Token, l *slog.Logger) Client {
return &client{
client: c,
conn: conn,
token: tkn,
logger: l,
observe: 0,
}
}

func (c *client) Done() <-chan struct{} {
return c.client.Done()
return c.conn.Done()
}

func (c *client) Cancel() error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: context.Background(),
Options: make(message.Options, 0, 16),
}
if err := c.client.WriteMessage(&m); err != nil {
pm := c.conn.AcquireMessage(c.conn.Context())
pm.SetCode(codes.Content)
pm.SetToken(c.token)
if err := c.conn.WriteMessage(pm); err != nil {
c.logger.Error(fmt.Sprintf("Error sending message: %s.", err))
}
return c.client.Close()
c.conn.ReleaseMessage(pm)
return c.conn.Close()
}

func (c *client) Token() string {
return c.token.String()
}

func (c *client) Handle(msg *messaging.Message) error {
m := message.Message{
Code: codes.Content,
Token: c.token,
Context: c.client.Context(),
Body: bytes.NewReader(msg.GetPayload()),
}
pm := c.conn.AcquireMessage(c.conn.Context())
defer c.conn.ReleaseMessage(pm)
pm.SetCode(codes.Content)
pm.SetToken(c.token)
pm.SetBody(bytes.NewReader(msg.GetPayload()))

atomic.AddUint32(&c.observe, 1)
var opts message.Options
Expand All @@ -93,7 +89,6 @@ func (c *client) Handle(msg *messaging.Message) error {
c.logger.Error(fmt.Sprintf("Can't set content format: %s.", err))
return errors.Wrap(ErrOption, err)
}
opts = append(opts, message.Option{ID: message.Observe, Value: []byte{byte(c.observe)}})
opts, n, err = opts.SetObserve(buff, c.observe)
if err == message.ErrTooSmall {
buff = append(buff, make([]byte, n)...)
Expand All @@ -103,6 +98,8 @@ func (c *client) Handle(msg *messaging.Message) error {
return fmt.Errorf("cannot set options to response: %w", err)
}

m.Options = opts
return c.client.WriteMessage(&m)
for _, option := range opts {
pm.SetOptionBytes(option.ID, option.Value)
}
return c.conn.WriteMessage(pm)
}
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/oklog/ulid/v2 v2.1.0
github.com/ory/dockertest/v3 v3.10.0
github.com/pelletier/go-toml v1.9.5
github.com/plgd-dev/go-coap/v2 v2.6.0
github.com/plgd-dev/go-coap/v3 v3.3.4
github.com/prometheus/client_golang v1.19.1
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rubenv/sql-migrate v1.6.1
Expand Down Expand Up @@ -141,11 +141,10 @@ require (
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
github.com/pelletier/go-toml/v2 v2.2.0 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/dtls/v2 v2.2.8-0.20240501061905-2c36d63320a0 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/transport/v3 v3.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/plgd-dev/kit/v2 v2.0.0-20211006190727-057b33161b90 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.2 // indirect
Expand Down Expand Up @@ -176,7 +175,7 @@ require (
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
Expand Down
Loading

0 comments on commit 42be65a

Please sign in to comment.