Skip to content
This repository has been archived by the owner on Dec 14, 2021. It is now read-only.

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
htdvisser committed May 13, 2020
2 parents 1b1c68b + 7180744 commit be6a661
Show file tree
Hide file tree
Showing 44 changed files with 1,499 additions and 284 deletions.
7 changes: 4 additions & 3 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cache:

tests:
stage: test
image: golang:1.13
image: golang:1.14
services:
- thethingsnetwork/rabbitmq
- redis
Expand All @@ -27,10 +27,11 @@ tests:
script:
- make deps
- make test
retry: 2

binaries:
stage: build
image: golang:1.13
image: golang:1.14
script:
- mkdir release
- export CI_BUILD_DATE=$(date -u +%Y-%m-%dT%H:%M:%SZ)
Expand All @@ -48,7 +49,7 @@ sign:
- master@thethingsnetwork/ttn
- develop@thethingsnetwork/ttn
stage: sign
image: golang:1.13
image: golang:1.14
script:
- pushd release
- shasum -a 256 $(ls) > checksums
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- docker

go:
- "1.13.x"
- "1.14.x"

env:
global:
Expand Down
20 changes: 10 additions & 10 deletions api/go.mod
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
module github.com/TheThingsNetwork/ttn/api

go 1.11
go 1.14

replace github.com/TheThingsNetwork/ttn/utils/errors => ../utils/errors

require (
github.com/StackExchange/wmi v0.0.0-20181212234831-e0a55b97c705 // indirect
github.com/TheThingsNetwork/api v0.0.0-20190516085542-c732802571cf
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/TheThingsNetwork/api v0.0.0-20200324103623-039923721bb6
github.com/TheThingsNetwork/go-utils v0.0.0-20190516083235-bdd4967fab4e
github.com/TheThingsNetwork/ttn/utils/errors v0.0.0-20190516081709-034d40b328bd
github.com/apex/log v1.1.0
github.com/apex/log v1.1.2
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/juju/ratelimit v1.0.1
github.com/mwitkow/go-grpc-middleware v1.0.0
github.com/shirou/gopsutil v2.18.12+incompatible
github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4 // indirect
github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3
golang.org/x/net v0.0.0-20190514140710-3ec191127204
google.golang.org/grpc v1.20.1
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/shirou/gopsutil v2.20.2+incompatible
github.com/smartystreets/assertions v1.0.1
golang.org/x/net v0.0.0-20200320220750-118fecf932d8
google.golang.org/grpc v1.28.0
)
192 changes: 171 additions & 21 deletions api/go.sum

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions api/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
"github.com/TheThingsNetwork/go-utils/grpc/rpclog"
"github.com/TheThingsNetwork/go-utils/roots"
"github.com/TheThingsNetwork/ttn/utils/errors"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/mwitkow/go-grpc-middleware"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)

// RootCAs to use in API connections
Expand Down Expand Up @@ -65,11 +66,6 @@ func (c *conn) dial(ctx context.Context, opts ...grpc.DialOption) {
}()
}

// KeepAliveDialer is a dialer that adds a 10 second TCP KeepAlive
func KeepAliveDialer(addr string, timeout time.Duration) (net.Conn, error) {
return (&net.Dialer{Timeout: timeout, KeepAlive: 10 * time.Second}).Dial("tcp", addr)
}

// DefaultDialOptions for connecting with servers
var DefaultDialOptions = []grpc.DialOption{
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
Expand All @@ -83,8 +79,11 @@ var DefaultDialOptions = []grpc.DialOption{
restartstream.Interceptor(restartstream.DefaultSettings),
rpclog.StreamClientInterceptor(nil),
)),
grpc.WithDialer(KeepAliveDialer),
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
}),
}

// Global pool with connections
Expand Down
88 changes: 57 additions & 31 deletions core/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ type Broker interface {
HandleDownlink(downlink *pb.DownlinkMessage) error
HandleActivation(activation *pb.DeviceActivationRequest) (*pb.DeviceActivationResponse, error)

ActivateRouter(id string) (<-chan *pb.DownlinkMessage, error)
DeactivateRouter(id string) error
ActivateRouterDownlink(id string) (<-chan *pb.DownlinkMessage, error)
DeactivateRouterDownlink(id string) error
ActivateHandlerUplink(id string) (<-chan *pb.DeduplicatedUplinkMessage, error)
DeactivateHandlerUplink(id string) error
}

