Skip to content

Commit

Permalink
updating export interface
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Dec 6, 2024
1 parent 6574a0c commit 6f82df9
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pkg/audit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func (am *Manager) addAuditResponsesToUpdateLists(
if *exportController.ExportEnabled {
err := am.exportSystem.Publish(context.Background(), *auditConnection, *auditChannel, violationMsg(constraint, ea, r.ScopedEnforcementActions, gvk, namespace, name, msg, details, labels, timestamp))
if err != nil {
am.log.Error(err, "export audit Publishing")
am.log.Error(err, "error exporting audit violation")
}
}
if *emitAuditEvents {
Expand Down
File renamed without changes.
6 changes: 3 additions & 3 deletions pkg/export/dapr/dapr.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (r *Dapr) Publish(_ context.Context, connectionName string, data interface{
return nil
}

func (r *Dapr) Close(connectionName string) error {
func (r *Dapr) CloseConnection(connectionName string) error {
delete(r.openConnections, connectionName)
return nil
}

func (r *Dapr) Update(_ context.Context, connectionName string, config interface{}) error {
func (r *Dapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
Expand All @@ -66,7 +66,7 @@ func (r *Dapr) Update(_ context.Context, connectionName string, config interface
return nil
}

func (r *Dapr) Create(_ context.Context, connectionName string, config interface{}) error {
func (r *Dapr) CreateConnection(_ context.Context, connectionName string, config interface{}) error {
var conn Connection
cfg, ok := config.(map[string]interface{})
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/export/dapr/dapr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestCreate(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := testClient.Create(context.TODO(), "another-test", tc.config)
err := testClient.CreateConnection(context.TODO(), "another-test", tc.config)
tmp, ok := testClient.(*Dapr)
if !ok {
t.Errorf("failed to type assert")
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestDapr_Update(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := testClient
if err := r.Update(context.Background(), tt.connectionName, tt.config); (err != nil) != tt.wantErr {
if err := r.UpdateConnection(context.Background(), tt.connectionName, tt.config); (err != nil) != tt.wantErr {
t.Errorf("Dapr.Update() error = %v, wantErr %v", err, tt.wantErr)
}
if !tt.wantErr {
Expand Down
6 changes: 3 additions & 3 deletions pkg/export/dapr/fake_dapr_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,15 @@ func (r *FakeDapr) Publish(_ context.Context, _ string, _ interface{}, _ string)
return nil
}

func (r *FakeDapr) Close(connectionName string) error {
func (r *FakeDapr) CloseConnection(connectionName string) error {
if len(r.openConnections) == 1 {
r.openConnections[connectionName].f()
}
delete(r.openConnections, connectionName)
return nil
}

func (r *FakeDapr) Update(_ context.Context, connectionName string, config interface{}) error {
func (r *FakeDapr) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
cfg, ok := config.(map[string]interface{})
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
Expand All @@ -378,7 +378,7 @@ func (r *FakeDapr) Update(_ context.Context, connectionName string, config inter
return nil
}

func (r *FakeDapr) Create(ctx context.Context, connectionName string, config interface{}) error {
func (r *FakeDapr) CreateConnection(ctx context.Context, connectionName string, config interface{}) error {
var conn FakeDaprConnection
cfg, ok := config.(map[string]interface{})
if !ok {
Expand Down
14 changes: 7 additions & 7 deletions pkg/export/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
)

type Driver interface {
// Publish single message with specific subject
// Publish publishes single message with specific subject using a connection
Publish(ctx context.Context, connectionName string, data interface{}, subject string) error

// Close connections
Close(connectionName string) error
// CloseConnection closes a connection
CloseConnection(connectionName string) error

// Update connection
Update(ctx context.Context, connectionName string, config interface{}) error
// UpdateConnection updates an existing connection
UpdateConnection(ctx context.Context, connectionName string, config interface{}) error

// Create connection
Create(ctx context.Context, connectionName string, config interface{}) error
// CreateConnection creates new connection
CreateConnection(ctx context.Context, connectionName string, config interface{}) error
}
26 changes: 13 additions & 13 deletions pkg/export/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ var SupportedDrivers = map[string]driver.Driver{
}

type System struct {
mux sync.RWMutex
connections map[string]string
mux sync.RWMutex
connectionToDriver map[string]string
}

func NewSystem() *System {
return &System{
connections: map[string]string{},
connectionToDriver: map[string]string{},
}
}

func (s *System) Publish(_ context.Context, connectionName string, subject string, msg interface{}) error {
s.mux.RLock()
defer s.mux.RUnlock()
if c, ok := s.connections[connectionName]; ok {
return SupportedDrivers[c].Publish(context.Background(), connectionName, msg, subject)
if dName, ok := s.connectionToDriver[connectionName]; ok {
return SupportedDrivers[dName].Publish(context.Background(), connectionName, msg, subject)
}
return fmt.Errorf("connection is not initialized, name: %s ", connectionName)
}
Expand All @@ -37,15 +37,15 @@ func (s *System) UpsertConnection(ctx context.Context, config interface{}, conne
s.mux.Lock()
defer s.mux.Unlock()
// Check if the connection already exists.
if oldDriver, ok := s.connections[connectionName]; ok {
if oldDriver, ok := s.connectionToDriver[connectionName]; ok {
// If the provider is the same, update the existing connection.
if oldDriver == newDriver {
return SupportedDrivers[newDriver].Update(ctx, connectionName, config)
return SupportedDrivers[newDriver].UpdateConnection(ctx, connectionName, config)
}
}
// Check if the provider is supported.
if conn, ok := SupportedDrivers[newDriver]; ok {
err := conn.Create(ctx, connectionName, config)
if d, ok := SupportedDrivers[newDriver]; ok {
err := d.CreateConnection(ctx, connectionName, config)
if err != nil {
return err
}
Expand All @@ -55,7 +55,7 @@ func (s *System) UpsertConnection(ctx context.Context, config interface{}, conne
return err
}
// Add the new connection and provider to the maps.
s.connections[connectionName] = newDriver
s.connectionToDriver[connectionName] = newDriver
return nil
}
return fmt.Errorf("driver %s is not supported", newDriver)
Expand All @@ -68,14 +68,14 @@ func (s *System) CloseConnection(connectionName string) error {
}

func (s *System) closeConnection(connectionName string) error {
if c, ok := s.connections[connectionName]; ok {
if c, ok := s.connectionToDriver[connectionName]; ok {
if conn, ok := SupportedDrivers[c]; ok {
err := conn.Close(connectionName)
err := conn.CloseConnection(connectionName)
if err != nil {
return err
}
}
delete(s.connections, connectionName)
delete(s.connectionToDriver, connectionName)
}
return nil
}
36 changes: 18 additions & 18 deletions pkg/export/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func TestMain(m *testing.M) {
},
}
for name, fakeConn := range SupportedDrivers {
testSystem.connections[name] = name
_ = fakeConn.Create(ctx, name, cfg[name])
testSystem.connectionToDriver[name] = name
_ = fakeConn.CreateConnection(ctx, name, cfg[name])
}
r := m.Run()
for name, fakeConn := range testSystem.connections {
_ = SupportedDrivers[fakeConn].Close(name)
for name, fakeConn := range testSystem.connectionToDriver {
_ = SupportedDrivers[fakeConn].CloseConnection(name)
}

if r != 0 {
Expand All @@ -48,7 +48,7 @@ func TestNewSystem(t *testing.T) {
{
name: "requesting system",
want: &System{
connections: map[string]string{},
connectionToDriver: map[string]string{},
},
},
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
connectionName: "conn1",
newDriver: dapr.Name,
setup: func(s *System) error {
s.connections = map[string]string{}
s.connectionToDriver = map[string]string{}
SupportedDrivers[dapr.Name] = dapr.FakeConn
return nil
},
Expand All @@ -90,9 +90,9 @@ func TestSystem_UpsertConnection(t *testing.T) {
connectionName: "conn1",
newDriver: dapr.Name,
setup: func(s *System) error {
s.connections["conn1"] = dapr.Name
s.connectionToDriver["conn1"] = dapr.Name
SupportedDrivers[dapr.Name] = dapr.FakeConn
return SupportedDrivers[dapr.Name].Create(ctx, "conn1", map[string]interface{}{"component": "pubsub"})
return SupportedDrivers[dapr.Name].CreateConnection(ctx, "conn1", map[string]interface{}{"component": "pubsub"})
},
wantErr: false,
},
Expand All @@ -110,10 +110,10 @@ func TestSystem_UpsertConnection(t *testing.T) {
connectionName: "conn4",
newDriver: dapr.Name,
setup: func(s *System) error {
s.connections["conn4"] = testdriver.Name
s.connectionToDriver["conn4"] = testdriver.Name
SupportedDrivers[dapr.Name] = dapr.FakeConn
SupportedDrivers[testdriver.Name] = testdriver.FakeConn
return SupportedDrivers[testdriver.Name].Create(ctx, "conn4", "config4")
return SupportedDrivers[testdriver.Name].CreateConnection(ctx, "conn4", "config4")
},
wantErr: false,
},
Expand All @@ -131,7 +131,7 @@ func TestSystem_UpsertConnection(t *testing.T) {
}

if !tt.wantErr {
if driver, ok := system.connections[tt.connectionName]; !ok || driver != tt.newDriver {
if driver, ok := system.connectionToDriver[tt.connectionName]; !ok || driver != tt.newDriver {
t.Errorf("connection %s not found or driver mismatch: got %v, want %v", tt.connectionName, driver, tt.newDriver)
}
}
Expand All @@ -149,9 +149,9 @@ func TestSystem_CloseConnection(t *testing.T) {
{
name: "close existing connection",
setup: func(s *System) {
s.connections["test-connection"] = dapr.Name
s.connectionToDriver["test-connection"] = dapr.Name
SupportedDrivers[dapr.Name] = dapr.FakeConn
_ = dapr.FakeConn.Create(context.TODO(), "test-connection", map[string]interface{}{"component": "pubsub"})
_ = dapr.FakeConn.CreateConnection(context.TODO(), "test-connection", map[string]interface{}{"component": "pubsub"})
},
connectionName: "test-connection",
wantErr: false,
Expand All @@ -160,7 +160,7 @@ func TestSystem_CloseConnection(t *testing.T) {
name: "close non-existing connection",
setup: func(s *System) {
// No setup needed for non-existing connection
s.connections = map[string]string{}
s.connectionToDriver = map[string]string{}
},
connectionName: "non-existing-connection",
wantErr: false,
Expand All @@ -179,7 +179,7 @@ func TestSystem_CloseConnection(t *testing.T) {
t.Errorf("CloseConnection() error = %v, wantErr %v", err, tt.wantErr)
}

if _, exists := s.connections[tt.connectionName]; exists && !tt.wantErr {
if _, exists := s.connectionToDriver[tt.connectionName]; exists && !tt.wantErr {
t.Errorf("connection %s still exists after CloseConnection", tt.connectionName)
}
})
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestSystem_Publish(t *testing.T) {
{
name: "Publishing to a connection that does exist",
fields: fields{
connections: testSystem.connections,
connections: testSystem.connectionToDriver,
},
args: args{ctx: context.Background(), connection: "dapr", topic: "test", msg: nil},
wantErr: false,
Expand All @@ -230,8 +230,8 @@ func TestSystem_Publish(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &System{
mux: sync.RWMutex{},
connections: tt.fields.connections,
mux: sync.RWMutex{},
connectionToDriver: tt.fields.connections,
}
if err := s.Publish(tt.args.ctx, tt.args.connection, tt.args.topic, tt.args.msg); (err != nil) != tt.wantErr {
t.Errorf("System.Publish() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
6 changes: 3 additions & 3 deletions pkg/export/testdriver/testdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ func (r *Connection) Publish(_ context.Context, _ string, _ interface{}, _ strin
return nil
}

func (r *Connection) Close(connectionName string) error {
func (r *Connection) CloseConnection(connectionName string) error {
delete(r.openConnections, connectionName)
return nil
}

func (r *Connection) Update(_ context.Context, connectionName string, config interface{}) error {
func (r *Connection) UpdateConnection(_ context.Context, connectionName string, config interface{}) error {
name, ok := config.(string)
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
Expand All @@ -38,7 +38,7 @@ func (r *Connection) Update(_ context.Context, connectionName string, config int
return nil
}

func (r *Connection) Create(_ context.Context, connectionName string, config interface{}) error {
func (r *Connection) CreateConnection(_ context.Context, connectionName string, config interface{}) error {
name, ok := config.(string)
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
Expand Down

0 comments on commit 6f82df9

Please sign in to comment.