Skip to content

Commit

Permalink
Use separate go routines for OF readand write
Browse files Browse the repository at this point in the history
Previously we read a batch and then write a batch, which would hinder
concurrency. This version allocates two separate go routines for read
and write that send and receive messages in parallel.
  • Loading branch information
soheilhy committed May 11, 2015
1 parent 72f9e2b commit 9183773
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 68 deletions.
67 changes: 43 additions & 24 deletions openflow/bh.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,57 @@ import (
bh "github.com/kandoo/beehive"
)

// OFConfig stores the configuration of the OpenFlow driver.
type OFConfig struct {
Proto string // The driver's listening protocol.
Addr string // The driver's listening address.
ReadBufLen int // Maximum number of packets to read.
var (
proto = flag.String("of.proto", "tcp", "protocol of the OpenFlow listener")
addr = flag.String("of.addr", "0.0.0.0:6633",
"address of the OpenFlow listener in the form of HOST:PORT")
readBufLen = flag.Int("of.rbuflen", 1<<8,
"maximum number of packets to read per each read call")
)

// Option represents an OpenFlow listener option.
type Option func(l *ofListener)

// ListenOn returns an OpenFlow option that sets the address on which the
// OpenFlow service listens.
func ListenOn(addr string) Option {
return func(l *ofListener) {
l.addr = addr
}
}

var defaultOFConfig = OFConfig{}
// UseProto returns an Openflow option that sets the protocol that the OpenFlow
// service uses to listen.
func UseProto(proto string) Option {
return func(l *ofListener) {
l.proto = proto
}
}

func init() {
flag.StringVar(&defaultOFConfig.Proto, "ofproto", "tcp",
"Protocol of the OpenFlow listener.")
flag.StringVar(&defaultOFConfig.Addr, "ofaddr", "0.0.0.0:6633",
"Address of the OpenFlow listener in the form of HOST:PORT.")
flag.IntVar(&defaultOFConfig.ReadBufLen, "rbuflen", 1<<8,
"Maximum number of packets to read per each read call.")
// SetReadBufLen returns an OpenFlow option that sets reader buffer length of
// the OpenFlow service.
func SetReadBufLen(rlen int) Option {
return func(l *ofListener) {
l.readBufLen = rlen
}
}

// StartOpenFlow starts the OpenFlow driver on the given hive using the default
// OpenFlow configuration that can be set through command line arguments.
func StartOpenFlow(hive bh.Hive) error {
return StartOpenFlowWithConfig(hive, defaultOFConfig)
}

// StartOpenFlowWithConfig starts the OpenFlow driver on the give hive with the
// provided configuration.
func StartOpenFlowWithConfig(hive bh.Hive, cfg OFConfig) error {
func StartOpenFlow(hive bh.Hive, options ...Option) error {
app := hive.NewApp("OFDriver")
app.Detached(&ofListener{
cfg: cfg,
})
l := &ofListener{
proto: *proto,
addr: *addr,
readBufLen: *readBufLen,
}

for _, opt := range options {
opt(l)
}

app.Detached(l)
glog.V(2).Infof("OpenFlow driver registered on %s:%s", l.proto, l.addr)

glog.V(2).Infof("OpenFlow driver registered on %s:%s", cfg.Proto, cfg.Addr)
return nil
}
117 changes: 79 additions & 38 deletions openflow/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ type ofConnConfig struct {

type ofConn struct {
of.HeaderConn
cfg ofConnConfig // Configuration of this connection.
ctx bh.RcvContext // RcvContext of the detached bee running the connection.
node nom.Node // Node that this connection represents.
driver ofDriver // OpenFlow driver of this connection.
wCh chan bh.Msg // Messages to be written.
wErr error // Last error in write.

ctx bh.RcvContext // RcvContext of the detached bee running ofConn.

readBufLen int // Maximum number of packets to read.
wCh chan bh.Msg // Messages to be written.
wErr error // Last error in write.

node nom.Node // Node that this connection represents.
driver ofDriver // OpenFlow driver of this connection.
}

func (c *ofConn) drainWCh() {
Expand Down Expand Up @@ -51,8 +54,76 @@ func (c *ofConn) Start(ctx bh.RcvContext) {
return
}

pkts := make([]of.Header, c.cfg.readBufLen)
stop := make(chan struct{})

wdone := make(chan struct{})
go c.doWrite(wdone, stop)

rdone := make(chan struct{})
go c.doRead(rdone, stop)

select {
case <-rdone:
close(stop)
case <-wdone:
close(stop)
}

<-rdone
<-wdone
}

func (c *ofConn) doWrite(done chan struct{}, stop chan struct{}) {
defer close(done)

written := false
var msg bh.Msg
for {
msg = nil
if !written {
select {
case msg = <-c.wCh:
case <-stop:
return
}
} else {
select {
case msg = <-c.wCh:
case <-stop:
return
default:
if c.wErr = c.HeaderConn.Flush(); c.wErr != nil {
return
}
written = false
continue
}
}

// Write the message.
err := c.driver.handleMsg(msg, c)
if c.wErr != nil {
return
}
if err != nil {
glog.Errorf("ofconn: Cannot convert NOM message to OpenFlow: %v",
err)
}
written = true
}
}

func (c *ofConn) doRead(done chan struct{}, stop chan struct{}) {
defer close(done)

pkts := make([]of.Header, c.readBufLen)
for {
select {
case <-stop:
return
default:
}

n, err := c.ReadHeaders(pkts)
if err != nil {
if err == io.EOF {
Expand All @@ -71,35 +142,9 @@ func (c *ofConn) Start(ctx bh.RcvContext) {
}
}

for {
select {
case msg := <-c.wCh:
if c.wErr != nil {
// Drain the channel.
continue
}

// If werr != nil, we loop and drain the wCh.
if err := c.driver.handleMsg(msg, c); err != nil {
glog.Errorf("ofconn: Cannot convert NOM message to OpenFlow: %v",
err)
continue
}
default:
goto flush
}
}

flush:
if c.wErr != nil {
return
}

c.HeaderConn.Flush()

pkts = pkts[n:]
if len(pkts) == 0 {
pkts = make([]of.Header, c.cfg.readBufLen)
pkts = make([]of.Header, c.readBufLen)
}
}
}
Expand All @@ -118,10 +163,6 @@ func (c *ofConn) NodeUID() nom.UID {
}

func (c *ofConn) WriteHeader(pkt of.Header) error {
if c.wErr != nil {
return c.wErr
}

c.wErr = c.HeaderConn.WriteHeader(pkt)
return c.wErr
}
12 changes: 6 additions & 6 deletions openflow/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ import (
)

type ofListener struct {
cfg OFConfig
proto string // The driver's listening protocol.
addr string // The driver's listening address.
readBufLen int // Maximum number of packets to read.
}

func (l *ofListener) Start(ctx bh.RcvContext) {
nl, err := net.Listen(l.cfg.Proto, l.cfg.Addr)
nl, err := net.Listen(l.proto, l.addr)
if err != nil {
glog.Errorf("Cannot start the OF listener: %v", err)
return
}

glog.Infof("OF listener started on %s:%s", l.cfg.Proto, l.cfg.Addr)
glog.Infof("OF listener started on %s:%s", l.proto, l.addr)

defer func() {
glog.Infof("OF listener closed")
Expand All @@ -41,9 +43,7 @@ func (l *ofListener) Start(ctx bh.RcvContext) {
func (l *ofListener) startOFConn(conn net.Conn, ctx bh.RcvContext) {
ofc := &ofConn{
HeaderConn: of.NewHeaderConn(conn),
cfg: ofConnConfig{
readBufLen: l.cfg.ReadBufLen,
},
readBufLen: l.readBufLen,
}

ctx.StartDetached(ofc)
Expand Down

0 comments on commit 9183773

Please sign in to comment.