Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jun 18, 2024
1 parent afbd655 commit dd296e3
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 61 deletions.
177 changes: 148 additions & 29 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"io"
"os"
"sync"
"sync/atomic"
)

// AccessPolicies represents the Identity's access to a Service through many Policies. The PostureChecks provided
Expand Down Expand Up @@ -106,7 +107,11 @@ type RouterDataModel struct {
listenerBufferSize uint
lastSaveIndex *uint64

subscriptions cmap.ConcurrentMap[string, *IdentitySubscription] `json:"-"`
subscriptions cmap.ConcurrentMap[string, *IdentitySubscription]
events chan subscriberEvent
closeNotify <-chan struct{}
stopNotify chan struct{}
stopped atomic.Bool
}

// NewSenderRouterDataModel creates a new RouterDataModel that will store events in a circular buffer of
Expand All @@ -123,14 +128,13 @@ func NewSenderRouterDataModel(logSize uint64, listenerBufferSize uint) *RouterDa
PublicKeys: cmap.New[*edge_ctrl_pb.DataState_PublicKey](),
Revocations: cmap.New[*edge_ctrl_pb.DataState_Revocation](),
listenerBufferSize: listenerBufferSize,
subscriptions: cmap.New[*IdentitySubscription](),
}
}

// NewReceiverRouterDataModel creates a new RouterDataModel that does not store events. listenerBufferSize affects the
// buffer size of channels returned to listeners of the data model.
func NewReceiverRouterDataModel(listenerBufferSize uint) *RouterDataModel {
return &RouterDataModel{
func NewReceiverRouterDataModel(listenerBufferSize uint, closeNotify <-chan struct{}) *RouterDataModel {
result := &RouterDataModel{
EventCache: NewForgetfulEventCache(),
ConfigTypes: cmap.New[*ConfigType](),
Configs: cmap.New[*Config](),
Expand All @@ -142,12 +146,17 @@ func NewReceiverRouterDataModel(listenerBufferSize uint) *RouterDataModel {
Revocations: cmap.New[*edge_ctrl_pb.DataState_Revocation](),
listenerBufferSize: listenerBufferSize,
subscriptions: cmap.New[*IdentitySubscription](),
events: make(chan subscriberEvent),
closeNotify: closeNotify,
stopNotify: make(chan struct{}),
}
go result.processSubscriberEvents()
return result
}

// NewReceiverRouterDataModelFromFile creates a new RouterDataModel that does not store events and is initialized from
// a file backup. listenerBufferSize affects the buffer size of channels returned to listeners of the data model.
func NewReceiverRouterDataModelFromFile(path string, listenerBufferSize uint) (*RouterDataModel, error) {
func NewReceiverRouterDataModelFromFile(path string, listenerBufferSize uint, closeNotify <-chan struct{}) (*RouterDataModel, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
Expand All @@ -166,7 +175,7 @@ func NewReceiverRouterDataModelFromFile(path string, listenerBufferSize uint) (*
}

rdmContents := &rdmDb{
RouterDataModel: NewReceiverRouterDataModel(listenerBufferSize),
RouterDataModel: NewReceiverRouterDataModel(listenerBufferSize, closeNotify),
}

err = json.Unmarshal(data, rdmContents)
Expand All @@ -179,6 +188,25 @@ func NewReceiverRouterDataModelFromFile(path string, listenerBufferSize uint) (*
return rdmContents.RouterDataModel, nil
}

func (rdm *RouterDataModel) processSubscriberEvents() {
for !rdm.stopped.Load() {
select {
case <-rdm.closeNotify:
return
case <-rdm.stopNotify:
return
case evt := <-rdm.events:
evt.process(rdm)
}
}
}

func (rdm *RouterDataModel) Stop() {
if rdm.stopped.CompareAndSwap(false, true) {
close(rdm.stopNotify)
}
}

// NewListener returns a channel that will receive the events applied to this data model.
func (rdm *RouterDataModel) NewListener() <-chan *edge_ctrl_pb.DataState_ChangeSet {
if rdm.listeners == nil {
Expand Down Expand Up @@ -241,25 +269,42 @@ func (rdm *RouterDataModel) Handle(index uint64, event *edge_ctrl_pb.DataState_E
}
}

func (rdm *RouterDataModel) queueEvent(event subscriberEvent) {
rdm.events <- event
}

// HandleIdentityEvent will apply the delta event to the router data model. It is not restricted by index calculations.
// Use ApplyIdentityEvent for event logged event handling. This method is generally meant for bulk loading of data
// during startup.
func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_Identity) {
if event.Action == edge_ctrl_pb.DataState_Delete {
rdm.Identities.Remove(model.Identity.Id)
rdm.queueEvent(identityRemoveEvent{identityId: model.Identity.Id})
} else {
var identity *Identity
rdm.Identities.Upsert(model.Identity.Id, nil, func(exist bool, valueInMap *Identity, newValue *Identity) *Identity {
if valueInMap == nil {
return &Identity{
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: map[string]struct{}{},
IdentityIndex: index,
}
} else {
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: valueInMap.ServicePolicies,
IdentityIndex: index,
ServiceSetIndex: valueInMap.ServiceSetIndex,
}
}
valueInMap.DataStateIdentity = model.Identity
valueInMap.IdentityIndex = index
return valueInMap
return identity
})

if event.Action == edge_ctrl_pb.DataState_Create {
rdm.queueEvent(identityCreatedEvent{identity: identity})
} else if event.Action == edge_ctrl_pb.DataState_Update {
rdm.queueEvent(identityUpdatedEvent{identity: identity})
}
}
}

Expand Down Expand Up @@ -681,24 +726,66 @@ type IdentityConfig struct {

type IdentityService struct {
Service *Service
Checks map[string]*PostureCheck
Checks map[string]struct{}
Configs map[string]*IdentityConfig
DialAllowed bool
BindAllowed bool
ServiceIndex uint64
}

type IdentitySubscription struct {
Identity *Identity
Services map[string]*IdentityService
IdentityIndex uint64
Listeners concurrenz.CopyOnWriteSlice[IdentityEventSubscriber]
Identity *Identity
Services map[string]*IdentityService
Listeners concurrenz.CopyOnWriteSlice[IdentityEventSubscriber]
Checks map[string]*PostureCheck

sync.Mutex
}

func (self *IdentitySubscription) identityUpdated(identity *Identity) {
self.Lock()
defer self.Unlock()

if identity.IdentityIndex > self.Identity.IdentityIndex {
self.Identity = identity
}

for _, subscriber := range self.Listeners.Value() {
subscriber.NotifyIdentityUpdated(identity)
}
}

func (self *IdentitySubscription) identityRemoved() {
self.Lock()
defer self.Unlock()

self.Checks = nil
self.Services = nil

for _, subscriber := range self.Listeners.Value() {
subscriber.NotifyIdentityRemoved()
}
}

func (self *IdentitySubscription) initialize(rdm *RouterDataModel, identity *Identity) {
self.Lock()
defer self.Unlock()

self.Identity = identity
if self.Services == nil {
rdm.buildServiceList(self)
}

for _, subscriber := range self.Listeners.Value() {
subscriber.NotifyInitialState(self.Identity, self.Services)
}
}

type IdentityEventSubscriber interface {
NotifyInitialState(Identity *Identity, services map[string]*IdentityService)
NotifyIdentityUpdated(index uint64, Identity *Identity)
NotifyIdentityUpdated(Identity *Identity)
NotifyIdentityRemoved()

NotifyServiceAdded(index uint64, service *IdentityService)
NotifyServiceChanged(index uint64, service *IdentityService)
NotifyServiceRemoved(index uint64, serviceId string)
Expand All @@ -722,17 +809,12 @@ func (rdm *RouterDataModel) SubscribeToIdentityChanges(identityId string, subscr
return result
})

subscription.Lock()
defer subscription.Unlock()
if subscription.Services == nil {
subscription.Services = rdm.buildServiceList(subscription)
}
subscriber.NotifyInitialState(subscription.Identity, subscription.Services)
subscription.initialize(rdm, identity)

return nil
}

func (rdm *RouterDataModel) buildServiceList(sub *IdentitySubscription) map[string]*IdentityService {
func (rdm *RouterDataModel) buildServiceList(sub *IdentitySubscription) {
log := pfxlog.Logger().WithField("identityId", sub.Identity.Id)
serviceMap := map[string]*IdentityService{}

Expand All @@ -751,17 +833,18 @@ func (rdm *RouterDataModel) buildServiceList(sub *IdentitySubscription) map[stri
Error("could not find service")
continue
}

identityService, ok := serviceMap[serviceId]
if !ok {
identityService = &IdentityService{
Service: service,
Configs: map[string]*IdentityConfig{},
Checks: map[string]*PostureCheck{},
Checks: map[string]struct{}{},
ServiceIndex: service.Index,
}
serviceMap[serviceId] = identityService
rdm.loadServiceConfigs(sub.Identity, identityService)
rdm.loadServicePostureChecks(sub.Identity, policy, identityService)
rdm.loadServicePostureChecks(sub, policy, identityService)
}

if policy.PolicyType == edge_ctrl_pb.PolicyType_BindPolicy {
Expand All @@ -771,12 +854,13 @@ func (rdm *RouterDataModel) buildServiceList(sub *IdentitySubscription) map[stri
}
}
}
return serviceMap

sub.Services = serviceMap
}

func (rdm *RouterDataModel) loadServicePostureChecks(identity *Identity, policy *ServicePolicy, svc *IdentityService) {
func (rdm *RouterDataModel) loadServicePostureChecks(sub *IdentitySubscription, policy *ServicePolicy, svc *IdentityService) {
log := pfxlog.Logger().
WithField("identityId", identity.Id).
WithField("identityId", sub.Identity.Id).
WithField("serviceId", svc.Service.Id).
WithField("policyId", policy.Id)

Expand All @@ -785,7 +869,8 @@ func (rdm *RouterDataModel) loadServicePostureChecks(identity *Identity, policy
if !ok {
log.WithField("postureCheckId", postureCheckId).Error("could not find posture check")
} else {
svc.Checks[postureCheckId] = check
svc.Checks[postureCheckId] = struct{}{}
sub.Checks[postureCheckId] = check
}
}
}
Expand Down Expand Up @@ -837,3 +922,37 @@ func (rdm *RouterDataModel) loadIdentityConfig(configId string, log *logrus.Entr
ConfigTypeIndex: configType.Index,
}
}

type subscriberEvent interface {
process(rdm *RouterDataModel)
}

type identityRemoveEvent struct {
identityId string
}

func (self identityRemoveEvent) process(rdm *RouterDataModel) {
if sub, found := rdm.subscriptions.Get(self.identityId); found {
sub.identityRemoved()
}
}

type identityCreatedEvent struct {
identity *Identity
}

func (self identityCreatedEvent) process(rdm *RouterDataModel) {
if sub, found := rdm.subscriptions.Get(self.identity.Id); found {
sub.initialize(rdm, self.identity)
}
}

type identityUpdatedEvent struct {
identity *Identity
}

func (self identityUpdatedEvent) process(rdm *RouterDataModel) {
if sub, found := rdm.subscriptions.Get(self.identity.Id); found {
sub.identityUpdated(self.identity)
}
}
18 changes: 0 additions & 18 deletions controller/model/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,6 @@ func (self *ControllerManager) ReadByName(name string) (*Controller, error) {
return modelEntity, nil
}

func (self *ControllerManager) MapControllerNamesToIds(values []string, identityId string) map[string]struct{} {
var result []string
if stringz.Contains(values, "all") {
result = []string{"all"}
} else {
for _, val := range values {
if Controller, _ := self.Read(val); Controller != nil {
result = append(result, val)
} else if Controller, _ := self.ReadByName(val); Controller != nil {
result = append(result, Controller.Id)
} else {
pfxlog.Logger().Debugf("user %v submitted %v as a config type of interest, but no matching records found", identityId, val)
}
}
}
return stringz.SliceToSet(result)
}

func (self *ControllerManager) Marshall(entity *Controller) ([]byte, error) {
msg := &edge_cmd_pb.Controller{
Id: entity.Id,
Expand Down
2 changes: 1 addition & 1 deletion router/state/dataState.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dsh *DataStateHandler) HandleReceive(msg *channel.Message, ch channel.Chan
return
}

model := common.NewReceiverRouterDataModel(RouterDataModelListerBufferSize)
model := common.NewReceiverRouterDataModel(RouterDataModelListerBufferSize, dsh.state.GetEnv().GetCloseNotify())

pfxlog.Logger().WithField("endIndex", newState.EndIndex).Debug("received full router data model state")
for _, event := range newState.Events {
Expand Down
Loading

0 comments on commit dd296e3

Please sign in to comment.