diff --git a/cmd/subspace/main.go b/cmd/subspace/main.go index 9a76396..464dcb3 100644 --- a/cmd/subspace/main.go +++ b/cmd/subspace/main.go @@ -41,17 +41,17 @@ import ( func main() { rt := int(time.Hour / 1e9) - if e, ok := os.LookupEnv("SUBSPACE_RETENTION"); ok { + if e, ok := os.LookupEnv("SUBSPACE_RETENTION"); ok { rt, _ = strconv.Atoi(e) - } + } - if len(os.Args) > 1 { - go subspace.Relay(os.Args[1:]) - } + if len(os.Args) > 1 { + go subspace.Relay(os.Args[1:]) + } s := sub.NewSpace() - - exit := make(chan os.Signal, 1) + + exit := make(chan os.Signal, 1) signal.Notify(exit, syscall.SIGINT, syscall.SIGTERM) @@ -60,7 +60,7 @@ func main() { go gc(s, rt) - fmt.Printf("⇌ Subspace %ds %v\n", rt, os.Args[1:]) + fmt.Printf("⇌ Subspace %ds %v\n", rt, os.Args[1:]) <-exit @@ -95,7 +95,7 @@ func gc(s *sub.Space, rt int) { atomic.LoadUint64(&s.StatAlloc), atomic.LoadUint64(&subspace.Rx), atomic.LoadUint64(&subspace.Tx), - atomic.LoadUint64(&subspace.Fx), + atomic.LoadUint64(&subspace.Fx), }) if err == nil { diff --git a/internal/app/subspace/relay.go b/internal/app/subspace/relay.go index 31d0cc3..90fa16f 100644 --- a/internal/app/subspace/relay.go +++ b/internal/app/subspace/relay.go @@ -3,7 +3,7 @@ package subspace import ( "net" - "runtime" + "runtime" "sync/atomic" "github.com/cuhsat/subspace/internal/pkg/sys" @@ -14,14 +14,14 @@ import ( type Bind func(u *net.UDPConn, s *sub.Space) var ( - // Received bytes. - Rx uint64 + // Received bytes. + Rx uint64 // Transmitted bytes. - Tx uint64 - // Forwarded bytes. - Fx uint64 - // Data channel. - dc atomic.Pointer[chan []byte] + Tx uint64 + // Forwarded bytes. + Fx uint64 + // Data channel. + dc atomic.Pointer[chan []byte] ) // A relay is a uni-directional communication relay to another subspace relay. @@ -54,29 +54,29 @@ func NewRelay(host string) (r *relay) { // // Any calling program will terminate immediately if an error occurs. func Relay(hosts []string) { - rs := make([]*relay, 0) - - for _, host := range hosts { - rs = append(rs, NewRelay(host)) - } - - ch := make(chan []byte) - - dc.Store(&ch) - - go func() { - for b := range *dc.Load() { - for _, r := range rs { - n, err := r.tu.Write(b) - - if err != nil { - sys.Fatal(err) - } - - atomic.AddUint64(&Fx, uint64(n)) - } - } - }() + rs := make([]*relay, 0) + + for _, host := range hosts { + rs = append(rs, NewRelay(host)) + } + + ch := make(chan []byte) + + dc.Store(&ch) + + go func() { + for b := range *dc.Load() { + for _, r := range rs { + n, err := r.tu.Write(b) + + if err != nil { + sys.Fatal(err) + } + + atomic.AddUint64(&Fx, uint64(n)) + } + } + }() } // Send receives data from an UDP pseudo connection @@ -94,9 +94,9 @@ func Send(u *net.UDPConn, s *sub.Space) { atomic.AddUint64(&Rx, uint64(n)) - if c := dc.Load(); c != nil { - *c <- b[:n] - } + if c := dc.Load(); c != nil { + *c <- b[:n] + } } // Scan receives a state id from an UDP pseudo connection diff --git a/internal/app/subspace/relay_test.go b/internal/app/subspace/relay_test.go index 5d23cb9..09e5912 100644 --- a/internal/app/subspace/relay_test.go +++ b/internal/app/subspace/relay_test.go @@ -24,30 +24,30 @@ func TestMain(m *testing.M) { } func TestRelay(t *testing.T) { - if os.Getenv("CI") != "" { - t.Skip() // Faulty CI - } + if os.Getenv("CI") != "" { + t.Skip() // Faulty CI + } + + t.Run("Relay should relay a signal to a relay", func(t *testing.T) { + t.Cleanup(_cleanup) - t.Run("Relay should relay a signal to a relay", func(t *testing.T) { - t.Cleanup(_cleanup) + go Relay([]string{"localhost"}) - go Relay([]string{"localhost"}) - - s := _s.Load() - u := sys.Listen("localhost" + sys.Port1) + s := _s.Load() + u := sys.Listen("localhost" + sys.Port1) - defer u.Close() + defer u.Close() - go Send(u, s) + go Send(u, s) - _sendOnce() + _sendOnce() - time.Sleep(time.Millisecond) + time.Sleep(time.Millisecond) - if atomic.LoadUint64(&Fx) == 0 { - t.Fatal("Signal was not relayed") - } - }) + if atomic.LoadUint64(&Fx) == 0 { + t.Fatal("Signal was not relayed") + } + }) } func TestSend(t *testing.T) { @@ -103,17 +103,17 @@ func TestScan(t *testing.T) { } func BenchmarkRelay(b *testing.B) { - b.Run("Benchmark Relay", func(b *testing.B) { - b.Cleanup(_cleanup) + b.Run("Benchmark Relay", func(b *testing.B) { + b.Cleanup(_cleanup) - b.ResetTimer() + b.ResetTimer() for n := 0; n < b.N; n++ { - Relay([]string{"localhost"}) + Relay([]string{"localhost"}) } b.StopTimer() - }) + }) } func BenchmarkSend(b *testing.B) { diff --git a/pkg/sub/types.go b/pkg/sub/types.go index b72f0f5..0211c5f 100644 --- a/pkg/sub/types.go +++ b/pkg/sub/types.go @@ -46,8 +46,8 @@ type Space struct { // by the time the signal they are pointing to is dropped. type states struct { sync.RWMutex - // Underlying map. - m map[string]*signal + // Underlying map. + m map[string]*signal } // A signal represents a received data package. @@ -57,10 +57,10 @@ type states struct { // A signal with nil as value for data or next has been dropped // and will be consumed by the garbage collector soon. type signal struct { - // Time of receiving. + // Time of receiving. time int64 - // Received data. + // Received data. data []byte - // Next signal. + // Next signal. next *signal }