Skip to content

Commit

Permalink
Add low-overhead link payload protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Sep 10, 2024
1 parent 3498a56 commit 47a756a
Show file tree
Hide file tree
Showing 15 changed files with 677 additions and 139 deletions.
2 changes: 1 addition & 1 deletion common/inspect/circuit_inspections.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type XgressDetail struct {
XgressPointer string `json:"xgressPointer"`
LinkSendBufferPointer string `json:"linkSendBufferPointer"`
Goroutines []string `json:"goroutines"`
Sequence int32 `json:"sequence"`
Sequence uint64 `json:"sequence"`
Flags string `json:"flags"`
}

Expand Down
2 changes: 2 additions & 0 deletions router/handler_link/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/forwarder"
metrics2 "github.com/openziti/ziti/router/metrics"
"github.com/openziti/ziti/router/xgress"
"github.com/openziti/ziti/router/xlink"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,6 +80,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error {
binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder))
binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry()))
binding.AddPeekHandler(trace.NewChannelPeekHandler(self.xlink.Id(), ch, self.forwarder.TraceController()))
binding.AddTransformHandler(xgress.PayloadTransformer{})
if err := self.xlink.Init(self.forwarder.MetricsRegistry()); err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion router/xgress/acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"sync/atomic"
)

var acker *Acker
var acker ackSender

type ackSender interface {
ack(ack *Acknowledgement, address Address)
}

func InitAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) {
acker = NewAcker(forwarder, metrics, closeNotify)
Expand Down
38 changes: 38 additions & 0 deletions router/xgress/heartbeat_transformer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress

import (
"encoding/binary"
"github.com/openziti/channel/v3"
"time"
)

type PayloadTransformer struct {
}

func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {}

func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) {
if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 {
if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 {
now := time.Now().UnixNano()
m.PutUint64Header(channel.HeartbeatHeader, uint64(now))
binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now))
}
}
}
2 changes: 1 addition & 1 deletion router/xgress/link_receive_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint
if buffer.tree.Size() > treeSize {
payloadSize := len(payload.Data)
size := atomic.AddUint32(&buffer.size, uint32(payloadSize))
pfxlog.Logger().Tracef("Payload %v of size %v added to transmit buffer. New size: %v", payload.Sequence, payloadSize, size)
pfxlog.Logger().Debugf("Payload %v of size %v added to transmit buffer. New size: %v", payload.Sequence, payloadSize, size)
if payload.Sequence > buffer.maxSequence {
buffer.maxSequence = payload.Sequence
}
Expand Down
56 changes: 49 additions & 7 deletions router/xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,23 +209,41 @@ const (
)

type Payload struct {
Header
Sequence int32
Headers map[uint8][]byte
Data []byte
CircuitId string
Flags uint32
RTT uint16
Sequence int32
Headers map[uint8][]byte
Data []byte
raw []byte
}

func (payload *Payload) GetSequence() int32 {
return payload.Sequence
}

func (payload *Payload) Marshall() *channel.Message {
if payload.raw != nil {
if payload.raw[0]&RttFlagMask != 0 {
rtt := uint16(info.NowInMilliseconds())
b0 := byte(rtt)
b1 := byte(rtt >> 8)
payload.raw[2] = b0
payload.raw[3] = b1
}
return channel.NewMessage(channel.ContentTypeRaw, payload.raw)
}

msg := channel.NewMessage(ContentTypePayloadType, payload.Data)
for key, value := range payload.Headers {
msgHeaderKey := MinHeaderKey + int32(key)
msg.Headers[msgHeaderKey] = value
}
payload.marshallHeader(msg)
msg.Headers[HeaderKeyCircuitId] = []byte(payload.CircuitId)
if payload.Flags != 0 {
msg.PutUint32Header(HeaderKeyFlags, payload.Flags)
}

msg.PutUint64Header(HeaderKeySequence, uint64(payload.Sequence))
msg.PutUint16Header(HeaderKeyRTT, uint16(info.NowInMilliseconds()))

Expand All @@ -249,10 +267,19 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) {
Data: msg.Body,
}

if err := payload.unmarshallHeader(msg); err != nil {
return nil, err
circuitId, ok := msg.Headers[HeaderKeyCircuitId]
if !ok {
return nil, fmt.Errorf("no circuitId found in xgress payload message")
}

// If no flags are present, it just means no flags have been set
flags, _ := msg.GetUint32Header(HeaderKeyFlags)

payload.CircuitId = string(circuitId)
payload.Flags = flags

payload.RTT, _ = msg.GetUint16Header(HeaderKeyRTT)

sequence, ok := msg.GetUint64Header(HeaderKeySequence)
if !ok {
return nil, fmt.Errorf("no sequence found in xgress payload message")
Expand All @@ -270,6 +297,14 @@ func setPayloadFlag(flags uint32, flag PayloadFlag) uint32 {
return uint32(PayloadFlag(flags) | flag)
}

func (payload *Payload) GetCircuitId() string {
return payload.CircuitId
}

func (payload *Payload) GetFlags() uint32 {
return payload.Flags
}

func (payload *Payload) IsCircuitEndFlagSet() bool {
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitEnd)
}
Expand All @@ -278,6 +313,13 @@ func (payload *Payload) IsCircuitStartFlagSet() bool {
return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitStart)
}

func (payload *Payload) GetOriginator() Originator {
if isPayloadFlagSet(payload.Flags, PayloadFlagOriginator) {
return Terminator
}
return Initiator
}

func SetOriginatorFlag(flags uint32, originator Originator) uint32 {
if originator == Initiator {
return ^uint32(PayloadFlagOriginator) & flags
Expand Down
2 changes: 1 addition & 1 deletion router/xgress/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var buffersBlockedByRemoteWindow int64
var outstandingPayloads int64
var outstandingPayloadBytes int64

func InitMetrics(registry metrics.UsageRegistry) {
func InitMetrics(registry metrics.Registry) {
droppedPayloadsMeter = registry.Meter("xgress.dropped_payloads")
retransmissions = registry.Meter("xgress.retransmissions")
retransmissionFailures = registry.Meter("xgress.retransmission_failures")
Expand Down
Loading

0 comments on commit 47a756a

Please sign in to comment.