diff --git a/cmd/ssl-status-board/main.go b/cmd/ssl-status-board/main.go index 8d28a77..33c16d2 100644 --- a/cmd/ssl-status-board/main.go +++ b/cmd/ssl-status-board/main.go @@ -23,7 +23,7 @@ func main() { } refereeBoard := board.NewBoard(config.RefereeConnection) - refereeBoard.MulticastReceiver.SkipInterfaces = config.RefereeConnection.SkipInterfaces + refereeBoard.MulticastServer.SkipInterfaces = config.RefereeConnection.SkipInterfaces refereeBoard.Start() http.HandleFunc(config.RefereeConnection.SubscribePath, refereeBoard.WsHandler) diff --git a/pkg/board/board.go b/pkg/board/board.go index 8a4ca6e..eb074fd 100644 --- a/pkg/board/board.go +++ b/pkg/board/board.go @@ -11,28 +11,28 @@ import ( // Board contains the state of this referee board type Board struct { - cfg RefereeConfig - refereeData []byte - mutex sync.Mutex - MulticastReceiver *sslnet.MulticastReceiver + cfg RefereeConfig + refereeData []byte + mutex sync.Mutex + MulticastServer *sslnet.MulticastServer } // NewBoard creates a new referee board func NewBoard(cfg RefereeConfig) (b *Board) { b = new(Board) b.cfg = cfg - b.MulticastReceiver = sslnet.NewMulticastReceiver(b.handlingMessage) + b.MulticastServer = sslnet.NewMulticastServer(b.handlingMessage) return } // Start listening for messages func (b *Board) Start() { - b.MulticastReceiver.Start(b.cfg.MulticastAddress) + b.MulticastServer.Start(b.cfg.MulticastAddress) } // Stop listening for messages func (b *Board) Stop() { - b.MulticastReceiver.Stop() + b.MulticastServer.Stop() } func (b *Board) handlingMessage(data []byte) { diff --git a/pkg/sslnet/multicast_receiver.go b/pkg/sslnet/multicast_receiver.go index 01dbb04..2e5ad26 100644 --- a/pkg/sslnet/multicast_receiver.go +++ b/pkg/sslnet/multicast_receiver.go @@ -9,64 +9,74 @@ import ( const maxDatagramSize = 8192 -type MulticastReceiver struct { - activeIfis map[string]bool - connections []*net.UDPConn +type MulticastServer struct { + connection *net.UDPConn running bool consumer func([]byte) mutex sync.Mutex SkipInterfaces []string + Verbose bool } -func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) { - r = new(MulticastReceiver) - r.activeIfis = map[string]bool{} +func NewMulticastServer(consumer func([]byte)) (r *MulticastServer) { + r = new(MulticastServer) r.consumer = consumer return } -func (r *MulticastReceiver) Start(multicastAddress string) { +func (r *MulticastServer) Start(multicastAddress string) { r.running = true go r.receive(multicastAddress) } -func (r *MulticastReceiver) Stop() { +func (r *MulticastServer) Stop() { r.mutex.Lock() defer r.mutex.Unlock() r.running = false - for _, c := range r.connections { - if err := c.Close(); err != nil { - log.Println("Could not close connection: ", err) - } + if err := r.connection.Close(); err != nil { + log.Println("Could not close connection: ", err) } } -func (r *MulticastReceiver) receive(multicastAddress string) { +func (r *MulticastServer) receive(multicastAddress string) { + log.Printf("Receiving on %v", multicastAddress) + var currentIfiIdx = 0 for r.isRunning() { - ifis, _ := net.Interfaces() - for _, ifi := range ifis { - if ifi.Flags&net.FlagMulticast == 0 || // No multicast support - r.skipInterface(ifi.Name) { - continue - } - r.mutex.Lock() - if _, ok := r.activeIfis[ifi.Name]; !ok { - // interface not active, (re-)start receiving - go r.receiveOnInterface(multicastAddress, ifi) - } - r.mutex.Unlock() + ifis := r.interfaces() + currentIfiIdx = currentIfiIdx % len(ifis) + ifi := ifis[currentIfiIdx] + r.receiveOnInterface(multicastAddress, ifi) + currentIfiIdx++ + if currentIfiIdx >= len(ifis) { + // cycled though all interfaces once, make a short break to avoid producing endless log messages + time.Sleep(1 * time.Second) } - time.Sleep(1 * time.Second) } } -func (r *MulticastReceiver) isRunning() bool { +func (r *MulticastServer) isRunning() bool { r.mutex.Lock() defer r.mutex.Unlock() return r.running } -func (r *MulticastReceiver) skipInterface(ifiName string) bool { +func (r *MulticastServer) interfaces() (interfaces []net.Interface) { + interfaces = []net.Interface{} + ifis, err := net.Interfaces() + if err != nil { + log.Println("Could not get available interfaces: ", err) + } + for _, ifi := range ifis { + if ifi.Flags&net.FlagMulticast == 0 || // No multicast support + r.skipInterface(ifi.Name) { + continue + } + interfaces = append(interfaces, ifi) + } + return +} + +func (r *MulticastServer) skipInterface(ifiName string) bool { for _, skipIfi := range r.SkipInterfaces { if skipIfi == ifiName { return true @@ -75,52 +85,55 @@ func (r *MulticastReceiver) skipInterface(ifiName string) bool { return false } -func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.Interface) { +func (r *MulticastServer) receiveOnInterface(multicastAddress string, ifi net.Interface) { addr, err := net.ResolveUDPAddr("udp", multicastAddress) if err != nil { log.Printf("Could resolve multicast address %v: %v", multicastAddress, err) return } - listener, err := net.ListenMulticastUDP("udp", &ifi, addr) + r.connection, err = net.ListenMulticastUDP("udp", &ifi, addr) if err != nil { log.Printf("Could not listen at %v: %v", multicastAddress, err) return } - if err := listener.SetReadBuffer(maxDatagramSize); err != nil { + if err := r.connection.SetReadBuffer(maxDatagramSize); err != nil { log.Println("Could not set read buffer: ", err) } - r.mutex.Lock() - r.connections = append(r.connections, listener) - r.activeIfis[ifi.Name] = true - r.mutex.Unlock() - - log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name) + if r.Verbose { + log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name) + } + first := true data := make([]byte, maxDatagramSize) for { - n, _, err := listener.ReadFrom(data) + if err := r.connection.SetDeadline(time.Now().Add(300 * time.Millisecond)); err != nil { + log.Println("Could not set deadline on connection: ", err) + } + n, _, err := r.connection.ReadFromUDP(data) if err != nil { - log.Println("ReadFromUDP failed:", err) + if r.Verbose { + log.Println("ReadFromUDP failed:", err) + } break } + if first { + log.Printf("Got first data packets from %s (%s)", multicastAddress, ifi.Name) + first = false + } + r.consumer(data[:n]) } - log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name) + if r.Verbose { + log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name) + } - if err := listener.Close(); err != nil { + if err := r.connection.Close(); err != nil { log.Println("Could not close listener: ", err) } - r.mutex.Lock() - delete(r.activeIfis, ifi.Name) - for i, c := range r.connections { - if c == listener { - r.connections = append(r.connections[:i], r.connections[i+1:]...) - } - } - r.mutex.Unlock() + return }