Skip to content

Commit

Permalink
Add controller connect events. Fixes #1835
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Oct 2, 2024
1 parent 0eec47c commit cdf631e
Show file tree
Hide file tree
Showing 14 changed files with 265 additions and 20 deletions.
59 changes: 59 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,62 @@
# Release 1.2.0

## What's New

* Connect Events

## Connect Events

These are events generated when a successful connection is made to a controller, from any of:

1. Identity, using the REST API
2. Router
3. Controller (peer in an HA cluster)

**Configuration**

```yml
events:
jsonLogger:
subscriptions:
- type: connect
handler:
type: file
format: json
path: /tmp/ziti-events.log
```
**Example Events**
```json
{
"namespace": "connect",
"src_type": "identity",
"src_id": "ji2Rt8KJ4",
"src_addr": "127.0.0.1:59336",
"dst_id": "ctrl_client",
"dst_addr": "localhost:1280/edge/management/v1/edge-routers/2L7NeVuGBU",
"timestamp": "2024-10-02T12:17:39.501821249-04:00"
}
{
"namespace": "connect",
"src_type": "router",
"src_id": "2L7NeVuGBU",
"src_addr": "127.0.0.1:42702",
"dst_id": "ctrl_client",
"dst_addr": "127.0.0.1:6262",
"timestamp": "2024-10-02T12:17:40.529865849-04:00"
}
{
"namespace": "connect",
"src_type": "peer",
"src_id": "ctrl2",
"src_addr": "127.0.0.1:40056",
"dst_id": "ctrl1",
"dst_addr": "127.0.0.1:6262",
"timestamp": "2024-10-02T12:37:04.490859197-04:00"
}
```

# Release 1.1.15

## What's New
Expand Down
13 changes: 13 additions & 0 deletions controller/env/appenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,19 @@ func (ae *AppEnv) IsAllowed(responderFunc func(ae *AppEnv, rc *response.RequestC
"url": request.URL,
}).Warn("could not mark metrics for REST ApiConfig endpoint, request context start time is zero")
}

if rc.ApiSession != nil {
connectEvent := &event.ConnectEvent{
Namespace: event.ConnectEventNS,
SrcType: event.ConnectSourceIdentity,
SrcId: rc.ApiSession.IdentityId,
SrcAddr: rc.Request.RemoteAddr,
DstId: ae.HostController.GetNetwork().GetAppId(),
DstAddr: rc.Request.Host + rc.Request.RequestURI,
Timestamp: time.Now(),
}
ae.GetEventDispatcher().AcceptConnectEvent(connectEvent)
}
})
}

Expand Down
49 changes: 49 additions & 0 deletions controller/event/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package event

import (
"time"
)

type ConnectSource string

const (
ConnectEventNS = "connect"
ConnectSourceRouter ConnectSource = "router"
ConnectSourcePeer ConnectSource = "peer"
ConnectSourceIdentity ConnectSource = "identity"
)

type ConnectEvent struct {
Namespace string `json:"namespace"`
SrcType ConnectSource `json:"src_type"`
SrcId string `json:"src_id"`
SrcAddr string `json:"src_addr"`
DstId string `json:"dst_id"`
DstAddr string `json:"dst_addr"`
Timestamp time.Time `json:"timestamp"`
}

type ConnectEventHandler interface {
AcceptConnectEvent(event *ConnectEvent)
}

