diff --git a/core/handler/activation.go b/core/handler/activation.go index 3214da302..731aacf0f 100644 --- a/core/handler/activation.go +++ b/core/handler/activation.go @@ -79,7 +79,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv h.RegisterReceived(activation) defer func() { if err != nil { - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.ActivationErrorEvent, @@ -88,6 +89,9 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv DevEUI: activation.DevEUI, ErrorEventData: types.ErrorEventData{Error: err.Error()}, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.ActivationErrorEvent) } activation.Trace = activation.Trace.WithEvent(trace.DropEvent, "reason", err) ctx.WithError(err).Warn("Could not handle activation") @@ -190,7 +194,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv // Publish Activation mqttMetadata, _ := h.getActivationMetadata(ctx, activation, dev) - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.ActivationEvent, @@ -200,6 +205,9 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv DevAddr: types.DevAddr(joinAccept.DevAddr), Metadata: mqttMetadata, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.ActivationEvent) } // Generate random AppNonce @@ -265,6 +273,8 @@ func (h *handler) HandleActivation(activation *pb_broker.DeduplicatedDeviceActiv } func (h *handler) registerDeviceOnJoin(base *device.Device, activation *pb_broker.DeduplicatedDeviceActivationRequest) (*device.Device, error) { + ctx := h.Ctx.WithFields(logfields.ForMessage(activation)) + clone := base.Clone() clone.DevID = strings.ToLower(fmt.Sprintf("%s-%s", base.DevID, activation.DevEUI.String())) clone.DevEUI = activation.DevEUI @@ -297,11 +307,15 @@ func (h *handler) registerDeviceOnJoin(base *device.Device, activation *pb_broke return nil, err } - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: clone.AppID, DevID: clone.DevID, Event: types.CreateEvent, Data: nil, // Don't send potentially sensitive details over MQTT + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.CreateEvent) } return clone, nil diff --git a/core/handler/convert_fields.go b/core/handler/convert_fields.go index ad64ef3d2..cb058c92d 100644 --- a/core/handler/convert_fields.go +++ b/core/handler/convert_fields.go @@ -6,6 +6,7 @@ package handler import ( "encoding/json" "fmt" + "time" pb_broker "github.com/TheThingsNetwork/api/broker" pb_handler "github.com/TheThingsNetwork/api/handler" @@ -56,11 +57,15 @@ func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.Deduplicate fields, valid, err := decoder.Decode(appUp.PayloadRaw, appUp.FPort) if err != nil { // Emit the error - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appUp.AppID, DevID: appUp.DevID, Event: types.UplinkErrorEvent, Data: types.ErrorEventData{Error: fmt.Sprintf("Unable to decode payload fields: %s", err)}, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent) } // Do not set fields if processing failed, but allow the handler to continue processing @@ -76,11 +81,15 @@ func (h *handler) ConvertFieldsUp(ctx ttnlog.Interface, _ *pb_broker.Deduplicate _, err = json.Marshal(fields) if err != nil { // Emit the error - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appUp.AppID, DevID: appUp.DevID, Event: types.UplinkErrorEvent, Data: types.ErrorEventData{Error: fmt.Sprintf("Payload Function output cannot be marshaled to JSON: %s", err.Error())}, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent) } // Do not set fields if processing failed, but allow the handler to continue processing diff --git a/core/handler/convert_lorawan.go b/core/handler/convert_lorawan.go index ca3469067..66b90c6a6 100644 --- a/core/handler/convert_lorawan.go +++ b/core/handler/convert_lorawan.go @@ -4,6 +4,8 @@ package handler import ( + "time" + pb_broker "github.com/TheThingsNetwork/api/broker" pb_lorawan "github.com/TheThingsNetwork/api/protocol/lorawan" "github.com/TheThingsNetwork/api/trace" @@ -59,13 +61,17 @@ func (h *handler) ConvertFromLoRaWAN(ctx ttnlog.Interface, ttnUp *pb_broker.Dedu // If it's confirmed, we can only unset it if we receive an ack. if macPayload.Ack { // Send event over MQTT - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appUp.AppID, DevID: appUp.DevID, Event: types.DownlinkAckEvent, Data: types.DownlinkEventData{ Message: dev.CurrentDownlink, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.DownlinkAckEvent) } dev.CurrentDownlink = nil } diff --git a/core/handler/downlink.go b/core/handler/downlink.go index 70dd4674d..e6b72f9d0 100644 --- a/core/handler/downlink.go +++ b/core/handler/downlink.go @@ -40,7 +40,8 @@ func (h *handler) EnqueueDownlink(appDownlink *types.DownlinkMessage) (err error defer func() { if err != nil { - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.DownlinkErrorEvent, @@ -48,6 +49,9 @@ func (h *handler) EnqueueDownlink(appDownlink *types.DownlinkMessage) (err error ErrorEventData: types.ErrorEventData{Error: err.Error()}, Message: appDownlink, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.DownlinkErrorEvent) } } }() @@ -88,13 +92,17 @@ func (h *handler) EnqueueDownlink(appDownlink *types.DownlinkMessage) (err error return err } - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.DownlinkScheduledEvent, Data: types.DownlinkEventData{ Message: appDownlink, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.DownlinkScheduledEvent) } return nil } @@ -111,7 +119,8 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p defer func() { if err != nil { - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.DownlinkErrorEvent, @@ -119,6 +128,9 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p ErrorEventData: types.ErrorEventData{Error: err.Error()}, Message: appDownlink, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.DownlinkErrorEvent) } ctx.WithError(err).Warn("Could not handle downlink") downlink.Trace = downlink.Trace.WithEvent(trace.DropEvent, "reason", err) @@ -189,7 +201,8 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p downlinkConfig.Frequency = uint(downlink.DownlinkOption.GatewayConfiguration.Frequency) downlinkConfig.Power = int(downlink.DownlinkOption.GatewayConfiguration.Power) - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appDownlink.AppID, DevID: appDownlink.DevID, Event: types.DownlinkSentEvent, @@ -199,6 +212,9 @@ func (h *handler) HandleDownlink(appDownlink *types.DownlinkMessage, downlink *p GatewayID: downlink.DownlinkOption.GatewayID, Config: &downlinkConfig, }, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.DownlinkSentEvent) } return nil } diff --git a/core/handler/handler.go b/core/handler/handler.go index 269d15f10..b2569b848 100644 --- a/core/handler/handler.go +++ b/core/handler/handler.go @@ -5,6 +5,7 @@ package handler import ( "fmt" + "time" pb_broker "github.com/TheThingsNetwork/api/broker" "github.com/TheThingsNetwork/api/broker/brokerclient" @@ -36,6 +37,9 @@ type Handler interface { EnqueueDownlink(appDownlink *types.DownlinkMessage) error } +// Timeout for publishing events to prevent blocking critical path. +var eventPublishTimeout = 10 * time.Millisecond + // NewRedisHandler creates a new Redis-backed Handler func NewRedisHandler(client *redis.Client, ttnBrokerID string) Handler { return &handler{ diff --git a/core/handler/uplink.go b/core/handler/uplink.go index bda4230cb..16cfb8b12 100644 --- a/core/handler/uplink.go +++ b/core/handler/uplink.go @@ -23,11 +23,15 @@ func (h *handler) HandleUplink(uplink *pb_broker.DeduplicatedUplinkMessage) (err h.RegisterReceived(uplink) defer func() { if err != nil { - h.qEvent <- &types.DeviceEvent{ + select { + case h.qEvent <- &types.DeviceEvent{ AppID: appID, DevID: devID, Event: types.UplinkErrorEvent, Data: types.ErrorEventData{Error: err.Error()}, + }: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", types.UplinkErrorEvent) } ctx.WithError(err).Warn("Could not handle uplink") uplink.Trace = uplink.Trace.WithEvent(trace.DropEvent, "reason", err) @@ -108,7 +112,11 @@ func (h *handler) HandleUplink(uplink *pb_broker.DeduplicatedUplinkMessage) (err } dev.CurrentDownlink = next } else { - h.qEvent <- noDownlinkErrEvent + select { + case h.qEvent <- noDownlinkErrEvent: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", noDownlinkErrEvent.Event) + } return nil } } @@ -116,7 +124,11 @@ func (h *handler) HandleUplink(uplink *pb_broker.DeduplicatedUplinkMessage) (err if uplink.ResponseTemplate == nil { if dev.CurrentDownlink != nil { - h.qEvent <- noDownlinkErrEvent + select { + case h.qEvent <- noDownlinkErrEvent: + case <-time.After(eventPublishTimeout): + ctx.Warnf("Could not emit %q event", noDownlinkErrEvent.Event) + } } return nil }