From 47a756a96379485957c0616ffa14791a9aa5359e Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Fri, 6 Sep 2024 15:18:51 -0400 Subject: [PATCH] Add low-overhead link payload protocol --- common/inspect/circuit_inspections.go | 2 +- router/handler_link/bind.go | 2 + router/xgress/acker.go | 6 +- router/xgress/heartbeat_transformer.go | 38 +++ router/xgress/link_receive_buffer.go | 2 +- router/xgress/messages.go | 56 +++- router/xgress/metrics.go | 2 +- router/xgress/minimal_payload_test.go | 254 ++++++++++++++++ router/xgress/ordering_test.go | 15 +- router/xgress/xgress.go | 296 ++++++++++++++++--- router/xlink_transport/dialer.go | 3 + router/xlink_transport/listener.go | 2 + tests/link_test.go | 10 +- zititest/models/dtls/FlowControl.json | 118 ++++---- zititest/models/dtls/configs/router.yml.tmpl | 10 +- 15 files changed, 677 insertions(+), 139 deletions(-) create mode 100644 router/xgress/heartbeat_transformer.go create mode 100644 router/xgress/minimal_payload_test.go diff --git a/common/inspect/circuit_inspections.go b/common/inspect/circuit_inspections.go index 437440786..7e8606027 100644 --- a/common/inspect/circuit_inspections.go +++ b/common/inspect/circuit_inspections.go @@ -41,7 +41,7 @@ type XgressDetail struct { XgressPointer string `json:"xgressPointer"` LinkSendBufferPointer string `json:"linkSendBufferPointer"` Goroutines []string `json:"goroutines"` - Sequence int32 `json:"sequence"` + Sequence uint64 `json:"sequence"` Flags string `json:"flags"` } diff --git a/router/handler_link/bind.go b/router/handler_link/bind.go index d38dac346..8cd7f8eaa 100644 --- a/router/handler_link/bind.go +++ b/router/handler_link/bind.go @@ -13,6 +13,7 @@ import ( "github.com/openziti/ziti/router/env" "github.com/openziti/ziti/router/forwarder" metrics2 "github.com/openziti/ziti/router/metrics" + "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -79,6 +80,7 @@ func (self *bindHandler) BindChannel(binding channel.Binding) error { binding.AddTypedReceiveHandler(newControlHandler(self.xlink, self.forwarder)) binding.AddPeekHandler(metrics2.NewChannelPeekHandler(self.xlink.Id(), self.forwarder.MetricsRegistry())) binding.AddPeekHandler(trace.NewChannelPeekHandler(self.xlink.Id(), ch, self.forwarder.TraceController())) + binding.AddTransformHandler(xgress.PayloadTransformer{}) if err := self.xlink.Init(self.forwarder.MetricsRegistry()); err != nil { return err } diff --git a/router/xgress/acker.go b/router/xgress/acker.go index d949e5a97..08b8a202f 100644 --- a/router/xgress/acker.go +++ b/router/xgress/acker.go @@ -7,7 +7,11 @@ import ( "sync/atomic" ) -var acker *Acker +var acker ackSender + +type ackSender interface { + ack(ack *Acknowledgement, address Address) +} func InitAcker(forwarder PayloadBufferForwarder, metrics metrics.Registry, closeNotify <-chan struct{}) { acker = NewAcker(forwarder, metrics, closeNotify) diff --git a/router/xgress/heartbeat_transformer.go b/router/xgress/heartbeat_transformer.go new file mode 100644 index 000000000..85eea5e7d --- /dev/null +++ b/router/xgress/heartbeat_transformer.go @@ -0,0 +1,38 @@ +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package xgress + +import ( + "encoding/binary" + "github.com/openziti/channel/v3" + "time" +) + +type PayloadTransformer struct { +} + +func (self PayloadTransformer) Rx(*channel.Message, channel.Channel) {} + +func (self PayloadTransformer) Tx(m *channel.Message, ch channel.Channel) { + if m.ContentType == channel.ContentTypeRaw && len(m.Body) > 1 { + if m.Body[0]&HeartbeatFlagMask != 0 && len(m.Body) > 12 { + now := time.Now().UnixNano() + m.PutUint64Header(channel.HeartbeatHeader, uint64(now)) + binary.BigEndian.PutUint64(m.Body[len(m.Body)-8:], uint64(now)) + } + } +} diff --git a/router/xgress/link_receive_buffer.go b/router/xgress/link_receive_buffer.go index 3f638b4bc..fd71e4863 100644 --- a/router/xgress/link_receive_buffer.go +++ b/router/xgress/link_receive_buffer.go @@ -61,7 +61,7 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint if buffer.tree.Size() > treeSize { payloadSize := len(payload.Data) size := atomic.AddUint32(&buffer.size, uint32(payloadSize)) - pfxlog.Logger().Tracef("Payload %v of size %v added to transmit buffer. New size: %v", payload.Sequence, payloadSize, size) + pfxlog.Logger().Debugf("Payload %v of size %v added to transmit buffer. New size: %v", payload.Sequence, payloadSize, size) if payload.Sequence > buffer.maxSequence { buffer.maxSequence = payload.Sequence } diff --git a/router/xgress/messages.go b/router/xgress/messages.go index 6510fcc69..2953c9843 100644 --- a/router/xgress/messages.go +++ b/router/xgress/messages.go @@ -209,10 +209,13 @@ const ( ) type Payload struct { - Header - Sequence int32 - Headers map[uint8][]byte - Data []byte + CircuitId string + Flags uint32 + RTT uint16 + Sequence int32 + Headers map[uint8][]byte + Data []byte + raw []byte } func (payload *Payload) GetSequence() int32 { @@ -220,12 +223,27 @@ func (payload *Payload) GetSequence() int32 { } func (payload *Payload) Marshall() *channel.Message { + if payload.raw != nil { + if payload.raw[0]&RttFlagMask != 0 { + rtt := uint16(info.NowInMilliseconds()) + b0 := byte(rtt) + b1 := byte(rtt >> 8) + payload.raw[2] = b0 + payload.raw[3] = b1 + } + return channel.NewMessage(channel.ContentTypeRaw, payload.raw) + } + msg := channel.NewMessage(ContentTypePayloadType, payload.Data) for key, value := range payload.Headers { msgHeaderKey := MinHeaderKey + int32(key) msg.Headers[msgHeaderKey] = value } - payload.marshallHeader(msg) + msg.Headers[HeaderKeyCircuitId] = []byte(payload.CircuitId) + if payload.Flags != 0 { + msg.PutUint32Header(HeaderKeyFlags, payload.Flags) + } + msg.PutUint64Header(HeaderKeySequence, uint64(payload.Sequence)) msg.PutUint16Header(HeaderKeyRTT, uint16(info.NowInMilliseconds())) @@ -249,10 +267,19 @@ func UnmarshallPayload(msg *channel.Message) (*Payload, error) { Data: msg.Body, } - if err := payload.unmarshallHeader(msg); err != nil { - return nil, err + circuitId, ok := msg.Headers[HeaderKeyCircuitId] + if !ok { + return nil, fmt.Errorf("no circuitId found in xgress payload message") } + // If no flags are present, it just means no flags have been set + flags, _ := msg.GetUint32Header(HeaderKeyFlags) + + payload.CircuitId = string(circuitId) + payload.Flags = flags + + payload.RTT, _ = msg.GetUint16Header(HeaderKeyRTT) + sequence, ok := msg.GetUint64Header(HeaderKeySequence) if !ok { return nil, fmt.Errorf("no sequence found in xgress payload message") @@ -270,6 +297,14 @@ func setPayloadFlag(flags uint32, flag PayloadFlag) uint32 { return uint32(PayloadFlag(flags) | flag) } +func (payload *Payload) GetCircuitId() string { + return payload.CircuitId +} + +func (payload *Payload) GetFlags() uint32 { + return payload.Flags +} + func (payload *Payload) IsCircuitEndFlagSet() bool { return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitEnd) } @@ -278,6 +313,13 @@ func (payload *Payload) IsCircuitStartFlagSet() bool { return isPayloadFlagSet(payload.Flags, PayloadFlagCircuitStart) } +func (payload *Payload) GetOriginator() Originator { + if isPayloadFlagSet(payload.Flags, PayloadFlagOriginator) { + return Terminator + } + return Initiator +} + func SetOriginatorFlag(flags uint32, originator Originator) uint32 { if originator == Initiator { return ^uint32(PayloadFlagOriginator) & flags diff --git a/router/xgress/metrics.go b/router/xgress/metrics.go index 1aa49a415..a84003f3d 100644 --- a/router/xgress/metrics.go +++ b/router/xgress/metrics.go @@ -21,7 +21,7 @@ var buffersBlockedByRemoteWindow int64 var outstandingPayloads int64 var outstandingPayloadBytes int64 -func InitMetrics(registry metrics.UsageRegistry) { +func InitMetrics(registry metrics.Registry) { droppedPayloadsMeter = registry.Meter("xgress.dropped_payloads") retransmissions = registry.Meter("xgress.retransmissions") retransmissionFailures = registry.Meter("xgress.retransmission_failures") diff --git a/router/xgress/minimal_payload_test.go b/router/xgress/minimal_payload_test.go new file mode 100644 index 000000000..2abf9a943 --- /dev/null +++ b/router/xgress/minimal_payload_test.go @@ -0,0 +1,254 @@ +package xgress + +import ( + "encoding/binary" + "errors" + "fmt" + "github.com/michaelquigley/pfxlog" + "github.com/openziti/channel/v3" + "github.com/openziti/metrics" + "github.com/openziti/ziti/controller/idgen" + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/sirupsen/logrus" + "io" + "testing" + "time" +) + +func newTestXgConn(bufferSize int, targetSends uint32, targetReceives uint32) *testXgConn { + return &testXgConn{ + bufferSize: bufferSize, + targetSends: targetSends, + targetReceives: targetReceives, + done: make(chan struct{}), + errs: make(chan error, 1), + } +} + +type testXgConn struct { + bufferSize int + targetSends uint32 + targetReceives uint32 + sendCounter uint32 + recvCounter uint32 + done chan struct{} + errs chan error + bufCounter uint32 +} + +func (self *testXgConn) Close() error { + return nil +} + +func (self *testXgConn) LogContext() string { + return "test" +} + +func (self *testXgConn) ReadPayload() ([]byte, map[uint8][]byte, error) { + if self.targetSends == 0 { + time.Sleep(time.Minute) + } + buf := make([]byte, self.bufferSize) + sl := buf + for len(sl) > 0 && self.sendCounter < self.targetSends { + binary.BigEndian.PutUint32(sl, self.sendCounter) + self.sendCounter++ + sl = sl[4:] + } + + if len(sl) > 0 { + buf = buf[:len(buf)-len(sl)] + } + + if self.sendCounter >= self.targetSends { + fmt.Printf("sending final %d bytes\n", len(buf)) + return buf, nil, io.EOF + } + + fmt.Printf("sending %d bytes\n", len(buf)) + + return buf, nil, nil +} + +func (self *testXgConn) WritePayload(buf []byte, m map[uint8][]byte) (int, error) { + sl := buf + for len(sl) > 0 { + next := binary.BigEndian.Uint32(sl) + sl = sl[4:] + if next != self.recvCounter { + select { + case self.errs <- fmt.Errorf("expected counter %d, got %d, buf: %d", self.recvCounter, next, self.bufCounter): + default: + } + } + self.recvCounter++ + if self.recvCounter == self.targetReceives { + close(self.done) + } else if self.recvCounter > self.targetReceives { + select { + case self.errs <- fmt.Errorf("exceeded expected counter %d, got %d, buf: %d", self.targetReceives, self.recvCounter, self.bufCounter): + default: + } + } + } + + fmt.Printf("received %d bytes\n", len(buf)) + self.bufCounter++ + + return len(buf), nil +} + +func (self *testXgConn) HandleControlMsg(controlType ControlType, headers channel.Headers, responder ControlReceiver) error { + panic("implement me") +} + +type testIntermediary struct { + circuitId string + dest *Xgress + msgs channel.MessageStrategy + payloadTransformer PayloadTransformer + counter uint64 +} + +func (self *testIntermediary) HandleXgressReceive(payload *Payload, x *Xgress) { + m := payload.Marshall() + self.payloadTransformer.Tx(m, nil) + b, err := self.msgs.GetMarshaller()(m) + if err != nil { + panic(err) + } + + m, err = self.msgs.GetPacketProducer()(b) + if err != nil { + panic(err) + } + + if err = self.validateMessage(m); err != nil { + panic(err) + } + + payload, err = UnmarshallPayload(m) + if err != nil { + panic(err) + } + + if err = self.dest.SendPayload(payload, 0, PayloadTypeXg); err != nil { + panic(err) + } + fmt.Printf("transmitted payload %d from %s -> %s\n", payload.Sequence, x.address, self.dest.address) +} + +func (self *testIntermediary) validateMessage(m *channel.Message) error { + circuitId, found := m.GetStringHeader(HeaderKeyCircuitId) + if !found { + return errors.New("no circuit id found") + } + + if circuitId != self.circuitId { + return fmt.Errorf("expected circuit id %s, got %s", self.circuitId, circuitId) + } + + seq, found := m.GetUint64Header(HeaderKeySequence) + if !found { + return errors.New("no sequence found") + } + if seq != self.counter { + return fmt.Errorf("expected sequence %d, got %d", self.counter, seq) + } + self.counter++ + + return nil +} + +func (self *testIntermediary) HandleControlReceive(control *Control, x *Xgress) { + panic("implement me") +} + +type testAcker struct { + destinations cmap.ConcurrentMap[string, *Xgress] +} + +func (self *testAcker) ack(ack *Acknowledgement, address Address) { + dest, _ := self.destinations.Get(string(address)) + if dest != nil { + if err := dest.SendAcknowledgement(ack); err != nil { + panic(err) + } + } else { + panic(fmt.Errorf("no xgress found with id %s", string(address))) + } + fmt.Printf("forwarded ack from %s -> %s\n", address, dest.address) +} + +type mockForwarder struct{} + +func (m mockForwarder) RetransmitPayload(srcAddr Address, payload *Payload) error { + return nil +} + +func (m mockForwarder) ForwardAcknowledgement(srcAddr Address, acknowledgement *Acknowledgement) error { + return nil +} + +type mockFaulter struct{} + +func (m mockFaulter) ReportForwardingFault(circuitId string, ctrlId string) { +} + +func Test_MinimalPayloadMarshalling(t *testing.T) { + logOptions := pfxlog.DefaultOptions().SetTrimPrefix("github.com/openziti/").NoColor() + pfxlog.GlobalInit(logrus.InfoLevel, logOptions) + pfxlog.SetFormatter(pfxlog.NewFormatter(pfxlog.DefaultOptions().SetTrimPrefix("github.com/openziti/").StartingToday())) + + metricsRegistry := metrics.NewRegistry("test", nil) + InitMetrics(metricsRegistry) + + closeNotify := make(chan struct{}) + defer func() { + close(closeNotify) + }() + + InitPayloadIngester(closeNotify) + InitRetransmitter(mockForwarder{}, mockFaulter{}, metricsRegistry, closeNotify) + + ackHandler := &testAcker{ + destinations: cmap.New[*Xgress](), + } + acker = ackHandler + options := DefaultOptions() + options.Mtu = 1400 + + circuitId := idgen.New() + srcTestConn := newTestXgConn(10_000, 100_000, 0) + dstTestConn := newTestXgConn(10_000, 0, 100_000) + + srcXg := NewXgress(circuitId, "ctrl", "src", srcTestConn, Initiator, options, nil) + dstXg := NewXgress(circuitId, "ctrl", "dst", dstTestConn, Terminator, options, nil) + + ackHandler.destinations.Set("src", dstXg) + ackHandler.destinations.Set("dst", srcXg) + + msgStrategy := channel.DatagramMessageStrategy(UnmarshallPacketPayload) + srcXg.receiveHandler = &testIntermediary{ + circuitId: circuitId, + dest: dstXg, + msgs: msgStrategy, + } + + dstXg.receiveHandler = &testIntermediary{ + circuitId: circuitId, + dest: srcXg, + msgs: msgStrategy, + } + + srcXg.Start() + dstXg.Start() + + select { + case <-dstTestConn.done: + case err := <-dstTestConn.errs: + t.Fatal(err) + case <-time.After(time.Second): + t.Fatal("timeout") + } +} diff --git a/router/xgress/ordering_test.go b/router/xgress/ordering_test.go index 04b9a44f6..b2fabf8c0 100644 --- a/router/xgress/ordering_test.go +++ b/router/xgress/ordering_test.go @@ -90,15 +90,12 @@ func Test_Ordering(t *testing.T) { data := make([]byte, 8) binary.LittleEndian.PutUint64(data, uint64(i)) payload := &Payload{ - Header: Header{ - CircuitId: "test", - Flags: SetOriginatorFlag(0, Terminator), - RecvBufferSize: 16000, - RTT: 0, - }, - Sequence: int32(i), - Headers: nil, - Data: data, + CircuitId: "test", + Flags: SetOriginatorFlag(0, Terminator), + RTT: 0, + Sequence: int32(i), + Headers: nil, + Data: data, } if err := x.SendPayload(payload, 0, PayloadTypeXg); err != nil { errorCh <- err diff --git a/router/xgress/xgress.go b/router/xgress/xgress.go index 6fbed8a3e..45b5d1cb3 100644 --- a/router/xgress/xgress.go +++ b/router/xgress/xgress.go @@ -145,7 +145,7 @@ type Xgress struct { Options *Options txQueue chan *Payload closeNotify chan struct{} - rxSequence int32 + rxSequence uint64 rxSequenceLock sync.Mutex receiveHandler ReceiveHandler payloadBuffer *LinkSendBuffer @@ -264,24 +264,20 @@ func (self *Xgress) Label() string { func (self *Xgress) GetStartCircuit() *Payload { startCircuit := &Payload{ - Header: Header{ - CircuitId: self.circuitId, - Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitStart), self.originator), - }, - Sequence: self.nextReceiveSequence(), - Data: nil, + CircuitId: self.circuitId, + Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitStart), self.originator), + Sequence: int32(self.nextReceiveSequence()), + Data: nil, } return startCircuit } func (self *Xgress) GetEndCircuit() *Payload { endCircuit := &Payload{ - Header: Header{ - CircuitId: self.circuitId, - Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitEnd), self.originator), - }, - Sequence: self.nextReceiveSequence(), - Data: nil, + CircuitId: self.circuitId, + Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitEnd), self.originator), + Sequence: int32(self.nextReceiveSequence()), + Data: nil, } return endCircuit } @@ -388,14 +384,11 @@ func (self *Xgress) HandleControlReceive(controlType ControlType, headers channe func (self *Xgress) payloadIngester(payload *Payload) { if payload.IsCircuitStartFlagSet() && self.firstCircuitStartReceived() { - pfxlog.ContextLogger(self.Label()).WithFields(payload.GetLoggerFields()).Debug("received circuit start, starting xgress receiver") go self.rx() } if !self.Options.RandomDrops || rand.Int31n(self.Options.Drop1InN) != 1 { self.PayloadReceived(payload) - } else { - pfxlog.ContextLogger(self.Label()).WithFields(payload.GetLoggerFields()).Error("drop!") } self.queueSends() } @@ -488,7 +481,7 @@ func (self *Xgress) tx() { return false } else { payloadWriteTimer.UpdateSince(start) - payloadLogger.Infof("payload sent [%s]", info.ByteCount(int64(n))) + payloadLogger.Debugf("payload sent [%s]", info.ByteCount(int64(n))) } } return true @@ -499,7 +492,7 @@ func (self *Xgress) tx() { payloadStarted := false payloadComplete := false - var payloadSize int64 + var payloadSize uint64 var payloadWriteOffset int for { @@ -520,18 +513,20 @@ func (self *Xgress) tx() { var payloadReadOffset int if !payloadStarted { - payloadSize, payloadReadOffset = binary.Varint(payloadChunk.Data) + payloadSize, payloadReadOffset = binary.Uvarint(payloadChunk.Data) - if len(payloadChunk.Data) == 0 || payloadSize+int64(payloadReadOffset) == int64(len(payloadChunk.Data)) { + if len(payloadChunk.Data) == 0 || payloadSize+uint64(payloadReadOffset) == uint64(len(payloadChunk.Data)) { payload = payloadChunk payload.Data = payload.Data[payloadReadOffset:] payloadComplete = true } else { payload = &Payload{ - Header: payloadChunk.Header, - Sequence: payloadChunk.Sequence, - Headers: payloadChunk.Headers, - Data: make([]byte, payloadSize), + CircuitId: payloadChunk.CircuitId, + Flags: payloadChunk.Flags, + RTT: payloadChunk.RTT, + Sequence: payloadChunk.Sequence, + Headers: payloadChunk.Headers, + Data: make([]byte, payloadSize), } } payloadStarted = true @@ -541,7 +536,7 @@ func (self *Xgress) tx() { chunkData := payloadChunk.Data[payloadReadOffset:] copy(payload.Data[payloadWriteOffset:], chunkData) payloadWriteOffset += len(chunkData) - payloadComplete = int64(payloadWriteOffset) == payloadSize + payloadComplete = uint64(payloadWriteOffset) == payloadSize } payloadLogger := log.WithFields(payload.GetLoggerFields()) @@ -577,6 +572,64 @@ func (self *Xgress) flushSendThenClose() { }) } +/** + Payload format + + Field 1: 1 byte - version and flags + Masks + * 00000000 - Always 0 to indicate type. The standard channel header 4 byte protocol indicator has a 1 in byte 0 of the first byte + * 00000110 - Version, v0-v3. Assumption is that if we ever get to v4, we can roll back to 0, since everything + should have upgraded past v0 by that point + * 00001000 - Terminator Flag - indicates the payload origin, initiator (0) or terminator (1) + * 00010000 - RTT Flag. Indicates if the payload contains an RTT. We don't need to send RTT on every payload. + * 00100000 - Chunk Flag. Indicates if this payload is chunked. + * 01000000 - Headers flag. Indicates this payload contains headers. + * 10000000 - Heartbeat Flag. Indicates the payload contains a heartbeat + + Field 2: 1 byte, Circuit id size + Masks + * 00001111 - Number of bytes in circuit id. Supports circuit ids which take up to 15 bytes. + Circuits ids are currently at 9 bytes. + * 11110000 - currently unused + + Field 3: RTT (optional) + - 2 bytes + + Field 4: CircuitId + - direct bytes representation of string encoded circuit id + + Field 5: Sequence number + - Encoded using binary.PutUvarint + + Field 6: Headers + - Presence indicated by headers flag in first field + length - encoded with binary.PutUvarint + for each key/value pair - + key - 1 byte + value length - encoded with binary.PutUvarint + value - byte array, directly appended + + + Field 7: Data + + Field 8: Heartbeat + - 8 bytes + - only included if there's extra room +*/ + +const ( + VersionMask byte = 0b00000110 + TerminatorFlagMask byte = 0b00001000 + RttFlagMask byte = 0b00010000 + ChunkFlagMask byte = 0b00100000 + HeadersFlagMask byte = 0b01000000 + HeartbeatFlagMask byte = 0b10000000 + + CircuitIdSizeMask byte = 0b00001111 + PayloadProtocolV1 byte = 1 + PayloadProtocolOffset byte = 1 +) + func (self *Xgress) rx() { log := pfxlog.ContextLogger(self.Label()) @@ -612,7 +665,7 @@ func (self *Xgress) rx() { return } - if n < int(self.Options.Mtu) || self.Options.Mtu == 0 { + if self.Options.Mtu == 0 { if !self.sendUnchunkedBuffer(buffer, headers) { return } @@ -620,30 +673,91 @@ func (self *Xgress) rx() { } first := true + chunked := false for len(buffer) > 0 { + seq := self.nextReceiveSequence() + chunk := make([]byte, self.Options.Mtu) - dataTarget := chunk - offset := 0 - if first { - offset = binary.PutVarint(chunk, int64(n)) - dataTarget = chunk[offset:] + + flagsHeader := VersionMask & (PayloadProtocolV1 << PayloadProtocolOffset) + var sizesHeader byte + if self.originator == Terminator { + flagsHeader |= TerminatorFlagMask + } + + written := 2 + rest := chunk[2:] + includeRtt := seq%5 == 0 + if includeRtt { + flagsHeader |= RttFlagMask + written += 2 + rest = rest[2:] } - written := copy(dataTarget, buffer) - buffer = buffer[written:] + size := copy(rest, self.circuitId) + sizesHeader |= CircuitIdSizeMask & uint8(size) + written += size + rest = rest[size:] + size = binary.PutUvarint(rest, seq) + rest = rest[size:] + written += size + + if first && len(headers) > 0 { + flagsHeader |= HeadersFlagMask + size, err = writeU8ToBytesMap(headers, rest) + if err != nil { + log.WithError(err).Error("payload encoding error, closing") + return + } + rest = rest[size:] + size += written + } + + data := rest + dataLen := 0 + if first && len(rest) < len(buffer) { + chunked = true + size = binary.PutUvarint(rest, uint64(n)) + dataLen += size + written += size + rest = rest[size:] + } + + if chunked { + flagsHeader |= ChunkFlagMask + } + + size = copy(rest, buffer) + written += size + dataLen += size + + buffer = buffer[size:] + + // check if there's room for a heartbeat + if written+8 <= len(chunk) { + flagsHeader |= HeartbeatFlagMask + written += 8 + } + + chunk[0] = flagsHeader + chunk[1] = sizesHeader payload := &Payload{ - Header: Header{ - CircuitId: self.circuitId, - Flags: setPayloadFlag(SetOriginatorFlag(0, self.originator), PayloadFlagChunk), - }, - Sequence: self.nextReceiveSequence(), - Data: chunk[:offset+written], + CircuitId: self.circuitId, + Flags: SetOriginatorFlag(0, self.originator), + Sequence: int32(seq), + Data: data[:dataLen], + raw: chunk[:written], + } + + if chunked { + payload.Flags = setPayloadFlag(payload.Flags, PayloadFlagChunk) } if first { payload.Headers = headers } + log.Debugf("sending payload chunk. seq: %d, first: %v, chunk size: %d, payload size: %d, remainder: %d", payload.Sequence, first, len(payload.Data), n, len(buffer)) first = false @@ -665,13 +779,11 @@ func (self *Xgress) sendUnchunkedBuffer(buf []byte, headers map[uint8][]byte) bo log := pfxlog.ContextLogger(self.Label()) payload := &Payload{ - Header: Header{ - CircuitId: self.circuitId, - Flags: SetOriginatorFlag(0, self.originator), - }, - Sequence: self.nextReceiveSequence(), - Data: buf, - Headers: headers, + CircuitId: self.circuitId, + Flags: SetOriginatorFlag(0, self.originator), + Sequence: int32(self.nextReceiveSequence()), + Data: buf, + Headers: headers, } log.Debugf("sending unchunked payload. seq: %d, payload size: %d", payload.Sequence, len(payload.Data)) @@ -704,7 +816,7 @@ func (self *Xgress) forwardPayload(payload *Payload) bool { return true } -func (self *Xgress) nextReceiveSequence() int32 { +func (self *Xgress) nextReceiveSequence() uint64 { self.rxSequenceLock.Lock() defer self.rxSequenceLock.Unlock() @@ -743,10 +855,10 @@ func (self *Xgress) SendEmptyAck() { acker.ack(ack, self.address) } -func (self *Xgress) GetSequence() int32 { +func (self *Xgress) GetSequence() uint64 { self.rxSequenceLock.Lock() defer self.rxSequenceLock.Unlock() - return self.rxSequence + return uint64(self.rxSequence) } func (self *Xgress) InspectCircuit(detail *inspect.CircuitInspectDetail) { @@ -819,3 +931,89 @@ func (self *Xgress) addGoroutineIfRelated(buf *bytes.Buffer, xgressRelated bool, } return result } + +func UnmarshallPacketPayload(buf []byte) (*channel.Message, error) { + flagsField := buf[0] + if flagsField&1 != 0 { + return channel.ReadV2(bytes.NewBuffer(buf)) + } + version := (flagsField & VersionMask) >> 1 + if version != PayloadProtocolV1 { + return nil, fmt.Errorf("unsupported version: %d", version) + } + sizeField := buf[1] + circuitIdSize := CircuitIdSizeMask & sizeField + rest := buf[2:] + + var rtt *uint16 + if flagsField&RttFlagMask != 0 { + b0 := rest[0] + b1 := rest[1] + rest = rest[2:] + val := uint16(b0) | (uint16(b1) << 8) + rtt = &val + } + + var heartbeat *uint64 + if flagsField&HeartbeatFlagMask != 0 { + val := binary.BigEndian.Uint64(rest[len(rest)-8:]) + heartbeat = &val + rest = rest[:len(rest)-8] + } + + circuitId := string(rest[:circuitIdSize]) + rest = rest[circuitIdSize:] + seq, read := binary.Uvarint(rest) + rest = rest[read:] + + if flagsField&HeadersFlagMask != 0 { + // TODO: read headers + } + + msg := channel.NewMessage(ContentTypePayloadType, rest) + msg.PutStringHeader(HeaderKeyCircuitId, circuitId) + msg.PutUint64Header(HeaderKeySequence, seq) + if heartbeat != nil { + msg.PutUint64Header(channel.HeartbeatHeader, *heartbeat) + } + + flags := uint32(0) + + if flagsField&ChunkFlagMask != 0 { + flags = setPayloadFlag(flags, PayloadFlagChunk) + } + + if flagsField&TerminatorFlagMask != 0 { + flags = setPayloadFlag(flags, PayloadFlagOriginator) + } + + if flags != 0 { + msg.PutUint32Header(HeaderKeyFlags, flags) + } + + if rtt != nil { + msg.PutUint16Header(HeaderKeyRTT, *rtt) + } + + return msg, nil +} + +func writeU8ToBytesMap(m map[uint8][]byte, buf []byte) (int, error) { + written := binary.PutUvarint(buf, uint64(len(m))) + buf = buf[:written] + for k, v := range m { + if len(buf) < 10 { + return 0, fmt.Errorf("header too large, payload has insufficient space") + } + buf[0] = k + lenLen := binary.PutUvarint(buf[1:], uint64(len(v))) + buf = buf[lenLen+1:] + written += 1 + lenLen + if len(buf) < len(v) { + return 0, fmt.Errorf("header too large, payload has insufficient space") + } + written += copy(buf[1+lenLen:], v) + } + + return written, nil +} diff --git a/router/xlink_transport/dialer.go b/router/xlink_transport/dialer.go index 583819f50..009edd4f4 100644 --- a/router/xlink_transport/dialer.go +++ b/router/xlink_transport/dialer.go @@ -23,6 +23,7 @@ import ( "github.com/openziti/identity" "github.com/openziti/metrics" "github.com/openziti/transport/v2" + "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -114,6 +115,7 @@ func (self *dialer) dialSplit(linkId *identity.TokenId, address transport.Addres LocalBinding: self.config.localBinding, Headers: headers, TransportConfig: self.transportConfig, + MessageStrategy: channel.DatagramMessageStrategy(xgress.UnmarshallPacketPayload), } payloadDialer := channel.NewClassicDialer(channelDialerConfig) @@ -175,6 +177,7 @@ func (self *dialer) dialSingle(linkId *identity.TokenId, address transport.Addre LocalBinding: self.config.localBinding, Headers: headers, TransportConfig: self.transportConfig, + MessageStrategy: channel.DatagramMessageStrategy(xgress.UnmarshallPacketPayload), }) bindHandler := &dialBindHandler{ diff --git a/router/xlink_transport/listener.go b/router/xlink_transport/listener.go index bfeac6181..c4f414053 100644 --- a/router/xlink_transport/listener.go +++ b/router/xlink_transport/listener.go @@ -24,6 +24,7 @@ import ( "github.com/openziti/metrics" "github.com/openziti/transport/v2" fabricMetrics "github.com/openziti/ziti/common/metrics" + "github.com/openziti/ziti/router/xgress" "github.com/openziti/ziti/router/xlink" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -51,6 +52,7 @@ func (self *listener) Listen() error { TransportConfig: self.tcfg, PoolConfigurator: fabricMetrics.GoroutinesPoolMetricsConfigF(self.metricsRegistry, "pool.listener.link"), ConnectionHandlers: []channel.ConnectionHandler{&ConnectionHandler{self.id}}, + MessageStrategy: channel.DatagramMessageStrategy(xgress.UnmarshallPacketPayload), } var err error diff --git a/tests/link_test.go b/tests/link_test.go index 2466706c2..aa1f6dced 100644 --- a/tests/link_test.go +++ b/tests/link_test.go @@ -205,12 +205,10 @@ func Test_UnrequestedLinkFromValidRouter(t *testing.T) { } else { for i := int32(0); i < 100 && err == nil; i++ { payload := &xgress.Payload{ - Header: xgress.Header{ - CircuitId: "hello", - }, - Sequence: i, - Headers: nil, - Data: []byte{1, 2, 3, 4}, + CircuitId: "hello", + Sequence: i, + Headers: nil, + Data: []byte{1, 2, 3, 4}, } err = xla.getLink().SendPayload(payload, time.Second, xgress.PayloadTypeXg) ctx.Req.NoErrorf(err, "iteration %v", i) diff --git a/zititest/models/dtls/FlowControl.json b/zititest/models/dtls/FlowControl.json index 8d9d2daa7..a2cd46386 100644 --- a/zititest/models/dtls/FlowControl.json +++ b/zititest/models/dtls/FlowControl.json @@ -31,7 +31,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -111,7 +111,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -173,7 +173,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -253,7 +253,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -315,7 +315,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -395,7 +395,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -457,7 +457,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -537,7 +537,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -599,7 +599,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -679,7 +679,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -741,7 +741,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -821,7 +821,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -883,7 +883,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -963,7 +963,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1025,7 +1025,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -1105,7 +1105,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1167,7 +1167,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -1272,7 +1272,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1328,7 +1328,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1385,7 +1385,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1442,7 +1442,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1499,7 +1499,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1560,7 +1560,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -1640,7 +1640,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1702,7 +1702,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -1782,7 +1782,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1839,7 +1839,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1896,7 +1896,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -1953,7 +1953,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2014,7 +2014,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -2094,7 +2094,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2150,7 +2150,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2207,7 +2207,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2264,7 +2264,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2321,7 +2321,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2382,7 +2382,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -2463,7 +2463,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2519,7 +2519,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -2600,7 +2600,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2656,7 +2656,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -2736,7 +2736,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2796,7 +2796,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -2876,7 +2876,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -2936,7 +2936,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3016,7 +3016,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3050,7 +3050,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3130,7 +3130,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3165,7 +3165,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3245,7 +3245,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3275,7 +3275,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3310,7 +3310,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3417,7 +3417,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3469,7 +3469,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3522,7 +3522,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3579,7 +3579,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3659,7 +3659,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3719,7 +3719,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "fieldConfig": { "defaults": { @@ -3800,7 +3800,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { @@ -3856,7 +3856,7 @@ { "datasource": { "type": "influxdb", - "uid": "ddwawc0jfp1q8e" + "uid": "bdxepfvogbev4d" }, "groupBy": [ { diff --git a/zititest/models/dtls/configs/router.yml.tmpl b/zititest/models/dtls/configs/router.yml.tmpl index b30870644..c6e5f64a8 100644 --- a/zititest/models/dtls/configs/router.yml.tmpl +++ b/zititest/models/dtls/configs/router.yml.tmpl @@ -38,14 +38,14 @@ transport: westworld3: profile_version: 1 #max_segment_sz: 3000 - dtls: + #dtls: # maxBytesPerSecond: 500000 link: listeners: - binding: transport - bind: dtls:0.0.0.0:6000 - advertise: dtls:{{$router_ip}}:6000 + bind: tls:0.0.0.0:6000 + advertise: tls:{{$router_ip}}:6000 dialers: - binding: transport split: false @@ -53,7 +53,7 @@ link: dialers: - binding: tunnel options: - mtu: 1310 + #mtu: 1435 #txPortalStartSize: 4192000 txPortalIncreaseThresh: 250 txPortalIncreaseScale: 0.5 @@ -68,7 +68,7 @@ listeners: - binding: tunnel options: mode: tproxy - mtu: 1310 + #mtu: 1435 #txPortalStartSize: 4192000 txPortalIncreaseThresh: 250 txPortalIncreaseScale: 0.5