Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Drop handler events after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser committed May 13, 2020
1 parent 0ac8b88 commit 7180744
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 13 deletions.
20 changes: 17 additions & 3 deletions core/handler/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv
h.RegisterReceived(activation)
defer func() {
if err != nil {
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.ActivationErrorEvent,
Expand All @@ -88,6 +89,9 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv
DevEUI: activation.DevEUI,
ErrorEventData: types.ErrorEventData{Error: err.Error()},
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.ActivationErrorEvent)
}
activation.Trace = activation.Trace.WithEvent(trace.DropEvent, "reason", err)
ctx.WithError(err).Warn("Could not handle activation")
Expand Down Expand Up @@ -190,7 +194,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv

// Publish Activation
mqttMetadata, _ := h.getActivationMetadata(ctx, activation, dev)
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.ActivationEvent,
Expand All @@ -200,6 +205,9 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv
DevAddr: types.DevAddr(joinAccept.DevAddr),
Metadata: mqttMetadata,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.ActivationEvent)
}

// Generate random AppNonce
Expand Down Expand Up @@ -265,6 +273,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv
}

func (h *handler) registerDeviceOnJoin(base *device.Device, activation *pb_broker.DeduplicatedDeviceActivationRequest) (*device.Device, error) {
ctx := h.Ctx.WithFields(logfields.ForMessage(activation))

clone := base.Clone()
clone.DevID = strings.ToLower(fmt.Sprintf("%s-%s", base.DevID, activation.DevEUI.String()))
clone.DevEUI = activation.DevEUI
Expand Down Expand Up @@ -297,11 +307,15 @@ func (h *handler) registerDeviceOnJoin(base *device.Device, activation *pb_broke
return nil, err
}

h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: clone.AppID,
DevID: clone.DevID,
Event: types.CreateEvent,
Data: nil, // Don't send potentially sensitive details over MQTT
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.CreateEvent)
}

return clone, nil
Expand Down
13 changes: 11 additions & 2 deletions core/handler/convert_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package handler
import (
"encoding/json"
"fmt"
"time"

pb_broker "github.com/TheThingsNetwork/api/broker"
pb_handler "github.com/TheThingsNetwork/api/handler"
Expand Down Expand Up @@ -56,11 +57,15 @@ func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.Deduplicate
fields, valid, err := decoder.Decode(appUp.PayloadRaw, appUp.FPort)
if err != nil {
// Emit the error
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appUp.AppID,
DevID: appUp.DevID,
Event: types.UplinkErrorEvent,
Data: types.ErrorEventData{Error: fmt.Sprintf("Unable to decode payload fields: %s", err)},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent)
}

// Do not set fields if processing failed, but allow the handler to continue processing
Expand All @@ -76,11 +81,15 @@ func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.Deduplicate
_, err = json.Marshal(fields)
if err != nil {
// Emit the error
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appUp.AppID,
DevID: appUp.DevID,
Event: types.UplinkErrorEvent,
Data: types.ErrorEventData{Error: fmt.Sprintf("Payload Function output cannot be marshaled to JSON: %s", err.Error())},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent)
}

// Do not set fields if processing failed, but allow the handler to continue processing
Expand Down
8 changes: 7 additions & 1 deletion core/handler/convert_lorawan.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package handler

