This repository has been archived by the owner on Nov 30, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
subscriptions.go
76 lines (68 loc) · 1.49 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package peter
import (
"secondbit.org/wendy"
"sync"
)
type subscriptionMap struct {
*sync.RWMutex
items map[Topic][]wendy.NodeID
}
func newSubscriptionMap() *subscriptionMap {
return &subscriptionMap{
new(sync.RWMutex),
map[Topic][]wendy.NodeID{},
}
}
func (s *subscriptionMap) insert(t Topic, id wendy.NodeID) bool {
s.Lock()
defer s.Unlock()
if ids, set := s.items[t]; set {
for _, i := range ids {
if id.Equals(i) {
return false
}
}
s.items[t] = append(s.items[t], id)
} else {
s.items[t] = []wendy.NodeID{id}
}
return true
}
func (s *subscriptionMap) remove(t Topic, id wendy.NodeID) (removed, empty bool) {
s.Lock()
defer s.Unlock()
return s.unsafeRemove(t, id)
}
func (s *subscriptionMap) removeSubscriber(id wendy.NodeID) {
s.Lock()
defer s.Unlock()
for topic, _ := range s.items {
s.unsafeRemove(topic, id)
}
}
func (s *subscriptionMap) unsafeRemove(t Topic, id wendy.NodeID) (removed, empty bool) {
if ids, set := s.items[t]; set {
for pos, i := range ids {
if id.Equals(i) {
empty := false
if len(s.items[t]) == 1 {
s.items[t] = []wendy.NodeID{}
empty = true
} else if pos == 0 {
s.items[t] = s.items[t][1:]
} else if pos+1 == len(s.items[t]) {
s.items[t] = s.items[t][:pos]
} else {
s.items[t] = append(s.items[t][:pos], s.items[t][pos+1:]...)
}
return true, empty
}
}
}
return false, false
}
func (s *subscriptionMap) list(t Topic) []wendy.NodeID {
s.RLock()
defer s.RUnlock()
return s.items[t]
}