Skip to content

Commit

Permalink
create a rod pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy committed Oct 10, 2022
1 parent 5172cd1 commit 42a6c43
Show file tree
Hide file tree
Showing 13 changed files with 532 additions and 11 deletions.
3 changes: 2 additions & 1 deletion elements/forms/pipeline_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package forms

import (
Context "context"
"fmt"
"github.com/vortex14/gotyphoon/elements/models/label"
Errors "github.com/vortex14/gotyphoon/errors"

Expand Down Expand Up @@ -83,7 +84,7 @@ func (g *PipelineGroup) Run(context Context.Context) error {
logger.Warning(Errors.ForceSkipPipelines.Error())
default:
errStack = err
logger.Error("Exit from group. Error: ", err.Error(), p.GetName())
logger.Error(fmt.Sprintf("Pipeline name: %s ; Exit from group. Error: %s", p.GetName(), err.Error()))
failedFlow = true
}

Expand Down
83 changes: 83 additions & 0 deletions extensions/pipelines/http/emulator/rod/constuctor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package rod

import (
"bytes"
"context"
"fmt"
"github.com/PuerkitoBio/goquery"
"github.com/go-rod/rod"
"github.com/go-rod/rod/lib/devices"
net_http "github.com/vortex14/gotyphoon/extensions/pipelines/http/net-http"
"github.com/vortex14/gotyphoon/extensions/pipelines/text/html"
"os"
"time"

"github.com/vortex14/gotyphoon/elements/forms"
"github.com/vortex14/gotyphoon/elements/models/label"
"github.com/vortex14/gotyphoon/interfaces"
)

func CreateProxyRodRequestPipeline(opts *forms.Options, evopts *EventOptions) *HttpRodRequestPipeline {

return &HttpRodRequestPipeline{
BasePipeline: &forms.BasePipeline{
MetaInfo: &label.MetaInfo{
Name: "Rod http request",
},
Options: opts,
Middlewares: []interfaces.MiddlewareInterface{
net_http.ConstructorProxySettingMiddleware(true),
ConstructorRodProxyRequestMiddleware(true),
},
},

Fn: func(context context.Context, task interfaces.TaskInterface, logger interfaces.LoggerInterface,
browser *rod.Browser) (error, context.Context) {

logger.Info(fmt.Sprintf("RUN rod request proxy: %s , proxy_server: %s url: %s", task.GetProxyAddress(), task.GetProxyServerUrl(), task.GetFetcherUrl()))

page := browser.DefaultDevice(devices.IPhoneX).
Timeout(time.Duration(task.GetFetcherTimeout()) * time.Second).
MustConnect().
MustPage(task.GetFetcherUrl())

if evopts != nil {
evopts.Wait()
}

logger.Debug("page opened")
page.MustWaitLoad()
logger.Debug("the page loaded")
context = NewPageCtx(context, page)

body := page.MustHTML()

doc, err := goquery.NewDocumentFromReader(bytes.NewBuffer([]byte(body)))
if err != nil {
return err, context
}

context = html.NewHtmlCtx(context, doc)
context = NewBodyResponse(context, &body)

page.MustClose()

return nil, context
},
Cn: func(err error,
context context.Context,
task interfaces.TaskInterface,
logger interfaces.LoggerInterface) {

if task.GetSaveData("SKIP_CN") == "skip" {
return
}

// Block current proxy
if net_http.MakeBlockRequest(logger, task) != nil {
logger.Error("Fatal exception. Impossible to block the proxy.")
os.Exit(1)
}
},
}
}
52 changes: 52 additions & 0 deletions extensions/pipelines/http/emulator/rod/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package rod

import (
Context "context"
"github.com/go-rod/rod"
"github.com/go-rod/rod/lib/launcher"

"github.com/vortex14/gotyphoon/ctx"
)

const (
BROWSER = "browser"
PAGE = "page"
LAUNCHER = "launcher"
RESPONSE = "response"
)

func NewBrowserCtx(context Context.Context, browser *rod.Browser) Context.Context {
return ctx.Update(context, BROWSER, browser)
}

func NewBodyResponse(context Context.Context, body *string) Context.Context {
return ctx.Update(context, RESPONSE, body)
}

func GetPageResponse(context Context.Context) (bool, *string) {
body, ok := ctx.Get(context, RESPONSE).(*string)
return ok, body
}

func GetBrowserCtx(context Context.Context) (bool, *rod.Browser) {
browser, ok := ctx.Get(context, BROWSER).(*rod.Browser)
return ok, browser
}

func NewPageCtx(context Context.Context, page *rod.Page) Context.Context {
return ctx.Update(context, PAGE, page)
}

func GetPageCtx(context Context.Context) (bool, *rod.Page) {
page, ok := ctx.Get(context, PAGE).(*rod.Page)
return ok, page
}

func NewLauncherCtx(context Context.Context, launcher *launcher.Launcher) Context.Context {
return ctx.Update(context, LAUNCHER, launcher)
}

func GetLauncherCtx(context Context.Context) (bool, *launcher.Launcher) {
launcher, ok := ctx.Get(context, LAUNCHER).(*launcher.Launcher)
return ok, launcher
}
67 changes: 67 additions & 0 deletions extensions/pipelines/http/emulator/rod/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package rod

import (
"context"
"github.com/go-rod/rod"
"github.com/go-rod/rod/lib/launcher"
"github.com/vortex14/gotyphoon/elements/forms"
"github.com/vortex14/gotyphoon/elements/models/label"
"github.com/vortex14/gotyphoon/elements/models/task"
"github.com/vortex14/gotyphoon/extensions/middlewares"
"github.com/vortex14/gotyphoon/interfaces"
)

func ConstructorRodProxyRequestMiddleware(required bool) interfaces.MiddlewareInterface {
return &middlewares.TaskMiddleware{
Middleware: &forms.Middleware{
MetaInfo: &label.MetaInfo{
Required: required,
Name: "prepare for rod request",
},
},
Fn: func(context context.Context, task *task.TyphoonTask,
logger interfaces.LoggerInterface, reject func(err error), next func(ctx context.Context)) {

url := launcher.New().Proxy(task.GetProxyAddress()).Delete("use-mock-keychain").MustLaunch()
browser := rod.New().ControlURL(url)
err := browser.Connect()

if err != nil {
reject(err)
return
}

context = NewBrowserCtx(context, browser)

next(context)

},
}
}

//func ConstructorRodProxyRequestMiddleware(required bool) interfaces.MiddlewareInterface {
// return &middlewares.TaskMiddleware{
// Middleware: &forms.Middleware{
// MetaInfo: &label.MetaInfo{
// Required: required,
// Name: "set launcher for rod request",
// },
// },
// Fn: func(context context.Context, task *task.TyphoonTask,
// logger interfaces.LoggerInterface, reject func(err error), next func(ctx context.Context)) {
//
// sB, browser := GetBrowserCtx(context)
//
// if !sB {
// reject(Errors.New("rod browser not found"))
// return
// }
//
// url := launcher.New().Proxy(task.GetProxyAddress()).Delete("use-mock-keychain").MustLaunch()
// browser.ControlURL(url)
//
// next(context)
//
// },
// }
//}
21 changes: 21 additions & 0 deletions extensions/pipelines/http/emulator/rod/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package rod

import (
"github.com/go-rod/rod"
"github.com/go-rod/rod/lib/proto"
)

type EventOptions struct {
NetworkResponseReceived bool
Page *rod.Page
}

func (e *EventOptions) Wait() {

if e.NetworkResponseReceived {
er := proto.NetworkResponseReceived{}
wait := e.Page.WaitEvent(&er)
wait()
}

}
102 changes: 102 additions & 0 deletions extensions/pipelines/http/emulator/rod/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package rod

import (
"context"
"fmt"

"github.com/go-rod/rod"
"github.com/vortex14/gotyphoon/elements/forms"
"github.com/vortex14/gotyphoon/elements/models/task"
Errors "github.com/vortex14/gotyphoon/errors"
"github.com/vortex14/gotyphoon/extensions/pipelines"
"github.com/vortex14/gotyphoon/interfaces"
"github.com/vortex14/gotyphoon/log"
)

type HttpRodRequestPipeline struct {
*forms.BasePipeline
*pipelines.TaskPipeline

Fn func(
context context.Context,
task interfaces.TaskInterface,
logger interfaces.LoggerInterface,

browser *rod.Browser,

) (error, context.Context)

Cn func(
err error,
context context.Context,
task interfaces.TaskInterface,
logger interfaces.LoggerInterface,
)
}

func (t *HttpRodRequestPipeline) UnpackRequestCtx(
ctx context.Context,
) (bool, interfaces.TaskInterface, interfaces.LoggerInterface, *rod.Browser) {
okT, taskInstance := task.Get(ctx)
okL, logger := log.Get(ctx)

okB, browser := GetBrowserCtx(ctx)

if !okT || !okL || !okB {
return false, nil, nil, nil
}

return okL && okT && okB, taskInstance, logger, browser
}

func (t *HttpRodRequestPipeline) Run(
context context.Context,
reject func(pipeline interfaces.BasePipelineInterface, err error),
next func(ctx context.Context),
) {

if t.Fn == nil {
reject(t, Errors.TaskPipelineRequiredHandler)
return
}

t.SafeRun(func() error {
ok, taskInstance, logger, browser := t.UnpackRequestCtx(context)

if !ok {
return fmt.Errorf("%s. taskInstance: %v, logger: %v, browser: %v", Errors.PipelineContexFailed, taskInstance, logger, browser)
}

err, newContext := t.Fn(context, taskInstance, logger, browser)
if err != nil {
return err
}
next(newContext)
return err

}, func(err error) {
reject(t, err)
_, logCtx := log.Get(context)
t.Cancel(context, logCtx, err)
})

}

func (t *HttpRodRequestPipeline) Cancel(
context context.Context,
logger interfaces.LoggerInterface,
err error,
) {

if t.Cn == nil {
return
}

ok, taskInstance, logger := t.UnpackCtx(context)
if !ok {
return
}

t.Cn(err, context, taskInstance, logger)

}
Loading

0 comments on commit 42a6c43

Please sign in to comment.