diff --git a/example/main.go b/example/naive/main.go similarity index 100% rename from example/main.go rename to example/naive/main.go diff --git a/example/packet_loss/main.go b/example/packet_loss/main.go new file mode 100644 index 0000000..0f66183 --- /dev/null +++ b/example/packet_loss/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + "github.com/showwin/speedtest-go/speedtest" + "github.com/showwin/speedtest-go/speedtest/transport" + "log" + "sync" +) + +// Note: The current packet loss analyzer does not support udp over http. +// This means we cannot get packet loss through a proxy. +func main() { + // 0. Fetching servers + serverList, err := speedtest.FetchServers() + checkError(err) + + // 1. Retrieve available servers + targets := serverList.Available() + + // 2. Create a packet loss analyzer, use default options + analyzer, err := speedtest.NewPacketLossAnalyzer(nil) + checkError(err) + + wg := &sync.WaitGroup{} + // 3. Perform packet loss analysis on all available servers + for _, server := range *targets { + wg.Add(1) + //ctx, cancel := context.WithTimeout(context.Background(), time.Second*20) + //go func(server *speedtest.Server, analyzer *speedtest.PacketLossAnalyzer, ctx context.Context, cancel context.CancelFunc) { + go func(server *speedtest.Server, analyzer *speedtest.PacketLossAnalyzer) { + //defer cancel() + defer wg.Done() + // Note: Please call ctx.cancel at the appropriate time to release resources if you use analyzer.RunWithContext + // we using analyzer.Run() here. + err = analyzer.Run(server.Host, func(packetLoss *transport.PLoss) { + fmt.Println(packetLoss, server.Host, server.Name) + }) + //err = analyzer.RunWithContext(ctx, server.Host, func(packetLoss *transport.PLoss) { + // fmt.Println(packetLoss, server.Host, server.Name) + //}) + if err != nil { + fmt.Println(err) + } + //}(server, analyzer, ctx, cancel) + }(server, analyzer) + } + wg.Wait() +} + +func checkError(err error) { + if err != nil { + log.Fatal(err) + } +} diff --git a/speedtest.go b/speedtest.go index d6c8d5b..c2cca16 100644 --- a/speedtest.go +++ b/speedtest.go @@ -2,7 +2,9 @@ package main import ( "context" + "errors" "fmt" + "github.com/showwin/speedtest-go/speedtest/transport" "gopkg.in/alecthomas/kingpin.v2" "os" "strconv" @@ -128,6 +130,24 @@ func main() { task.Printf("Latency: %v Jitter: %v Min: %v Max: %v", server.Latency, server.Jitter, server.MinLatency, server.MaxLatency) task.Complete() }) + + // 3.0 create a packet loss analyzer, use default options + var analyzer *speedtest.PacketLossAnalyzer + analyzer, err = speedtest.NewPacketLossAnalyzer(&speedtest.PacketLossAnalyzerOptions{ + SourceInterface: *source, + }) + server.PacketLoss = -1.0 // N/A as default + packetLossAnalyzerCtx, packetLossAnalyzerCancel := context.WithTimeout(context.Background(), time.Second*40) + go func() { + err = analyzer.RunWithContext(packetLossAnalyzerCtx, server.Host, func(packetLoss *transport.PLoss) { + server.PacketLoss = packetLoss.Loss() + }) + if errors.Is(err, transport.ErrUnsupported) { + packetLossAnalyzerCancel() // cancel early + } + }() + + // 3.1 create accompany Echo accEcho := newAccompanyEcho(server, time.Millisecond*500) taskManager.Run("Download", func(task *Task) { accEcho.Run() @@ -172,8 +192,15 @@ func main() { }) taskManager.Reset() speedtestClient.Manager.Reset() + packetLossAnalyzerCancel() + if !*jsonOutput { + if server.PacketLoss != -1 { + fmt.Printf(" Packet Loss: %.2f%%", server.PacketLoss*100) + } else { + fmt.Printf(" Packet Loss: N/A") + } + } } - taskManager.Stop() if *jsonOutput { diff --git a/speedtest/loss.go b/speedtest/loss.go new file mode 100644 index 0000000..781fa60 --- /dev/null +++ b/speedtest/loss.go @@ -0,0 +1,123 @@ +package speedtest + +import ( + "context" + "github.com/showwin/speedtest-go/speedtest/transport" + "net" + "time" +) + +type PacketLossAnalyzerOptions struct { + RemoteSamplingInterval time.Duration + SamplingDuration time.Duration + PacketSendingInterval time.Duration + PacketSendingTimeout time.Duration + SourceInterface string // source interface + TCPDialer *net.Dialer // tcp dialer for sampling + UDPDialer *net.Dialer // udp dialer for sending packet + +} + +type PacketLossAnalyzer struct { + options *PacketLossAnalyzerOptions +} + +func NewPacketLossAnalyzer(options *PacketLossAnalyzerOptions) (*PacketLossAnalyzer, error) { + if options == nil { + options = &PacketLossAnalyzerOptions{} + } + if options.SamplingDuration == 0 { + options.SamplingDuration = time.Second * 30 + } + if options.RemoteSamplingInterval == 0 { + options.RemoteSamplingInterval = 1 * time.Second + } + if options.PacketSendingInterval == 0 { + options.PacketSendingInterval = 67 * time.Millisecond + } + if options.PacketSendingTimeout == 0 { + options.PacketSendingTimeout = 5 * time.Second + } + if options.TCPDialer == nil { + options.TCPDialer = &net.Dialer{ + Timeout: options.PacketSendingTimeout, + } + } + if options.UDPDialer == nil { + var addr net.Addr + if len(options.SourceInterface) > 0 { + // skip error and using auto-select + addr, _ = net.ResolveUDPAddr("udp", options.SourceInterface) + } + options.UDPDialer = &net.Dialer{ + Timeout: options.PacketSendingTimeout, + LocalAddr: addr, + } + } + return &PacketLossAnalyzer{ + options: options, + }, nil +} + +func (pla *PacketLossAnalyzer) Run(host string, callback func(packetLoss *transport.PLoss)) error { + ctx, cancel := context.WithTimeout(context.Background(), pla.options.SamplingDuration) + defer cancel() + return pla.RunWithContext(ctx, host, callback) +} + +func (pla *PacketLossAnalyzer) RunWithContext(ctx context.Context, host string, callback func(packetLoss *transport.PLoss)) error { + samplerClient, err := transport.NewClient(pla.options.TCPDialer) + if err != nil { + return transport.ErrUnsupported + } + senderClient, err := transport.NewPacketLossSender(samplerClient.ID(), pla.options.UDPDialer) + if err != nil { + return transport.ErrUnsupported + } + + if err = samplerClient.Connect(ctx, host); err != nil { + return transport.ErrUnsupported + } + if err = senderClient.Connect(ctx, host); err != nil { + return transport.ErrUnsupported + } + if err = samplerClient.InitPacketLoss(); err != nil { + return transport.ErrUnsupported + } + go pla.loopSender(ctx, senderClient) + return pla.loopSampler(ctx, samplerClient, callback) +} + +func (pla *PacketLossAnalyzer) loopSampler(ctx context.Context, client *transport.Client, callback func(packetLoss *transport.PLoss)) error { + ticker := time.NewTicker(pla.options.RemoteSamplingInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if pl, err1 := client.PacketLoss(); err1 == nil { + if pl != nil { + callback(pl) + } + } else { + return err1 + } + case <-ctx.Done(): + return nil + } + } +} + +func (pla *PacketLossAnalyzer) loopSender(ctx context.Context, senderClient *transport.PacketLossSender) { + order := 0 + sendTick := time.NewTicker(pla.options.PacketSendingInterval) + defer sendTick.Stop() + for { + select { + case <-sendTick.C: + _ = senderClient.Send(order) + order++ + case <-ctx.Done(): + return + } + } +} diff --git a/speedtest/request.go b/speedtest/request.go index 13851bf..029b6ea 100644 --- a/speedtest/request.go +++ b/speedtest/request.go @@ -279,8 +279,11 @@ func (s *Server) TCPPing( pingDst = s.Host } failTimes := 0 - client := transport.NewClient(s.Context.tcpDialer, pingDst) - err = client.Connect() + client, err := transport.NewClient(s.Context.tcpDialer) + if err != nil { + return nil, err + } + err = client.Connect(ctx, pingDst) if err != nil { return nil, err } diff --git a/speedtest/server.go b/speedtest/server.go index e3be322..2265384 100644 --- a/speedtest/server.go +++ b/speedtest/server.go @@ -51,6 +51,7 @@ type Server struct { DLSpeed ByteRate `json:"dl_speed"` ULSpeed ByteRate `json:"ul_speed"` TestDuration TestDuration `json:"test_duration"` + PacketLoss float64 `json:"packet_loss"` Context *Speedtest `json:"-"` } diff --git a/speedtest/transport/tcp.go b/speedtest/transport/tcp.go index 66e3f8b..2a55ecb 100644 --- a/speedtest/transport/tcp.go +++ b/speedtest/transport/tcp.go @@ -2,6 +2,7 @@ package transport import ( "bufio" + "bytes" "context" "errors" "fmt" @@ -14,14 +15,17 @@ var ( pingPrefix = []byte{0x50, 0x49, 0x4e, 0x47, 0x20} // downloadPrefix = []byte{0x44, 0x4F, 0x57, 0x4E, 0x4C, 0x4F, 0x41, 0x44, 0x20} // uploadPrefix = []byte{0x55, 0x50, 0x4C, 0x4F, 0x41, 0x44, 0x20} + initPacket = []byte{0x49, 0x4e, 0x49, 0x54, 0x50, 0x4c, 0x4f, 0x53, 0x53} + packetLoss = []byte{0x50, 0x4c, 0x4f, 0x53, 0x53} hiFormat = []byte{0x48, 0x49} quitFormat = []byte{0x51, 0x55, 0x49, 0x54} ) var ( - ErrEchoData = errors.New("incorrect echo data") - ErrEmptyConn = errors.New("empty conn") - ErrUnsupported = errors.New("unsupported protocol tcp") // Some servers have disabled ip:8080, we return this error. + ErrEchoData = errors.New("incorrect echo data") + ErrEmptyConn = errors.New("empty conn") + ErrUnsupported = errors.New("unsupported protocol") // Some servers have disabled ip:8080, we return this error. + ErrUninitializedPacketLossInst = errors.New("uninitialized packet loss inst") ) func pingFormat(locTime int64) []byte { @@ -29,6 +33,7 @@ func pingFormat(locTime int64) []byte { } type Client struct { + id string conn net.Conn host string version string @@ -38,17 +43,29 @@ type Client struct { reader *bufio.Reader } -func NewClient(dialer *net.Dialer, host string) *Client { +func NewClient(dialer *net.Dialer) (*Client, error) { + uuid, err := generateUUID() + if err != nil { + return nil, err + } return &Client{ - host: host, + id: uuid, dialer: dialer, - } + }, nil +} + +func (client *Client) ID() string { + return client.id } -func (client *Client) Connect() (err error) { - client.conn, err = client.dialer.Dial("tcp", client.host) +func (client *Client) Connect(ctx context.Context, host string) (err error) { + client.host = host + client.conn, err = client.dialer.DialContext(ctx, "tcp", client.host) + if err != nil { + return err + } client.reader = bufio.NewReader(client.conn) - return + return nil } func (client *Client) Disconnect() (err error) { @@ -143,6 +160,63 @@ func (client *Client) PingContext(ctx context.Context) (int64, error) { } } +func (client *Client) InitPacketLoss() error { + id := client.id + payload := append(hiFormat, 0x20) + payload = append(payload, []byte(id)...) + err := client.Write(payload) + if err != nil { + return err + } + return client.Write(initPacket) +} + +type PLoss struct { + Sent int + Dup int + MaximumReceived int +} + +func (p *PLoss) String() string { + return fmt.Sprintf("Sent: %d, DupPacket: %d, MaximumReceived: %d", p.Sent, p.Dup, p.MaximumReceived) +} + +func (p *PLoss) Loss() float64 { + return 1 - (float64(p.Sent-p.Dup))/float64(p.MaximumReceived+1) +} + +func (client *Client) PacketLoss() (*PLoss, error) { + err := client.Write(packetLoss) + if err != nil { + return nil, err + } + result, err := client.Read() + if err != nil { + return nil, err + } + splitResult := bytes.Split(result, []byte{0x20}) + if len(splitResult) < 3 || !bytes.Equal(splitResult[0], packetLoss) { + return nil, nil + } + x0, err := strconv.Atoi(string(splitResult[1])) + if err != nil { + return nil, err + } + x1, err := strconv.Atoi(string(splitResult[2])) + if err != nil { + return nil, err + } + x2, err := strconv.Atoi(string(bytes.TrimRight(splitResult[3], "\n"))) + if err != nil { + return nil, err + } + return &PLoss{ + Sent: x0, + Dup: x1, + MaximumReceived: x2, + }, nil +} + func (client *Client) Download() { panic("Unimplemented method: Client.Download()") } diff --git a/speedtest/transport/udp.go b/speedtest/transport/udp.go new file mode 100644 index 0000000..04dbc70 --- /dev/null +++ b/speedtest/transport/udp.go @@ -0,0 +1,65 @@ +package transport + +import ( + "bytes" + "context" + "crypto/rand" + "fmt" + mrand "math/rand" + "net" + "strconv" + "strings" + "time" +) + +var ( + loss = []byte{0x4c, 0x4f, 0x53, 0x53} +) + +type PacketLossSender struct { + ID string // UUID + nounce int // Random int (maybe) [0,10000000000) + withTimestamp bool // With timestamp (ten seconds level) + conn net.Conn // UDP Conn + raw []byte + host string + dialer *net.Dialer +} + +func NewPacketLossSender(uuid string, dialer *net.Dialer) (*PacketLossSender, error) { + rd := mrand.New(mrand.NewSource(time.Now().UnixNano())) + nounce := rd.Intn(10000000000) + p := &PacketLossSender{ + ID: strings.ToUpper(uuid), + nounce: nounce, + withTimestamp: false, // we close it as default, we won't be able to use it right now. + dialer: dialer, + } + p.raw = []byte(fmt.Sprintf("%s %d %s %s", loss, nounce, "#", uuid)) + return p, nil +} + +func (ps *PacketLossSender) Connect(ctx context.Context, host string) (err error) { + ps.host = host + ps.conn, err = ps.dialer.DialContext(ctx, "udp", ps.host) + return err +} + +// Send +// @param order the value will be sent +func (ps *PacketLossSender) Send(order int) error { + payload := bytes.Replace(ps.raw, []byte{0x23}, []byte(strconv.Itoa(order)), 1) + _, err := ps.conn.Write(payload) + return err +} + +func generateUUID() (string, error) { + randUUID := make([]byte, 16) + _, err := rand.Read(randUUID) + if err != nil { + return "", err + } + randUUID[8] = randUUID[8]&^0xc0 | 0x80 + randUUID[6] = randUUID[6]&^0xf0 | 0x40 + return fmt.Sprintf("%x-%x-%x-%x-%x", randUUID[0:4], randUUID[4:6], randUUID[6:8], randUUID[8:10], randUUID[10:]), nil +}