Skip to content

Commit

Permalink
feat(asset): enhance pubsub functionality with subscription and event…
Browse files Browse the repository at this point in the history
… handling

- Added subscription and unsubscription methods to the AssetPubSub struct, allowing for flexible event handling.
- Implemented tests for subscribing to all events, specific events, and verifying unsubscribe functionality.
- Enhanced event notification mechanism to ensure subscribers receive relevant asset events (created, updated, uploaded).
- Improved concurrency handling in event processing with mutex locks to prevent race conditions.
- This change strengthens the event-driven architecture, facilitating better asset management and responsiveness.
  • Loading branch information
kasugamirai committed Jan 7, 2025
1 parent 692f929 commit 7353e6b
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 16 deletions.
148 changes: 133 additions & 15 deletions asset/infrastructure/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,30 @@ package pubsub

import (
"context"
"reflect"
"sync"

"github.com/reearth/reearthx/asset/domain"
"github.com/reearth/reearthx/asset/repository"
"github.com/reearth/reearthx/log"
)

// Publisher defines the interface for publishing events
type Publisher interface {
Publish(ctx context.Context, topic string, msg interface{}) error
}

// AssetPubSub handles publishing of asset events
type subscription struct {
eventType repository.EventType
handler repository.EventHandler
}

// AssetPubSub handles publishing and subscribing to asset events
type AssetPubSub struct {
publisher Publisher
topic string
publisher Publisher
topic string
mu sync.RWMutex
subscriptions []subscription
}

var _ repository.PubSubRepository = (*AssetPubSub)(nil)
Expand All @@ -28,14 +38,83 @@ func NewAssetPubSub(publisher Publisher, topic string) *AssetPubSub {
}
}

// Subscribe registers a handler for a specific event type
func (p *AssetPubSub) Subscribe(eventType repository.EventType, handler repository.EventHandler) {
p.mu.Lock()
defer p.mu.Unlock()

p.subscriptions = append(p.subscriptions, subscription{
eventType: eventType,
handler: handler,
})
}

// Unsubscribe removes a handler for a specific event type
func (p *AssetPubSub) Unsubscribe(eventType repository.EventType, handler repository.EventHandler) {
p.mu.Lock()
defer p.mu.Unlock()

handlerValue := reflect.ValueOf(handler)
for i := len(p.subscriptions) - 1; i >= 0; i-- {
s := p.subscriptions[i]
if s.eventType == eventType && reflect.ValueOf(s.handler) == handlerValue {
p.subscriptions = append(p.subscriptions[:i], p.subscriptions[i+1:]...)
}
}
}

// notify notifies all subscribers of an event
func (p *AssetPubSub) notify(ctx context.Context, event repository.AssetEvent) {
p.mu.RLock()
subs := make([]subscription, len(p.subscriptions))
copy(subs, p.subscriptions)
p.mu.RUnlock()

for _, sub := range subs {
if sub.eventType == event.Type || sub.eventType == "*" {
sub.handler(ctx, event)
}
}
}

// PublishAssetCreated publishes an asset created event
func (p *AssetPubSub) PublishAssetCreated(ctx context.Context, asset *domain.Asset) error {
return p.publishAssetEvent(ctx, repository.EventTypeAssetCreated, asset)
event := repository.AssetEvent{
Type: repository.EventTypeAssetCreated,
AssetID: asset.ID(),
WorkspaceID: asset.WorkspaceID(),
ProjectID: asset.ProjectID(),
Status: asset.Status(),
Error: asset.Error(),
}

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset created event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}

// PublishAssetUpdated publishes an asset updated event
func (p *AssetPubSub) PublishAssetUpdated(ctx context.Context, asset *domain.Asset) error {
return p.publishAssetEvent(ctx, repository.EventTypeAssetUpdated, asset)
event := repository.AssetEvent{
Type: repository.EventTypeAssetUpdated,
AssetID: asset.ID(),
WorkspaceID: asset.WorkspaceID(),
ProjectID: asset.ProjectID(),
Status: asset.Status(),
Error: asset.Error(),
}

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset updated event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}

// PublishAssetDeleted publishes an asset deleted event
Expand All @@ -44,33 +123,72 @@ func (p *AssetPubSub) PublishAssetDeleted(ctx context.Context, assetID domain.ID
Type: repository.EventTypeAssetDeleted,
AssetID: assetID,
}
return p.publisher.Publish(ctx, p.topic, event)

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset deleted event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}

// PublishAssetUploaded publishes an asset uploaded event
func (p *AssetPubSub) PublishAssetUploaded(ctx context.Context, asset *domain.Asset) error {
return p.publishAssetEvent(ctx, repository.EventTypeAssetUploaded, asset)
event := repository.AssetEvent{
Type: repository.EventTypeAssetUploaded,
AssetID: asset.ID(),
WorkspaceID: asset.WorkspaceID(),
ProjectID: asset.ProjectID(),
Status: asset.Status(),
Error: asset.Error(),
}

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset uploaded event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}

// PublishAssetExtracted publishes an asset extraction status event
func (p *AssetPubSub) PublishAssetExtracted(ctx context.Context, asset *domain.Asset) error {
return p.publishAssetEvent(ctx, repository.EventTypeAssetExtracted, asset)
event := repository.AssetEvent{
Type: repository.EventTypeAssetExtracted,
AssetID: asset.ID(),
WorkspaceID: asset.WorkspaceID(),
ProjectID: asset.ProjectID(),
Status: asset.Status(),
Error: asset.Error(),
}

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset extracted event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}

