Skip to content

Commit

Permalink
Merge pull request #77 from madflojo/bug
Browse files Browse the repository at this point in the history
Bugfix for #76: Multiple Init and Scheduled functions
  • Loading branch information
madflojo authored Oct 29, 2023
2 parents 48395a6 + 0f128df commit 6fcac6d
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,21 +313,34 @@ func (srv *Server) Run() error {
// Create WASM Callback Router
router := callbacks.New(callbacks.Config{
PreFunc: func(namespace, op string, data []byte) ([]byte, error) {
// Debug logging of callback
srv.log.WithFields(logrus.Fields{
"namespace": namespace,
"operation": op,
}).Debugf("CallbackRouter called")

// Trace logging of callback
srv.log.WithFields(logrus.Fields{
"namespace": namespace,
"function": op,
"operation": op,
}).Tracef("CallbackRouter called with payload - %s", data)
return []byte(""), nil
},
PostFunc: func(r callbacks.CallbackResult) {
// Measure Callback Execution time and counts
srv.stats.Callbacks.WithLabelValues(fmt.Sprintf("%s:%s", r.Namespace, r.Operation)).Observe(r.EndTime.Sub(r.StartTime).Seconds())

// Debug logging of callback results
srv.log.WithFields(logrus.Fields{
"namespace": r.Namespace,
"operation": r.Operation,
"error": r.Err,
}).Debugf("Callback returned result after %f seconds", r.EndTime.Sub(r.StartTime).Seconds())

// Trace logging of callback results
srv.log.WithFields(logrus.Fields{
"namespace": r.Namespace,
"function": r.Operation,
"operation": r.Operation,
"input": r.Input,
"error": r.Err,
}).Tracef("Callback returned result after %f seconds with output - %s", r.EndTime.Sub(r.StartTime).Seconds(), r.Output)
Expand All @@ -336,7 +349,7 @@ func (srv *Server) Run() error {
if r.Err != nil {
srv.log.WithFields(logrus.Fields{
"namespace": r.Namespace,
"function": r.Operation,
"operation": r.Operation,
}).Warnf("Callback call resulted in error after %f seconds - %s", r.EndTime.Sub(r.StartTime).Seconds(), r.Err)
}
},
Expand Down Expand Up @@ -485,18 +498,20 @@ func (srv *Server) Run() error {

// Schedule tasks for scheduled functions
if r.Type == "scheduled_task" {
// Capture function name to avoid scope issues
fname := r.Function
srv.log.Infof("Scheduling custom task for function %s with interval of %d", r.Function, r.Frequency)
id, err := srv.scheduler.Add(&tasks.Task{
Interval: time.Duration(r.Frequency) * time.Second,
TaskFunc: func() error {
now := time.Now()
srv.log.Tracef("Executing Scheduled Function %s", r.Function)
_, err := srv.runWASM(r.Function, "handler", []byte(""))
srv.log.Tracef("Executing Scheduled Function %s", fname)
_, err := srv.runWASM(fname, "handler", []byte(""))
if err != nil {
srv.stats.Tasks.WithLabelValues(r.Function).Observe(time.Since(now).Seconds())
srv.stats.Tasks.WithLabelValues(fname).Observe(time.Since(now).Seconds())
return err
}
srv.stats.Tasks.WithLabelValues(r.Function).Observe(time.Since(now).Seconds())
srv.stats.Tasks.WithLabelValues(fname).Observe(time.Since(now).Seconds())
return nil
},
})
Expand All @@ -511,9 +526,13 @@ func (srv *Server) Run() error {
// Setup callbacks for function to function calls
if r.Type == "function" {
srv.log.Infof("Registering Function to Function callback for %s", r.Function)
router.RegisterCallback("function", r.Function, func(b []byte) ([]byte, error) {
return srv.runWASM(r.Function, "handler", b)
})
// Capture r in local values to avoid scope issues
fname := r.Function
f := func(b []byte) ([]byte, error) {
srv.log.Infof("Executing Function to Function callback for %s", fname)
return srv.runWASM(fname, "handler", b)
}
router.RegisterCallback("function", fname, f)
}
}

Expand Down

0 comments on commit 6fcac6d

Please sign in to comment.