Skip to content

Commit

Permalink
feat: packet loss analyzer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
r3inbowari committed May 8, 2024
1 parent 91a2f7d commit 9606a8f
Show file tree
Hide file tree
Showing 8 changed files with 360 additions and 12 deletions.
File renamed without changes.
55 changes: 55 additions & 0 deletions example/packet_loss/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"fmt"
"github.com/showwin/speedtest-go/speedtest"
"github.com/showwin/speedtest-go/speedtest/transport"
"log"
"sync"
)

// Note: The current packet loss analyzer does not support udp over http.
// This means we cannot get packet loss through a proxy.
func main() {
// 0. Fetching servers
serverList, err := speedtest.FetchServers()
checkError(err)

// 1. Retrieve available servers
targets := serverList.Available()

// 2. Create a packet loss analyzer, use default options
analyzer, err := speedtest.NewPacketLossAnalyzer(nil)
checkError(err)

wg := &sync.WaitGroup{}
// 3. Perform packet loss analysis on all available servers
for _, server := range *targets {
wg.Add(1)
//ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
//go func(server *speedtest.Server, analyzer *speedtest.PacketLossAnalyzer, ctx context.Context, cancel context.CancelFunc) {
go func(server *speedtest.Server, analyzer *speedtest.PacketLossAnalyzer) {
//defer cancel()
defer wg.Done()
// Note: Please call ctx.cancel at the appropriate time to release resources if you use analyzer.RunWithContext
// we using analyzer.Run() here.
err = analyzer.Run(server.Host, func(packetLoss *transport.PLoss) {
fmt.Println(packetLoss, server.Host, server.Name)
})
//err = analyzer.RunWithContext(ctx, server.Host, func(packetLoss *transport.PLoss) {
// fmt.Println(packetLoss, server.Host, server.Name)
//})
if err != nil {
fmt.Println(err)
}
//}(server, analyzer, ctx, cancel)
}(server, analyzer)
}
wg.Wait()
}

func checkError(err error) {
if err != nil {
log.Fatal(err)
}
}
29 changes: 28 additions & 1 deletion speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"context"
"errors"
"fmt"
"github.com/showwin/speedtest-go/speedtest/transport"
"gopkg.in/alecthomas/kingpin.v2"
"os"
"strconv"
Expand Down Expand Up @@ -128,6 +130,24 @@ func main() {
task.Printf("Latency: %v Jitter: %v Min: %v Max: %v", server.Latency, server.Jitter, server.MinLatency, server.MaxLatency)
task.Complete()
})

// 3.0 create a packet loss analyzer, use default options
var analyzer *speedtest.PacketLossAnalyzer
analyzer, err = speedtest.NewPacketLossAnalyzer(&speedtest.PacketLossAnalyzerOptions{
SourceInterface: *source,
})
server.PacketLoss = -1.0 // N/A as default
packetLossAnalyzerCtx, packetLossAnalyzerCancel := context.WithTimeout(context.Background(), time.Second*40)
go func() {
err = analyzer.RunWithContext(packetLossAnalyzerCtx, server.Host, func(packetLoss *transport.PLoss) {
server.PacketLoss = packetLoss.Loss()
})
if errors.Is(err, transport.ErrUnsupported) {
packetLossAnalyzerCancel() // cancel early
}
}()

// 3.1 create accompany Echo
accEcho := newAccompanyEcho(server, time.Millisecond*500)
taskManager.Run("Download", func(task *Task) {
accEcho.Run()
Expand Down Expand Up @@ -172,8 +192,15 @@ func main() {
})
taskManager.Reset()
speedtestClient.Manager.Reset()
packetLossAnalyzerCancel()
if !*jsonOutput {
if server.PacketLoss != -1 {
fmt.Printf(" Packet Loss: %.2f%%", server.PacketLoss*100)
} else {
fmt.Printf(" Packet Loss: N/A")
}
}
}

taskManager.Stop()

