Skip to content

Commit

Permalink
feat(worker&lua):add hooks & add injection of BeforeSet
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaohui committed Jan 20, 2022
1 parent 4fce61d commit 5f9712d
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .githooks/pre-commit
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash
set -e
# 这里请根据实际项目进行设置
cd $GOPATH/src/github.com/ultramesh/frigate
cd $GOPATH/src/github.com/meshplus/hyperbench

st=0
counter=0
Expand Down
9 changes: 8 additions & 1 deletion core/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ func (l *ControllerImpl) Prepare() (err error) {
// Run start the job
func (l *ControllerImpl) Run() (err error) {
defer l.teardownWorkers()

// beforeRun
for _, w := range l.workerClients {
w.worker.BeforeRun()
}
// run all workers
duration := viper.GetDuration(fcom.EngineDurationPath)
l.start = time.Now().UnixNano()
Expand Down Expand Up @@ -145,6 +148,10 @@ func (l *ControllerImpl) Run() (err error) {
}

l.recorder.Release()
// afterRun
for _, w := range l.workerClients {
w.worker.AfterRun()
}
sd, err := l.master.Statistic(l.start, l.end)
if err != nil {
l.logger.Notice(err)
Expand Down
27 changes: 23 additions & 4 deletions core/controller/worker/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,8 @@ func NewLocalWorker(config LocalWorkerConfig) (*LocalWorker, error) {
return &localWorker, nil
}

//func engineCreator(t engine.Type) engine

// SetContext set the context of worker passed from Master
func (l *LocalWorker) SetContext(bs []byte) error {
var err error
func (l *LocalWorker) SetContext(bs []byte) (err error) {
l.pool.Walk(func(v vm.VM) bool {
if err = v.BeforeSet(); err != nil {
return true
Expand All @@ -96,6 +93,17 @@ func (l *LocalWorker) SetContext(bs []byte) error {
return err
}

// BeforeRun call user hook
func (l *LocalWorker) BeforeRun() (err error) {
l.pool.Walk(func(v vm.VM) bool {
if err = v.BeforeRun(); err != nil {
return true
}
return false
})
return err
}

// Do call the workers to running
func (l *LocalWorker) Do() error {

Expand All @@ -106,6 +114,17 @@ func (l *LocalWorker) Do() error {
return nil
}

// AfterRun call user hook
func (l *LocalWorker) AfterRun() (err error) {
l.pool.Walk(func(v vm.VM) bool {
if err = v.AfterRun(); err != nil {
return true
}
return false
})
return err
}

func (l *LocalWorker) runCollector() {

defer func() {
Expand Down
6 changes: 6 additions & 0 deletions core/controller/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@ type Worker interface {
// SetContext set the context of worker passed from Master.
SetContext([]byte) error

// BeforeRun call user hook
BeforeRun() error

// Do call the workers to running.
Do() error

// AfterRun call user hook
AfterRun() error

// CheckoutCollector checkout collector.
CheckoutCollector() (collector.Collector, bool, error)

Expand Down
6 changes: 6 additions & 0 deletions core/controller/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func TestLocalWorker(t *testing.T) {
err = localWorker.SetContext(bs)
assert.NoError(t, err)

err = localWorker.BeforeRun()
assert.NoError(t, err)

var vm vm.VM
assert.NotNil(t, localWorker.pool.Pop())
localWorker.pool.Push(vm)
Expand All @@ -29,6 +32,9 @@ func TestLocalWorker(t *testing.T) {

time.Sleep(time.Second * 5)

err = localWorker.AfterRun()
assert.NoError(t, err)

col, b, _ := localWorker.CheckoutCollector()
assert.NotNil(t, col)
assert.NotNil(t, b)
Expand Down
25 changes: 18 additions & 7 deletions core/network/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ const (

// Client is used to communicate with worker by master.
type Client struct {
url string
nonce string
index int
logger *logging.Logger
path string
err error
finished bool
url string
nonce string
index int
logger *logging.Logger
path string
err error
}

// NewClient create Client.
Expand Down Expand Up @@ -153,12 +152,24 @@ func (c *Client) Teardown() {
}
}

// BeforeRun call user hook
func (c *Client) BeforeRun() error {
defer c.teardownWhileErr()
return c.callWithValues("before run", network.BeforeRunPath, url.Values{"nonce": {c.nonce}})
}

// Do call the workers to running
func (c *Client) Do() error {
defer c.teardownWhileErr()
return c.callWithValues("do", network.DoPath, url.Values{"nonce": {c.nonce}})
}

// AfterRun call user hook
func (c *Client) AfterRun() error {
defer c.teardownWhileErr()
return c.callWithValues("after run", network.AfterRunPath, url.Values{"nonce": {c.nonce}})
}

func (c *Client) teardownWhileErr() {
if c.err != nil {
_ = c.teardown()
Expand Down
11 changes: 9 additions & 2 deletions core/network/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestClient(t *testing.T) {
m["engine.rate"] = 1
m["engine.duration"] = 5
m["engine.cap"] = 1
m["client.plugin"] = "hyperchain.so"
viper.MergeConfigMap(m)

err = cli.TestsetNonce()
Expand All @@ -35,14 +36,20 @@ func TestClient(t *testing.T) {
err = cli.Testinit()
assert.NoError(t, err)

err = cli.SetContext(nil)
assert.NoError(t, err)

err = cli.BeforeRun()
assert.NoError(t, err)

go cli.Do()

go cli.CheckoutCollector()

time.Sleep(time.Second * 2)

err = cli.SetContext(nil)
assert.Error(t, err)
err = cli.AfterRun()
assert.NoError(t, err)

cli.Teardown()

Expand Down
4 changes: 4 additions & 0 deletions core/network/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ const (
InitPath = "/init"
// SetContextPath set context path.
SetContextPath = "/set-context"
// BeforeRunPath before run path.
BeforeRunPath = "/before-run"
// DoPath do path.
DoPath = "/do"
// AfterRunPath after run path.
AfterRunPath = "/after-run"
// TeardownPath teardown path.
TeardownPath = "/teardown"
// CheckoutCollectorPath checkout collector path.
Expand Down
34 changes: 34 additions & 0 deletions core/network/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,23 @@ func (s *Server) Start() error {
c.String(http.StatusOK, "ok")
})

r.POST(network.BeforeRunPath, func(c *gin.Context) {
if !s.checkNonce(c) {
s.logger.Error("busy")
c.String(http.StatusUnauthorized, "busy")
return
}
if s.workerHandle == nil {
s.logger.Error("worker is not exist")
c.String(http.StatusUnauthorized, "worker is not exist")
return
}
// nolint
go s.workerHandle.BeforeRun()

c.String(http.StatusOK, "ok")
})

r.POST(network.DoPath, func(c *gin.Context) {
if !s.checkNonce(c) {
s.logger.Error("busy")
Expand All @@ -220,6 +237,23 @@ func (s *Server) Start() error {
c.String(http.StatusOK, "ok")
})

r.POST(network.AfterRunPath, func(c *gin.Context) {
if !s.checkNonce(c) {
s.logger.Error("busy")
c.String(http.StatusUnauthorized, "busy")
return
}
if s.workerHandle == nil {
s.logger.Error("worker is not exist")
c.String(http.StatusUnauthorized, "worker is not exist")
return
}
// nolint
go s.workerHandle.AfterRun()

c.String(http.StatusOK, "ok")
})

r.POST(network.CheckoutCollectorPath, func(c *gin.Context) {
if !s.checkNonce(c) {
s.logger.Error("busy")
Expand Down
4 changes: 3 additions & 1 deletion core/network/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,12 @@ func TestDo(t *testing.T) {
cli := client.NewClient(4, "localhost:8080")

cli.TestsetNonce()
err := cli.Do()
err := cli.BeforeRun()
assert.Error(t, err)
err = cli.Do()
assert.Error(t, err)
err = cli.AfterRun()
assert.Error(t, err)

}

Expand Down
5 changes: 3 additions & 2 deletions vm/lua/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ package lua
import (
"errors"

base2 "github.com/meshplus/hyperbench-common/base"
fcom "github.com/meshplus/hyperbench-common/common"
"github.com/meshplus/hyperbench/plugins/blockchain"
idex "github.com/meshplus/hyperbench/plugins/index"
"github.com/meshplus/hyperbench/plugins/toolkit"
"github.com/meshplus/hyperbench/vm/base"
base2 "github.com/meshplus/hyperbench-common/base"
fcom "github.com/meshplus/hyperbench-common/common"
"github.com/spf13/viper"
lua "github.com/yuin/gopher-lua"
luar "layeh.com/gopher-luar"
Expand Down Expand Up @@ -500,6 +500,7 @@ func (v *VM) injectTestcaseBase() {
v.vm.SetField(mt, lIndex, v.vm.SetFuncs(v.vm.NewTable(), map[string]lua.LGFunction{
beforeDeploy: empty,
beforeGet: empty,
beforeSet: empty,
beforeRun: empty,
run: result,
afterRun: empty,
Expand Down

0 comments on commit 5f9712d

Please sign in to comment.