Skip to content

Commit

Permalink
Merge pull request #363 from CortexFoundation/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
ucwong authored Apr 20, 2023
2 parents 812bdcf + 064b741 commit 21fca97
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 41 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/ucwong/filecache v1.0.5-0.20230402204106-c6ff7d7930ef
github.com/ucwong/go-ttlmap v1.0.2-0.20221020173635-331e7ddde2bb
github.com/ucwong/golang-kv v1.0.16-0.20230412200348-60af39b46284
github.com/ucwong/golang-kv v1.0.16-0.20230420140948-f85ce47d4928
github.com/ucwong/shard v0.0.0-20230406003402-a003024d1f4f
github.com/urfave/cli/v2 v2.25.1
go.etcd.io/bbolt v1.3.7
Expand Down Expand Up @@ -61,7 +61,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/pebble v0.0.0-20230412195903-4e468412c552 // indirect
github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ github.com/cockroachdb/errors v1.9.1/go.mod h1:2sxOtL2WIc096WSZqZ5h8fa17rdDq9HZO
github.com/cockroachdb/logtags v0.0.0-20211118104740-dabe8e521a4f/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE=
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b/go.mod h1:Vz9DsVWQQhf3vs21MhPMZpMGSht7O/2vFW2xusFUVOs=
github.com/cockroachdb/pebble v0.0.0-20230412195903-4e468412c552 h1:n4wDCFjAFEp9aWLJPKVfviVlFrx8GWYjO7UTeT0IJpQ=
github.com/cockroachdb/pebble v0.0.0-20230412195903-4e468412c552/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50=
github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756 h1:NgUWWgwtcPD4JjclHKE6GtJGJmZm0mWHfGjomCA4CsY=
github.com/cockroachdb/pebble v0.0.0-20230420011906-6002e39ce756/go.mod h1:9lRMC4XN3/BLPtIp6kAKwIaHu369NOf2rMucPzipz50=
github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ=
github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
Expand Down Expand Up @@ -575,8 +575,8 @@ github.com/ucwong/filecache v1.0.5-0.20230402204106-c6ff7d7930ef h1:bbArCHS5UrTT
github.com/ucwong/filecache v1.0.5-0.20230402204106-c6ff7d7930ef/go.mod h1:ddwX+NCjMZPdpzcGh1fcEbNTUTCtKgt2hC2rqvmLKgA=
github.com/ucwong/go-ttlmap v1.0.2-0.20221020173635-331e7ddde2bb h1:dVZH3AH9f7zB3VBmsjn25B7lfcAyMP4QxdFYTrfj7tg=
github.com/ucwong/go-ttlmap v1.0.2-0.20221020173635-331e7ddde2bb/go.mod h1:3yswsBsVuwsOjDvFfC5Na9XSEf4HC7mj3W3g6jvSY/s=
github.com/ucwong/golang-kv v1.0.16-0.20230412200348-60af39b46284 h1:GbObPT3x5fgGJo8owcQmJTT6Ir5G/K3AkYXzbqmmeaM=
github.com/ucwong/golang-kv v1.0.16-0.20230412200348-60af39b46284/go.mod h1:k6qOzS8yK+ZSVuN560N9em5kf1wqvr/b961FE8E+Kd4=
github.com/ucwong/golang-kv v1.0.16-0.20230420140948-f85ce47d4928 h1:CbZ1HSx9YG6dHRbMCWS30d9evLzTWK2/hm3fZTNaU3I=
github.com/ucwong/golang-kv v1.0.16-0.20230420140948-f85ce47d4928/go.mod h1:pFPRDx8PPYFli7tBrpt5+ltAzRbWccwUnba5y5wrn/M=
github.com/ucwong/shard v0.0.0-20230406003402-a003024d1f4f h1:q9Y7MB/zxaHiR3Bxp8/qp9iRju/2cCFIE2RM1o79NWo=
github.com/ucwong/shard v0.0.0-20230406003402-a003024d1f4f/go.mod h1:Y/x75znJgnXLUzsTENLnGpiUPVEeizYyg6qwpx9M3T0=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
Expand Down
12 changes: 6 additions & 6 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Monitor struct {
//dl *backend.TorrentManager

exitCh chan any
terminated atomic.Int32
terminated atomic.Bool
lastNumber uint64
startNumber uint64
scope uint64
Expand Down Expand Up @@ -117,7 +117,7 @@ func New(flag *params.Config, cache, compress, listen bool, fs *backend.ChainDB,
//start: mclock.Now(),
}
m.currentNumber.Store(0)
m.terminated.Store(0)
m.terminated.Store(false)
m.blockCache, _ = lru.New(delay)
m.sizeCache, _ = lru.New(batch)
m.listen = listen
Expand Down Expand Up @@ -287,7 +287,7 @@ func (m *Monitor) buildConnection(ipcpath string, rpcuri string) (*rpc.Client, e
return cl, nil
}

if m.terminated.Load() == 1 {
if m.terminated.Load() {
log.Info("Connection builder break")
return nil, errors.New("ipc connection terminated")
}
Expand Down Expand Up @@ -495,10 +495,10 @@ func (m *Monitor) Stop() {
m.lock.Lock()
defer m.lock.Unlock()
//m.closeOnce.Do(func() {
if m.terminated.Load() == 1 {
if m.terminated.Load() {
return
}
m.terminated.Store(1)
m.terminated.Store(true)

m.exit()
log.Info("Monitor is waiting to be closed")
Expand Down Expand Up @@ -715,7 +715,7 @@ func (m *Monitor) syncLastBlock() uint64 {
}
//start := mclock.Now()
for i := minNumber; i <= maxNumber; { // i++ {
if m.terminated.Load() == 1 {
if m.terminated.Load() {
log.Warn("Fs scan terminated", "number", i)
maxNumber = i - 1
break
Expand Down
2 changes: 1 addition & 1 deletion params/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var DefaultConfig = Config{
BoostNodes: TorrentBoostNodes,
Mode: "default",
DisableUTP: true,
DisableDHT: false,
DisableDHT: true,
DisableTCP: false,
DisableIPv6: false,
MaxSeedingNum: LimitSeeding / 2,
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package params

const (
ClientVersion = "-COLA08-"
ClientVersion = "-COLA09-"
)
12 changes: 10 additions & 2 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,20 @@ func (peer *Peer) handshake() error {
Files: uint64(peer.host.Congress()),
Leafs: uint64(len(peer.host.chain().Blocks())),
}
errc <- p2p.SendItems(peer.ws, params.StatusCode, params.ProtocolVersion, &info)
select {
case errc <- p2p.SendItems(peer.ws, params.StatusCode, params.ProtocolVersion, &info):
case <-peer.quit:
}
log.Debug("Nas send items OK", "status", params.StatusCode, "version", params.ProtocolVersion, "len", len(errc))
}()
// Fetch the remote status packet and verify protocol match
peer.wg.Add(1)
go func() {
defer peer.wg.Done()
errc <- peer.readStatus()
select {
case errc <- peer.readStatus():
case <-peer.quit:
}
}()

timeout := time.NewTimer(params.HandshakeTimeout)
Expand All @@ -241,6 +247,8 @@ func (peer *Peer) handshake() error {
case <-timeout.C:
log.Info("Handshake timeout")
return fmt.Errorf("peer [%x] timeout", peer.ID())
case <-peer.quit:
return nil
}
}

Expand Down
115 changes: 90 additions & 25 deletions wormhole/wormhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
package wormhole

import (
"errors"
"github.com/CortexFoundation/CortexTheseus/common"
"github.com/CortexFoundation/CortexTheseus/common/mclock"
"github.com/CortexFoundation/CortexTheseus/log"
"github.com/CortexFoundation/torrentfs/params"
"net"
"net/url"
"time"
//"sync"

resty "github.com/go-resty/resty/v2"

mapset "github.com/deckarep/golang-set/v2"

"strings"
"time"
)

var (
Expand All @@ -47,43 +50,72 @@ func Tunnel(hash string) error {
}

func BestTrackers() (ret []string) {
defer client.SetTimeout(time.Second * 10)

for _, ur := range params.BestTrackerUrl {
resp, err := client.R().Get(ur)

if err != nil {
if err != nil || resp == nil || len(resp.String()) == 0 {
log.Warn("Global tracker lost", "err", err)
continue
}

str := strings.Split(resp.String(), "\n\n")
client.SetTimeout(time.Second * 2)

//var wg sync.WaitGroup
var (
str = strings.Split(resp.String(), "\n\n")
retCh = make(chan string, len(str))
start = mclock.Now()
)
for _, s := range str {
if len(ret) < CAP {
log.Debug("Global best trackers", "url", s)
switch {
case strings.HasPrefix(s, "http"), strings.HasPrefix(s, "https"):
if _, err := client.R().Post(s); err != nil {
log.Warn("tracker failed", "err", err)
} else {
ret = append(ret, s)
}
case strings.HasPrefix(s, "udp"):
if u, err := url.Parse(s); err == nil {
if host, port, err := net.SplitHostPort(u.Host); err == nil {
if err := ping(host, port); err == nil {
ret = append(ret, s)
} else {
log.Warn("UDP ping err", "s", s, "err", err)
}
//if len(ret) < CAP {
// wg.Add(1)
go func(ss string) {
// defer wg.Done()
if err := HealthCheck(ss); err == nil {
//ret = append(ret, s)
retCh <- ss
} else {
retCh <- ""
}
}(s)
/*switch {
case strings.HasPrefix(s, "http"), strings.HasPrefix(s, "https"):
if _, err := client.R().Post(s); err != nil {
log.Warn("tracker failed", "err", err)
} else {
ret = append(ret, s)
}
case strings.HasPrefix(s, "udp"):
if u, err := url.Parse(s); err == nil {
if host, port, err := net.SplitHostPort(u.Host); err == nil {
if err := ping(host, port); err == nil {
ret = append(ret, s)
} else {
log.Warn("UDP ping err", "s", s, "err", err)
}
}
default:
log.Warn("Other protocols trackers", "s", s)
}
} else {
break
default:
log.Warn("Other protocols trackers", "s", s)
}*/
//} else {
// break
//}
}

for i := 0; i < len(str); i++ {
select {
case x := <-retCh:
if len(x) > 0 {
log.Info("Healthy tracker", "url", x, "latency", common.PrettyDuration(time.Duration(mclock.Now())-time.Duration(start)))
ret = append(ret, x)
}
}
}

//wg.Wait()

if len(ret) > 0 {
return
}
Expand All @@ -92,6 +124,39 @@ func BestTrackers() (ret []string) {
return
}

func HealthCheck(s string) error {
log.Debug("Global best trackers", "url", s)
switch {
case strings.HasPrefix(s, "http"), strings.HasPrefix(s, "https"):
if _, err := client.R().Post(s); err != nil {
log.Warn("tracker failed", "err", err)
return err
} else {
//ret = append(ret, s)
return nil
}
case strings.HasPrefix(s, "udp"):
if u, err := url.Parse(s); err == nil {
if host, port, err := net.SplitHostPort(u.Host); err == nil {
if err := ping(host, port); err == nil {
//ret = append(ret, s)
return nil
} else {
log.Warn("UDP ping err", "s", s, "err", err)
return err
}
}
} else {
return err
}
default:
log.Warn("Other protocols trackers", "s", s)
return errors.New("invalid url protocol")
}

return errors.New("unhealthy url")
}

func ColaList() mapset.Set[string] {
m := mapset.NewSet[string]()
for _, url := range params.ColaUrl {
Expand Down

0 comments on commit 21fca97

Please sign in to comment.