if *jsonOutput {
Expand Down
123 changes: 123 additions & 0 deletions speedtest/loss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package speedtest

import (
"context"
"github.com/showwin/speedtest-go/speedtest/transport"
"net"
"time"
)

type PacketLossAnalyzerOptions struct {
RemoteSamplingInterval time.Duration
SamplingDuration time.Duration
PacketSendingInterval time.Duration
PacketSendingTimeout time.Duration
SourceInterface string // source interface
TCPDialer *net.Dialer // tcp dialer for sampling
UDPDialer *net.Dialer // udp dialer for sending packet

}

type PacketLossAnalyzer struct {
options *PacketLossAnalyzerOptions
}

func NewPacketLossAnalyzer(options *PacketLossAnalyzerOptions) (*PacketLossAnalyzer, error) {
if options == nil {
options = &PacketLossAnalyzerOptions{}
}
if options.SamplingDuration == 0 {
options.SamplingDuration = time.Second * 30
}
if options.RemoteSamplingInterval == 0 {
options.RemoteSamplingInterval = 1 * time.Second
}
if options.PacketSendingInterval == 0 {
options.PacketSendingInterval = 67 * time.Millisecond
}
if options.PacketSendingTimeout == 0 {
options.PacketSendingTimeout = 5 * time.Second
}
if options.TCPDialer == nil {
options.TCPDialer = &net.Dialer{
Timeout: options.PacketSendingTimeout,
}
}
if options.UDPDialer == nil {
var addr net.Addr
if len(options.SourceInterface) > 0 {
// skip error and using auto-select
addr, _ = net.ResolveUDPAddr("udp", options.SourceInterface)
}
options.UDPDialer = &net.Dialer{
Timeout: options.PacketSendingTimeout,
LocalAddr: addr,
}
}
return &PacketLossAnalyzer{
options: options,
}, nil
}

func (pla *PacketLossAnalyzer) Run(host string, callback func(packetLoss *transport.PLoss)) error {
ctx, cancel := context.WithTimeout(context.Background(), pla.options.SamplingDuration)
defer cancel()
return pla.RunWithContext(ctx, host, callback)
}

func (pla *PacketLossAnalyzer) RunWithContext(ctx context.Context, host string, callback func(packetLoss *transport.PLoss)) error {
samplerClient, err := transport.NewClient(pla.options.TCPDialer)
if err != nil {
return transport.ErrUnsupported
}
senderClient, err := transport.NewPacketLossSender(samplerClient.ID(), pla.options.UDPDialer)
if err != nil {
return transport.ErrUnsupported
}

if err = samplerClient.Connect(ctx, host); err != nil {
return transport.ErrUnsupported
}
if err = senderClient.Connect(ctx, host); err != nil {
return transport.ErrUnsupported
}
if err = samplerClient.InitPacketLoss(); err != nil {
return transport.ErrUnsupported
}
go pla.loopSender(ctx, senderClient)
return pla.loopSampler(ctx, samplerClient, callback)
}

func (pla *PacketLossAnalyzer) loopSampler(ctx context.Context, client *transport.Client, callback func(packetLoss *transport.PLoss)) error {
ticker := time.NewTicker(pla.options.RemoteSamplingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if pl, err1 := client.PacketLoss(); err1 == nil {
if pl != nil {
callback(pl)
}
} else {
return err1
}
case <-ctx.Done():
return nil
}
}
}

func (pla *PacketLossAnalyzer) loopSender(ctx context.Context, senderClient *transport.PacketLossSender) {
order := 0
sendTick := time.NewTicker(pla.options.PacketSendingInterval)
defer sendTick.Stop()
for {
select {
case <-sendTick.C:
_ = senderClient.Send(order)
order++
case <-ctx.Done():
return
}
}
}
7 changes: 5 additions & 2 deletions speedtest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,11 @@ func (s *Server) TCPPing(
pingDst = s.Host
}
failTimes := 0
client := transport.NewClient(s.Context.tcpDialer, pingDst)
err = client.Connect()
client, err := transport.NewClient(s.Context.tcpDialer)
if err != nil {
return nil, err
}
err = client.Connect(ctx, pingDst)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions speedtest/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Server struct {
DLSpeed ByteRate `json:"dl_speed"`
ULSpeed ByteRate `json:"ul_speed"`
TestDuration TestDuration `json:"test_duration"`
PacketLoss float64 `json:"packet_loss"`

Context *Speedtest `json:"-"`
}
Expand Down
92 changes: 83 additions & 9 deletions speedtest/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package transport

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -14,21 +15,25 @@ var (
pingPrefix = []byte{0x50, 0x49, 0x4e, 0x47, 0x20}
// downloadPrefix = []byte{0x44, 0x4F, 0x57, 0x4E, 0x4C, 0x4F, 0x41, 0x44, 0x20}
// uploadPrefix = []byte{0x55, 0x50, 0x4C, 0x4F, 0x41, 0x44, 0x20}
initPacket = []byte{0x49, 0x4e, 0x49, 0x54, 0x50, 0x4c, 0x4f, 0x53, 0x53}
packetLoss = []byte{0x50, 0x4c, 0x4f, 0x53, 0x53}
hiFormat = []byte{0x48, 0x49}
quitFormat = []byte{0x51, 0x55, 0x49, 0x54}
)

