From 1cffd5253401807a9beba450c25a78fcab562d42 Mon Sep 17 00:00:00 2001 From: Ian Chen Date: Tue, 5 Mar 2024 14:19:20 +0800 Subject: [PATCH] use worker pool to handle downlink data --- internal/gtp/handler/handler.go | 38 +++++++++++++++++++++++++++------ internal/gtp/service/service.go | 3 ++- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/internal/gtp/handler/handler.go b/internal/gtp/handler/handler.go index 9536df95..3d86f26d 100644 --- a/internal/gtp/handler/handler.go +++ b/internal/gtp/handler/handler.go @@ -1,6 +1,7 @@ package handler import ( + "context" "net" "runtime/debug" @@ -14,15 +15,38 @@ import ( n3iwfContext "github.com/free5gc/n3iwf/pkg/context" ) -// Parse the fields not supported by go-gtp and forward data to UE. -func HandleQoSTPDU(c gtp.Conn, senderAddr net.Addr, msg gtpMsg.Message) error { - pdu := gtpQoSMsg.QoSTPDUPacket{} - if err := pdu.Unmarshal(msg.(*gtpMsg.TPDU)); err != nil { - return err +func worker(ctx context.Context, queue chan gtpMsg.Message) { + for { + select { + case m := <-queue: + pdu := gtpQoSMsg.QoSTPDUPacket{} + if err := pdu.Unmarshal(m.(*gtpMsg.TPDU)); err != nil { + logger.GTPLog.Errorf("Unmarshal QoSTPDU failed: %+v", err) + continue + } + forward(pdu) + case <-ctx.Done(): + return + } } +} - forward(pdu) - return nil +func NewPool(workerNum int) (chan gtpMsg.Message, context.CancelFunc) { + var queue chan gtpMsg.Message = make(chan gtpMsg.Message, 100) + ctx, cancel := context.WithCancel(context.Background()) + for i := 0; i < workerNum; i++ { + go worker(ctx, queue) + } + return queue, cancel +} + +// Parse the fields not supported by go-gtp and forward data to UE. +func HandleQoSTPDU(q chan gtpMsg.Message) func(c gtp.Conn, senderAddr net.Addr, + msg gtpMsg.Message) error { + return func(c gtp.Conn, senderAddr net.Addr, msg gtpMsg.Message) error { + q <- msg + return nil + } } // Forward user plane packets from N3 to UE with GRE header and new IP header encapsulated diff --git a/internal/gtp/service/service.go b/internal/gtp/service/service.go index 8b33842f..7d11ffe5 100644 --- a/internal/gtp/service/service.go +++ b/internal/gtp/service/service.go @@ -44,8 +44,9 @@ func SetupGTPTunnelWithUPF(upfIPAddr string) (*gtp.UPlaneConn, net.Addr, error) return nil, nil, errors.New("Dial failed") } + q, _ := handler.NewPool(10) // Overwrite T-PDU handler for supporting extension header containing QoS parameters - userPlaneConnection.AddHandler(gtpMsg.MsgTypeTPDU, handler.HandleQoSTPDU) + userPlaneConnection.AddHandler(gtpMsg.MsgTypeTPDU, handler.HandleQoSTPDU(q)) return userPlaneConnection, remoteUDPAddr, nil }