import (
"time"

pb_broker "github.com/TheThingsNetwork/api/broker"
pb_lorawan "github.com/TheThingsNetwork/api/protocol/lorawan"
"github.com/TheThingsNetwork/api/trace"
Expand Down Expand Up @@ -59,13 +61,17 @@ func (h *handler) ConvertFromLoRaWAN(ctx ttnlog.Interface, ttnUp *pb_broker.Dedu
// If it's confirmed, we can only unset it if we receive an ack.
if macPayload.Ack {
// Send event over MQTT
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appUp.AppID,
DevID: appUp.DevID,
Event: types.DownlinkAckEvent,
Data: types.DownlinkEventData{
Message: dev.CurrentDownlink,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.DownlinkAckEvent)
}
dev.CurrentDownlink = nil
}
Expand Down
24 changes: 20 additions & 4 deletions core/handler/downlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ func (h *handler) EnqueueDownlink(appDownlink *types.DownlinkMessage) (err error

defer func() {
if err != nil {
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.DownlinkErrorEvent,
Data: types.DownlinkEventData{
ErrorEventData: types.ErrorEventData{Error: err.Error()},
Message: appDownlink,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.DownlinkErrorEvent)
}
}
}()
Expand Down Expand Up @@ -88,13 +92,17 @@ func (h *handler) EnqueueDownlink(appDownlink *types.DownlinkMessage) (err error
return err
}

h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.DownlinkScheduledEvent,
Data: types.DownlinkEventData{
Message: appDownlink,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.DownlinkScheduledEvent)
}
return nil
}
Expand All @@ -111,14 +119,18 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p

defer func() {
if err != nil {
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.DownlinkErrorEvent,
Data: types.DownlinkEventData{
ErrorEventData: types.ErrorEventData{Error: err.Error()},
Message: appDownlink,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.DownlinkErrorEvent)
}
ctx.WithError(err).Warn("Could not handle downlink")
downlink.Trace = downlink.Trace.WithEvent(trace.DropEvent, "reason", err)
Expand Down Expand Up @@ -189,7 +201,8 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p
downlinkConfig.Frequency = uint(downlink.DownlinkOption.GatewayConfiguration.Frequency)
downlinkConfig.Power = int(downlink.DownlinkOption.GatewayConfiguration.Power)

h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appDownlink.AppID,
DevID: appDownlink.DevID,
Event: types.DownlinkSentEvent,
Expand All @@ -199,6 +212,9 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p
GatewayID: downlink.DownlinkOption.GatewayID,
Config: &downlinkConfig,
},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.DownlinkSentEvent)
}
return nil
}
4 changes: 4 additions & 0 deletions core/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package handler

import (
"fmt"
"time"

pb_broker "github.com/TheThingsNetwork/api/broker"
"github.com/TheThingsNetwork/api/broker/brokerclient"
Expand Down Expand Up @@ -36,6 +37,9 @@ type Handler interface {
EnqueueDownlink(appDownlink *types.DownlinkMessage) error
}

// Timeout for publishing events to prevent blocking critical path.
var eventPublishTimeout = 10 * time.Millisecond

// NewRedisHandler creates a new Redis-backed Handler
func NewRedisHandler(client *redis.Client, ttnBrokerID string) Handler {
return &handler{
Expand Down
18 changes: 15 additions & 3 deletions core/handler/uplink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ func (h *handler) HandleUplink(uplink *pb_broker.DeduplicatedUplinkMessage) (err
h.RegisterReceived(uplink)
defer func() {
if err != nil {
h.qEvent <- &types.DeviceEvent{
select {
case h.qEvent <- &types.DeviceEvent{
AppID: appID,
DevID: devID,
Event: types.UplinkErrorEvent,
Data: types.ErrorEventData{Error: err.Error()},
}:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent)
}
ctx.WithError(err).Warn("Could not handle uplink")
uplink.Trace = uplink.Trace.WithEvent(trace.DropEvent, "reason", err)
Expand Down Expand Up @@ -108,15 +112,23 @@ func (h *handler) HandleUplink(uplink *pb_broker.DeduplicatedUplinkMessage) (err
}
dev.CurrentDownlink = next
} else {
h.qEvent <- noDownlinkErrEvent
select {
case h.qEvent <- noDownlinkErrEvent:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", noDownlinkErrEvent.Event)
}
return nil
}
}
}

if uplink.ResponseTemplate == nil {
if dev.CurrentDownlink != nil {
h.qEvent <- noDownlinkErrEvent
select {
case h.qEvent <- noDownlinkErrEvent:
case <-time.After(eventPublishTimeout):
ctx.Warnf("Could not emit %q event", noDownlinkErrEvent.Event)
}
}
return nil
}
Expand Down

0 comments on commit 7180744

Please sign in to comment.