type ConnectEventHandlerWrapper interface {
ConnectEventHandler
IsWrapping(value ConnectEventHandler) bool
}
1 change: 1 addition & 0 deletions controller/event/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type Dispatcher interface {

ApiSessionEventHandler
CircuitEventHandler
ConnectEventHandler
EntityChangeEventHandler
LinkEventHandler
MetricsEventHandler
Expand Down
2 changes: 2 additions & 0 deletions controller/event/dispatcher_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var _ Dispatcher = DispatcherMock{}

type DispatcherMock struct{}

func (d DispatcherMock) AcceptConnectEvent(event *ConnectEvent) {}

func (d DispatcherMock) AcceptApiSessionEvent(event *ApiSessionEvent) {}

func (d DispatcherMock) AddApiSessionEventHandler(handler ApiSessionEventHandler) {}
Expand Down
2 changes: 2 additions & 0 deletions controller/events/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewDispatcher(closeNotify <-chan struct{}) *Dispatcher {
result.RegisterEventTypeFunctions(event.TerminatorEventsNs, result.registerTerminatorEventHandler, result.unregisterTerminatorEventHandler)
result.RegisterEventTypeFunctions(event.UsageEventsNs, result.registerUsageEventHandler, result.unregisterUsageEventHandler)
result.RegisterEventTypeFunctions(event.ClusterEventsNs, result.registerClusterEventHandler, result.unregisterClusterEventHandler)
result.RegisterEventTypeFunctions(event.ConnectEventNS, result.registerConnectEventHandler, result.unregisterConnectEventHandler)

result.RegisterEventTypeFunctions(event.ApiSessionEventNS, result.registerApiSessionEventHandler, result.unregisterApiSessionEventHandler)
result.RegisterEventTypeFunctions(event.EntityCountEventNS, result.registerEntityCountEventHandler, result.unregisterEntityCountEventHandler)
Expand Down Expand Up @@ -93,6 +94,7 @@ type Dispatcher struct {
usageEventHandlers concurrenz.CopyOnWriteSlice[event.UsageEventHandler]
usageEventV3Handlers concurrenz.CopyOnWriteSlice[event.UsageEventV3Handler]
clusterEventHandlers concurrenz.CopyOnWriteSlice[event.ClusterEventHandler]
connectEventHandlers concurrenz.CopyOnWriteSlice[event.ConnectEventHandler]

apiSessionEventHandlers concurrenz.CopyOnWriteSlice[event.ApiSessionEventHandler]
entityCountEventHandlers concurrenz.CopyOnWriteSlice[*entityCountState]
Expand Down
62 changes: 62 additions & 0 deletions controller/events/dispatcher_connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package events

import (
"github.com/openziti/ziti/controller/event"
"github.com/pkg/errors"
"reflect"
)

func (self *Dispatcher) AddConnectEventHandler(handler event.ConnectEventHandler) {
self.connectEventHandlers.Append(handler)
}

func (self *Dispatcher) RemoveConnectEventHandler(handler event.ConnectEventHandler) {
self.connectEventHandlers.DeleteIf(func(val event.ConnectEventHandler) bool {
if val == handler {
return true
}
if w, ok := val.(event.ConnectEventHandlerWrapper); ok {
return w.IsWrapping(handler)
}
return false
})
}

func (self *Dispatcher) AcceptConnectEvent(evt *event.ConnectEvent) {
for _, handler := range self.connectEventHandlers.Value() {
go handler.AcceptConnectEvent(evt)
}
}

func (self *Dispatcher) registerConnectEventHandler(val interface{}, _ map[string]interface{}) error {
handler, ok := val.(event.ConnectEventHandler)

if !ok {
return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/ConnectEventHandler interface.", reflect.TypeOf(val))
}

self.AddConnectEventHandler(handler)
return nil
}

func (self *Dispatcher) unregisterConnectEventHandler(val interface{}) {
if handler, ok := val.(event.ConnectEventHandler); ok {
self.RemoveConnectEventHandler(handler)
}
}
20 changes: 20 additions & 0 deletions controller/events/dispatcher_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,24 @@ func (self *routerEventAdapter) routerChange(eventType event.RouterEventType, r
}

self.Dispatcher.AcceptRouterEvent(evt)

if eventType == event.RouterOnline {
srcAddr := ""
dstAddr := ""
if ctrl := r.Control; ctrl != nil {
srcAddr = r.Control.Underlay().GetRemoteAddr().String()
dstAddr = r.Control.Underlay().GetLocalAddr().String()
}

connectEvent := &event.ConnectEvent{
Namespace: event.ConnectEventNS,
SrcType: event.ConnectSourceRouter,
SrcId: r.Id,
SrcAddr: srcAddr,
DstId: self.Dispatcher.network.GetAppId(),
DstAddr: dstAddr,
Timestamp: time.Now(),
}
self.Dispatcher.AcceptConnectEvent(connectEvent)
}
}
14 changes: 14 additions & 0 deletions controller/events/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ func (event *JsonClusterEvent) Format() ([]byte, error) {
return MarshalJson(event)
}

type JsonConnectEvent event.ConnectEvent

func (event *JsonConnectEvent) GetEventType() string {
return "connect"
}

func (event *JsonConnectEvent) Format() ([]byte, error) {
return MarshalJson(event)
}

type JsonEntityChangeEvent event.EntityChangeEvent

func (event *JsonEntityChangeEvent) GetEventType() string {
Expand Down Expand Up @@ -277,6 +287,10 @@ func (formatter *JsonFormatter) AcceptClusterEvent(evt *event.ClusterEvent) {
formatter.AcceptLoggingEvent((*JsonClusterEvent)(evt))
}

func (formatter *JsonFormatter) AcceptConnectEvent(evt *event.ConnectEvent) {
formatter.AcceptLoggingEvent((*JsonConnectEvent)(evt))
}

func (formatter *JsonFormatter) AcceptEntityChangeEvent(evt *event.EntityChangeEvent) {
formatter.AcceptLoggingEvent((*JsonEntityChangeEvent)(evt))
}
Expand Down
27 changes: 24 additions & 3 deletions controller/raft/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer
binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect)
binding.AddCloseHandler(peer)

return self.PeerConnected(peer)
return self.PeerConnected(peer, true)
})

