Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

custom serialization #139

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,27 @@ updates:
schedule:
interval: daily
open-pull-requests-limit: 10
groups:
non-major:
applies-to: version-updates
update-types:
- "minor"
- "patch"
ignore:
- dependency-name: github.com/lucas-clemente/quic-go
versions:
- "> 0.18.1"

- package-ecosystem: github-actions
directory: "/"
schedule:
interval: weekly
open-pull-requests-limit: 10
groups:
all:
applies-to: version-updates
update-types:
- "major"
- "minor"
- "patch"

71 changes: 71 additions & 0 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"

on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '40 13 * * 2'

jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write

strategy:
fail-fast: false
matrix:
language: [ 'go' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python' ]
# Learn more:
# https://docs.github.com/en/free-pro-team@latest/github/finding-security-vulnerabilities-and-errors-in-your-code/configuring-code-scanning#changing-the-languages-that-are-analyzed

steps:
- name: Checkout repository
uses: actions/checkout@v3

# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v1
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# queries: ./path/to/local/query, your-org/your-repo/queries@main

# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v1

# ℹ️ Command-line programs to run using the OS shell.
# 📚 https://git.io/JvXDl

# ✏️ If the Autobuild fails above, remove it and uncomment the following three lines
# and modify them (or add more) to build your code if your project
# uses a compiled language

#- run: |
# make bootstrap
# make release

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v1
13 changes: 8 additions & 5 deletions .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/setup-go@v3
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: 1.19
- uses: actions/checkout@v3
go-version-file: ./go.mod


- name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v6
with:
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
version: v1.49
version: v1.57.2

# Optional: working directory, useful for monorepos
# working-directory: somedir
Expand Down
9 changes: 4 additions & 5 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Git Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
persist-credentials: false

- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: '1.19.x'
go-version-file: ./go.mod

- name: Install Ziti CI
uses: openziti/ziti-ci@v1
Expand All @@ -33,9 +33,8 @@ jobs:
gh_ci_key: ${{ secrets.GH_CI_KEY }}
ziti_ci_gpg_key: ${{ secrets.ZITI_CI_GPG_KEY }}
ziti_ci_gpg_key_id: ${{ secrets.ZITI_CI_GPG_KEY_ID }}
if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/heads/release-')
if: github.ref_name == 'main' || startsWith(github.ref_name, 'release-')
run: |
$(go env GOPATH)/bin/ziti-ci configure-git
$(go env GOPATH)/bin/ziti-ci tag -v -f version
$(go env GOPATH)/bin/ziti-ci trigger-github-build openziti/fabric update-dependency --token ${{ secrets.ZITI_CI_GH_TOKEN }}
$(go env GOPATH)/bin/ziti-ci trigger-github-build openziti/sdk-golang update-dependency --token ${{ secrets.ZITI_CI_GH_TOKEN }}
4 changes: 2 additions & 2 deletions .github/workflows/update-dependency.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ jobs:
fetch-depth: 0

- name: Install Go
uses: actions/setup-go@v3
uses: actions/setup-go@v5
with:
go-version: '1.19.x'
go-version-file: ./go.mod

- name: Install Ziti CI
uses: openziti/ziti-ci@v1
Expand Down
6 changes: 1 addition & 5 deletions accept_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package channel

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/transport/v2"
"time"
)

