Skip to content

Commit

Permalink
Use multicast server from game-controller
Browse files Browse the repository at this point in the history
Listening on all interfaces for multicast traffic
seems to have side-effects, so cycling through
them until messages are received is more robust.
  • Loading branch information
g3force committed Feb 21, 2021
1 parent a4da55c commit 5367060
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 57 deletions.
2 changes: 1 addition & 1 deletion cmd/ssl-status-board/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {
}

refereeBoard := board.NewBoard(config.RefereeConnection)
refereeBoard.MulticastReceiver.SkipInterfaces = config.RefereeConnection.SkipInterfaces
refereeBoard.MulticastServer.SkipInterfaces = config.RefereeConnection.SkipInterfaces
refereeBoard.Start()
http.HandleFunc(config.RefereeConnection.SubscribePath, refereeBoard.WsHandler)

Expand Down
14 changes: 7 additions & 7 deletions pkg/board/board.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (

// Board contains the state of this referee board
type Board struct {
cfg RefereeConfig
refereeData []byte
mutex sync.Mutex
MulticastReceiver *sslnet.MulticastReceiver
cfg RefereeConfig
refereeData []byte
mutex sync.Mutex
MulticastServer *sslnet.MulticastServer
}

// NewBoard creates a new referee board
func NewBoard(cfg RefereeConfig) (b *Board) {
b = new(Board)
b.cfg = cfg
b.MulticastReceiver = sslnet.NewMulticastReceiver(b.handlingMessage)
b.MulticastServer = sslnet.NewMulticastServer(b.handlingMessage)
return
}

// Start listening for messages
func (b *Board) Start() {
b.MulticastReceiver.Start(b.cfg.MulticastAddress)
b.MulticastServer.Start(b.cfg.MulticastAddress)
}

// Stop listening for messages
func (b *Board) Stop() {
b.MulticastReceiver.Stop()
b.MulticastServer.Stop()
}

func (b *Board) handlingMessage(data []byte) {
Expand Down
111 changes: 62 additions & 49 deletions pkg/sslnet/multicast_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,64 +9,74 @@ import (

const maxDatagramSize = 8192

type MulticastReceiver struct {
activeIfis map[string]bool
connections []*net.UDPConn
type MulticastServer struct {
connection *net.UDPConn
running bool
consumer func([]byte)
mutex sync.Mutex
SkipInterfaces []string
Verbose bool
}

func NewMulticastReceiver(consumer func([]byte)) (r *MulticastReceiver) {
r = new(MulticastReceiver)
r.activeIfis = map[string]bool{}
func NewMulticastServer(consumer func([]byte)) (r *MulticastServer) {
r = new(MulticastServer)
r.consumer = consumer
return
}

func (r *MulticastReceiver) Start(multicastAddress string) {
func (r *MulticastServer) Start(multicastAddress string) {
r.running = true
go r.receive(multicastAddress)
}

func (r *MulticastReceiver) Stop() {
func (r *MulticastServer) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.running = false
for _, c := range r.connections {
if err := c.Close(); err != nil {
log.Println("Could not close connection: ", err)
}
if err := r.connection.Close(); err != nil {
log.Println("Could not close connection: ", err)
}
}

func (r *MulticastReceiver) receive(multicastAddress string) {
func (r *MulticastServer) receive(multicastAddress string) {
log.Printf("Receiving on %v", multicastAddress)
var currentIfiIdx = 0
for r.isRunning() {
ifis, _ := net.Interfaces()
for _, ifi := range ifis {
if ifi.Flags&net.FlagMulticast == 0 || // No multicast support
r.skipInterface(ifi.Name) {
continue
}
r.mutex.Lock()
if _, ok := r.activeIfis[ifi.Name]; !ok {
// interface not active, (re-)start receiving
go r.receiveOnInterface(multicastAddress, ifi)
}
r.mutex.Unlock()
ifis := r.interfaces()
currentIfiIdx = currentIfiIdx % len(ifis)
ifi := ifis[currentIfiIdx]
r.receiveOnInterface(multicastAddress, ifi)
currentIfiIdx++
if currentIfiIdx >= len(ifis) {
// cycled though all interfaces once, make a short break to avoid producing endless log messages
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}

func (r *MulticastReceiver) isRunning() bool {
func (r *MulticastServer) isRunning() bool {
r.mutex.Lock()
defer r.mutex.Unlock()
return r.running
}

func (r *MulticastReceiver) skipInterface(ifiName string) bool {
func (r *MulticastServer) interfaces() (interfaces []net.Interface) {
interfaces = []net.Interface{}
ifis, err := net.Interfaces()
if err != nil {
log.Println("Could not get available interfaces: ", err)
}
for _, ifi := range ifis {
if ifi.Flags&net.FlagMulticast == 0 || // No multicast support
r.skipInterface(ifi.Name) {
continue
}
interfaces = append(interfaces, ifi)
}
return
}

func (r *MulticastServer) skipInterface(ifiName string) bool {
for _, skipIfi := range r.SkipInterfaces {
if skipIfi == ifiName {
return true
Expand All @@ -75,52 +85,55 @@ func (r *MulticastReceiver) skipInterface(ifiName string) bool {
return false
}

func (r *MulticastReceiver) receiveOnInterface(multicastAddress string, ifi net.Interface) {
func (r *MulticastServer) receiveOnInterface(multicastAddress string, ifi net.Interface) {
addr, err := net.ResolveUDPAddr("udp", multicastAddress)
if err != nil {
log.Printf("Could resolve multicast address %v: %v", multicastAddress, err)
return
}

listener, err := net.ListenMulticastUDP("udp", &ifi, addr)
r.connection, err = net.ListenMulticastUDP("udp", &ifi, addr)
if err != nil {
log.Printf("Could not listen at %v: %v", multicastAddress, err)
return
}

if err := listener.SetReadBuffer(maxDatagramSize); err != nil {
if err := r.connection.SetReadBuffer(maxDatagramSize); err != nil {
log.Println("Could not set read buffer: ", err)
}

r.mutex.Lock()
r.connections = append(r.connections, listener)
r.activeIfis[ifi.Name] = true
r.mutex.Unlock()

log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name)
if r.Verbose {
log.Printf("Listening on %s (%s)", multicastAddress, ifi.Name)
}

first := true
data := make([]byte, maxDatagramSize)
for {
n, _, err := listener.ReadFrom(data)
if err := r.connection.SetDeadline(time.Now().Add(300 * time.Millisecond)); err != nil {
log.Println("Could not set deadline on connection: ", err)
}
n, _, err := r.connection.ReadFromUDP(data)
if err != nil {
log.Println("ReadFromUDP failed:", err)
if r.Verbose {
log.Println("ReadFromUDP failed:", err)
}
break
}

if first {
log.Printf("Got first data packets from %s (%s)", multicastAddress, ifi.Name)
first = false
}

r.consumer(data[:n])
}

log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name)
if r.Verbose {
log.Printf("Stop listening on %s (%s)", multicastAddress, ifi.Name)
}

if err := listener.Close(); err != nil {
if err := r.connection.Close(); err != nil {
log.Println("Could not close listener: ", err)
}
r.mutex.Lock()
delete(r.activeIfis, ifi.Name)
for i, c := range r.connections {
if c == listener {
r.connections = append(r.connections[:i], r.connections[i+1:]...)
}
}
r.mutex.Unlock()
return
}

0 comments on commit 5367060

Please sign in to comment.