From ec8f30bc8047a38392cd083274bea138616c1e83 Mon Sep 17 00:00:00 2001 From: typhoon Date: Tue, 16 Aug 2022 17:17:37 +1000 Subject: [PATCH] update --- .../http/net-http/http-response-pipeline.go | 38 +++++++--- extensions/pipelines/http/net-http/request.go | 2 +- extensions/servers/gin/domains/proxy/proxy.go | 73 ++++++++++++------- main/proxy-rotator/main.go | 2 +- 4 files changed, 75 insertions(+), 40 deletions(-) diff --git a/extensions/pipelines/http/net-http/http-response-pipeline.go b/extensions/pipelines/http/net-http/http-response-pipeline.go index 7bddad4..0dae823 100644 --- a/extensions/pipelines/http/net-http/http-response-pipeline.go +++ b/extensions/pipelines/http/net-http/http-response-pipeline.go @@ -2,9 +2,10 @@ package net_http import ( "context" + "net/http" + "github.com/vortex14/gotyphoon/elements/forms" "github.com/vortex14/gotyphoon/extensions/pipelines" - "net/http" Errors "github.com/vortex14/gotyphoon/errors" "github.com/vortex14/gotyphoon/interfaces" @@ -27,7 +28,7 @@ type HttpResponsePipeline struct { response *http.Response, data *string, - ) (error, context.Context) + ) (error, context.Context) Cn func( err error, @@ -49,12 +50,14 @@ func (t *HttpResponsePipeline) UnpackResponse(ctx context.Context) ( *http.Response, *string, - ) { +) { - ok,taskInstance, logger, client, request, transport := t.UnpackRequestCtx(ctx) + ok, taskInstance, logger, client, request, transport := t.UnpackRequestCtx(ctx) okR, response, data := GetResponseCtx(ctx) - if !ok || !okR { return false, nil, nil, nil, nil, nil, nil, nil } + if !ok || !okR { + return false, nil, nil, nil, nil, nil, nil, nil + } return ok, taskInstance, logger, client, request, transport, response, data } @@ -64,14 +67,23 @@ func (t *HttpResponsePipeline) Run( next func(ctx context.Context), ) { - if t.Fn == nil { reject(t, Errors.TaskPipelineRequiredHandler); return } + if t.Fn == nil { + reject(t, Errors.TaskPipelineRequiredHandler) + return + } ok, taskInstance, logger, client, request, transport, response, data := t.UnpackResponse(context) - if !ok { reject(t, Errors.PipelineContexFailed); return } + if !ok { + reject(t, Errors.PipelineContexFailed) + return + } err, newContext := t.Fn(context, taskInstance, logger, client, request, transport, response, data) - if err != nil { reject(t, err); return } + if err != nil { + reject(t, err) + return + } next(newContext) } @@ -81,8 +93,12 @@ func (t *HttpResponsePipeline) Cancel( err error, ) { - if t.Cn == nil { return } + if t.Cn == nil { + return + } ok, taskInstance, logger := t.UnpackCtx(context) - if !ok { return } + if !ok { + return + } t.Cn(err, context, taskInstance, logger) -} \ No newline at end of file +} diff --git a/extensions/pipelines/http/net-http/request.go b/extensions/pipelines/http/net-http/request.go index dc099f6..1f60210 100644 --- a/extensions/pipelines/http/net-http/request.go +++ b/extensions/pipelines/http/net-http/request.go @@ -133,7 +133,7 @@ func CreateProxyRequestPipeline(opts *forms.Options) *HttpRequestPipeline { // Block current proxy if MakeBlockRequest(logger, task) != nil { - logger.Error("Fatal exception. Impossible block proxy.") + logger.Error("Fatal exception. Impossible to block the proxy.") os.Exit(1) } diff --git a/extensions/servers/gin/domains/proxy/proxy.go b/extensions/servers/gin/domains/proxy/proxy.go index 281b9f4..e74e3a2 100644 --- a/extensions/servers/gin/domains/proxy/proxy.go +++ b/extensions/servers/gin/domains/proxy/proxy.go @@ -17,8 +17,6 @@ import ( u_ "github.com/ahl5esoft/golang-underscore" Gin "github.com/gin-gonic/gin" - "github.com/sirupsen/logrus" - "github.com/vortex14/gotyphoon/elements/forms" "github.com/vortex14/gotyphoon/elements/models/label" "github.com/vortex14/gotyphoon/elements/models/singleton" @@ -56,7 +54,8 @@ type Settings struct { type Collection struct { singleton.Singleton - mu sync.Mutex + mu sync.Mutex + LOG interfaces.LoggerInterface Settings *Settings @@ -88,7 +87,7 @@ func (c *Collection) Clear() error { } func (c *Collection) blockAvailableProxyByIndex(index int) error { - logrus.Debug("block proxy by index ", index) + c.LOG.Debug("block proxy by index ", index) proxy := c.allowed[index] c.allowed = append(c.allowed[:index], c.allowed[index+1:]...) c.locked = append(c.locked, proxy) @@ -112,7 +111,7 @@ func (c *Collection) unblockProxyByValue(proxyAddress string) { } proxy := c.locked[index] - logrus.Debug(fmt.Sprintf("unblock proxy by index: %d , proxy: %s", index, proxy)) + c.LOG.Debug(fmt.Sprintf("unblock proxy by index: %d , proxy: %s", index, proxy)) c.locked = append(c.locked[:index], c.locked[index+1:]...) c.allowed = append(c.allowed, proxy) } @@ -164,11 +163,11 @@ func (c *Collection) CountByBans() int { func (c *Collection) unblockingProxy() { for { time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second) - logrus.Debug("checking blocked timeout proxy") + c.LOG.Debug("checking locked timeout proxy") for _, value := range c.locked { proxyData := c.redisService.Get(c.GetFullKeyPath(value)) if len(proxyData) == 0 { - logrus.Debug("Unblocked proxy by timeout: ", value) + c.LOG.Debug("Unblocked proxy by timeout: ", value) c.unblockProxyByValue(value) } @@ -178,7 +177,7 @@ func (c *Collection) unblockingProxy() { } } -func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup) error { +func (c *Collection) MakeRequestThroughProxy(proxy string) error { taskTest := fake.CreateDefaultTask() taskTest.SetFetcherUrl(c.Settings.CheckHosts[0]) @@ -205,13 +204,10 @@ func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup request *http.Request, response *http.Response, data *string, doc *goquery.Document) (error, Context.Context) { - group.Done() - return nil, context }, Cn: func(err error, context Context.Context, task interfaces.TaskInterface, logger interfaces.LoggerInterface) { logger.Error("pipeline error") - group.Done() }, }, }, @@ -221,50 +217,58 @@ func (c *Collection) MakeRequestThroughProxy(proxy string, group *sync.WaitGroup func (c *Collection) checkingBlocked() { lastIndex := 0 for { - wg := sync.WaitGroup{} + c.LOG.Debug(fmt.Sprintf("checking blocked ... every %ds Count: %d", c.Settings.CheckTime, len(c.banned))) + wg := &sync.WaitGroup{} - if lastIndex == len(c.banned) { + if lastIndex == len(c.banned) || lastIndex > len(c.banned) { lastIndex = 0 } step := lastIndex + c.Settings.ConcurrentCheck residue := len(c.banned) - lastIndex - if step > residue { - step = residue + if step > len(c.banned) { + step = lastIndex + residue } if len(c.banned) == 0 { + c.LOG.Debug("not found proxies for checking") time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second) continue } - - for i, proxy := range c.banned[lastIndex : lastIndex+step] { - logrus.Debug(fmt.Sprintf("check %s", proxy)) + c.LOG.Debug(fmt.Sprintf("lastIndex: %d ; Step: %d; residue: %d", lastIndex, step, residue)) + for i, proxy := range c.banned[lastIndex:step] { + c.LOG.Debug(fmt.Sprintf("check %s", proxy)) if (i + 1) >= c.Settings.ConcurrentCheck { break } wg.Add(1) - go func() { - err := c.MakeRequestThroughProxy(proxy, &wg) + go func(wg *sync.WaitGroup, proxy string) { + err := c.MakeRequestThroughProxy(proxy) if err == nil { ansE := c.RemoveProxyBan(proxy) if ansE == nil { - logrus.Debug(fmt.Sprintf("proxy %s is available", proxy)) + c.LOG.Debug(fmt.Sprintf("proxy %s is available", proxy)) } else { - logrus.Error(ansE) + c.LOG.Error(ansE) } + } else { - logrus.Error(err) + c.LOG.Error(err) } - }() + + wg.Done() + + }(wg, proxy) lastIndex += 1 } + c.LOG.Debug("waiting for checked") wg.Wait() time.Sleep(time.Duration(c.Settings.CheckTime) * time.Second) + c.LOG.Debug("next list step to check") } } @@ -347,7 +351,7 @@ func (c *Collection) PrintStats() { func (c *Collection) init() { proxyEnvList := os.Getenv("PROXY_LIST") if len(proxyEnvList) == 0 { - logrus.Fatal("env PROXY_LIST not found") + c.LOG.Error("env PROXY_LIST not found") } proxyList := strings.Split(proxyEnvList, "\n") @@ -357,7 +361,7 @@ func (c *Collection) init() { func (c *Collection) Init() *Collection { c.Construct(func() { - + c.LOG = log.New(log.D{"proxy": "rotator"}) c.stats = make(map[string]int) redisService := &redis.Service{ @@ -387,7 +391,7 @@ func (c *Collection) Init() *Collection { l := fmt.Sprintf("Init %d proxies; locked: %d", len(c.allowed), len(c.locked)) - logrus.Debug(l) + c.LOG.Debug(l) go c.unblockingProxy() @@ -570,6 +574,21 @@ func Constructor(opts *Settings) interfaces.ServerInterface { ctx.JSON(200, proxyCollection.stats) }, }, + "clear": &gin.Action{ + Action: &forms.Action{ + MetaInfo: &label.MetaInfo{ + Name: "clear", + Path: "clear", + Description: "clear proxy history", + }, + Methods: []string{interfaces.GET}, + }, + GinController: func(ctx *Gin.Context, logger interfaces.LoggerInterface) { + logger.Debug("clear proxy history.") + e := proxyCollection.Clear() + ctx.JSON(200, e) + }, + }, }, }, ) diff --git a/main/proxy-rotator/main.go b/main/proxy-rotator/main.go index 4439832..9ae57e1 100644 --- a/main/proxy-rotator/main.go +++ b/main/proxy-rotator/main.go @@ -10,6 +10,6 @@ func init() { } func main() { - _ = proxy.Constructor("localhost").Run() + _ = proxy.Constructor(&proxy.Settings{}).Run() }