Skip to content

Commit

Permalink
DTLS tuning
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Sep 3, 2024
1 parent c405c48 commit 4fbbe89
Show file tree
Hide file tree
Showing 23 changed files with 4,258 additions and 153 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ require (
github.com/openziti/sdk-golang v0.23.40
github.com/openziti/secretstream v0.1.21
github.com/openziti/storage v0.3.0
github.com/openziti/transport/v2 v2.0.143
github.com/openziti/transport/v2 v2.0.144-0.20240903212250-65f868ed70b2
github.com/openziti/x509-claims v1.0.3
github.com/openziti/xweb/v2 v2.1.1
github.com/openziti/ziti-db-explorer v1.1.3
Expand Down Expand Up @@ -131,6 +131,7 @@ require (
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/iancoleman/strcase v0.1.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/josharian/native v1.1.0 // indirect
Expand All @@ -153,7 +154,8 @@ require (
github.com/muhlemmer/httpforwarded v0.1.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect
github.com/openziti/dilithium v0.3.3 // indirect
github.com/openziti-incubator/cf v0.0.3 // indirect
github.com/openziti/dilithium v0.3.5 // indirect
github.com/parallaxsecond/parsec-client-go v0.0.0-20221025095442-f0a77d263cf9 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pion/dtls/v3 v3.0.1 // indirect
Expand Down
22 changes: 11 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174 h1:WlZsjVhE8Af9IcZDGgJGQpNflI3+MJSBhsgT5PCtzBQ=
github.com/hinshun/vt10x v0.0.0-20180616224451-1954e6464174/go.mod h1:DqJ97dSdRW1W22yXSB90986pcOyQ7r45iio1KN2ez1A=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.1.3 h1:dJBk1m2/qjL1twPLf68JND55vvivMupZ4wIzE8CTdBw=
github.com/iancoleman/strcase v0.1.3/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
Expand Down Expand Up @@ -433,7 +435,6 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -510,7 +511,7 @@ github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8Ku
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQE9x6ikvDFZS2mDVS3drnohI=
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/michaelquigley/pfxlog v0.3.1/go.mod h1:pwJmJ9nN787xk9U+0z8/5nP2GMhU6GGCTfgBWGpiXaQ=
github.com/michaelquigley/pfxlog v0.6.1/go.mod h1:z864Y2uU0O4QA5OoclXIOdoX5Tme2zm66FLAJ+R1jZs=
github.com/michaelquigley/pfxlog v0.6.10 h1:IbC/H3MmSDcPlQHF1UZPQU13Dkrs0+ycWRyQd2ihnjw=
github.com/michaelquigley/pfxlog v0.6.10/go.mod h1:gEiNTfKEX6cJHSwRpOuqBpc8oYrlhMiDK/xMk/gV7D0=
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
Expand Down Expand Up @@ -565,14 +566,16 @@ github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je4
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A=
github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/openziti-incubator/cf v0.0.3 h1:JKs55DbaIxl87nI/Ra/3DHMiz5iaPpu8JjsuN8SnG7w=
github.com/openziti-incubator/cf v0.0.3/go.mod h1:6abCY06bCjKmK2I9kohij+cp9uXIPFiFwSCNZPdMk8E=
github.com/openziti/agent v1.0.17 h1:CNBVWl8m4CWXz/pmdWjEhg1rvtUGinQNRAr3vgF90go=
github.com/openziti/agent v1.0.17/go.mod h1:GJVKVikwmvZ0U+hNP7Zi2P+xd/wTb6VZ9wz24/2WQ+U=
github.com/openziti/channel/v2 v2.0.143 h1:GPbcITZX5+vfGq+i8GJ4WWfoMCuEuDlxd8gIvQGFaH8=
github.com/openziti/channel/v2 v2.0.143/go.mod h1:SKka1yjVzBpz8Zy5wuTHHEjfArRo8aGBPs+2XZWheso=
github.com/openziti/cobra-to-md v1.0.1 h1:WRinNoIRmwWUSJm+pSNXMjOrtU48oxXDZgeCYQfVXxE=
github.com/openziti/cobra-to-md v1.0.1/go.mod h1:FjCpk/yzHF7/r28oSTNr5P57yN5VolpdAtS/g7KNi2c=
github.com/openziti/dilithium v0.3.3 h1:PLgQ6PMNLSTzCFbX/h98cmudgz/cU6TmjdSv5NAPD8k=
github.com/openziti/dilithium v0.3.3/go.mod h1:vsCjI2AU/hon9e+dLhUFbCNGesJDj2ASgkySOcpmvjo=
github.com/openziti/dilithium v0.3.5 h1:+envGNzxc3OyVPiuvtxivQmCsOjdZjtOMLpQBeMz7eM=
github.com/openziti/dilithium v0.3.5/go.mod h1:XONq1iK6te/WwNzkgZHfIDHordMPqb0hMwJ8bs9EfSk=
github.com/openziti/edge-api v0.26.25 h1:ueHfs4+dRuWkSFCbduYRsMre3udbUp7xlmjOVbdEX5k=
github.com/openziti/edge-api v0.26.25/go.mod h1:Nv/0DxHZ7WtR7sHzLIrUJdgAHrHpIV3s9ftrokeuQzg=
github.com/openziti/foundation/v2 v2.0.48 h1:G0/P8XQS+xTAS3KYQ/PHjLFHLABZkLJeNDbPgPTaxU0=
Expand All @@ -591,8 +594,8 @@ github.com/openziti/secretstream v0.1.21 h1:r4xN8/CzSEvxZFFYGSztrlhMtIvk3B+SQcq2
github.com/openziti/secretstream v0.1.21/go.mod h1:1lfAnS8gBHsKZiPbRRK1sularbAsqizN6tWUEuZSfo0=
github.com/openziti/storage v0.3.0 h1:DH2SN8GYy7rSlBZM9X5W1Dv2b2qZ8kSKyt0iivokVMw=
github.com/openziti/storage v0.3.0/go.mod h1:1f6cGRKYLzwst5hwVY+qr8GCcUeO/U5jJftE8+qFqbk=
github.com/openziti/transport/v2 v2.0.143 h1:qhqI/yEN4SvP8SBx7ERCt0x67Im+Icy/hGtJ7Dn/xOQ=
github.com/openziti/transport/v2 v2.0.143/go.mod h1:3BxxlWa8fbhmZG1CmIOpeEHlCCY1G7DPx7v7+bAXYEQ=
github.com/openziti/transport/v2 v2.0.144-0.20240903212250-65f868ed70b2 h1:0C8EQYAiRygEE17K0b/AZuIKuh5G41j299kAfuMVKhU=
github.com/openziti/transport/v2 v2.0.144-0.20240903212250-65f868ed70b2/go.mod h1:HtxMLtUusJMrXghLy650CRVodXXs9fjf4QFPZ9yzR64=
github.com/openziti/x509-claims v1.0.3 h1:HNdQ8Nf1agB3lBs1gahcO6zfkeS4S5xoQ2/PkY4HRX0=
github.com/openziti/x509-claims v1.0.3/go.mod h1:Z0WIpBm6c4ecrpRKrou6Gk2wrLWxJO/+tuUwKh8VewE=
github.com/openziti/xweb/v2 v2.1.1 h1:T6vbmG2189WWwq16wryM7RQEbT5wNARrVHNQs23jEPE=
Expand Down Expand Up @@ -714,7 +717,6 @@ github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYED
github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand All @@ -739,6 +741,7 @@ github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkU
github.com/spf13/cast v1.6.0 h1:GEiTHELF+vaR5dhz3VqZfFSzZjYbgeKDpBxQVS4GYJ0=
github.com/spf13/cast v1.6.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSWzOo=
github.com/spf13/cobra v1.2.0/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk=
github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk=
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
Expand Down Expand Up @@ -855,7 +858,6 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20181030102418-4d3f4d9ffa16/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190422183909-d864b10871cd/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
Expand All @@ -865,7 +867,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200221231518-2aa609cf4a9d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
Expand Down Expand Up @@ -1062,7 +1064,6 @@ golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200831180312-196b9ba8737a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200918174421-af09f7315aff/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down Expand Up @@ -1350,7 +1351,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
18 changes: 12 additions & 6 deletions router/forwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Forwarder struct {
}

type Destination interface {
SendPayload(payload *xgress.Payload) error
SendPayload(payload *xgress.Payload, timeout time.Duration, payloadType xgress.PayloadType) error
SendAcknowledgement(acknowledgement *xgress.Acknowledgement) error
SendControl(control *xgress.Control) error
InspectCircuit(detail *inspect.CircuitInspectDetail)
Expand Down Expand Up @@ -165,22 +165,28 @@ func (forwarder *Forwarder) EndCircuit(circuitId string) {
forwarder.UnregisterDestinations(circuitId)
}

func (forwarder *Forwarder) ForwardPayload(srcAddr xgress.Address, payload *xgress.Payload) error {
return forwarder.forwardPayload(srcAddr, payload, true)
func (forwarder *Forwarder) ForwardPayload(srcAddr xgress.Address, payload *xgress.Payload, timeout time.Duration) error {
return forwarder.forwardPayload(srcAddr, payload, true, timeout)
}

func (forwarder *Forwarder) RetransmitPayload(srcAddr xgress.Address, payload *xgress.Payload) error {
return forwarder.forwardPayload(srcAddr, payload, false)
return forwarder.forwardPayload(srcAddr, payload, false, time.Second)
}

func (forwarder *Forwarder) forwardPayload(srcAddr xgress.Address, payload *xgress.Payload, markActive bool) error {
func (forwarder *Forwarder) forwardPayload(srcAddr xgress.Address, payload *xgress.Payload, markActive bool, timeout time.Duration) error {
log := pfxlog.ContextLogger(string(srcAddr))

circuitId := payload.GetCircuitId()
if forwardTable, found := forwarder.circuits.getForwardTable(circuitId, markActive); found {
if dstAddr, found := forwardTable.getForwardAddress(srcAddr); found {
if dst, found := forwarder.destinations.getDestination(dstAddr); found {
if err := dst.SendPayload(payload); err != nil {
payloadType := xgress.PayloadTypeXg
if !markActive {
payloadType = xgress.PayloadTypeRtx
} else if timeout == 0 {
payloadType = xgress.PayloadTypeFwd
}
if err := dst.SendPayload(payload, timeout, payloadType); err != nil {
return err
}
log.WithFields(payload.GetLoggerFields()).Debugf("=> %s", string(dstAddr))
Expand Down
2 changes: 1 addition & 1 deletion router/handler_link/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (self *payloadHandler) HandleReceive(msg *channel.Message, ch channel.Chann

payload, err := xgress.UnmarshallPayload(msg)
if err == nil {
if err = self.forwarder.ForwardPayload(xgress.Address(self.link.Id()), payload); err != nil {
if err = self.forwarder.ForwardPayload(xgress.Address(self.link.Id()), payload, 0); err != nil {
log.WithError(err).Debug("unable to forward")
self.forwarder.ReportForwardingFault(payload.CircuitId, "")
}
Expand Down
3 changes: 2 additions & 1 deletion router/handler_xgress/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/forwarder"
"github.com/openziti/ziti/router/xgress"
"time"
)

type closeHandler struct {
Expand All @@ -41,7 +42,7 @@ func (txc *closeHandler) HandleXgressClose(x *xgress.Xgress) {

x.ForwardEndOfCircuit(func(payload *xgress.Payload) bool {
log.Debug("sending end of circuit payload")
if err := txc.forwarder.ForwardPayload(x.Address(), payload); err != nil {
if err := txc.forwarder.ForwardPayload(x.Address(), payload, time.Second); err != nil {
// ok that we couldn't forward close, as that means it was already closed
log.Debugf("error forwarding end circuit payload (%s)", err)
return false
Expand Down
15 changes: 12 additions & 3 deletions router/handler_xgress/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package handler_xgress

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2"
"github.com/openziti/ziti/router/forwarder"
"github.com/openziti/ziti/router/xgress"
"time"
)

type receiveHandler struct {
Expand All @@ -33,9 +35,16 @@ func NewReceiveHandler(forwarder *forwarder.Forwarder) *receiveHandler {
}

func (xrh *receiveHandler) HandleXgressReceive(payload *xgress.Payload, x *xgress.Xgress) {
if err := xrh.forwarder.ForwardPayload(x.Address(), payload); err != nil {
pfxlog.ContextLogger(x.Label()).WithFields(payload.GetLoggerFields()).WithError(err).Error("unable to forward payload")
xrh.forwarder.ReportForwardingFault(payload.CircuitId, x.CtrlId())
for {
if err := xrh.forwarder.ForwardPayload(x.Address(), payload, time.Second); err != nil {
if !channel.IsTimeout(err) {
pfxlog.ContextLogger(x.Label()).WithFields(payload.GetLoggerFields()).WithError(err).Error("unable to forward payload")
xrh.forwarder.ReportForwardingFault(payload.CircuitId, x.CtrlId())
return
}
} else {
return
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion router/link/link_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (self *testLink) Id() string {
return self.id
}

func (self *testLink) SendPayload(payload *xgress.Payload) error {
func (self *testLink) SendPayload(payload *xgress.Payload, timeout time.Duration, payloadType xgress.PayloadType) error {
panic("implement me")
}

Expand Down
3 changes: 3 additions & 0 deletions router/xgress/link_receive_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (buffer *LinkReceiveBuffer) Size() uint32 {

func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint32) bool {
if payload.GetSequence() <= buffer.sequence {
duplicatePayloadsMeter.Mark(1)
return true
}

Expand All @@ -64,6 +65,8 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(payload *Payload, maxSize uint
if payload.Sequence > buffer.maxSequence {
buffer.maxSequence = payload.Sequence
}
} else {
duplicatePayloadsMeter.Mark(1)
}
return true
}
Expand Down
Loading

0 comments on commit 4fbbe89

Please sign in to comment.