diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index a6bde46b0..c875c137f 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -32,6 +32,5 @@ jobs: go-version: "1.20" - run: | - go mod download go mod tidy git diff --exit-code diff --git a/README.md b/README.md index afd3f3c94..81b455526 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,3 @@ - # gomavlib [![Test](https://github.com/aler9/gomavlib/workflows/test/badge.svg)](https://github.com/aler9/gomavlib/actions?query=workflow:test) diff --git a/endpoint.go b/endpoint.go index ea6c80f30..0570aa6b4 100644 --- a/endpoint.go +++ b/endpoint.go @@ -6,7 +6,7 @@ import ( // EndpointConf is the interface implemented by all endpoint configurations. type EndpointConf interface { - init() (Endpoint, error) + init(*Node) (Endpoint, error) } // Endpoint is an endpoint, which can create Channels. diff --git a/endpoint_broadcast.go b/endpoint_broadcast.go index cbc9def63..208d57054 100644 --- a/endpoint_broadcast.go +++ b/endpoint_broadcast.go @@ -57,12 +57,13 @@ type EndpointUDPBroadcast struct { type endpointUDPBroadcast struct { conf EndpointUDPBroadcast pc net.PacketConn + writeTimeout time.Duration broadcastAddr net.Addr terminate chan struct{} } -func (conf EndpointUDPBroadcast) init() (Endpoint, error) { +func (conf EndpointUDPBroadcast) init(node *Node) (Endpoint, error) { ipString, port, err := net.SplitHostPort(conf.BroadcastAddress) if err != nil { return nil, fmt.Errorf("invalid broadcast address") @@ -99,6 +100,7 @@ func (conf EndpointUDPBroadcast) init() (Endpoint, error) { t := &endpointUDPBroadcast{ conf: conf, pc: pc, + writeTimeout: node.conf.WriteTimeout, broadcastAddr: &net.UDPAddr{IP: broadcastIP, Port: iport}, terminate: make(chan struct{}), } @@ -135,7 +137,7 @@ func (t *endpointUDPBroadcast) Read(buf []byte) (int, error) { } func (t *endpointUDPBroadcast) Write(buf []byte) (int, error) { - err := t.pc.SetWriteDeadline(time.Now().Add(netWriteTimeout)) + err := t.pc.SetWriteDeadline(time.Now().Add(t.writeTimeout)) if err != nil { return 0, err } diff --git a/endpoint_client.go b/endpoint_client.go index 379aa485c..3bd2c9d0e 100644 --- a/endpoint_client.go +++ b/endpoint_client.go @@ -13,7 +13,7 @@ import ( type endpointClientConf interface { isUDP() bool getAddress() string - init() (Endpoint, error) + init(*Node) (Endpoint, error) } // EndpointTCPClient sets up a endpoint that works with a TCP client. @@ -33,8 +33,8 @@ func (conf EndpointTCPClient) getAddress() string { return conf.Address } -func (conf EndpointTCPClient) init() (Endpoint, error) { - return initEndpointClient(conf) +func (conf EndpointTCPClient) init(node *Node) (Endpoint, error) { + return initEndpointClient(node, conf) } // EndpointUDPClient sets up a endpoint that works with a UDP client. @@ -51,8 +51,8 @@ func (conf EndpointUDPClient) getAddress() string { return conf.Address } -func (conf EndpointUDPClient) init() (Endpoint, error) { - return initEndpointClient(conf) +func (conf EndpointUDPClient) init(node *Node) (Endpoint, error) { + return initEndpointClient(node, conf) } type endpointClient struct { @@ -60,7 +60,7 @@ type endpointClient struct { io.ReadWriteCloser } -func initEndpointClient(conf endpointClientConf) (Endpoint, error) { +func initEndpointClient(node *Node, conf endpointClientConf) (Endpoint, error) { _, _, err := net.SplitHostPort(conf.getAddress()) if err != nil { return nil, fmt.Errorf("invalid address") @@ -79,14 +79,14 @@ func initEndpointClient(conf endpointClientConf) (Endpoint, error) { } return "tcp4" }() - timedContext, timedContextClose := context.WithTimeout(ctx, netConnectTimeout) + timedContext, timedContextClose := context.WithTimeout(ctx, node.conf.ReadTimeout) nconn, err := (&net.Dialer{}).DialContext(timedContext, network, conf.getAddress()) timedContextClose() if err != nil { return nil, err } - return timednetconn.New(netWriteTimeout, nconn), nil + return timednetconn.New(node.conf.WriteTimeout, nconn), nil }, ), } diff --git a/endpoint_custom.go b/endpoint_custom.go index 24aa26ee8..808a14868 100644 --- a/endpoint_custom.go +++ b/endpoint_custom.go @@ -16,7 +16,7 @@ type endpointCustom struct { io.ReadWriteCloser } -func (conf EndpointCustom) init() (Endpoint, error) { +func (conf EndpointCustom) init(node *Node) (Endpoint, error) { t := &endpointCustom{ conf: conf, ReadWriteCloser: conf.ReadWriteCloser, diff --git a/endpoint_serial.go b/endpoint_serial.go index 193e9e50b..84a242bd8 100644 --- a/endpoint_serial.go +++ b/endpoint_serial.go @@ -30,7 +30,7 @@ type endpointSerial struct { io.ReadWriteCloser } -func (conf EndpointSerial) init() (Endpoint, error) { +func (conf EndpointSerial) init(node *Node) (Endpoint, error) { // check device existence test, err := serialOpenFunc(conf.Device, conf.Baud) if err != nil { diff --git a/endpoint_server.go b/endpoint_server.go index 03e77d71f..65c441a17 100644 --- a/endpoint_server.go +++ b/endpoint_server.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "net" + "time" "github.com/aler9/gomavlib/pkg/timednetconn" "github.com/aler9/gomavlib/pkg/udplistener" @@ -12,7 +13,7 @@ import ( type endpointServerConf interface { isUDP() bool getAddress() string - init() (Endpoint, error) + init(*Node) (Endpoint, error) } // EndpointTCPServer sets up a endpoint that works with a TCP server. @@ -49,22 +50,23 @@ func (conf EndpointUDPServer) getAddress() string { } type endpointServer struct { - conf endpointServerConf - listener net.Listener + conf endpointServerConf + listener net.Listener + writeTimeout time.Duration // in terminate chan struct{} } -func (conf EndpointTCPServer) init() (Endpoint, error) { - return initEndpointServer(conf) +func (conf EndpointTCPServer) init(node *Node) (Endpoint, error) { + return initEndpointServer(node, conf) } -func (conf EndpointUDPServer) init() (Endpoint, error) { - return initEndpointServer(conf) +func (conf EndpointUDPServer) init(node *Node) (Endpoint, error) { + return initEndpointServer(node, conf) } -func initEndpointServer(conf endpointServerConf) (Endpoint, error) { +func initEndpointServer(node *Node, conf endpointServerConf) (Endpoint, error) { _, _, err := net.SplitHostPort(conf.getAddress()) if err != nil { return nil, fmt.Errorf("invalid address") @@ -81,9 +83,10 @@ func initEndpointServer(conf endpointServerConf) (Endpoint, error) { } t := &endpointServer{ - conf: conf, - listener: listener, - terminate: make(chan struct{}), + conf: conf, + writeTimeout: node.conf.WriteTimeout, + listener: listener, + terminate: make(chan struct{}), } return t, nil } @@ -115,7 +118,7 @@ func (t *endpointServer) accept() (string, io.ReadWriteCloser, error) { return "tcp" }(), nconn.RemoteAddr()) - conn := timednetconn.New(netWriteTimeout, nconn) + conn := timednetconn.New(t.writeTimeout, nconn) return label, conn, nil } diff --git a/node.go b/node.go index 74a0e6b9f..8666679a9 100644 --- a/node.go +++ b/node.go @@ -22,12 +22,6 @@ import ( "github.com/aler9/gomavlib/pkg/message" ) -const ( - bufferSize = 512 // frames cannot go beyond len(header) + 255 + len(check) + len(sig) - netConnectTimeout = 10 * time.Second - netWriteTimeout = 10 * time.Second -) - var errTerminated = fmt.Errorf("terminated") type writeToReq struct { @@ -82,6 +76,13 @@ type NodeConf struct { StreamRequestEnable bool // (optional) the requested stream frequency in Hz. It defaults to 4. StreamRequestFrequency int + + // (optional) read timeout. + // It defaults to 10 seconds. + ReadTimeout time.Duration + // (optional) write timeout. + // It defaults to 5 seconds. + WriteTimeout time.Duration } // Node is a high-level Mavlink encoder and decoder that works with endpoints. @@ -140,6 +141,13 @@ func NewNode(conf NodeConf) (*Node, error) { return nil, fmt.Errorf("OutKey requires V2 frames") } + if conf.ReadTimeout == 0 { + conf.ReadTimeout = 10 * time.Second + } + if conf.WriteTimeout == 0 { + conf.WriteTimeout = 5 * time.Second + } + dialectRW, err := func() (*dialect.ReadWriter, error) { if conf.Dialect == nil { return nil, nil @@ -176,7 +184,7 @@ func NewNode(conf NodeConf) (*Node, error) { // endpoints for _, tconf := range conf.Endpoints { - tp, err := tconf.init() + tp, err := tconf.init(n) if err != nil { closeExisting() return nil, err