// PublishAssetTransferred publishes an asset transferred event
func (p *AssetPubSub) PublishAssetTransferred(ctx context.Context, asset *domain.Asset) error {
return p.publishAssetEvent(ctx, repository.EventTypeAssetTransferred, asset)
}

// publishAssetEvent is a helper function to publish asset events with common fields
func (p *AssetPubSub) publishAssetEvent(ctx context.Context, eventType repository.EventType, asset *domain.Asset) error {
event := repository.AssetEvent{
Type: eventType,
Type: repository.EventTypeAssetTransferred,
AssetID: asset.ID(),
WorkspaceID: asset.WorkspaceID(),
ProjectID: asset.ProjectID(),
Status: asset.Status(),
Error: asset.Error(),
}
return p.publisher.Publish(ctx, p.topic, event)

if err := p.publisher.Publish(ctx, p.topic, event); err != nil {
log.Errorfc(ctx, "failed to publish asset transferred event: %v", err)
return err
}

p.notify(ctx, event)
return nil
}
108 changes: 108 additions & 0 deletions asset/infrastructure/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pubsub

import (
"context"
"sync"
"testing"

"github.com/reearth/reearthx/asset/domain"
Expand Down Expand Up @@ -31,6 +32,113 @@ func TestNewAssetPubSub(t *testing.T) {
assert.Equal(t, "test-topic", ps.topic)
}

func TestAssetPubSub_Subscribe(t *testing.T) {
ps := NewAssetPubSub(&mockPublisher{}, "test-topic")

var receivedEvents []repository.AssetEvent
var mu sync.Mutex

// Subscribe to all events
ps.Subscribe("*", func(ctx context.Context, event repository.AssetEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})

// Create test asset
asset := domain.NewAsset(
domain.NewID(),
"test.txt",
100,
"text/plain",
)
asset.MoveToWorkspace(domain.NewWorkspaceID())
asset.MoveToProject(domain.NewProjectID())
asset.UpdateStatus(domain.StatusActive, "")

// Publish events
ctx := context.Background()
ps.PublishAssetCreated(ctx, asset)
ps.PublishAssetUpdated(ctx, asset)
ps.PublishAssetUploaded(ctx, asset)

// Check received events
mu.Lock()
defer mu.Unlock()
assert.Equal(t, 3, len(receivedEvents))
assert.Equal(t, repository.EventTypeAssetCreated, receivedEvents[0].Type)
assert.Equal(t, repository.EventTypeAssetUpdated, receivedEvents[1].Type)
assert.Equal(t, repository.EventTypeAssetUploaded, receivedEvents[2].Type)
}

func TestAssetPubSub_SubscribeSpecificEvent(t *testing.T) {
ps := NewAssetPubSub(&mockPublisher{}, "test-topic")

var receivedEvents []repository.AssetEvent
var mu sync.Mutex

// Subscribe only to created events
ps.Subscribe(repository.EventTypeAssetCreated, func(ctx context.Context, event repository.AssetEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
})

// Create test asset
asset := domain.NewAsset(
domain.NewID(),
"test.txt",
100,
"text/plain",
)

// Publish different events
ctx := context.Background()
ps.PublishAssetCreated(ctx, asset) // Should be received
ps.PublishAssetUpdated(ctx, asset) // Should be ignored
ps.PublishAssetUploaded(ctx, asset) // Should be ignored

// Check received events
mu.Lock()
defer mu.Unlock()
assert.Equal(t, 1, len(receivedEvents))
assert.Equal(t, repository.EventTypeAssetCreated, receivedEvents[0].Type)
}

func TestAssetPubSub_Unsubscribe(t *testing.T) {
ps := NewAssetPubSub(&mockPublisher{}, "test-topic")

var receivedEvents []repository.AssetEvent
var mu sync.Mutex

handler := func(ctx context.Context, event repository.AssetEvent) {
mu.Lock()
receivedEvents = append(receivedEvents, event)
mu.Unlock()
}

// Subscribe and then unsubscribe
ps.Subscribe(repository.EventTypeAssetCreated, handler)
ps.Unsubscribe(repository.EventTypeAssetCreated, handler)

// Create test asset
asset := domain.NewAsset(
domain.NewID(),
"test.txt",
100,
"text/plain",
)

// Publish event
ctx := context.Background()
ps.PublishAssetCreated(ctx, asset)

// Check no events were received
mu.Lock()
defer mu.Unlock()
assert.Equal(t, 0, len(receivedEvents))
}

func TestAssetPubSub_PublishEvents(t *testing.T) {
ctx := context.Background()
pub := &mockPublisher{}
Expand Down
12 changes: 11 additions & 1 deletion asset/repository/pubsub_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ type AssetEvent struct {
Error string `json:"error,omitempty"`
}

// PubSubRepository defines the interface for publishing asset events
// EventHandler is a function that handles asset events
type EventHandler func(ctx context.Context, event AssetEvent)

// PubSubRepository defines the interface for publishing and subscribing to asset events
type PubSubRepository interface {
// PublishAssetCreated publishes an asset created event
PublishAssetCreated(ctx context.Context, asset *domain.Asset) error
Expand All @@ -48,4 +51,11 @@ type PubSubRepository interface {

// PublishAssetTransferred publishes an asset transferred event
PublishAssetTransferred(ctx context.Context, asset *domain.Asset) error

// Subscribe registers a handler for a specific event type
// Use "*" as eventType to subscribe to all events
Subscribe(eventType EventType, handler EventHandler)

// Unsubscribe removes a handler for a specific event type
Unsubscribe(eventType EventType, handler EventHandler)
}

0 comments on commit 7353e6b

Please sign in to comment.