var (
ErrEchoData = errors.New("incorrect echo data")
ErrEmptyConn = errors.New("empty conn")
ErrUnsupported = errors.New("unsupported protocol tcp") // Some servers have disabled ip:8080, we return this error.
ErrEchoData = errors.New("incorrect echo data")
ErrEmptyConn = errors.New("empty conn")
ErrUnsupported = errors.New("unsupported protocol") // Some servers have disabled ip:8080, we return this error.
ErrUninitializedPacketLossInst = errors.New("uninitialized packet loss inst")
)

func pingFormat(locTime int64) []byte {
return strconv.AppendInt(pingPrefix, locTime, 10)
}

type Client struct {
id string
conn net.Conn
host string
version string
Expand All @@ -38,17 +43,29 @@ type Client struct {
reader *bufio.Reader
}

func NewClient(dialer *net.Dialer, host string) *Client {
func NewClient(dialer *net.Dialer) (*Client, error) {
uuid, err := generateUUID()
if err != nil {
return nil, err
}
return &Client{
host: host,
id: uuid,
dialer: dialer,
}
}, nil
}

func (client *Client) ID() string {
return client.id
}

func (client *Client) Connect() (err error) {
client.conn, err = client.dialer.Dial("tcp", client.host)
func (client *Client) Connect(ctx context.Context, host string) (err error) {
client.host = host
client.conn, err = client.dialer.DialContext(ctx, "tcp", client.host)
if err != nil {
return err
}
client.reader = bufio.NewReader(client.conn)
return
return nil
}

func (client *Client) Disconnect() (err error) {
Expand Down Expand Up @@ -143,6 +160,63 @@ func (client *Client) PingContext(ctx context.Context) (int64, error) {
}
}

func (client *Client) InitPacketLoss() error {
id := client.id
payload := append(hiFormat, 0x20)
payload = append(payload, []byte(id)...)
err := client.Write(payload)
if err != nil {
return err
}
return client.Write(initPacket)
}

type PLoss struct {
Sent int
Dup int
MaximumReceived int
}

func (p *PLoss) String() string {
return fmt.Sprintf("Sent: %d, DupPacket: %d, MaximumReceived: %d", p.Sent, p.Dup, p.MaximumReceived)
}

func (p *PLoss) Loss() float64 {
return 1 - (float64(p.Sent-p.Dup))/float64(p.MaximumReceived+1)
}

func (client *Client) PacketLoss() (*PLoss, error) {
err := client.Write(packetLoss)
if err != nil {
return nil, err
}
result, err := client.Read()
if err != nil {
return nil, err
}
splitResult := bytes.Split(result, []byte{0x20})
if len(splitResult) < 3 || !bytes.Equal(splitResult[0], packetLoss) {
return nil, nil
}
x0, err := strconv.Atoi(string(splitResult[1]))
if err != nil {
return nil, err
}
x1, err := strconv.Atoi(string(splitResult[2]))
if err != nil {
return nil, err
}
x2, err := strconv.Atoi(string(bytes.TrimRight(splitResult[3], "\n")))
if err != nil {
return nil, err
}
return &PLoss{
Sent: x0,
Dup: x1,
MaximumReceived: x2,
}, nil
}

func (client *Client) Download() {
panic("Unimplemented method: Client.Download()")
}
Expand Down
Loading

0 comments on commit 9606a8f

Please sign in to comment.