Expand All @@ -31,7 +30,6 @@ type UnderlayAcceptor interface {
type UnderlayDispatcherConfig struct {
Listener UnderlayListener
ConnectTimeout time.Duration
TransportConfig transport.Configuration
Acceptors map[string]UnderlayAcceptor
DefaultAcceptor UnderlayAcceptor
}
Expand All @@ -41,7 +39,6 @@ type UnderlayDispatcherConfig struct {
type UnderlayDispatcher struct {
listener UnderlayListener
connectTimeout time.Duration
transportConfig transport.Configuration
acceptors map[string]UnderlayAcceptor
defaultAcceptor UnderlayAcceptor
}
Expand All @@ -50,7 +47,6 @@ func NewUnderlayDispatcher(config UnderlayDispatcherConfig) *UnderlayDispatcher
return &UnderlayDispatcher{
listener: config.Listener,
connectTimeout: config.ConnectTimeout,
transportConfig: config.TransportConfig,
acceptors: config.Acceptors,
defaultAcceptor: config.DefaultAcceptor,
}
Expand All @@ -62,7 +58,7 @@ func (self *UnderlayDispatcher) Run() {
defer log.Warn("exited")

for {
underlay, err := self.listener.Create(self.connectTimeout, self.transportConfig)
underlay, err := self.listener.Create(self.connectTimeout)
if err != nil {
log.WithError(err).Error("error accepting connection")
if err.Error() == "closed" {
Expand Down
2 changes: 1 addition & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type UnderlayListener interface {
// UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement
// UnderlayFactory, to provide instances to Channel.
type UnderlayFactory interface {
Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
Create(timeout time.Duration) (Underlay, error)
}

// Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.
Expand Down
4 changes: 2 additions & 2 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestCloseInBind(t *testing.T) {
req.NoError(err)

clientId := &identity.TokenId{Token: "echo-client"}
underlayFactory := NewClassicDialer(clientId, addr, nil)
underlayFactory := NewClassicDialer(DialerConfig{Identity: clientId, Endpoint: addr})

errC := make(chan error, 1)

Expand Down Expand Up @@ -343,7 +343,7 @@ func dialServer(options *Options, t *testing.T, bindHandler BindHandler) Channel
req.NoError(err)

clientId := &identity.TokenId{Token: "echo-client"}
underlayFactory := NewClassicDialer(clientId, addr, nil)
underlayFactory := NewClassicDialer(DialerConfig{Identity: clientId, Endpoint: addr})

ch, err := NewChannel("echo-test", underlayFactory, bindHandler, options)
req.NoError(err)
Expand Down
36 changes: 22 additions & 14 deletions classic_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,30 @@ type classicDialer struct {
endpoint transport.Address
localBinding string
headers map[int32][]byte
underlayFactory func(peer transport.Conn, version uint32) classicUnderlay
underlayFactory func(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay
messageStrategy MessageStrategy
transportConfig transport.Configuration
}

func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte) UnderlayFactory {
type DialerConfig struct {
Identity *identity.TokenId
Endpoint transport.Address
LocalBinding string
Headers map[int32][]byte
MessageStrategy MessageStrategy
TransportConfig transport.Configuration
}

func NewClassicDialer(cfg DialerConfig) UnderlayFactory {
result := &classicDialer{
identity: identity,
endpoint: endpoint,
localBinding: localBinding,
headers: headers,
identity: cfg.Identity,
endpoint: cfg.Endpoint,
localBinding: cfg.LocalBinding,
headers: cfg.Headers,
messageStrategy: cfg.MessageStrategy,
}

if endpoint.Type() == "dtls" {
if cfg.Endpoint.Type() == "dtls" {
result.underlayFactory = newDatagramUnderlay
} else {
result.underlayFactory = newClassicImpl
Expand All @@ -50,11 +62,7 @@ func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transp
return result
}

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory {
return NewClassicDialerWithBindAddress(identity, endpoint, "", headers)
}

func (self *classicDialer) Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error) {
func (self *classicDialer) Create(timeout time.Duration) (Underlay, error) {
log := pfxlog.ContextLogger(self.endpoint.String())
log.Debug("started")
defer log.Debug("exited")
Expand All @@ -71,12 +79,12 @@ func (self *classicDialer) Create(timeout time.Duration, tcfg transport.Configur
log.Debugf("Attempting to dial with bind: %s", self.localBinding)

for time.Now().Before(deadline) {
peer, err := self.endpoint.DialWithLocalBinding("classic", self.localBinding, self.identity, timeout, tcfg)
peer, err := self.endpoint.DialWithLocalBinding("classic", self.localBinding, self.identity, timeout, self.transportConfig)
if err != nil {
return nil, err
}

underlay := self.underlayFactory(peer, version)
underlay := self.underlayFactory(self.messageStrategy, peer, version)
if err = self.sendHello(underlay, deadline); err != nil {
if tryCount > 0 {
return nil, err
Expand Down
16 changes: 10 additions & 6 deletions classic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,22 @@ func (impl *classicImpl) getPeer() transport.Conn {
return impl.peer
}

func newClassicImpl(peer transport.Conn, version uint32) classicUnderlay {
func newClassicImpl(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay {
readF := ReadV2
marshalF := MarshalV2

if version == 2 {
readF = ReadV2
marshalF = MarshalV2
} else if version == 3 { // currently only used for testing fallback to a common protocol version
readF = ReadV2
if version == 3 { // currently only used for testing fallback to a common protocol version
marshalF = marshalV3
}

if messageStrategy != nil && messageStrategy.GetStreamProducer() != nil {
readF = messageStrategy.GetStreamProducer()
}

if messageStrategy != nil && messageStrategy.GetMarshaller() != nil {
marshalF = messageStrategy.GetMarshaller()
}

return &classicImpl{
peer: peer,
readF: readF,
Expand Down
15 changes: 11 additions & 4 deletions classic_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type classicListener struct {
headers map[int32][]byte
closed atomic.Bool
listenerPool goroutines.Pool
underlayFactory func(peer transport.Conn, version uint32) classicUnderlay
messageStrategy MessageStrategy
underlayFactory func(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay
}

func DefaultListenerConfig() ListenerConfig {
Expand All @@ -57,6 +58,7 @@ type ListenerConfig struct {
TransportConfig transport.Configuration
PoolConfigurator func(config *goroutines.PoolConfig)
ConnectionHandlers []ConnectionHandler
MessageStrategy MessageStrategy
}

func newClassicListener(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig) *classicListener {
Expand Down Expand Up @@ -95,12 +97,17 @@ func newClassicListener(identity *identity.TokenId, endpoint transport.Address,
return &classicListener{
identity: identity,
endpoint: endpoint,
socket: nil,
close: closeNotify,
handlers: config.ConnectionHandlers,
acceptF: nil,
created: nil,
connectOptions: config.ConnectOptions,
tcfg: config.TransportConfig,
headers: config.Headers,
closed: atomic.Bool{},
listenerPool: pool,
handlers: config.ConnectionHandlers,
messageStrategy: config.MessageStrategy,
underlayFactory: underlayFactory,
}
}
Expand Down Expand Up @@ -151,7 +158,7 @@ func (self *classicListener) Close() error {
return nil
}

func (self *classicListener) Create(_ time.Duration, _ transport.Configuration) (Underlay, error) {
func (self *classicListener) Create(_ time.Duration) (Underlay, error) {
if self.created == nil {
return nil, errors.New("this listener was not set up for Create to be called, programming error")
}
Expand All @@ -169,7 +176,7 @@ func (self *classicListener) Create(_ time.Duration, _ transport.Configuration)
func (self *classicListener) acceptConnection(peer transport.Conn) {
log := pfxlog.ContextLogger(self.endpoint.String())
err := self.listenerPool.Queue(func() {
impl := self.underlayFactory(peer, 2)
impl := self.underlayFactory(self.messageStrategy, peer, 2)

connectionId, err := NextConnectionId()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/channel/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package main

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/channel/v2/cmd/channel/subcmd"
"github.com/openziti/channel/v3/cmd/channel/subcmd"
"github.com/openziti/transport/v2"
"github.com/openziti/transport/v2/tcp"
"github.com/openziti/transport/v2/tls"
Expand Down
Loading
Loading