Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: test using multi server #110

Merged
merged 5 commits into from
Feb 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Flags:
eg: --proxy=http://10.20.0.101:7890
--source Bind a source interface for the speedtest.
eg: --source=10.20.0.101
-m --multi Enable multi mode.
-t --thread Set the number of speedtest threads.
--version Show application version.
```

Expand Down
75 changes: 70 additions & 5 deletions speedtest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
Expand All @@ -13,14 +14,16 @@ import (
var (
showList = kingpin.Flag("list", "Show available speedtest.net servers.").Short('l').Bool()
serverIds = kingpin.Flag("server", "Select server id to run speedtest.").Short('s').Ints()
customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net").String()
customURL = kingpin.Flag("custom-url", "Specify the url of the server instead of getting a list from Speedtest.net.").String()
savingMode = kingpin.Flag("saving-mode", "Using less memory (≒10MB), though low accuracy (especially > 30Mbps).").Bool()
jsonOutput = kingpin.Flag("json", "Output results in json format").Bool()
location = kingpin.Flag("location", "Change the location with a precise coordinate. Format: lat,lon").String()
city = kingpin.Flag("city", "Change the location with a predefined city label.").String()
showCityList = kingpin.Flag("city-list", "List all predefined city labels.").Bool()
proxy = kingpin.Flag("proxy", "Set a proxy(http(s) or socks) for the speedtest.").String()
source = kingpin.Flag("source", "Bind a source interface for the speedtest.").String()
multi = kingpin.Flag("multi", "Enable multi mode.").Short('m').Bool()
thread = kingpin.Flag("thread", "Set the number of speedtest threads.").Short('t').Int()
)

type fullOutput struct {
Expand All @@ -36,6 +39,7 @@ func main() {
kingpin.Parse()

var speedtestClient = speedtest.New()
speedtestClient.SetNThread(*thread)

if len(*proxy) > 0 || len(*source) > 0 {
config := &speedtest.UserConfig{
Expand Down Expand Up @@ -93,7 +97,11 @@ func main() {
targets = []*speedtest.Server{target}
}

startTest(targets, *savingMode, *jsonOutput)
if *multi {
startMultiTest(targets[0], servers, *savingMode, *jsonOutput)
} else {
startTest(targets, *savingMode, *jsonOutput, *multi)
}

if *jsonOutput {
jsonBytes, err := json.Marshal(
Expand All @@ -109,7 +117,37 @@ func main() {
}
}

func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
func startMultiTest(s *speedtest.Server, servers speedtest.Servers, savingMode bool, jsonOutput bool) {
// Reset DataManager counters, avoid measurement of multiple server result mixing.
s.Context.Reset()
if !jsonOutput {
showServer(s)
}

err := s.PingTest()
checkError(err)

if jsonOutput {
err = s.MultiDownloadTestContext(context.Background(), servers, savingMode)
checkError(err)
s.Context.Wait()
err = s.MultiUploadTestContext(context.Background(), servers, savingMode)
checkError(err)
return
}

showLatencyResult(s)
err = testDownloadM(s, servers, savingMode)
checkError(err)
// It is necessary to wait for the release of the last test resource,
// otherwise the overload will cause excessive data deviation
s.Context.Wait()
err = testUploadM(s, servers, savingMode)
checkError(err)
showServerResult(s)
}

func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool, multi bool) {
for _, s := range servers {
// Reset DataManager counters, avoid measurement of multiple server result mixing.
s.Context.Reset()
Expand All @@ -121,12 +159,11 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
checkError(err)

if jsonOutput {
err := s.DownloadTest(savingMode)
err = s.DownloadTest(savingMode)
checkError(err)
s.Context.Wait()
err = s.UploadTest(savingMode)
checkError(err)

continue
}

Expand All @@ -147,6 +184,34 @@ func startTest(servers speedtest.Servers, savingMode bool, jsonOutput bool) {
}
}

func testDownloadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Download Test: ")
go dots(quit)
err := server.MultiDownloadTestContext(context.Background(), servers, savingMode)
checkError(err)
quit <- true
if err != nil {
return err
}
fmt.Println()
return err
}

func testUploadM(server *speedtest.Server, servers speedtest.Servers, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Upload Test: ")
go dots(quit)
err := server.MultiUploadTestContext(context.Background(), servers, savingMode)
checkError(err)
quit <- true
if err != nil {
return err
}
fmt.Println()
return nil
}

func testDownload(server *speedtest.Server, savingMode bool) error {
quit := make(chan bool)
fmt.Printf("Download Test: ")
Expand Down
144 changes: 112 additions & 32 deletions speedtest/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ type Manager interface {
CallbackDownloadRate(callback func(downRate float64)) *time.Ticker
CallbackUploadRate(callback func(upRate float64)) *time.Ticker

DownloadRateCaptureHandler(fn func())
UploadRateCaptureHandler(fn func())
RegisterDownloadHandler(fn func()) *FuncGroup
RegisterUploadHandler(fn func()) *FuncGroup

// Wait for the upload or download task to end to avoid errors caused by core occupation
Wait()
Reset()

SetNThread(n int) Manager
}

type Chunk interface {
Expand All @@ -55,6 +57,15 @@ const TypeEmptyChunk = 0
const TypeDownload = 1
const TypeUpload = 2

type FuncGroup struct {
fns []func()
manager *DataManager
}

func (f *FuncGroup) Add(fn func()) {
f.fns = append(f.fns, fn)
}

type DataManager struct {
totalDownload int64
totalUpload int64
Expand All @@ -70,6 +81,11 @@ type DataManager struct {
captureTime time.Duration
rateCaptureFrequency time.Duration
nThread int

running bool

dFn *FuncGroup
uFn *FuncGroup
}

func NewDataManager() *DataManager {
Expand All @@ -78,6 +94,8 @@ func NewDataManager() *DataManager {
captureTime: time.Second * 10,
rateCaptureFrequency: time.Second,
}
ret.dFn = &FuncGroup{manager: ret}
ret.uFn = &FuncGroup{manager: ret}
return ret
}

Expand Down Expand Up @@ -130,56 +148,103 @@ func (dm *DataManager) Wait() {
}
}

func (dm *DataManager) DownloadRateCaptureHandler(fn func()) {
dm.testHandler(dm.downloadRateCapture, fn)
func (dm *DataManager) RegisterUploadHandler(fn func()) *FuncGroup {
if len(dm.uFn.fns) < dm.nThread {
dm.uFn.Add(fn)
}
return dm.uFn
}

func (dm *DataManager) UploadRateCaptureHandler(fn func()) {
dm.testHandler(dm.uploadRateCapture, fn)
func (dm *DataManager) RegisterDownloadHandler(fn func()) *FuncGroup {
if len(dm.dFn.fns) < dm.nThread {
dm.dFn.Add(fn)
}
return dm.dFn
}

func (dm *DataManager) testHandler(captureFunc func() *time.Ticker, fn func()) {
ticker := captureFunc()
running := true
func (f *FuncGroup) Start(mainRequestHandlerIndex int) {
if len(f.fns) == 0 {
panic("empty task stack")
}
if mainRequestHandlerIndex > len(f.fns)-1 {
mainRequestHandlerIndex = 0
}
mainLoadFactor := 0.3
// When the number of processor cores is equivalent to the processing program,
// the processing efficiency reaches the highest level (VT is not considered).
mainN := int(mainLoadFactor * float64(len(f.fns)))
if mainN == 0 {
mainN = 1
}
if len(f.fns) == 1 {
mainN = f.manager.nThread
}
auxN := f.manager.nThread - mainN

wg := sync.WaitGroup{}
time.AfterFunc(dm.captureTime, func() {
f.manager.running = true
ticker := f.manager.rateCapture()
time.AfterFunc(f.manager.captureTime, func() {
ticker.Stop()
running = false
f.manager.running = false
})
// When the number of processor cores is equivalent to the processing program,
// the processing efficiency reaches the highest level (VT is not considered).
for i := 0; i < dm.nThread; i++ {

for i := 0; i < mainN; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
if !running {
if !f.manager.running {
return
}
fn()
f.fns[mainRequestHandlerIndex]()
}
}()
}
wg.Wait()
}

func (dm *DataManager) downloadRateCapture() *time.Ticker {
return dm.rateCapture(dm.GetTotalDownload, &dm.DownloadRateSequence)
}
for j := 0; j < auxN; {
for i := range f.fns {
if j == auxN {
break
}
if i == mainRequestHandlerIndex {
continue
}
wg.Add(1)
t := i
go func() {
defer wg.Done()
for {
if !f.manager.running {
return
}
f.fns[t]()
}
}()
j++
}
}

func (dm *DataManager) uploadRateCapture() *time.Ticker {
return dm.rateCapture(dm.GetTotalUpload, &dm.UploadRateSequence)
wg.Wait()
}

func (dm *DataManager) rateCapture(rateFunc func() int64, dst *[]int64) *time.Ticker {
func (dm *DataManager) rateCapture() *time.Ticker {
ticker := time.NewTicker(dm.rateCaptureFrequency)
oldTotal := rateFunc()
oldTotalDownload := dm.totalDownload
oldTotalUpload := dm.totalUpload
go func() {
for range ticker.C {
newTotal := rateFunc()
delta := newTotal - oldTotal
oldTotal = newTotal
*dst = append(*dst, delta)
newTotalDownload := dm.totalDownload
newTotalUpload := dm.totalUpload
deltaDownload := newTotalDownload - oldTotalDownload
deltaUpload := newTotalUpload - oldTotalUpload
oldTotalDownload = newTotalDownload
oldTotalUpload = newTotalUpload
if deltaDownload != 0 {
dm.DownloadRateSequence = append(dm.DownloadRateSequence, deltaDownload)
}
if deltaUpload != 0 {
dm.UploadRateSequence = append(dm.UploadRateSequence, deltaUpload)
}
}
}()
return ticker
Expand Down Expand Up @@ -221,8 +286,12 @@ func (dm *DataManager) SetCaptureTime(duration time.Duration) *DataManager {
return dm
}

func (dm *DataManager) SetNThread(n int) *DataManager {
dm.nThread = n
func (dm *DataManager) SetNThread(n int) Manager {
if n < 1 {
dm.nThread = runtime.NumCPU()
} else {
dm.nThread = n
}
return dm
}

Expand All @@ -232,6 +301,8 @@ func (dm *DataManager) Reset() {
dm.DataGroup = []*DataChunk{}
dm.DownloadRateSequence = []int64{}
dm.UploadRateSequence = []int64{}
dm.dFn.fns = []func(){}
dm.uFn.fns = []func(){}
}

func (dm *DataManager) GetAvgDownloadRate() float64 {
Expand Down Expand Up @@ -350,6 +421,9 @@ func (dc *DataChunk) Read(b []byte) (n int, err error) {

// calcMAFilter Median-Averaging Filter
func calcMAFilter(list []int64) float64 {
if len(list) == 0 {
return 0
}
var sum int64 = 0
n := len(list)
if n == 0 {
Expand All @@ -370,6 +444,9 @@ func calcMAFilter(list []int64) float64 {
}

func pautaFilter(vector []int64) []int64 {
if len(vector) == 0 {
return vector
}
mean, _, std, _, _ := sampleVariance(vector)
var retVec []int64
for _, value := range vector {
Expand All @@ -382,6 +459,9 @@ func pautaFilter(vector []int64) []int64 {

// standardDeviation sample Variance
func sampleVariance(vector []int64) (mean, variance, stdDev, min, max int64) {
if len(vector) == 0 {
return 0, 0, 0, 0, 0
}
var sumNum, accumulate int64
min = math.MaxInt64
max = math.MinInt64
Expand Down
Loading