Skip to content

Commit

Permalink
use worker pool to handle downlink data
Browse files Browse the repository at this point in the history
  • Loading branch information
ianchen0119 committed Mar 5, 2024
1 parent 24672b7 commit 1cffd52
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
38 changes: 31 additions & 7 deletions internal/gtp/handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"net"
"runtime/debug"

Expand All @@ -14,15 +15,38 @@ import (
n3iwfContext "github.com/free5gc/n3iwf/pkg/context"
)

// Parse the fields not supported by go-gtp and forward data to UE.
func HandleQoSTPDU(c gtp.Conn, senderAddr net.Addr, msg gtpMsg.Message) error {
pdu := gtpQoSMsg.QoSTPDUPacket{}
if err := pdu.Unmarshal(msg.(*gtpMsg.TPDU)); err != nil {
return err
func worker(ctx context.Context, queue chan gtpMsg.Message) {
for {
select {
case m := <-queue:
pdu := gtpQoSMsg.QoSTPDUPacket{}
if err := pdu.Unmarshal(m.(*gtpMsg.TPDU)); err != nil {
logger.GTPLog.Errorf("Unmarshal QoSTPDU failed: %+v", err)
continue
}
forward(pdu)
case <-ctx.Done():
return
}
}
}

forward(pdu)
return nil
func NewPool(workerNum int) (chan gtpMsg.Message, context.CancelFunc) {
var queue chan gtpMsg.Message = make(chan gtpMsg.Message, 100)
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < workerNum; i++ {
go worker(ctx, queue)
}
return queue, cancel
}

// Parse the fields not supported by go-gtp and forward data to UE.
func HandleQoSTPDU(q chan gtpMsg.Message) func(c gtp.Conn, senderAddr net.Addr,
msg gtpMsg.Message) error {
return func(c gtp.Conn, senderAddr net.Addr, msg gtpMsg.Message) error {
q <- msg
return nil
}
}

// Forward user plane packets from N3 to UE with GRE header and new IP header encapsulated
Expand Down
3 changes: 2 additions & 1 deletion internal/gtp/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func SetupGTPTunnelWithUPF(upfIPAddr string) (*gtp.UPlaneConn, net.Addr, error)
return nil, nil, errors.New("Dial failed")
}

q, _ := handler.NewPool(10)
// Overwrite T-PDU handler for supporting extension header containing QoS parameters
userPlaneConnection.AddHandler(gtpMsg.MsgTypeTPDU, handler.HandleQoSTPDU)
userPlaneConnection.AddHandler(gtpMsg.MsgTypeTPDU, handler.HandleQoSTPDU(q))

return userPlaneConnection, remoteUDPAddr, nil
}

0 comments on commit 1cffd52

Please sign in to comment.