diff --git a/go/cmd/pfacct/pfacct.go b/go/cmd/pfacct/pfacct.go index 891dc3f9350..41924a144c1 100644 --- a/go/cmd/pfacct/pfacct.go +++ b/go/cmd/pfacct/pfacct.go @@ -23,8 +23,10 @@ import ( ) const DefaultTimeDuration = 5 * time.Minute -const DefaultRadiusWorkers = 5 +const DefaultRadiusWorkers = 15 const DefaultRadiusWorkQueueSize = 1000 +const DefaultHttpdWorkQueueSize = 1000 +const DefaulHttpdWorkers = 100 type radiusRequest struct { w radius.ResponseWriter @@ -52,6 +54,7 @@ type PfAcct struct { StatsdOption statsd.Option StatsdClient *statsd.Client radiusRequests []chan<- radiusRequest + httpdRequest chan *radius.Request localSecret string StatsdOnce tryableonce.TryableOnce isProxied bool @@ -60,6 +63,8 @@ type PfAcct struct { ProcessBandwidthAcct bool RadiusWorkers int RadiusWorkQueueSize int + HttpdWorkQueueSize int + HttpdWorkers int } func NewPfAcct() *PfAcct { @@ -84,6 +89,8 @@ func NewPfAcct() *PfAcct { TimeDuration: DefaultTimeDuration, RadiusWorkers: DefaultRadiusWorkers, RadiusWorkQueueSize: DefaultRadiusWorkQueueSize, + HttpdWorkQueueSize: DefaultHttpdWorkQueueSize, + HttpdWorkers: DefaulHttpdWorkers, } pfAcct.SwitchInfoCache = cache.New(5*time.Minute, 10*time.Minute) pfAcct.NodeSessionCache = cache.New(cache.NoExpiration, cache.NoExpiration) @@ -93,8 +100,19 @@ func NewPfAcct() *PfAcct { pfAcct.SetupConfig(ctx) pfAcct.radiusRequests = makeRadiusRequests(pfAcct, pfAcct.RadiusWorkers, pfAcct.RadiusWorkQueueSize) + pfAcct.httpdRequest = make(chan *radius.Request, pfAcct.HttpdWorkQueueSize) pfAcct.AAAClient = jsonrpc2.NewAAAClientFromConfig(ctx) //pfAcct.Dispatcher = NewDispatcher(16, 128) + + // create workers + for i := 1; i <= pfAcct.HttpdWorkers; i++ { + go func(i int) { + for j := range pfAcct.httpdRequest { + pfAcct.sendRadiusAccountingCall(j) + } + }(i) + } + pfAcct.runPing() return pfAcct } @@ -108,6 +126,7 @@ func makeRadiusRequests(h *PfAcct, requestFanOut, backlog int) []chan<- radiusRe for rr := range c { h.handleAccountingRequest(rr) } + fmt.Println(len(c)) }(c) } diff --git a/go/cmd/pfacct/radius.go b/go/cmd/pfacct/radius.go index c8d69ae2c3e..384375a845d 100644 --- a/go/cmd/pfacct/radius.go +++ b/go/cmd/pfacct/radius.go @@ -305,6 +305,18 @@ func (h *PfAcct) accountingUniqueSessionId(r *radius.Request) uint64 { } func (h *PfAcct) sendRadiusAccounting(r *radius.Request, switchInfo *SwitchInfo) { + + h.sendRadiusAccountingToQueue(r) + +} + +func (h *PfAcct) sendRadiusAccountingToQueue(r *radius.Request) { + go func(h *PfAcct, r *radius.Request) { + h.httpdRequest <- r + }(h, r) +} + +func (h *PfAcct) sendRadiusAccountingCall(r *radius.Request) { ctx := r.Context() attr := packetToMap(ctx, r.Packet) attr["PF_HEADERS"] = map[string]string{ @@ -316,7 +328,6 @@ func (h *PfAcct) sendRadiusAccounting(r *radius.Request, switchInfo *SwitchInfo) attr["NAS-IP-Address"] = strings.Split(r.RemoteAddr.String(), ":")[0] logWarn(ctx, fmt.Sprintf("Empty NAS-IP-Address, using the source IP address of the packet (%s)", attr["NAS-IP-Address"])) } - if _, err := h.AAAClient.Call(ctx, "radius_accounting", attr); err != nil { logError(ctx, err.Error()) }