Skip to content

Commit

Permalink
add options to set timeouts (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 authored Mar 12, 2023
1 parent 2eed29c commit 766a6f6
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 34 deletions.
1 change: 0 additions & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ jobs:
go-version: "1.20"

- run: |
go mod download
go mod tidy
git diff --exit-code
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# gomavlib

[![Test](https://github.com/aler9/gomavlib/workflows/test/badge.svg)](https://github.com/aler9/gomavlib/actions?query=workflow:test)
Expand Down
2 changes: 1 addition & 1 deletion endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

// EndpointConf is the interface implemented by all endpoint configurations.
type EndpointConf interface {
init() (Endpoint, error)
init(*Node) (Endpoint, error)
}

// Endpoint is an endpoint, which can create Channels.
Expand Down
6 changes: 4 additions & 2 deletions endpoint_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ type EndpointUDPBroadcast struct {
type endpointUDPBroadcast struct {
conf EndpointUDPBroadcast
pc net.PacketConn
writeTimeout time.Duration
broadcastAddr net.Addr

terminate chan struct{}
}

func (conf EndpointUDPBroadcast) init() (Endpoint, error) {
func (conf EndpointUDPBroadcast) init(node *Node) (Endpoint, error) {
ipString, port, err := net.SplitHostPort(conf.BroadcastAddress)
if err != nil {
return nil, fmt.Errorf("invalid broadcast address")
Expand Down Expand Up @@ -99,6 +100,7 @@ func (conf EndpointUDPBroadcast) init() (Endpoint, error) {
t := &endpointUDPBroadcast{
conf: conf,
pc: pc,
writeTimeout: node.conf.WriteTimeout,
broadcastAddr: &net.UDPAddr{IP: broadcastIP, Port: iport},
terminate: make(chan struct{}),
}
Expand Down Expand Up @@ -135,7 +137,7 @@ func (t *endpointUDPBroadcast) Read(buf []byte) (int, error) {
}

func (t *endpointUDPBroadcast) Write(buf []byte) (int, error) {
err := t.pc.SetWriteDeadline(time.Now().Add(netWriteTimeout))
err := t.pc.SetWriteDeadline(time.Now().Add(t.writeTimeout))
if err != nil {
return 0, err
}
Expand Down
16 changes: 8 additions & 8 deletions endpoint_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type endpointClientConf interface {
isUDP() bool
getAddress() string
init() (Endpoint, error)
init(*Node) (Endpoint, error)
}

// EndpointTCPClient sets up a endpoint that works with a TCP client.
Expand All @@ -33,8 +33,8 @@ func (conf EndpointTCPClient) getAddress() string {
return conf.Address
}

func (conf EndpointTCPClient) init() (Endpoint, error) {
return initEndpointClient(conf)
func (conf EndpointTCPClient) init(node *Node) (Endpoint, error) {
return initEndpointClient(node, conf)
}

// EndpointUDPClient sets up a endpoint that works with a UDP client.
Expand All @@ -51,16 +51,16 @@ func (conf EndpointUDPClient) getAddress() string {
return conf.Address
}

func (conf EndpointUDPClient) init() (Endpoint, error) {
return initEndpointClient(conf)
func (conf EndpointUDPClient) init(node *Node) (Endpoint, error) {
return initEndpointClient(node, conf)
}

type endpointClient struct {
conf endpointClientConf
io.ReadWriteCloser
}

func initEndpointClient(conf endpointClientConf) (Endpoint, error) {
func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) {
_, _, err := net.SplitHostPort(conf.getAddress())
if err != nil {
return nil, fmt.Errorf("invalid address")
Expand All @@ -79,14 +79,14 @@ func initEndpointClient(conf endpointClientConf) (Endpoint, error) {
}
return "tcp4"
}()
timedContext, timedContextClose := context.WithTimeout(ctx, netConnectTimeout)
timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout)
nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress())
timedContextClose()
if err != nil {
return nil, err
}

return timednetconn.New(netWriteTimeout, nconn), nil
return timednetconn.New(node.conf.WriteTimeout, nconn), nil
},
),
}
Expand Down
2 changes: 1 addition & 1 deletion endpoint_custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type endpointCustom struct {
io.ReadWriteCloser
}

func (conf EndpointCustom) init() (Endpoint, error) {
func (conf EndpointCustom) init(node *Node) (Endpoint, error) {
t := &endpointCustom{
conf: conf,
ReadWriteCloser: conf.ReadWriteCloser,
Expand Down
2 changes: 1 addition & 1 deletion endpoint_serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type endpointSerial struct {
io.ReadWriteCloser
}

func (conf EndpointSerial) init() (Endpoint, error) {
func (conf EndpointSerial) init(node *Node) (Endpoint, error) {
// check device existence
test, err := serialOpenFunc(conf.Device, conf.Baud)
if err != nil {
Expand Down
27 changes: 15 additions & 12 deletions endpoint_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net"
"time"

"github.com/aler9/gomavlib/pkg/timednetconn"
"github.com/aler9/gomavlib/pkg/udplistener"
Expand All @@ -12,7 +13,7 @@ import (
type endpointServerConf interface {
isUDP() bool
getAddress() string
init() (Endpoint, error)
init(*Node) (Endpoint, error)
}

// EndpointTCPServer sets up a endpoint that works with a TCP server.
Expand Down Expand Up @@ -49,22 +50,23 @@ func (conf EndpointUDPServer) getAddress() string {
}

type endpointServer struct {
conf endpointServerConf
listener net.Listener
conf endpointServerConf
listener net.Listener
writeTimeout time.Duration

// in
terminate chan struct{}
}

func (conf EndpointTCPServer) init() (Endpoint, error) {
return initEndpointServer(conf)
func (conf EndpointTCPServer) init(node *Node) (Endpoint, error) {
return initEndpointServer(node, conf)
}

func (conf EndpointUDPServer) init() (Endpoint, error) {
return initEndpointServer(conf)
func (conf EndpointUDPServer) init(node *Node) (Endpoint, error) {
return initEndpointServer(node, conf)
}

func initEndpointServer(conf endpointServerConf) (Endpoint, error) {
func initEndpointServer(node *Node, conf endpointServerConf) (Endpoint, error) {
_, _, err := net.SplitHostPort(conf.getAddress())
if err != nil {
return nil, fmt.Errorf("invalid address")
Expand All @@ -81,9 +83,10 @@ func initEndpointServer(conf endpointServerConf) (Endpoint, error) {
}

t := &endpointServer{
conf: conf,
listener: listener,
terminate: make(chan struct{}),
conf: conf,
writeTimeout: node.conf.WriteTimeout,
listener: listener,
terminate: make(chan struct{}),
}
return t, nil
}
Expand Down Expand Up @@ -115,7 +118,7 @@ func (t *endpointServer) accept() (string, io.ReadWriteCloser, error) {
return "tcp"
}(), nconn.RemoteAddr())

conn := timednetconn.New(netWriteTimeout, nconn)
conn := timednetconn.New(t.writeTimeout, nconn)

return label, conn, nil
}
22 changes: 15 additions & 7 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
"github.com/aler9/gomavlib/pkg/message"
)

const (
bufferSize = 512 // frames cannot go beyond len(header) + 255 + len(check) + len(sig)
netConnectTimeout = 10 * time.Second
netWriteTimeout = 10 * time.Second
)

var errTerminated = fmt.Errorf("terminated")

type writeToReq struct {
Expand Down Expand Up @@ -82,6 +76,13 @@ type NodeConf struct {
StreamRequestEnable bool
// (optional) the requested stream frequency in Hz. It defaults to 4.
StreamRequestFrequency int

// (optional) read timeout.
// It defaults to 10 seconds.
ReadTimeout time.Duration
// (optional) write timeout.
// It defaults to 5 seconds.
WriteTimeout time.Duration
}

// Node is a high-level Mavlink encoder and decoder that works with endpoints.
Expand Down Expand Up @@ -140,6 +141,13 @@ func NewNode(conf NodeConf) (*Node, error) {
return nil, fmt.Errorf("OutKey requires V2 frames")
}

if conf.ReadTimeout == 0 {
conf.ReadTimeout = 10 * time.Second
}
if conf.WriteTimeout == 0 {
conf.WriteTimeout = 5 * time.Second
}

dialectRW, err := func() (*dialect.ReadWriter, error) {
if conf.Dialect == nil {
return nil, nil
Expand Down Expand Up @@ -176,7 +184,7 @@ func NewNode(conf NodeConf) (*Node, error) {

// endpoints
for _, tconf := range conf.Endpoints {
tp, err := tconf.init()
tp, err := tconf.init(n)
if err != nil {
closeExisting()
return nil, err
Expand Down

0 comments on commit 766a6f6

Please sign in to comment.