Skip to content

Commit

Permalink
complete async execution support
Browse files Browse the repository at this point in the history
  • Loading branch information
tarunKoyalwar committed Aug 13, 2023
1 parent f4abe7a commit 2a32933
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 104 deletions.
1 change: 1 addition & 0 deletions v2/pkg/protocols/common/executer/executer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func (e *Executer) executeFlow(input *contextargs.Context, callback protocols.Ou
gologger.Error().Msgf("invalid request type %s", req.Type().String())
}
}

flow := &FlowExecutor{
allProtocols: allprotos,
input: input,
Expand Down
218 changes: 114 additions & 104 deletions v2/pkg/protocols/common/executer/flow_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ var (
)

type ProtoOptions struct {
Hide bool
Async bool
Hide bool
Async bool
protoName string
reqIDS []string
callback func(result *output.InternalWrappedEvent)
}

func GetProtoOptions(m map[string]interface{}) *ProtoOptions {
options := &ProtoOptions{
Hide: GetBool(m["hide"]),
Async: GetBool(m["async"]),
}
return options
// LoadOptions loads the protocol options from a map
func (P *ProtoOptions) LoadOptions(m map[string]interface{}) {
P.Hide = GetBool(m["hide"])
P.Async = GetBool(m["async"])
}

type FlowExecutor struct {
Expand Down Expand Up @@ -94,13 +95,12 @@ func (f *FlowExecutor) Compile(callback func(event *output.InternalWrappedEvent)
}
}
f.options.TemplateCtx.Merge(allVars)
// ------

// ---- define callback functions/objects----
f.protoFunctions = map[string]func(call goja.FunctionCall) goja.Value{}
compileErrors := []error{}

// iterate over all protocols and generate callback functions for each protocol
for p, requests := range f.allProtocols {
// for each protocol build a requestMap with reqID and protocol request
reqMap := mapsutil.Map[string, protocols.Request]{}
counter := 0
proto := strings.ToLower(p) // donot use loop variables in callback functions directly
Expand All @@ -116,121 +116,130 @@ func (f *FlowExecutor) Compile(callback func(event *output.InternalWrappedEvent)
counter++
}
// ---define hook that allows protocol/request execution from js-----
// --- this is the actual callback that is executed when function is invoked in js----
f.protoFunctions[proto] = func(call goja.FunctionCall) goja.Value {
ids := []string{}
opts := &ProtoOptions{}
opts := &ProtoOptions{
callback: callback,
protoName: proto,
}
for _, v := range call.Arguments {
switch value := v.Export().(type) {
case map[string]interface{}:
opts = GetProtoOptions(value)
opts.LoadOptions(value)
default:
ids = append(ids, types.ToString(value))
opts.reqIDS = append(opts.reqIDS, types.ToString(value))
}
}
if opts.Async {
f.wg.Add(1)
defer f.wg.Done()
go func() {
defer f.wg.Done()
f.requestExecutor(reqMap, opts)
}()
return f.jsVM.ToValue(true)
}
defer func() {
// to avoid polling update template variables everytime we execute a protocol
var m map[string]interface{} = f.options.TemplateCtx.GetAll()
_ = f.jsVM.Set("template", m)
}()
matcherStatus := &atomic.Bool{} // due to interactsh matcher polling logic this needs to be atomic bool

// if no id is passed execute all requests in sequence
if len(ids) == 0 {
// execution logic for http()/dns() etc
for index := range f.allProtocols[proto] {
req := f.allProtocols[proto][index]
err := req.ExecuteWithResults(f.input, output.InternalEvent(f.options.TemplateCtx.GetAll()), nil, func(result *output.InternalWrappedEvent) {
if result != nil {
f.results.CompareAndSwap(false, true)
if !opts.Hide {
callback(result)
}
// export dynamic values from operators (i.e internal:true)
// add add it to template context
// this is a conflicting behaviour with iterate-all
if result.HasOperatorResult() {
matcherStatus.CompareAndSwap(false, result.OperatorsResult.Matched)
if !result.OperatorsResult.Matched && !hasMatchers(req.GetCompiledOperators()) {
// if matcher status is false . check if template/request contains any matcher at all
// if it does then we need to set matcher status to true
matcherStatus.CompareAndSwap(false, true)
}
if len(result.OperatorsResult.DynamicValues) > 0 {
for k, v := range result.OperatorsResult.DynamicValues {
f.options.TemplateCtx.Set(k, v)
}
}
}
}
})
// fmt.Printf("done executing %v with index %v and err %v", proto, index, err)
if err != nil {
// save all errors in a map with id as key
// its less likely that there will be race condition but just in case
id := req.GetID()
if id == "" {
id, _ = reqMap.GetKeyWithValue(req)
return f.jsVM.ToValue(f.requestExecutor(reqMap, opts))
}
}

// register all built in functions
return f.RegisterBuiltInFunctions()
}

// requestExecutor executes a protocol/request and returns true if any matcher was found
func (f *FlowExecutor) requestExecutor(reqMap mapsutil.Map[string, protocols.Request], opts *ProtoOptions) bool {
defer func() {
// to avoid polling update template variables everytime we execute a protocol
var m map[string]interface{} = f.options.TemplateCtx.GetAll()
_ = f.jsVM.Set("template", m)
}()
matcherStatus := &atomic.Bool{} // due to interactsh matcher polling logic this needs to be atomic bool
// if no id is passed execute all requests in sequence
if len(opts.reqIDS) == 0 {
// execution logic for http()/dns() etc
for index := range f.allProtocols[opts.protoName] {
req := f.allProtocols[opts.protoName][index]
err := req.ExecuteWithResults(f.input, output.InternalEvent(f.options.TemplateCtx.GetAll()), nil, func(result *output.InternalWrappedEvent) {
if result != nil {
f.results.CompareAndSwap(false, true)
if !opts.Hide {
opts.callback(result)
}
// export dynamic values from operators (i.e internal:true)
// add add it to template context
// this is a conflicting behaviour with iterate-all
if result.HasOperatorResult() {
matcherStatus.CompareAndSwap(false, result.OperatorsResult.Matched)
if !result.OperatorsResult.Matched && !hasMatchers(req.GetCompiledOperators()) {
// if matcher status is false . check if template/request contains any matcher at all
// if it does then we need to set matcher status to true
matcherStatus.CompareAndSwap(false, true)
}
err = f.allErrs.Set(proto+":"+id, err)
if err != nil {
gologger.Error().Msgf("failed to store flow runtime errors got %v", err)
if len(result.OperatorsResult.DynamicValues) > 0 {
for k, v := range result.OperatorsResult.DynamicValues {
f.options.TemplateCtx.Set(k, v)
}
}
return f.jsVM.ToValue(matcherStatus.Load())
}
}
return f.jsVM.ToValue(matcherStatus.Load())
})
if err != nil {
// save all errors in a map with id as key
// its less likely that there will be race condition but just in case
id := req.GetID()
if id == "" {
id, _ = reqMap.GetKeyWithValue(req)
}
err = f.allErrs.Set(opts.protoName+":"+id, err)
if err != nil {
gologger.Error().Msgf("failed to store flow runtime errors got %v", err)
}
return matcherStatus.Load()
}
}
return matcherStatus.Load()
}

// execution logic for http("0") or http("get-aws-vpcs")
for _, id := range ids {
req, ok := reqMap[id]
if !ok {
gologger.Error().Msgf("invalid request id '%s' provided", id)
// compile error
compileErrors = append(compileErrors, ErrInvalidRequestID.Msgf(id))
return f.jsVM.ToValue(matcherStatus.Load())
// execution logic for http("0") or http("get-aws-vpcs")
for _, id := range opts.reqIDS {
req, ok := reqMap[id]
if !ok {
gologger.Error().Msgf("invalid request id '%s' provided", id)
// compile error
if err := f.allErrs.Set(opts.protoName+":"+id, ErrInvalidRequestID.Msgf(id)); err != nil {
gologger.Error().Msgf("failed to store flow runtime errors got %v", err)
}
return matcherStatus.Load()
}
err := req.ExecuteWithResults(f.input, output.InternalEvent(f.options.TemplateCtx.GetAll()), nil, func(result *output.InternalWrappedEvent) {
if result != nil {
f.results.CompareAndSwap(false, true)
if !opts.Hide {
opts.callback(result)
}
err := req.ExecuteWithResults(f.input, output.InternalEvent(f.options.TemplateCtx.GetAll()), nil, func(result *output.InternalWrappedEvent) {
if result != nil {
f.results.CompareAndSwap(false, true)
if !opts.Hide {
callback(result)
}
// export dynamic values from operators (i.e internal:true)
// add add it to template context
if result.HasOperatorResult() {
matcherStatus.CompareAndSwap(false, result.OperatorsResult.Matched)
if len(result.OperatorsResult.DynamicValues) > 0 {
for k, v := range result.OperatorsResult.DynamicValues {
f.options.TemplateCtx.Set(k, v)
}
_ = f.jsVM.Set("template", f.options.TemplateCtx.GetAll())
}
// export dynamic values from operators (i.e internal:true)
// add add it to template context
if result.HasOperatorResult() {
matcherStatus.CompareAndSwap(false, result.OperatorsResult.Matched)
if len(result.OperatorsResult.DynamicValues) > 0 {
for k, v := range result.OperatorsResult.DynamicValues {
f.options.TemplateCtx.Set(k, v)
}
}
})
if err != nil {
index := id
err = f.allErrs.Set(proto+":"+index, err)
if err != nil {
gologger.Error().Msgf("failed to store flow runtime errors got %v", err)
_ = f.jsVM.Set("template", f.options.TemplateCtx.GetAll())
}
}
}
return f.jsVM.ToValue(matcherStatus.Load())
})
if err != nil {
index := id
err = f.allErrs.Set(opts.protoName+":"+index, err)
if err != nil {
gologger.Error().Msgf("failed to store flow runtime errors got %v", err)
}
}
}

if len(compileErrors) > 0 {
return multierr.Combine(compileErrors...)
}

// register all built in functions
return f.RegisterBuiltInFunctions()
return matcherStatus.Load()
}

// RegisterBuiltInFunctions registers all built in functions for the flow
Expand Down Expand Up @@ -330,8 +339,9 @@ func (f *FlowExecutor) RegisterBuiltInFunctions() error {
if err := f.jsVM.Set("Dedupe", func(call goja.ConstructorCall) *goja.Object {
d := builtin.NewDedupe(f.jsVM)
obj := call.This
obj.Set("Add", d.Add)
obj.Set("Values", d.Values)
// register these methods
_ = obj.Set("Add", d.Add)
_ = obj.Set("Values", d.Values)
return nil
}); err != nil {
return err
Expand Down

0 comments on commit 2a32933

Please sign in to comment.