Skip to content

Commit

Permalink
[iamclient] Subscribe on subjects changed event
Browse files Browse the repository at this point in the history
Signed-off-by: Mykhailo Lohvynenko <[email protected]>
  • Loading branch information
mlohvynenko committed Sep 27, 2024
1 parent cf48642 commit af97ca7
Show file tree
Hide file tree
Showing 6 changed files with 368 additions and 21 deletions.
2 changes: 1 addition & 1 deletion communicationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func newCommunicationManager(cfg *config.Config) (cm *communicationManager, err
return cm, aoserrors.Wrap(err)
}

if cm.statusHandler, err = unitstatushandler.New(cfg, cm.iam, cm.unitConfig, cm.umController,
if cm.statusHandler, err = unitstatushandler.New(cfg, cm.iam, cm.iam, cm.unitConfig, cm.umController,
cm.imagemanager, cm.launcher, cm.downloader, cm.db, cm.amqp, cm.smController); err != nil {
return cm, aoserrors.Wrap(err)
}
Expand Down
106 changes: 106 additions & 0 deletions iamclient/iamclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type Client struct {
publicNodesService pb.IAMPublicNodesServiceClient
nodesService pb.IAMNodesServiceClient
nodeInfoListeners []chan cloudprotocol.NodeInfo
subjectsListeners []chan []string
}

// Sender provides API to send messages to the cloud.
Expand Down Expand Up @@ -145,6 +146,8 @@ func New(
return nil, aoserrors.Wrap(err)
}

go localClient.connectPublicIdentityService()

go localClient.connectPublicNodeService()

return localClient, nil
Expand All @@ -160,6 +163,23 @@ func (client *Client) GetSystemID() (systemID string) {
return client.systemID
}

// GetSubjects returns subjects.
func (client *Client) GetSubjects() (subjects []string, err error) {
ctx, cancel := context.WithTimeout(context.Background(), iamRequestTimeout)
defer cancel()

request := &empty.Empty{}

response, err := client.identService.GetSubjects(ctx, request)
if err != nil {
return nil, aoserrors.Wrap(err)
}

log.WithFields(log.Fields{"subjects": response.GetSubjects()}).Debug("Get subjects")

return response.GetSubjects(), nil
}

// GetCurrentNodeInfo returns info for current node.
func (client *Client) GetCurrentNodeInfo() (nodeInfo cloudprotocol.NodeInfo, err error) {
return client.GetNodeInfo(client.GetNodeID())
Expand Down Expand Up @@ -210,6 +230,19 @@ func (client *Client) SubscribeNodeInfoChange() <-chan cloudprotocol.NodeInfo {
return ch
}

// SubscribeSubjectsChanged subscribes client on subject changed events.
func (client *Client) SubscribeSubjectsChanged() <-chan []string {
client.Lock()
defer client.Unlock()

log.Debug("Subscribe on subjects change event")

ch := make(chan []string)
client.subjectsListeners = append(client.subjectsListeners, ch)

return ch
}

// RenewCertificatesNotification renew certificates notification.
func (client *Client) RenewCertificatesNotification(secrets cloudprotocol.UnitSecrets,
certInfo []cloudprotocol.RenewCertData,
Expand Down Expand Up @@ -487,6 +520,7 @@ func (client *Client) Close() (err error) {
}

client.nodeInfoListeners = nil
client.subjectsListeners = nil

log.Debug("Disconnected from IAM")

Expand Down Expand Up @@ -588,6 +622,30 @@ func (client *Client) getSystemID() (systemID string, err error) {
return response.GetSystemId(), nil
}

func (client *Client) processSubjectsChangeMessages(
listener pb.IAMPublicIdentityService_SubscribeSubjectsChangedClient,
) (err error) {
for {
subjects, err := listener.Recv()
if err != nil {
if code, ok := status.FromError(err); ok {
if code.Code() == codes.Canceled {
log.Debug("IAM client connection closed")
return nil
}
}

return aoserrors.Wrap(err)
}

client.Lock()
for _, listener := range client.subjectsListeners {
listener <- subjects.GetSubjects()
}
client.Unlock()
}
}

func (client *Client) processMessages(listener pb.IAMPublicNodesService_SubscribeNodeChangedClient) (err error) {
for {
nodeInfo, err := listener.Recv()
Expand All @@ -610,6 +668,36 @@ func (client *Client) processMessages(listener pb.IAMPublicNodesService_Subscrib
}
}

func (client *Client) connectPublicIdentityService() {
listener, err := client.subscribeSubjectsChange()

for {
if err != nil {
log.Errorf("Error register to IAM: %v", aoserrors.Wrap(err))
} else {
if err = client.processSubjectsChangeMessages(listener); err != nil {
if errors.Is(err, io.EOF) {
log.Debug("Connection is closed")
} else {
log.Errorf("Connection error: %v", aoserrors.Wrap(err))
}
}
}

log.Debugf("Reconnect to IAM in %v...", iamReconnectInterval)

select {
case <-client.closeChannel:
log.Debugf("Disconnected from IAM")

return

case <-time.After(iamReconnectInterval):
listener, err = client.subscribeSubjectsChange()
}
}
}

func (client *Client) connectPublicNodeService() {
listener, err := client.subscribeNodeInfoChange()

Expand Down Expand Up @@ -640,6 +728,24 @@ func (client *Client) connectPublicNodeService() {
}
}

func (client *Client) subscribeSubjectsChange() (
listener pb.IAMPublicIdentityService_SubscribeSubjectsChangedClient, err error,
) {
client.Lock()
defer client.Unlock()

client.identService = pb.NewIAMPublicIdentityServiceClient(client.publicConnection)

listener, err = client.identService.SubscribeSubjectsChanged(context.Background(), &emptypb.Empty{})
if err != nil {
log.WithField("error", err).Error("Can't subscribe on subjects change event")

return nil, aoserrors.Wrap(err)
}

return listener, aoserrors.Wrap(err)
}

func (client *Client) subscribeNodeInfoChange() (
listener pb.IAMPublicNodesService_SubscribeNodeChangedClient, err error,
) {
Expand Down
103 changes: 100 additions & 3 deletions iamclient/iamclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ const (
* Types
**********************************************************************************************************************/

type testIAMPublicIdentityServiceServer struct {
pb.UnimplementedIAMPublicIdentityServiceServer
subjects chan *pb.Subjects

currentSubjects []string
systemID string
}

type testIAMPublicNodesServiceServer struct {
pb.UnimplementedIAMPublicNodesServiceServer
nodeInfo chan *pb.NodeInfo
Expand All @@ -66,11 +74,10 @@ type testIAMPublicNodesServiceServer struct {

type testPublicServer struct {
pb.UnimplementedIAMPublicServiceServer
pb.UnimplementedIAMPublicIdentityServiceServer
testIAMPublicIdentityServiceServer
testIAMPublicNodesServiceServer

grpcServer *grpc.Server
systemID string
certURL map[string]string
keyURL map[string]string
}
Expand Down Expand Up @@ -165,6 +172,67 @@ func TestGetSystemID(t *testing.T) {
}
}

func TestGetSubjects(t *testing.T) {
publicServer, protectedServer, err := newTestServer(publicServerURL, protectedServerURL)
if err != nil {
t.Fatalf("Can't create test server: %s", err)
}

defer publicServer.close()
defer protectedServer.close()

publicServer.systemID = "testID"

client, err := iamclient.New(&config.Config{
IAMProtectedServerURL: protectedServerURL,
IAMPublicServerURL: publicServerURL,
}, &testSender{}, nil, true)
if err != nil {
t.Fatalf("Can't create IAM client: %s", err)
}
defer client.Close()

subjects, err := client.GetSubjects()
if err != nil {
t.Errorf("Can't get subjects: %v", err)
}

if !reflect.DeepEqual(subjects, publicServer.currentSubjects) {
t.Errorf("Invalid subjects: %v", subjects)
}
}

func TestSubscribeSubjectsChanged(t *testing.T) {
publicServer, protectedServer, err := newTestServer(publicServerURL, protectedServerURL)
if err != nil {
t.Fatalf("Can't create test server: %s", err)
}

defer publicServer.close()
defer protectedServer.close()

client, err := iamclient.New(&config.Config{
IAMProtectedServerURL: protectedServerURL,
IAMPublicServerURL: publicServerURL,
}, &testSender{}, nil, true)
if err != nil {
t.Fatalf("Can't create IAM client: %s", err)
}
defer client.Close()

stream := client.SubscribeSubjectsChanged()

newSubjects := []string{"new1", "new2"}

publicServer.subjects <- &pb.Subjects{Subjects: newSubjects}

receivedSubjects := <-stream

if !reflect.DeepEqual(newSubjects, receivedSubjects) {
t.Errorf("Not expected subjects: %v", receivedSubjects)
}
}

func TestRenewCertificatesNotification(t *testing.T) {
sender := &testSender{}

Expand Down Expand Up @@ -658,10 +726,13 @@ func newTestServer(
}

publicServer.grpcServer = grpc.NewServer()

publicServer.currentSubjects = []string{"initial1", "initial2"}
publicServer.subjects = make(chan *pb.Subjects)
publicServer.nodeInfo = make(chan *pb.NodeInfo)

pb.RegisterIAMPublicServiceServer(publicServer.grpcServer, publicServer)
pb.RegisterIAMPublicIdentityServiceServer(publicServer.grpcServer, publicServer)
pb.RegisterIAMPublicIdentityServiceServer(publicServer.grpcServer, &publicServer.testIAMPublicIdentityServiceServer)
pb.RegisterIAMPublicNodesServiceServer(publicServer.grpcServer, &publicServer.testIAMPublicNodesServiceServer)

go func() {
Expand Down Expand Up @@ -823,6 +894,32 @@ func (server *testPublicServer) GetNodeInfo(context context.Context, req *empty.
return &pb.NodeInfo{}, nil
}

func (server *testIAMPublicIdentityServiceServer) GetSystemInfo(
context context.Context, req *empty.Empty,
) (rsp *pb.SystemInfo, err error) {
rsp = &pb.SystemInfo{SystemId: server.systemID}

return rsp, nil
}

func (server *testIAMPublicIdentityServiceServer) GetSubjects(
context.Context, *emptypb.Empty,
) (rsp *pb.Subjects, err error) {
rsp = &pb.Subjects{Subjects: server.currentSubjects}

return rsp, nil
}

func (server *testIAMPublicIdentityServiceServer) SubscribeSubjectsChanged(
empty *emptypb.Empty, stream pb.IAMPublicIdentityService_SubscribeSubjectsChangedServer,
) error {
log.Error("testIAMPublicIdentityServiceServer SubscribeSubjectsChanged")

subjects := <-server.subjects

return aoserrors.Wrap(stream.Send(subjects))
}

func (server *testIAMPublicNodesServiceServer) GetAllNodeIDs(context context.Context, req *emptypb.Empty) (
*pb.NodesID, error,
) {
Expand Down
Loading

0 comments on commit af97ca7

Please sign in to comment.