From cdf631e93247ee948924f51891c2f256e4c172ab Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Wed, 2 Oct 2024 13:41:13 -0400 Subject: [PATCH] Add controller connect events. Fixes #1835 --- CHANGELOG.md | 59 +++++++++++++++++++++++ controller/env/appenv.go | 13 ++++++ controller/event/connect.go | 49 +++++++++++++++++++ controller/event/dispatcher.go | 1 + controller/event/dispatcher_mock.go | 2 + controller/events/dispatcher.go | 2 + controller/events/dispatcher_connect.go | 62 +++++++++++++++++++++++++ controller/events/dispatcher_router.go | 20 ++++++++ controller/events/formatter.go | 14 ++++++ controller/raft/mesh/mesh.go | 27 +++++++++-- controller/raft/mesh/mesh_test.go | 4 +- doc/ha/ctrl1.yml | 15 +++--- etc/ctrl.with.edge.yml | 15 +++--- version | 2 +- 14 files changed, 265 insertions(+), 20 deletions(-) create mode 100644 controller/event/connect.go create mode 100644 controller/events/dispatcher_connect.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 194ede250..99fc136b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,62 @@ +# Release 1.2.0 + +## What's New + +* Connect Events + +## Connect Events + +These are events generated when a successful connection is made to a controller, from any of: + +1. Identity, using the REST API +2. Router +3. Controller (peer in an HA cluster) + +**Configuration** + +```yml +events: + jsonLogger: + subscriptions: + - type: connect + handler: + type: file + format: json + path: /tmp/ziti-events.log +``` + +**Example Events** + +```json +{ + "namespace": "connect", + "src_type": "identity", + "src_id": "ji2Rt8KJ4", + "src_addr": "127.0.0.1:59336", + "dst_id": "ctrl_client", + "dst_addr": "localhost:1280/edge/management/v1/edge-routers/2L7NeVuGBU", + "timestamp": "2024-10-02T12:17:39.501821249-04:00" +} +{ + "namespace": "connect", + "src_type": "router", + "src_id": "2L7NeVuGBU", + "src_addr": "127.0.0.1:42702", + "dst_id": "ctrl_client", + "dst_addr": "127.0.0.1:6262", + "timestamp": "2024-10-02T12:17:40.529865849-04:00" +} +{ + "namespace": "connect", + "src_type": "peer", + "src_id": "ctrl2", + "src_addr": "127.0.0.1:40056", + "dst_id": "ctrl1", + "dst_addr": "127.0.0.1:6262", + "timestamp": "2024-10-02T12:37:04.490859197-04:00" +} +``` + # Release 1.1.15 ## What's New diff --git a/controller/env/appenv.go b/controller/env/appenv.go index 04beb51ed..84525f81f 100644 --- a/controller/env/appenv.go +++ b/controller/env/appenv.go @@ -993,6 +993,19 @@ func (ae *AppEnv) IsAllowed(responderFunc func(ae *AppEnv, rc *response.RequestC "url": request.URL, }).Warn("could not mark metrics for REST ApiConfig endpoint, request context start time is zero") } + + if rc.ApiSession != nil { + connectEvent := &event.ConnectEvent{ + Namespace: event.ConnectEventNS, + SrcType: event.ConnectSourceIdentity, + SrcId: rc.ApiSession.IdentityId, + SrcAddr: rc.Request.RemoteAddr, + DstId: ae.HostController.GetNetwork().GetAppId(), + DstAddr: rc.Request.Host + rc.Request.RequestURI, + Timestamp: time.Now(), + } + ae.GetEventDispatcher().AcceptConnectEvent(connectEvent) + } }) } diff --git a/controller/event/connect.go b/controller/event/connect.go new file mode 100644 index 000000000..548267205 --- /dev/null +++ b/controller/event/connect.go @@ -0,0 +1,49 @@ +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package event + +import ( + "time" +) + +type ConnectSource string + +const ( + ConnectEventNS = "connect" + ConnectSourceRouter ConnectSource = "router" + ConnectSourcePeer ConnectSource = "peer" + ConnectSourceIdentity ConnectSource = "identity" +) + +type ConnectEvent struct { + Namespace string `json:"namespace"` + SrcType ConnectSource `json:"src_type"` + SrcId string `json:"src_id"` + SrcAddr string `json:"src_addr"` + DstId string `json:"dst_id"` + DstAddr string `json:"dst_addr"` + Timestamp time.Time `json:"timestamp"` +} + +type ConnectEventHandler interface { + AcceptConnectEvent(event *ConnectEvent) +} + +type ConnectEventHandlerWrapper interface { + ConnectEventHandler + IsWrapping(value ConnectEventHandler) bool +} diff --git a/controller/event/dispatcher.go b/controller/event/dispatcher.go index d89241f80..e584eaaa5 100644 --- a/controller/event/dispatcher.go +++ b/controller/event/dispatcher.go @@ -126,6 +126,7 @@ type Dispatcher interface { ApiSessionEventHandler CircuitEventHandler + ConnectEventHandler EntityChangeEventHandler LinkEventHandler MetricsEventHandler diff --git a/controller/event/dispatcher_mock.go b/controller/event/dispatcher_mock.go index a3334f00c..fc3cb0ded 100644 --- a/controller/event/dispatcher_mock.go +++ b/controller/event/dispatcher_mock.go @@ -27,6 +27,8 @@ var _ Dispatcher = DispatcherMock{} type DispatcherMock struct{} +func (d DispatcherMock) AcceptConnectEvent(event *ConnectEvent) {} + func (d DispatcherMock) AcceptApiSessionEvent(event *ApiSessionEvent) {} func (d DispatcherMock) AddApiSessionEventHandler(handler ApiSessionEventHandler) {} diff --git a/controller/events/dispatcher.go b/controller/events/dispatcher.go index 6eede14a9..8cb9bce8a 100644 --- a/controller/events/dispatcher.go +++ b/controller/events/dispatcher.go @@ -63,6 +63,7 @@ func NewDispatcher(closeNotify <-chan struct{}) *Dispatcher { result.RegisterEventTypeFunctions(event.TerminatorEventsNs, result.registerTerminatorEventHandler, result.unregisterTerminatorEventHandler) result.RegisterEventTypeFunctions(event.UsageEventsNs, result.registerUsageEventHandler, result.unregisterUsageEventHandler) result.RegisterEventTypeFunctions(event.ClusterEventsNs, result.registerClusterEventHandler, result.unregisterClusterEventHandler) + result.RegisterEventTypeFunctions(event.ConnectEventNS, result.registerConnectEventHandler, result.unregisterConnectEventHandler) result.RegisterEventTypeFunctions(event.ApiSessionEventNS, result.registerApiSessionEventHandler, result.unregisterApiSessionEventHandler) result.RegisterEventTypeFunctions(event.EntityCountEventNS, result.registerEntityCountEventHandler, result.unregisterEntityCountEventHandler) @@ -93,6 +94,7 @@ type Dispatcher struct { usageEventHandlers concurrenz.CopyOnWriteSlice[event.UsageEventHandler] usageEventV3Handlers concurrenz.CopyOnWriteSlice[event.UsageEventV3Handler] clusterEventHandlers concurrenz.CopyOnWriteSlice[event.ClusterEventHandler] + connectEventHandlers concurrenz.CopyOnWriteSlice[event.ConnectEventHandler] apiSessionEventHandlers concurrenz.CopyOnWriteSlice[event.ApiSessionEventHandler] entityCountEventHandlers concurrenz.CopyOnWriteSlice[*entityCountState] diff --git a/controller/events/dispatcher_connect.go b/controller/events/dispatcher_connect.go new file mode 100644 index 000000000..336b6aafc --- /dev/null +++ b/controller/events/dispatcher_connect.go @@ -0,0 +1,62 @@ +/* + Copyright NetFoundry Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package events + +import ( + "github.com/openziti/ziti/controller/event" + "github.com/pkg/errors" + "reflect" +) + +func (self *Dispatcher) AddConnectEventHandler(handler event.ConnectEventHandler) { + self.connectEventHandlers.Append(handler) +} + +func (self *Dispatcher) RemoveConnectEventHandler(handler event.ConnectEventHandler) { + self.connectEventHandlers.DeleteIf(func(val event.ConnectEventHandler) bool { + if val == handler { + return true + } + if w, ok := val.(event.ConnectEventHandlerWrapper); ok { + return w.IsWrapping(handler) + } + return false + }) +} + +func (self *Dispatcher) AcceptConnectEvent(evt *event.ConnectEvent) { + for _, handler := range self.connectEventHandlers.Value() { + go handler.AcceptConnectEvent(evt) + } +} + +func (self *Dispatcher) registerConnectEventHandler(val interface{}, _ map[string]interface{}) error { + handler, ok := val.(event.ConnectEventHandler) + + if !ok { + return errors.Errorf("type %v doesn't implement github.com/openziti/ziti/controller/event/ConnectEventHandler interface.", reflect.TypeOf(val)) + } + + self.AddConnectEventHandler(handler) + return nil +} + +func (self *Dispatcher) unregisterConnectEventHandler(val interface{}) { + if handler, ok := val.(event.ConnectEventHandler); ok { + self.RemoveConnectEventHandler(handler) + } +} diff --git a/controller/events/dispatcher_router.go b/controller/events/dispatcher_router.go index 585ddb0d4..a1c346a8e 100644 --- a/controller/events/dispatcher_router.go +++ b/controller/events/dispatcher_router.go @@ -89,4 +89,24 @@ func (self *routerEventAdapter) routerChange(eventType event.RouterEventType, r } self.Dispatcher.AcceptRouterEvent(evt) + + if eventType == event.RouterOnline { + srcAddr := "" + dstAddr := "" + if ctrl := r.Control; ctrl != nil { + srcAddr = r.Control.Underlay().GetRemoteAddr().String() + dstAddr = r.Control.Underlay().GetLocalAddr().String() + } + + connectEvent := &event.ConnectEvent{ + Namespace: event.ConnectEventNS, + SrcType: event.ConnectSourceRouter, + SrcId: r.Id, + SrcAddr: srcAddr, + DstId: self.Dispatcher.network.GetAppId(), + DstAddr: dstAddr, + Timestamp: time.Now(), + } + self.Dispatcher.AcceptConnectEvent(connectEvent) + } } diff --git a/controller/events/formatter.go b/controller/events/formatter.go index 64cf493c9..8f88a7dd4 100644 --- a/controller/events/formatter.go +++ b/controller/events/formatter.go @@ -185,6 +185,16 @@ func (event *JsonClusterEvent) Format() ([]byte, error) { return MarshalJson(event) } +type JsonConnectEvent event.ConnectEvent + +func (event *JsonConnectEvent) GetEventType() string { + return "connect" +} + +func (event *JsonConnectEvent) Format() ([]byte, error) { + return MarshalJson(event) +} + type JsonEntityChangeEvent event.EntityChangeEvent func (event *JsonEntityChangeEvent) GetEventType() string { @@ -277,6 +287,10 @@ func (formatter *JsonFormatter) AcceptClusterEvent(evt *event.ClusterEvent) { formatter.AcceptLoggingEvent((*JsonClusterEvent)(evt)) } +func (formatter *JsonFormatter) AcceptConnectEvent(evt *event.ConnectEvent) { + formatter.AcceptLoggingEvent((*JsonConnectEvent)(evt)) +} + func (formatter *JsonFormatter) AcceptEntityChangeEvent(evt *event.EntityChangeEvent) { formatter.AcceptLoggingEvent((*JsonEntityChangeEvent)(evt)) } diff --git a/controller/raft/mesh/mesh.go b/controller/raft/mesh/mesh.go index 9ab950523..9d3753592 100644 --- a/controller/raft/mesh/mesh.go +++ b/controller/raft/mesh/mesh.go @@ -395,7 +395,7 @@ func (self *impl) GetOrConnectPeer(address string, timeout time.Duration) (*Peer binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect) binding.AddCloseHandler(peer) - return self.PeerConnected(peer) + return self.PeerConnected(peer, true) }) if _, err = channel.NewChannel(ChannelTypeMesh, dialer, bindHandler, channel.DefaultOptions()); err != nil { @@ -486,7 +486,7 @@ func ExtractSpiffeId(certs []*x509.Certificate) (string, error) { return "", errors.New("invalid controller certificate, no controller SPIFFE ID in cert") } -func (self *impl) PeerConnected(peer *Peer) error { +func (self *impl) PeerConnected(peer *Peer, dial bool) error { self.lock.Lock() defer self.lock.Unlock() if self.Peers[peer.Address] != nil { @@ -509,6 +509,27 @@ func (self *impl) PeerConnected(peer *Peer) error { }) self.eventDispatcher.AcceptClusterEvent(evt) + + if !dial { + srcAddr := "" + dstAddr := "" + if ch := peer.Channel; ch != nil { + srcAddr = ch.Underlay().GetRemoteAddr().String() + dstAddr = ch.Underlay().GetLocalAddr().String() + } + connectEvent := &event.ConnectEvent{ + Namespace: event.ConnectEventNS, + SrcType: event.ConnectSourcePeer, + SrcId: string(peer.Id), + SrcAddr: srcAddr, + DstId: self.id.Token, + DstAddr: dstAddr, + Timestamp: time.Now(), + } + + self.eventDispatcher.AcceptConnectEvent(connectEvent) + } + return nil } @@ -637,7 +658,7 @@ func (self *impl) AcceptUnderlay(underlay channel.Underlay) error { binding.AddReceiveHandlerF(RaftDataType, peer.handleReceiveData) binding.AddReceiveHandlerF(RaftDisconnectType, peer.handleReceiveDisconnect) binding.AddCloseHandler(peer) - return self.PeerConnected(peer) + return self.PeerConnected(peer, false) }) _, err := channel.NewChannelWithUnderlay(ChannelTypeMesh, underlay, bindHandler, channel.DefaultOptions()) diff --git a/controller/raft/mesh/mesh_test.go b/controller/raft/mesh/mesh_test.go index 5b8a274db..f98e2cc65 100644 --- a/controller/raft/mesh/mesh_test.go +++ b/controller/raft/mesh/mesh_test.go @@ -59,7 +59,7 @@ func Test_AddPeer_PassesReadonlyWhenVersionsMatch(t *testing.T) { p := &Peer{Version: testVersion("1")} - assert.NoError(t, m.PeerConnected(p)) + assert.NoError(t, m.PeerConnected(p, true)) assert.Equal(t, false, m.readonly.Load(), "Expected readonly to be false, got ", m.readonly.Load()) } @@ -72,7 +72,7 @@ func Test_AddPeer_TurnsReadonlyWhenVersionsDoNotMatch(t *testing.T) { p := &Peer{Version: testVersion("dne")} - assert.NoError(t, m.PeerConnected(p)) + assert.NoError(t, m.PeerConnected(p, true)) assert.Equal(t, true, m.readonly.Load(), "Expected readonly to be true, got ", m.readonly.Load()) } diff --git a/doc/ha/ctrl1.yml b/doc/ha/ctrl1.yml index 939c5b79f..c971a68e8 100644 --- a/doc/ha/ctrl1.yml +++ b/doc/ha/ctrl1.yml @@ -16,14 +16,15 @@ ctrl: options: advertiseAddress: tls:localhost:6262 -#events: -# jsonLogger: -# subscriptions: +events: + jsonLogger: + subscriptions: + - type: connect # - type: fabric.cluster -# handler: -# type: file -# format: json -# path: /tmp/ziti-events.log + handler: + type: file + format: json + path: /tmp/ziti-events.log edge: api: diff --git a/etc/ctrl.with.edge.yml b/etc/ctrl.with.edge.yml index 9490523ed..03299d010 100644 --- a/etc/ctrl.with.edge.yml +++ b/etc/ctrl.with.edge.yml @@ -79,9 +79,10 @@ ctrl: # connections. The value of newListener must be resolvable both via DNS and validate via certificates #newListener: tls:localhost:6262 -#events: -# jsonLogger: -# subscriptions: +events: + jsonLogger: + subscriptions: + - type: connect # - type: entityChange # include: # - services @@ -103,10 +104,10 @@ ctrl: # - type: services # - type: edge.entityCounts # interval: 5s -# handler: -# type: file -# format: json -# path: /tmp/ziti-events.log + handler: + type: file + format: json + path: /tmp/ziti-events.log # usageLogger: # subscriptions: # - type: fabric.usage diff --git a/version b/version index 9459d4ba2..5625e59da 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.1 +1.2