forked from potato2003/actioncable-client-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriptions.go
112 lines (86 loc) · 2.6 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package actioncable
import (
"log"
)
type Subscriptions struct {
consumer *Consumer
subscriptions []*Subscription
}
func newSubscriptions(consumer *Consumer) *Subscriptions {
return &Subscriptions{
consumer: consumer,
subscriptions: []*Subscription{},
}
}
func (s *Subscriptions) Create(channelIdentifier *ChannelIdentifier) (*Subscription, error) {
subscription, err := s.add(newSubscription(s.consumer, channelIdentifier))
if err != nil {
log.Println(err)
return nil, err
}
return subscription, nil
}
func (s *Subscriptions) add(subscription *Subscription) (*Subscription, error) {
s.subscriptions = append(s.subscriptions, subscription)
s.sendCommand(subscription, "subscribe")
return subscription, nil
}
func (s *Subscriptions) remove(subscription *Subscription) *Subscription {
s.forget(subscription)
s.sendCommand(subscription, "unsubscribe")
return subscription
}
func (s *Subscriptions) reject(identifier *ChannelIdentifier) []*Subscription {
matches := s.findAll(identifier)
for _, subscription := range matches {
s.forget(subscription)
rejectedEvent := createSubscriptionEvent(Rejected, nil)
s.notify(identifier, rejectedEvent)
}
return matches
}
func (s *Subscriptions) forget(subscription *Subscription) {
s.subscriptions = s.filter(subscription.Identifier)
}
func (s *Subscriptions) findAll(identifier *ChannelIdentifier) []*Subscription {
result := make([]*Subscription, 0, len(s.subscriptions))
for _, subscription := range s.subscriptions {
if identifier.Equals(subscription.Identifier) {
result = append(result, subscription)
}
}
return result
}
func (s *Subscriptions) filter(identifier *ChannelIdentifier) []*Subscription {
result := make([]*Subscription, 0, len(s.subscriptions))
for _, subscription := range s.subscriptions {
if !identifier.Equals(subscription.Identifier) {
result = append(result, subscription)
}
}
return result
}
func (s *Subscriptions) reload() {
logger.Debug("reloading Subscriptions")
for _, subscription := range s.subscriptions {
s.sendCommand(subscription, "subscribe")
}
}
func (s *Subscriptions) notifyAll(event *SubscriptionEvent) {
for _, subscription := range s.subscriptions {
subscription.NotifyCh <- event
}
}
func (s *Subscriptions) notify(identifier *ChannelIdentifier, event *SubscriptionEvent) {
for _, subscription := range s.findAll(identifier) {
subscription.NotifyCh <- event
}
}
func (s *Subscriptions) sendCommand(subscription *Subscription, command string) {
data := map[string]interface{}{
"command": command,
"identifier": subscription.Identifier,
}
s.consumer.send(data)
return
}