func NewBroker(timeout time.Duration) Broker {
return &broker{
routers: make(map[string]chan *pb.DownlinkMessage),
routers: make(map[string]*router),
handlers: make(map[string]*handler),
uplinkDeduplicator: NewDeduplicator(timeout),
activationDeduplicator: NewDeduplicator(timeout),
Expand All @@ -52,7 +52,7 @@ func (b *broker) SetNetworkServer(addr, cert, token string) {

type broker struct {
*component.Component
routers map[string]chan *pb.DownlinkMessage
routers map[string]*router
routersLock sync.RWMutex
handlers map[string]*handler
handlersLock sync.RWMutex
Expand Down Expand Up @@ -141,41 +141,64 @@ func (b *broker) Init(c *component.Component) error {

func (b *broker) Shutdown() {}

func (b *broker) ActivateRouter(id string) (<-chan *pb.DownlinkMessage, error) {
type router struct {
downlinkConns int
downlink chan *pb.DownlinkMessage
sync.Mutex
}

func (b *broker) getRouter(id string) *router {
b.routersLock.Lock()
defer b.routersLock.Unlock()
if existing, ok := b.routers[id]; ok {
return existing, errors.NewErrInternal(fmt.Sprintf("Router %s already active", id))
return existing
}
b.routers[id] = new(router)
return b.routers[id]
}

func (b *broker) ActivateRouterDownlink(id string) (<-chan *pb.DownlinkMessage, error) {
rtr := b.getRouter(id)
rtr.Lock()
defer rtr.Unlock()
if rtr.downlink == nil {
rtr.downlink = make(chan *pb.DownlinkMessage)
}
b.routers[id] = make(chan *pb.DownlinkMessage)
rtr.downlinkConns++
connectedRouters.Inc()
return b.routers[id], nil
return rtr.downlink, nil
}

func (b *broker) DeactivateRouter(id string) error {
b.routersLock.Lock()
defer b.routersLock.Unlock()
if channel, ok := b.routers[id]; ok {
close(channel)
delete(b.routers, id)
connectedRouters.Dec()
return nil
func (b *broker) DeactivateRouterDownlink(id string) error {
rtr := b.getRouter(id)
rtr.Lock()
defer rtr.Unlock()
if rtr.downlinkConns == 0 {
return errors.NewErrInternal(fmt.Sprintf("Router %s not active", id))
}
return errors.NewErrInternal(fmt.Sprintf("Router %s not active", id))
connectedRouters.Dec()
rtr.downlinkConns--
if rtr.downlinkConns == 0 {
close(rtr.downlink)
rtr.downlink = nil
}
return nil
}

func (b *broker) getRouter(id string) (chan<- *pb.DownlinkMessage, error) {
b.routersLock.RLock()
defer b.routersLock.RUnlock()
if router, ok := b.routers[id]; ok {
return router, nil
func (b *broker) getRouterDownlink(id string) (chan<- *pb.DownlinkMessage, error) {
rtr := b.getRouter(id)
rtr.Lock()
defer rtr.Unlock()
if rtr.downlink == nil {
return nil, errors.NewErrInternal(fmt.Sprintf("Router %s not active", id))
}
return nil, errors.NewErrInternal(fmt.Sprintf("Router %s not active", id))
return rtr.downlink, nil
}

type handler struct {
conn *grpc.ClientConn
uplink chan *pb.DeduplicatedUplinkMessage
conn *grpc.ClientConn
uplinkConns int
uplink chan *pb.DeduplicatedUplinkMessage
sync.Mutex
}

Expand All @@ -193,10 +216,10 @@ func (b *broker) ActivateHandlerUplink(id string) (<-chan *pb.DeduplicatedUplink
hdl := b.getHandler(id)
hdl.Lock()
defer hdl.Unlock()
if hdl.uplink != nil {
return hdl.uplink, errors.NewErrInternal(fmt.Sprintf("Handler %s already active", id))
if hdl.uplink == nil {
hdl.uplink = make(chan *pb.DeduplicatedUplinkMessage)
}
hdl.uplink = make(chan *pb.DeduplicatedUplinkMessage)
hdl.uplinkConns++
connectedHandlers.Inc()
return hdl.uplink, nil
}
Expand All @@ -205,12 +228,15 @@ func (b *broker) DeactivateHandlerUplink(id string) error {
hdl := b.getHandler(id)
hdl.Lock()
defer hdl.Unlock()
if hdl.uplink == nil {
if hdl.uplinkConns == 0 {
return errors.NewErrInternal(fmt.Sprintf("Handler %s not active", id))
}
close(hdl.uplink)
hdl.uplink = nil
connectedHandlers.Dec()
hdl.uplinkConns--
if hdl.uplinkConns == 0 {
close(hdl.uplink)
hdl.uplink = nil
}
return nil
}

Expand Down
24 changes: 16 additions & 8 deletions core/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,19 @@ func TestActivateDeactivateRouter(t *testing.T) {
a := New(t)

b := &broker{
routers: make(map[string]chan *pb.DownlinkMessage),
routers: make(map[string]*router),
}

err := b.DeactivateRouter("RouterID")
err := b.DeactivateRouterDownlink("RouterID")
a.So(err, ShouldNotBeNil)

ch, err := b.ActivateRouter("RouterID")
ch, err := b.ActivateRouterDownlink("RouterID")
a.So(err, ShouldBeNil)
a.So(ch, ShouldNotBeNil)

_, err = b.ActivateRouter("RouterID")
a.So(err, ShouldNotBeNil)
otherCH, err := b.ActivateRouterDownlink("RouterID")
a.So(err, ShouldBeNil)
a.So(otherCH, ShouldEqual, ch)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -96,7 +97,10 @@ func TestActivateDeactivateRouter(t *testing.T) {
wg.Done()
}()

err = b.DeactivateRouter("RouterID")
err = b.DeactivateRouterDownlink("RouterID")
a.So(err, ShouldBeNil)

err = b.DeactivateRouterDownlink("RouterID")
a.So(err, ShouldBeNil)

wg.Wait()
Expand All @@ -116,8 +120,9 @@ func TestActivateDeactivateHandler(t *testing.T) {
a.So(err, ShouldBeNil)
a.So(ch, ShouldNotBeNil)

_, err = b.ActivateHandlerUplink("HandlerID")
a.So(err, ShouldNotBeNil)
otherCH, err := b.ActivateHandlerUplink("HandlerID")
a.So(err, ShouldBeNil)
a.So(otherCH, ShouldEqual, ch)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -130,5 +135,8 @@ func TestActivateDeactivateHandler(t *testing.T) {
err = b.DeactivateHandlerUplink("HandlerID")
a.So(err, ShouldBeNil)

err = b.DeactivateHandlerUplink("HandlerID")
a.So(err, ShouldBeNil)

wg.Wait()
}
2 changes: 1 addition & 1 deletion core/broker/downlink.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (b *broker) HandleDownlink(downlink *pb.DownlinkMessage) error {
}
ctx = ctx.WithField("RouterID", routerID)

router, err := b.getRouter(routerID)
router, err := b.getRouterDownlink(routerID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions core/broker/downlink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func TestDownlink(t *testing.T) {
Monitor: monitorclient.NewMonitorClient(),
},
ns: &mockNetworkServer{},
routers: map[string]chan *pb.DownlinkMessage{
"routerID": dlch,
routers: map[string]*router{
"routerID": &router{downlinkConns: 1, downlink: dlch},
},
}
b.InitStatus()
Expand Down
4 changes: 2 additions & 2 deletions core/broker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ func (b *brokerRPC) associateRouter(md metadata.MD) (chan *pb.UplinkMessage, <-c
if err != nil {
return nil, nil, nil, err
}
down, err := b.broker.ActivateRouter(router.ID)
down, err := b.broker.ActivateRouterDownlink(router.ID)
if err != nil {
return nil, nil, nil, err
}

up := make(chan *pb.UplinkMessage, 1)

cancel := func() {
b.broker.DeactivateRouter(router.ID)
b.broker.DeactivateRouterDownlink(router.ID)
}

go func() {
Expand Down
6 changes: 3 additions & 3 deletions core/broker/uplink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestDeduplicateUplink(t *testing.T) {
uplink4 := &pb.UplinkMessage{Payload: payload, GatewayMetadata: gateway.RxMetadata{SNR: 7.8}, ProtocolMetadata: protocolMetadata}

b := getTestBroker(t)
b.uplinkDeduplicator = NewDeduplicator(20 * time.Millisecond).(*deduplicator)
b.uplinkDeduplicator = NewDeduplicator(200 * time.Millisecond).(*deduplicator)

var wg sync.WaitGroup
wg.Add(1)
Expand All @@ -172,12 +172,12 @@ func TestDeduplicateUplink(t *testing.T) {
wg.Done()
}()

<-time.After(10 * time.Millisecond)
<-time.After(100 * time.Millisecond)

a.So(b.deduplicateUplink(uplink2), ShouldBeNil)
a.So(b.deduplicateUplink(uplink3), ShouldBeNil)

<-time.After(50 * time.Millisecond)
<-time.After(500 * time.Millisecond)

a.So(b.deduplicateUplink(uplink4), ShouldNotBeNil)

Expand Down
Loading

0 comments on commit be6a661

Please sign in to comment.