if _, err = channel.NewChannel(ChannelTypeMesh, dialer, bindHandler, channel.DefaultOptions()); err != nil {
Expand Down Expand Up @@ -486,7 +486,7 @@ func ExtractSpiffeId(certs []*x509.Certificate) (string, error) {
return "", errors.New("invalid controller certificate, no controller SPIFFE ID in cert")
}

func (self *impl) PeerConnected(peer *Peer) error {
func (self *impl) PeerConnected(peer *Peer, dial bool) error {
self.lock.Lock()
defer self.lock.Unlock()
if self.Peers[peer.Address] != nil {
Expand All @@ -509,6 +509,27 @@ func (self *impl) PeerConnected(peer *Peer) error {
})

self.eventDispatcher.AcceptClusterEvent(evt)

if !dial {
srcAddr := ""
dstAddr := ""
if ch := peer.Channel; ch != nil {
srcAddr = ch.Underlay().GetRemoteAddr().String()
dstAddr = ch.Underlay().GetLocalAddr().String()
}
connectEvent := &event.ConnectEvent{
Namespace: event.ConnectEventNS,
SrcType: event.ConnectSourcePeer,
SrcId: string(peer.Id),
SrcAddr: srcAddr,
DstId: self.id.Token,
DstAddr: dstAddr,
Timestamp: time.Now(),
}

self.eventDispatcher.AcceptConnectEvent(connectEvent)
}

return nil
}

Expand Down Expand Up @@ -637,7 +658,7 @@ func (self *impl) AcceptUnderlay(underlay channel.Underlay) error {
binding.AddReceiveHandlerF(RaftDataType, peer.handleReceiveData)
binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect)
binding.AddCloseHandler(peer)
return self.PeerConnected(peer)
return self.PeerConnected(peer, false)
})

_, err := channel.NewChannelWithUnderlay(ChannelTypeMesh, underlay, bindHandler, channel.DefaultOptions())
Expand Down
4 changes: 2 additions & 2 deletions controller/raft/mesh/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func Test_AddPeer_PassesReadonlyWhenVersionsMatch(t *testing.T) {

p := &Peer{Version: testVersion("1")}

assert.NoError(t, m.PeerConnected(p))
assert.NoError(t, m.PeerConnected(p, true))
assert.Equal(t, false, m.readonly.Load(), "Expected readonly to be false, got ", m.readonly.Load())
}

Expand All @@ -72,7 +72,7 @@ func Test_AddPeer_TurnsReadonlyWhenVersionsDoNotMatch(t *testing.T) {

p := &Peer{Version: testVersion("dne")}

assert.NoError(t, m.PeerConnected(p))
assert.NoError(t, m.PeerConnected(p, true))
assert.Equal(t, true, m.readonly.Load(), "Expected readonly to be true, got ", m.readonly.Load())
}

Expand Down
15 changes: 8 additions & 7 deletions doc/ha/ctrl1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ ctrl:
options:
advertiseAddress: tls:localhost:6262

#events:
# jsonLogger:
# subscriptions:
events:
jsonLogger:
subscriptions:
- type: connect
# - type: fabric.cluster
# handler:
# type: file
# format: json
# path: /tmp/ziti-events.log
handler:
type: file
format: json
path: /tmp/ziti-events.log

edge:
api:
Expand Down
Loading

0 comments on commit cdf631e

Please sign in to comment.