From dde345d8b48a045c04aaa6aa18f09633bdac701f Mon Sep 17 00:00:00 2001 From: Jacob Gadikian Date: Sat, 28 Oct 2023 00:25:00 +0800 Subject: [PATCH] attempt at threading --- lib/checknode.go | 7 +++---- lib/lib.go | 47 ++++++++++++++++++++++++++++++++++++----------- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/lib/checknode.go b/lib/checknode.go index 841c354..e649e56 100644 --- a/lib/checknode.go +++ b/lib/checknode.go @@ -122,10 +122,9 @@ func CheckNode(nodeAddr string) { unsuccessfulNodes.Unlock() return } - for _, peer := range netinfo.Result.Peers { - peer := peer - ProcessPeer(&peer) - } + + ProcessPeers(netinfo.Result.Peers, 69) + } func CheckNodeGRPC(nodeAddr string) { diff --git a/lib/lib.go b/lib/lib.go index 844831a..54f954f 100644 --- a/lib/lib.go +++ b/lib/lib.go @@ -18,7 +18,7 @@ import ( ) var client = &http.Client{ - Timeout: 3000 * time.Millisecond, + Timeout: 500 * time.Millisecond, Transport: &http.Transport{ MaxIdleConns: 500, IdleConnTimeout: 30 * time.Second, @@ -47,7 +47,7 @@ func FetchStatus(nodeAddr string) (*types.StatusResponse, error) { return &status, nil } -func BuildRPCAddress(peer *types.Peer) string { +func BuildRPCAddress(peer types.Peer) string { rpcAddr := peer.NodeInfo.Other.RPCAddress rpcAddr = strings.TrimPrefix(rpcAddr, "tcp://") @@ -74,8 +74,35 @@ func WriteSectionToToml(file *os.File, sectionName string, nodes map[string]int) } } -// Modify the function signature to: -func ProcessPeer(peer *types.Peer) { +func ProcessPeers(peers []types.Peer, workerCount int) { + // Create a buffered channel to manage workload. + jobs := make(chan *types.Peer, len(peers)) + + // Create a wait group. + var wg sync.WaitGroup + + // Spawn worker goroutines. + for i := 0; i < workerCount; i++ { + go func() { + for peer := range jobs { + processSinglePeer(*peer) + wg.Done() + } + }() + } + + // Queue jobs. + for _, peer := range peers { + wg.Add(1) + jobs <- &peer + } + + // Close the job channel and wait for all jobs to finish. + close(jobs) + wg.Wait() +} + +func processSinglePeer(peer types.Peer) { rpcAddr := BuildRPCAddress(peer) rpcAddr = NormalizeAddressWithRemoteIP(rpcAddr, peer.RemoteIP) CheckNode("http://" + rpcAddr) @@ -83,18 +110,16 @@ func ProcessPeer(peer *types.Peer) { // Fetch network info netInfo, err := FetchNetInfo("http://" + rpcAddr) if err != nil { - // fmt.Println("Error fetching network info:", err) + // fmt.Println("Error fetching network info:", err) return } // Process each peer for _, peer := range netInfo.Result.Peers { - go func(peer types.Peer) { - if !IsNodeVisited(peer.NodeInfo.Other.RPCAddress) { - MarkNodeAsVisited(peer.NodeInfo.Other.RPCAddress) - ProcessPeer(&peer) - } - }(peer) + if !IsNodeVisited(peer.NodeInfo.Other.RPCAddress) { + MarkNodeAsVisited(peer.NodeInfo.Other.RPCAddress) + ProcessPeers([]types.Peer{peer}, 100) // Adjust workerCount as needed + } } }