Skip to content

Commit

Permalink
Implement ack low water marks
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Sep 20, 2024
1 parent ef5a3b4 commit 8c5ff2a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 35 deletions.
4 changes: 4 additions & 0 deletions router/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (forwarder *Forwarder) RetransmitPayload(srcAddr xgress.Address, payload *x
func (forwarder *Forwarder) forwardPayload(srcAddr xgress.Address, payload *xgress.Payload, markActive bool, timeout time.Duration) error {
log := pfxlog.ContextLogger(string(srcAddr))

if payload.IsCircuitEndFlagSet() {
pfxlog.Logger().Info("forwarding end-of-circuit")
}

circuitId := payload.GetCircuitId()
if forwardTable, found := forwarder.circuits.getForwardTable(circuitId, markActive); found {
if dstAddr, found := forwardTable.getForwardAddress(srcAddr); found {
Expand Down
1 change: 1 addition & 0 deletions router/handler_link/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (self *payloadHandler) HandleReceive(msg *channel.Message, ch channel.Chann
self.forwarder.ReportForwardingFault(payload.CircuitId, "")
}
if payload.IsCircuitEndFlagSet() {
pfxlog.Logger().Info("received end-of-circuit")
self.forwarder.EndCircuit(payload.GetCircuitId())
}
} else {
Expand Down
27 changes: 23 additions & 4 deletions router/xgress/link_receive_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
type LinkReceiveBuffer struct {
tree *btree.Tree
sequence int32
lowWaterMark int32
maxSequence int32
size uint32
lastBufferSizeSent uint32
Expand All @@ -45,15 +46,15 @@ func (buffer *LinkReceiveBuffer) Size() uint32 {
return atomic.LoadUint32(&buffer.size)
}

func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint32) bool {
func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint32) (bool, int32) {
if payload.GetSequence() <= buffer.sequence {
duplicatePayloadsMeter.Mark(1)
return true
return true, buffer.lowWaterMark
}

if atomic.LoadUint32(&buffer.size) > maxSize && payload.Sequence > buffer.maxSequence {
droppedPayloadsMeter.Mark(1)
return false
return false, 0
}

treeSize := buffer.tree.Size()
Expand All @@ -68,7 +69,25 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint
} else {
duplicatePayloadsMeter.Mark(1)
}
return true

if payload.Sequence <= buffer.lowWaterMark {
return true, buffer.lowWaterMark
}

if payload.Sequence == buffer.lowWaterMark+1 {
buffer.lowWaterMark++
for buffer.canHighWaterMarkBeAdvanced() {
buffer.lowWaterMark++
}
return true, buffer.lowWaterMark
}

return true, buffer.lowWaterMark
}

func (buffer *LinkReceiveBuffer) canHighWaterMarkBeAdvanced() bool {
_, found := buffer.tree.Get(buffer.lowWaterMark + 1)
return found
}

func (buffer *LinkReceiveBuffer) PeekHead() *Payload {
Expand Down
80 changes: 52 additions & 28 deletions router/xgress/link_send_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type LinkSendBuffer struct {
lastRetransmitTime int64
closeWhenEmpty atomic.Bool
inspectRequests chan *sendBufferInspectEvent
minSeq int32
}

type txPayload struct {
Expand Down Expand Up @@ -276,35 +277,9 @@ func (buffer *LinkSendBuffer) close() {

func (buffer *LinkSendBuffer) receiveAcknowledgement(ack *Acknowledgement) {
log := pfxlog.ContextLogger(buffer.x.Label()).WithFields(ack.GetLoggerFields())

for _, sequence := range ack.Sequence {
if txPayload, found := buffer.buffer[sequence]; found {
if txPayload.markAcked() { // if it's been queued for retransmission, remove it from the queue
retransmitter.queue(txPayload)
}

payloadSize := uint32(len(txPayload.payload.Data))
buffer.accumulator += payloadSize
buffer.successfulAcks++
delete(buffer.buffer, sequence)
atomic.AddInt64(&outstandingPayloads, -1)
atomic.AddInt64(&outstandingPayloadBytes, -int64(payloadSize))
buffer.linkSendBufferSize -= payloadSize
log.Debugf("removing payload %v with size %v. payload buffer size: %v",
txPayload.payload.Sequence, len(txPayload.payload.Data), buffer.linkSendBufferSize)

if buffer.successfulAcks >= buffer.x.Options.TxPortalIncreaseThresh {
buffer.successfulAcks = 0
delta := uint32(float64(buffer.accumulator) * buffer.x.Options.TxPortalIncreaseScale)
buffer.windowsSize += delta
if buffer.windowsSize > buffer.x.Options.TxPortalMaxSize {
buffer.windowsSize = buffer.x.Options.TxPortalMaxSize
}
buffer.retxScale -= 0.01
if buffer.retxScale < buffer.x.Options.RetxScale {
buffer.retxScale = buffer.x.Options.RetxScale
}
}
if payload, found := buffer.buffer[sequence]; found {
buffer.markPayloadComplete(payload, sequence, log)
} else { // duplicate ack
duplicateAcksMeter.Mark(1)
buffer.duplicateAcks++
Expand All @@ -315,6 +290,10 @@ func (buffer *LinkSendBuffer) receiveAcknowledgement(ack *Acknowledgement) {
}
}

if ack.LowWaterMark > 0 {
buffer.processLowWatermark(ack, log)
}

buffer.linkRecvBufferSize = ack.RecvBufferSize
if ack.RTT > 0 {
rtt := uint16(info.NowInMilliseconds()) - ack.RTT
Expand All @@ -326,6 +305,51 @@ func (buffer *LinkSendBuffer) receiveAcknowledgement(ack *Acknowledgement) {
}
}

func (buffer *LinkSendBuffer) processLowWatermark(ack *Acknowledgement, log *logrus.Entry) {
//fmt.Printf("checking minseq: %d, lowWater: %d\n", buffer.minSeq, ack.LowWaterMark)
for sequence := buffer.minSeq; sequence <= ack.LowWaterMark; sequence++ {
//fmt.Printf("checking minseq: %d, lowWater: %d, seq: %d\n", buffer.minSeq, ack.LowWaterMark, sequence)
if payload, found := buffer.buffer[sequence]; found {
buffer.markPayloadComplete(payload, sequence, log)
}
}
if buffer.minSeq <= ack.LowWaterMark+1 {
buffer.minSeq = ack.LowWaterMark + 1
}
}

func (buffer *LinkSendBuffer) markPayloadComplete(payload *txPayload, sequence int32, log *logrus.Entry) {
if payload.markAcked() { // if it's been queued for retransmission, remove it from the queue
retransmitter.queue(payload)
}

payloadSize := uint32(len(payload.payload.Data))
buffer.accumulator += payloadSize
delete(buffer.buffer, sequence)
atomic.AddInt64(&outstandingPayloads, -1)
atomic.AddInt64(&outstandingPayloadBytes, -int64(payloadSize))
buffer.linkSendBufferSize -= payloadSize
log.Debugf("removing payload %v with size %v. payload buffer size: %v",
payload.payload.Sequence, len(payload.payload.Data), buffer.linkSendBufferSize)

if buffer.successfulAcks >= buffer.x.Options.TxPortalIncreaseThresh {
buffer.successfulAcks = 0
delta := uint32(float64(buffer.accumulator) * buffer.x.Options.TxPortalIncreaseScale)
buffer.windowsSize += delta
if buffer.windowsSize > buffer.x.Options.TxPortalMaxSize {
buffer.windowsSize = buffer.x.Options.TxPortalMaxSize
}
buffer.retxScale -= 0.01
if buffer.retxScale < buffer.x.Options.RetxScale {
buffer.retxScale = buffer.x.Options.RetxScale
}
}

if sequence == buffer.minSeq {
buffer.minSeq = buffer.minSeq + 1
}
}

func (buffer *LinkSendBuffer) retransmit() {
now := info.NowInMilliseconds()
if len(buffer.buffer) > 0 && (now-buffer.lastRetransmitTime) > 64 {
Expand Down
8 changes: 8 additions & 0 deletions router/xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
HeaderKeyRecvBufferSize = 2259
HeaderKeyRTT = 2260
HeaderPayloadRaw = 2261
HeaderKeyLowWaterMark = 2262

ContentTypePayloadType = 1100
ContentTypeAcknowledgementType = 1101
Expand Down Expand Up @@ -85,6 +86,7 @@ type Acknowledgement struct {
RecvBufferSize uint32
RTT uint16
Sequence []int32
LowWaterMark int32
}

func (ack *Acknowledgement) GetCircuitId() string {
Expand Down Expand Up @@ -145,6 +147,9 @@ func (ack *Acknowledgement) Marshall() *channel.Message {
msg.PutUint32Header(HeaderKeyFlags, ack.Flags)
}
msg.PutUint32Header(HeaderKeyRecvBufferSize, ack.RecvBufferSize)
if ack.LowWaterMark > 0 {
msg.PutUint32Header(HeaderKeyLowWaterMark, uint32(ack.LowWaterMark))
}
return msg
}

Expand All @@ -171,6 +176,9 @@ func UnmarshallAcknowledgement(msg *channel.Message) (*Acknowledgement, error) {
return nil, err
}

lowWaterMark, _ := msg.GetUint32Header(HeaderKeyLowWaterMark)
ack.LowWaterMark = int32(lowWaterMark)

return ack, nil
}

Expand Down
12 changes: 11 additions & 1 deletion router/xgress/minimal_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func newTestXgConn(bufferSize int, targetSends uint32, targetReceives uint32) *t
targetSends: targetSends,
targetReceives: targetReceives,
done: make(chan struct{}),
closed: make(chan struct{}),
errs: make(chan error, 1),
}
}
Expand All @@ -35,11 +36,13 @@ type testXgConn struct {
sendCounter uint32
recvCounter uint32
done chan struct{}
closed chan struct{}
errs chan error
bufCounter uint32
}

func (self *testXgConn) Close() error {
close(self.closed)
return nil
}

Expand Down Expand Up @@ -302,8 +305,15 @@ func Test_MinimalPayloadMarshalling(t *testing.T) {
case err := <-dstTestConn.errs:
t.Fatal(err)
case <-time.After(time.Second):
t.Fatal("timeout")
t.Fatal("data timeout")
}

select {
case <-dstTestConn.closed:
case <-time.After(time.Second):
t.Fatal("close timeout")
}

}

func Test_PayloadSize(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions router/xgress/xgress.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func (self *Xgress) GetEndCircuit() *Payload {
func (self *Xgress) ForwardEndOfCircuit(sendF func(payload *Payload) bool) {
// for now always send end of circuit. too many is better than not enough
if !self.IsEndOfCircuitSent() {
pfxlog.Logger().Info("sending end-of-circuit")
sendF(self.GetEndCircuit())
self.flags.Set(endOfCircuitSentFlag, true)
}
Expand Down Expand Up @@ -833,10 +834,10 @@ func (self *Xgress) PayloadReceived(payload *Payload) {
if self.originator == payload.GetOriginator() {
// a payload sent from this xgress has arrived back at this xgress, instead of the other end
log.Warn("ouroboros (circuit cycle) detected, dropping payload")
} else if self.linkRxBuffer.ReceiveUnordered(payload, self.Options.RxBufferSize) {
} else if ack, lowWaterMark := self.linkRxBuffer.ReceiveUnordered(payload, self.Options.RxBufferSize); ack {
log.Debug("ready to acknowledge")

ack := NewAcknowledgement(self.circuitId, self.originator)
ack.LowWaterMark = lowWaterMark
ack.RecvBufferSize = self.linkRxBuffer.Size()
ack.Sequence = append(ack.Sequence, payload.Sequence)
ack.RTT = payload.RTT
Expand Down

0 comments on commit 8c5ff2a

Please sign in to comment.