diff --git a/asset/infrastructure/pubsub/pubsub.go b/asset/infrastructure/pubsub/pubsub.go index 4342c33..fcffec2 100644 --- a/asset/infrastructure/pubsub/pubsub.go +++ b/asset/infrastructure/pubsub/pubsub.go @@ -2,9 +2,12 @@ package pubsub import ( "context" + "reflect" + "sync" "github.com/reearth/reearthx/asset/domain" "github.com/reearth/reearthx/asset/repository" + "github.com/reearth/reearthx/log" ) // Publisher defines the interface for publishing events @@ -12,10 +15,17 @@ type Publisher interface { Publish(ctx context.Context, topic string, msg interface{}) error } -// AssetPubSub handles publishing of asset events +type subscription struct { + eventType repository.EventType + handler repository.EventHandler +} + +// AssetPubSub handles publishing and subscribing to asset events type AssetPubSub struct { - publisher Publisher - topic string + publisher Publisher + topic string + mu sync.RWMutex + subscriptions []subscription } var _ repository.PubSubRepository = (*AssetPubSub)(nil) @@ -28,14 +38,83 @@ func NewAssetPubSub(publisher Publisher, topic string) *AssetPubSub { } } +// Subscribe registers a handler for a specific event type +func (p *AssetPubSub) Subscribe(eventType repository.EventType, handler repository.EventHandler) { + p.mu.Lock() + defer p.mu.Unlock() + + p.subscriptions = append(p.subscriptions, subscription{ + eventType: eventType, + handler: handler, + }) +} + +// Unsubscribe removes a handler for a specific event type +func (p *AssetPubSub) Unsubscribe(eventType repository.EventType, handler repository.EventHandler) { + p.mu.Lock() + defer p.mu.Unlock() + + handlerValue := reflect.ValueOf(handler) + for i := len(p.subscriptions) - 1; i >= 0; i-- { + s := p.subscriptions[i] + if s.eventType == eventType && reflect.ValueOf(s.handler) == handlerValue { + p.subscriptions = append(p.subscriptions[:i], p.subscriptions[i+1:]...) + } + } +} + +// notify notifies all subscribers of an event +func (p *AssetPubSub) notify(ctx context.Context, event repository.AssetEvent) { + p.mu.RLock() + subs := make([]subscription, len(p.subscriptions)) + copy(subs, p.subscriptions) + p.mu.RUnlock() + + for _, sub := range subs { + if sub.eventType == event.Type || sub.eventType == "*" { + sub.handler(ctx, event) + } + } +} + // PublishAssetCreated publishes an asset created event func (p *AssetPubSub) PublishAssetCreated(ctx context.Context, asset *domain.Asset) error { - return p.publishAssetEvent(ctx, repository.EventTypeAssetCreated, asset) + event := repository.AssetEvent{ + Type: repository.EventTypeAssetCreated, + AssetID: asset.ID(), + WorkspaceID: asset.WorkspaceID(), + ProjectID: asset.ProjectID(), + Status: asset.Status(), + Error: asset.Error(), + } + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset created event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } // PublishAssetUpdated publishes an asset updated event func (p *AssetPubSub) PublishAssetUpdated(ctx context.Context, asset *domain.Asset) error { - return p.publishAssetEvent(ctx, repository.EventTypeAssetUpdated, asset) + event := repository.AssetEvent{ + Type: repository.EventTypeAssetUpdated, + AssetID: asset.ID(), + WorkspaceID: asset.WorkspaceID(), + ProjectID: asset.ProjectID(), + Status: asset.Status(), + Error: asset.Error(), + } + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset updated event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } // PublishAssetDeleted publishes an asset deleted event @@ -44,33 +123,72 @@ func (p *AssetPubSub) PublishAssetDeleted(ctx context.Context, assetID domain.ID Type: repository.EventTypeAssetDeleted, AssetID: assetID, } - return p.publisher.Publish(ctx, p.topic, event) + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset deleted event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } // PublishAssetUploaded publishes an asset uploaded event func (p *AssetPubSub) PublishAssetUploaded(ctx context.Context, asset *domain.Asset) error { - return p.publishAssetEvent(ctx, repository.EventTypeAssetUploaded, asset) + event := repository.AssetEvent{ + Type: repository.EventTypeAssetUploaded, + AssetID: asset.ID(), + WorkspaceID: asset.WorkspaceID(), + ProjectID: asset.ProjectID(), + Status: asset.Status(), + Error: asset.Error(), + } + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset uploaded event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } // PublishAssetExtracted publishes an asset extraction status event func (p *AssetPubSub) PublishAssetExtracted(ctx context.Context, asset *domain.Asset) error { - return p.publishAssetEvent(ctx, repository.EventTypeAssetExtracted, asset) + event := repository.AssetEvent{ + Type: repository.EventTypeAssetExtracted, + AssetID: asset.ID(), + WorkspaceID: asset.WorkspaceID(), + ProjectID: asset.ProjectID(), + Status: asset.Status(), + Error: asset.Error(), + } + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset extracted event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } // PublishAssetTransferred publishes an asset transferred event func (p *AssetPubSub) PublishAssetTransferred(ctx context.Context, asset *domain.Asset) error { - return p.publishAssetEvent(ctx, repository.EventTypeAssetTransferred, asset) -} - -// publishAssetEvent is a helper function to publish asset events with common fields -func (p *AssetPubSub) publishAssetEvent(ctx context.Context, eventType repository.EventType, asset *domain.Asset) error { event := repository.AssetEvent{ - Type: eventType, + Type: repository.EventTypeAssetTransferred, AssetID: asset.ID(), WorkspaceID: asset.WorkspaceID(), ProjectID: asset.ProjectID(), Status: asset.Status(), Error: asset.Error(), } - return p.publisher.Publish(ctx, p.topic, event) + + if err := p.publisher.Publish(ctx, p.topic, event); err != nil { + log.Errorfc(ctx, "failed to publish asset transferred event: %v", err) + return err + } + + p.notify(ctx, event) + return nil } diff --git a/asset/infrastructure/pubsub/pubsub_test.go b/asset/infrastructure/pubsub/pubsub_test.go index 9c8df87..14a822f 100644 --- a/asset/infrastructure/pubsub/pubsub_test.go +++ b/asset/infrastructure/pubsub/pubsub_test.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "sync" "testing" "github.com/reearth/reearthx/asset/domain" @@ -31,6 +32,113 @@ func TestNewAssetPubSub(t *testing.T) { assert.Equal(t, "test-topic", ps.topic) } +func TestAssetPubSub_Subscribe(t *testing.T) { + ps := NewAssetPubSub(&mockPublisher{}, "test-topic") + + var receivedEvents []repository.AssetEvent + var mu sync.Mutex + + // Subscribe to all events + ps.Subscribe("*", func(ctx context.Context, event repository.AssetEvent) { + mu.Lock() + receivedEvents = append(receivedEvents, event) + mu.Unlock() + }) + + // Create test asset + asset := domain.NewAsset( + domain.NewID(), + "test.txt", + 100, + "text/plain", + ) + asset.MoveToWorkspace(domain.NewWorkspaceID()) + asset.MoveToProject(domain.NewProjectID()) + asset.UpdateStatus(domain.StatusActive, "") + + // Publish events + ctx := context.Background() + ps.PublishAssetCreated(ctx, asset) + ps.PublishAssetUpdated(ctx, asset) + ps.PublishAssetUploaded(ctx, asset) + + // Check received events + mu.Lock() + defer mu.Unlock() + assert.Equal(t, 3, len(receivedEvents)) + assert.Equal(t, repository.EventTypeAssetCreated, receivedEvents[0].Type) + assert.Equal(t, repository.EventTypeAssetUpdated, receivedEvents[1].Type) + assert.Equal(t, repository.EventTypeAssetUploaded, receivedEvents[2].Type) +} + +func TestAssetPubSub_SubscribeSpecificEvent(t *testing.T) { + ps := NewAssetPubSub(&mockPublisher{}, "test-topic") + + var receivedEvents []repository.AssetEvent + var mu sync.Mutex + + // Subscribe only to created events + ps.Subscribe(repository.EventTypeAssetCreated, func(ctx context.Context, event repository.AssetEvent) { + mu.Lock() + receivedEvents = append(receivedEvents, event) + mu.Unlock() + }) + + // Create test asset + asset := domain.NewAsset( + domain.NewID(), + "test.txt", + 100, + "text/plain", + ) + + // Publish different events + ctx := context.Background() + ps.PublishAssetCreated(ctx, asset) // Should be received + ps.PublishAssetUpdated(ctx, asset) // Should be ignored + ps.PublishAssetUploaded(ctx, asset) // Should be ignored + + // Check received events + mu.Lock() + defer mu.Unlock() + assert.Equal(t, 1, len(receivedEvents)) + assert.Equal(t, repository.EventTypeAssetCreated, receivedEvents[0].Type) +} + +func TestAssetPubSub_Unsubscribe(t *testing.T) { + ps := NewAssetPubSub(&mockPublisher{}, "test-topic") + + var receivedEvents []repository.AssetEvent + var mu sync.Mutex + + handler := func(ctx context.Context, event repository.AssetEvent) { + mu.Lock() + receivedEvents = append(receivedEvents, event) + mu.Unlock() + } + + // Subscribe and then unsubscribe + ps.Subscribe(repository.EventTypeAssetCreated, handler) + ps.Unsubscribe(repository.EventTypeAssetCreated, handler) + + // Create test asset + asset := domain.NewAsset( + domain.NewID(), + "test.txt", + 100, + "text/plain", + ) + + // Publish event + ctx := context.Background() + ps.PublishAssetCreated(ctx, asset) + + // Check no events were received + mu.Lock() + defer mu.Unlock() + assert.Equal(t, 0, len(receivedEvents)) +} + func TestAssetPubSub_PublishEvents(t *testing.T) { ctx := context.Background() pub := &mockPublisher{} diff --git a/asset/repository/pubsub_repository.go b/asset/repository/pubsub_repository.go index b778956..b9d73e3 100644 --- a/asset/repository/pubsub_repository.go +++ b/asset/repository/pubsub_repository.go @@ -29,7 +29,10 @@ type AssetEvent struct { Error string `json:"error,omitempty"` } -// PubSubRepository defines the interface for publishing asset events +// EventHandler is a function that handles asset events +type EventHandler func(ctx context.Context, event AssetEvent) + +// PubSubRepository defines the interface for publishing and subscribing to asset events type PubSubRepository interface { // PublishAssetCreated publishes an asset created event PublishAssetCreated(ctx context.Context, asset *domain.Asset) error @@ -48,4 +51,11 @@ type PubSubRepository interface { // PublishAssetTransferred publishes an asset transferred event PublishAssetTransferred(ctx context.Context, asset *domain.Asset) error + + // Subscribe registers a handler for a specific event type + // Use "*" as eventType to subscribe to all events + Subscribe(eventType EventType, handler EventHandler) + + // Unsubscribe removes a handler for a specific event type + Unsubscribe(eventType EventType, handler EventHandler) }