Skip to content

Commit

Permalink
Merge pull request #24 from vortex14/proxy-server
Browse files Browse the repository at this point in the history
update
  • Loading branch information
vortex14 authored Aug 16, 2022
2 parents 2c302bf + ec8f30b commit fc91a30
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 40 deletions.
38 changes: 27 additions & 11 deletions extensions/pipelines/http/net-http/http-response-pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package net_http

import (
"context"
"net/http"

"github.com/vortex14/gotyphoon/elements/forms"
"github.com/vortex14/gotyphoon/extensions/pipelines"
"net/http"

Errors "github.com/vortex14/gotyphoon/errors"
"github.com/vortex14/gotyphoon/interfaces"
Expand All @@ -27,7 +28,7 @@ type HttpResponsePipeline struct {
response *http.Response,
data *string,

) (error, context.Context)
) (error, context.Context)

Cn func(
err error,
Expand All @@ -49,12 +50,14 @@ func (t *HttpResponsePipeline) UnpackResponse(ctx context.Context) (

*http.Response,
*string,
) {
) {

ok,taskInstance, logger, client, request, transport := t.UnpackRequestCtx(ctx)
ok, taskInstance, logger, client, request, transport := t.UnpackRequestCtx(ctx)

okR, response, data := GetResponseCtx(ctx)
if !ok || !okR { return false, nil, nil, nil, nil, nil, nil, nil }
if !ok || !okR {
return false, nil, nil, nil, nil, nil, nil, nil
}
return ok, taskInstance, logger, client, request, transport, response, data
}

Expand All @@ -64,14 +67,23 @@ func (t *HttpResponsePipeline) Run(
next func(ctx context.Context),
) {

if t.Fn == nil { reject(t, Errors.TaskPipelineRequiredHandler); return }
if t.Fn == nil {
reject(t, Errors.TaskPipelineRequiredHandler)
return
}

ok, taskInstance, logger, client, request, transport, response, data := t.UnpackResponse(context)

if !ok { reject(t, Errors.PipelineContexFailed); return }
if !ok {
reject(t, Errors.PipelineContexFailed)
return
}

err, newContext := t.Fn(context, taskInstance, logger, client, request, transport, response, data)
if err != nil { reject(t, err); return }
if err != nil {
reject(t, err)
return
}
next(newContext)
}

Expand All @@ -81,8 +93,12 @@ func (t *HttpResponsePipeline) Cancel(
err error,
) {

if t.Cn == nil { return }
if t.Cn == nil {
return
}
ok, taskInstance, logger := t.UnpackCtx(context)
if !ok { return }
if !ok {
return
}
t.Cn(err, context, taskInstance, logger)
}
}
2 changes: 1 addition & 1 deletion extensions/pipelines/http/net-http/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func CreateProxyRequestPipeline(opts *forms.Options) *HttpRequestPipeline {

// Block current proxy
if MakeBlockRequest(logger, task) != nil {
logger.Error("Fatal exception. Impossible block proxy.")
logger.Error("Fatal exception. Impossible to block the proxy.")
os.Exit(1)
}

Expand Down
73 changes: 46 additions & 27 deletions extensions/servers/gin/domains/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

u_ "github.com/ahl5esoft/golang-underscore"
Gin "github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"

"github.com/vortex14/gotyphoon/elements/forms"
"github.com/vortex14/gotyphoon/elements/models/label"
"github.com/vortex14/gotyphoon/elements/models/singleton"
Expand Down Expand Up @@ -56,7 +54,8 @@ type Settings struct {

type Collection struct {
singleton.Singleton
mu sync.Mutex
mu sync.Mutex
LOG interfaces.LoggerInterface

Settings *Settings

Expand Down Expand Up @@ -88,7 +87,7 @@ func (c *Collection) Clear() error {
}

func (c *Collection) blockAvailableProxyByIndex(index int) error {
logrus.Debug("block proxy by index ", index)
c.LOG.Debug("block proxy by index ", index)
proxy := c.allowed[index]
c.allowed = append(c.allowed[:index], c.allowed[index+1:]...)
c.locked = append(c.locked, proxy)
Expand All @@ -112,7 +111,7 @@ func (c *Collection) unblockProxyByValue(proxyAddress string) {
}

proxy := c.locked[index]
logrus.Debug(fmt.Sprintf("unblock proxy by index: %d , proxy: %s", index, proxy))
c.LOG.Debug(fmt.Sprintf("unblock proxy by index: %d , proxy: %s", index, proxy))
c.locked = append(c.locked[:index], c.locked[index+1:]...)
c.allowed = append(c.allowed, proxy)
}
Expand Down Expand Up @@ -164,11 +163,11 @@ func (c *Collection) CountByBans() int {
func (c *Collection) unblockingProxy() {
for {
time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second)
logrus.Debug("checking blocked timeout proxy")
c.LOG.Debug("checking locked timeout proxy")
for _, value := range c.locked {
proxyData := c.redisService.Get(c.GetFullKeyPath(value))
if len(proxyData) == 0 {
logrus.Debug("Unblocked proxy by timeout: ", value)
c.LOG.Debug("Unblocked proxy by timeout: ", value)
c.unblockProxyByValue(value)
}

Expand All @@ -178,7 +177,7 @@ func (c *Collection) unblockingProxy() {
}
}

func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup) error {
func (c *Collection) MakeRequestThroughProxy(proxy string) error {
taskTest := fake.CreateDefaultTask()

taskTest.SetFetcherUrl(c.Settings.CheckHosts[0])
Expand All @@ -205,13 +204,10 @@ func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup
request *http.Request, response *http.Response,
data *string, doc *goquery.Document) (error, Context.Context) {

group.Done()

return nil, context
},
Cn: func(err error, context Context.Context, task interfaces.TaskInterface, logger interfaces.LoggerInterface) {
logger.Error("pipeline error")
group.Done()
},
},
},
Expand All @@ -221,50 +217,58 @@ func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup
func (c *Collection) checkingBlocked() {
lastIndex := 0
for {
wg := sync.WaitGroup{}
c.LOG.Debug(fmt.Sprintf("checking blocked ... every %ds Count: %d", c.Settings.CheckTime, len(c.banned)))
wg := &sync.WaitGroup{}

if lastIndex == len(c.banned) {
if lastIndex == len(c.banned) || lastIndex > len(c.banned) {
lastIndex = 0
}

step := lastIndex + c.Settings.ConcurrentCheck
residue := len(c.banned) - lastIndex

if step > residue {
step = residue
if step > len(c.banned) {
step = lastIndex + residue
}

if len(c.banned) == 0 {
c.LOG.Debug("not found proxies for checking")
time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second)
continue
}

for i, proxy := range c.banned[lastIndex : lastIndex+step] {
logrus.Debug(fmt.Sprintf("check %s", proxy))
c.LOG.Debug(fmt.Sprintf("lastIndex: %d ; Step: %d; residue: %d", lastIndex, step, residue))
for i, proxy := range c.banned[lastIndex:step] {
c.LOG.Debug(fmt.Sprintf("check %s", proxy))
if (i + 1) >= c.Settings.ConcurrentCheck {
break
}

wg.Add(1)

go func() {
err := c.MakeRequestThroughProxy(proxy, &wg)
go func(wg *sync.WaitGroup, proxy string) {
err := c.MakeRequestThroughProxy(proxy)
if err == nil {
ansE := c.RemoveProxyBan(proxy)
if ansE == nil {
logrus.Debug(fmt.Sprintf("proxy %s is available", proxy))
c.LOG.Debug(fmt.Sprintf("proxy %s is available", proxy))
} else {
logrus.Error(ansE)
c.LOG.Error(ansE)
}

} else {
logrus.Error(err)
c.LOG.Error(err)
}
}()

wg.Done()

}(wg, proxy)
lastIndex += 1
}
c.LOG.Debug("waiting for checked")
wg.Wait()

time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second)
c.LOG.Debug("next list step to check")

}
}
Expand Down Expand Up @@ -347,7 +351,7 @@ func (c *Collection) PrintStats() {
func (c *Collection) init() {
proxyEnvList := os.Getenv("PROXY_LIST")
if len(proxyEnvList) == 0 {
logrus.Fatal("env PROXY_LIST not found")
c.LOG.Error("env PROXY_LIST not found")
}

proxyList := strings.Split(proxyEnvList, "\n")
Expand All @@ -357,7 +361,7 @@ func (c *Collection) init() {

func (c *Collection) Init() *Collection {
c.Construct(func() {

c.LOG = log.New(log.D{"proxy": "rotator"})
c.stats = make(map[string]int)

redisService := &redis.Service{
Expand Down Expand Up @@ -387,7 +391,7 @@ func (c *Collection) Init() *Collection {

l := fmt.Sprintf("Init %d proxies; locked: %d", len(c.allowed), len(c.locked))

logrus.Debug(l)
c.LOG.Debug(l)

go c.unblockingProxy()

Expand Down Expand Up @@ -570,6 +574,21 @@ func Constructor(opts *Settings) interfaces.ServerInterface {
ctx.JSON(200, proxyCollection.stats)
},
},
"clear": &gin.Action{
Action: &forms.Action{
MetaInfo: &label.MetaInfo{
Name: "clear",
Path: "clear",
Description: "clear proxy history",
},
Methods: []string{interfaces.GET},
},
GinController: func(ctx *Gin.Context, logger interfaces.LoggerInterface) {
logger.Debug("clear proxy history.")
e := proxyCollection.Clear()
ctx.JSON(200, e)
},
},
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion main/proxy-rotator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ func init() {
}

func main() {
_ = proxy.Constructor("localhost").Run()
_ = proxy.Constructor(&proxy.Settings{}).Run()

}

0 comments on commit fc91a30

Please sign in to comment.