Skip to content

Commit 4514cb8

Browse files
committed
use common hub with common publisher
1 parent 6ab96ff commit 4514cb8

19 files changed

+134
-187
lines changed

common/errors.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,6 @@ var ErrNilStatusMetricsHandler = errors.New("nil status metrics handler")
3434

3535
// ErrWrongTypeAssertion signals a wrong type assertion
3636
var ErrWrongTypeAssertion = errors.New("wrong type assertion")
37+
38+
// ErrLoopAlreadyStarted signals that a loop has already been started
39+
var ErrLoopAlreadyStarted = errors.New("loop already started")

disabled/disabledHub.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,33 @@ import (
99
type Hub struct {
1010
}
1111

12-
// Run does nothing
13-
func (h *Hub) Run() error {
12+
// RegisterListener does nothing
13+
func (h *Hub) RegisterListener() error {
1414
return nil
1515
}
1616

17-
// Broadcast does nothing
18-
func (h *Hub) Broadcast(_ data.BlockEvents) {
17+
// Publish -
18+
func (h *Hub) Publish(events data.BlockEvents) {
1919
}
2020

21-
// BroadcastRevert does nothing
22-
func (h *Hub) BroadcastRevert(_ data.RevertBlock) {
21+
// PublishRevert -
22+
func (h *Hub) PublishRevert(revertBlock data.RevertBlock) {
2323
}
2424

25-
// BroadcastFinalized does nothing
26-
func (h *Hub) BroadcastFinalized(_ data.FinalizedBlock) {
25+
// PublishFinalized -
26+
func (h *Hub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
2727
}
2828

29-
// BroadcastTxs does nothing
30-
func (h *Hub) BroadcastTxs(_ data.BlockTxs) {
29+
// PublishTxs -
30+
func (h *Hub) PublishTxs(blockTxs data.BlockTxs) {
3131
}
3232

33-
// BroadcastScrs does nothing
34-
func (h *Hub) BroadcastScrs(_ data.BlockScrs) {
33+
// PublishScrs -
34+
func (h *Hub) PublishScrs(blockScrs data.BlockScrs) {
3535
}
3636

37-
// BroadcastBlockEventsWithOrder does nothing
38-
func (h *Hub) BroadcastBlockEventsWithOrder(_ data.BlockEventsWithOrder) {
37+
// PublishBlockEventsWithOrder -
38+
func (h *Hub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
3939
}
4040

4141
// RegisterEvent does nothing

dispatcher/hub/commonHub.go

Lines changed: 39 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,16 @@ type ArgsCommonHub struct {
2222
}
2323

2424
type commonHub struct {
25-
filter filters.EventFilter
26-
subscriptionMapper dispatcher.SubscriptionMapperHandler
27-
mutDispatchers sync.RWMutex
28-
dispatchers map[uuid.UUID]dispatcher.EventDispatcher
29-
register chan dispatcher.EventDispatcher
30-
unregister chan dispatcher.EventDispatcher
31-
broadcast chan data.BlockEvents
32-
broadcastRevert chan data.RevertBlock
33-
broadcastFinalized chan data.FinalizedBlock
34-
broadcastTxs chan data.BlockTxs
35-
broadcastBlockEventsWithOrder chan data.BlockEventsWithOrder
36-
broadcastScrs chan data.BlockScrs
37-
closeChan chan struct{}
38-
cancelFunc func()
25+
filter filters.EventFilter
26+
subscriptionMapper dispatcher.SubscriptionMapperHandler
27+
mutDispatchers sync.RWMutex
28+
dispatchers map[uuid.UUID]dispatcher.EventDispatcher
29+
register chan dispatcher.EventDispatcher
30+
unregister chan dispatcher.EventDispatcher
31+
32+
cancelFunc func()
33+
closeChan chan struct{}
34+
mutState sync.RWMutex
3935
}
4036

4137
// NewCommonHub creates a new commonHub instance
@@ -46,19 +42,13 @@ func NewCommonHub(args ArgsCommonHub) (*commonHub, error) {
4642
}
4743

4844
return &commonHub{
49-
mutDispatchers: sync.RWMutex{},
50-
filter: args.Filter,
51-
subscriptionMapper: args.SubscriptionMapper,
52-
dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher),
53-
register: make(chan dispatcher.EventDispatcher),
54-
unregister: make(chan dispatcher.EventDispatcher),
55-
broadcast: make(chan data.BlockEvents),
56-
broadcastRevert: make(chan data.RevertBlock),
57-
broadcastFinalized: make(chan data.FinalizedBlock),
58-
broadcastTxs: make(chan data.BlockTxs),
59-
broadcastBlockEventsWithOrder: make(chan data.BlockEventsWithOrder),
60-
broadcastScrs: make(chan data.BlockScrs),
61-
closeChan: make(chan struct{}),
45+
mutDispatchers: sync.RWMutex{},
46+
filter: args.Filter,
47+
subscriptionMapper: args.SubscriptionMapper,
48+
dispatchers: make(map[uuid.UUID]dispatcher.EventDispatcher),
49+
register: make(chan dispatcher.EventDispatcher),
50+
unregister: make(chan dispatcher.EventDispatcher),
51+
closeChan: make(chan struct{}),
6252
}, nil
6353
}
6454

@@ -73,42 +63,26 @@ func checkArgs(args ArgsCommonHub) error {
7363
return nil
7464
}
7565

76-
// Run is launched as a goroutine and listens for events on the exposed channels
77-
// TODO: use protection from triggering multiple times
78-
func (ch *commonHub) Run() error {
66+
// RegisterListener creates a goroutine and listens for WS register events
67+
func (ch *commonHub) RegisterListener() error {
68+
ch.mutState.Lock()
69+
defer ch.mutState.Unlock()
70+
71+
if ch.cancelFunc != nil {
72+
return common.ErrLoopAlreadyStarted
73+
}
74+
7975
var ctx context.Context
8076
ctx, ch.cancelFunc = context.WithCancel(context.Background())
8177

82-
go ch.run(ctx)
78+
go ch.registerListener(ctx)
8379

8480
return nil
8581
}
8682

87-
func (ch *commonHub) run(ctx context.Context) {
83+
func (ch *commonHub) registerListener(ctx context.Context) {
8884
for {
8985
select {
90-
case <-ctx.Done():
91-
log.Debug("commonHub is stopping...")
92-
return
93-
94-
case events := <-ch.broadcast:
95-
ch.handleBroadcast(events)
96-
97-
case revertEvent := <-ch.broadcastRevert:
98-
ch.handleRevertBroadcast(revertEvent)
99-
100-
case finalizedEvent := <-ch.broadcastFinalized:
101-
ch.handleFinalizedBroadcast(finalizedEvent)
102-
103-
case txsEvent := <-ch.broadcastTxs:
104-
ch.handleTxsBroadcast(txsEvent)
105-
106-
case txsEvent := <-ch.broadcastBlockEventsWithOrder:
107-
ch.handleBlockEventsWithOrderBroadcast(txsEvent)
108-
109-
case scrsEvent := <-ch.broadcastScrs:
110-
ch.handleScrsBroadcast(scrsEvent)
111-
11286
case dispatcherClient := <-ch.register:
11387
ch.registerDispatcher(dispatcherClient)
11488

@@ -123,59 +97,6 @@ func (ch *commonHub) Subscribe(event data.SubscribeEvent) {
12397
ch.subscriptionMapper.MatchSubscribeEvent(event)
12498
}
12599

126-
// Broadcast handles block events pushed by producers into the broadcast channel
127-
// Upon reading the channel, the hub notifies the registered dispatchers, if any
128-
func (ch *commonHub) Broadcast(events data.BlockEvents) {
129-
select {
130-
case ch.broadcast <- events:
131-
case <-ch.closeChan:
132-
}
133-
}
134-
135-
// BroadcastRevert handles revert event pushed by producers into the broadcast channel
136-
// Upon reading the channel, the hub notifies the registered dispatchers, if any
137-
func (ch *commonHub) BroadcastRevert(event data.RevertBlock) {
138-
select {
139-
case ch.broadcastRevert <- event:
140-
case <-ch.closeChan:
141-
}
142-
}
143-
144-
// BroadcastFinalized handles finalized event pushed by producers into the broadcast channel
145-
// Upon reading the channel, the hub notifies the registered dispatchers, if any
146-
func (ch *commonHub) BroadcastFinalized(event data.FinalizedBlock) {
147-
select {
148-
case ch.broadcastFinalized <- event:
149-
case <-ch.closeChan:
150-
}
151-
}
152-
153-
// BroadcastTxs handles block txs event pushed by producers into the broadcast channel
154-
// Upon reading the channel, the hub notifies the registered dispatchers, if any
155-
func (ch *commonHub) BroadcastTxs(event data.BlockTxs) {
156-
select {
157-
case ch.broadcastTxs <- event:
158-
case <-ch.closeChan:
159-
}
160-
}
161-
162-
// BroadcastScrs handles block scrs event pushed by producers into the broadcast channel
163-
// Upon reading the channel, the hub notifies the registered dispatchers, if any
164-
func (ch *commonHub) BroadcastScrs(event data.BlockScrs) {
165-
select {
166-
case ch.broadcastScrs <- event:
167-
case <-ch.closeChan:
168-
}
169-
}
170-
171-
// BroadcastBlockEventsWithOrder handles full block events pushed by producers into the channel
172-
func (ch *commonHub) BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder) {
173-
select {
174-
case ch.broadcastBlockEventsWithOrder <- event:
175-
case <-ch.closeChan:
176-
}
177-
}
178-
179100
// RegisterEvent will send event to a receive-only channel used to register dispatchers
180101
func (ch *commonHub) RegisterEvent(event dispatcher.EventDispatcher) {
181102
select {
@@ -192,7 +113,8 @@ func (ch *commonHub) UnregisterEvent(event dispatcher.EventDispatcher) {
192113
}
193114
}
194115

195-
func (ch *commonHub) handleBroadcast(blockEvents data.BlockEvents) {
116+
// Publish will publish logs and events to dispatcher
117+
func (ch *commonHub) Publish(blockEvents data.BlockEvents) {
196118
subscriptions := ch.subscriptionMapper.Subscriptions()
197119

198120
for _, subscription := range subscriptions {
@@ -220,7 +142,8 @@ func (ch *commonHub) handlePushBlockEvents(blockEvents data.BlockEvents, subscri
220142
ch.mutDispatchers.RUnlock()
221143
}
222144

223-
func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) {
145+
// PublishRevert will publish revert event to dispatcher
146+
func (ch *commonHub) PublishRevert(revertBlock data.RevertBlock) {
224147
subscriptions := ch.subscriptionMapper.Subscriptions()
225148

226149
dispatchersMap := make(map[uuid.UUID]data.RevertBlock)
@@ -242,7 +165,8 @@ func (ch *commonHub) handleRevertBroadcast(revertBlock data.RevertBlock) {
242165
}
243166
}
244167

245-
func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock) {
168+
// PublishFinalized will publish finalized event to dispatcher
169+
func (ch *commonHub) PublishFinalized(finalizedBlock data.FinalizedBlock) {
246170
subscriptions := ch.subscriptionMapper.Subscriptions()
247171

248172
dispatchersMap := make(map[uuid.UUID]data.FinalizedBlock)
@@ -264,7 +188,8 @@ func (ch *commonHub) handleFinalizedBroadcast(finalizedBlock data.FinalizedBlock
264188
}
265189
}
266190

267-
func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) {
191+
// PublishTxs will publish txs event to dispatcher
192+
func (ch *commonHub) PublishTxs(blockTxs data.BlockTxs) {
268193
subscriptions := ch.subscriptionMapper.Subscriptions()
269194

270195
dispatchersMap := make(map[uuid.UUID]data.BlockTxs)
@@ -286,7 +211,8 @@ func (ch *commonHub) handleTxsBroadcast(blockTxs data.BlockTxs) {
286211
}
287212
}
288213

289-
func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEventsWithOrder) {
214+
// PublishBlockEventsWithOrder will publish block events with order to dispatcher
215+
func (ch *commonHub) PublishBlockEventsWithOrder(blockTxs data.BlockEventsWithOrder) {
290216
subscriptions := ch.subscriptionMapper.Subscriptions()
291217

292218
dispatchersMap := make(map[uuid.UUID]data.BlockEventsWithOrder)
@@ -308,7 +234,7 @@ func (ch *commonHub) handleBlockEventsWithOrderBroadcast(blockTxs data.BlockEven
308234
}
309235
}
310236

311-
func (ch *commonHub) handleScrsBroadcast(blockScrs data.BlockScrs) {
237+
func (ch *commonHub) PublishScrs(blockScrs data.BlockScrs) {
312238
subscriptions := ch.subscriptionMapper.Subscriptions()
313239

314240
dispatchersMap := make(map[uuid.UUID]data.BlockScrs)

dispatcher/interface.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/google/uuid"
99
"github.com/multiversx/mx-chain-notifier-go/data"
10+
"github.com/multiversx/mx-chain-notifier-go/process"
1011
)
1112

1213
// EventDispatcher defines the behaviour of a event dispatcher component
@@ -20,20 +21,20 @@ type EventDispatcher interface {
2021
ScrsEvent(event data.BlockScrs)
2122
}
2223

23-
// Hub defines the behaviour of a hub component which should be able to register
24-
// and unregister dispatching events
24+
// Hub defines the behaviour of a component which should be able to receive events
25+
// and publish them to subscribers
2526
type Hub interface {
26-
Run() error
27-
Broadcast(events data.BlockEvents)
28-
BroadcastRevert(event data.RevertBlock)
29-
BroadcastFinalized(event data.FinalizedBlock)
30-
BroadcastTxs(event data.BlockTxs)
31-
BroadcastScrs(event data.BlockScrs)
32-
BroadcastBlockEventsWithOrder(event data.BlockEventsWithOrder)
27+
process.PublisherHandler
28+
Dispatcher
29+
}
30+
31+
// Dispatcher defines the behaviour of a dispatcher component which should be able to register
32+
// and unregister dispatching events
33+
type Dispatcher interface {
34+
RegisterListener() error
3335
RegisterEvent(event EventDispatcher)
3436
UnregisterEvent(event EventDispatcher)
3537
Subscribe(event data.SubscribeEvent)
36-
Close() error
3738
IsInterfaceNil() bool
3839
}
3940

dispatcher/ws/errors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package ws
22

33
import "errors"
44

5-
// ErrNilHubHandler signals that a nil hub handler has been provided
6-
var ErrNilHubHandler = errors.New("nil hub handler")
5+
// ErrNilDispatcher signals that a nil dispatcher has been provided
6+
var ErrNilDispatcher = errors.New("nil dispatcher")
77

88
// ErrNilWSUpgrader signals that a nil websocket upgrader has been provided
99
var ErrNilWSUpgrader = errors.New("nil websocket upgrader")

dispatcher/ws/export_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ type ArgsWSDispatcher struct {
88
// NewTestWSDispatcher -
99
func NewTestWSDispatcher(args ArgsWSDispatcher) (*websocketDispatcher, error) {
1010
wsArgs := argsWebSocketDispatcher{
11-
Hub: args.Hub,
11+
Dispatcher: args.Dispatcher,
1212
Conn: args.Conn,
1313
Marshaller: args.Marshaller,
1414
}

dispatcher/ws/wsDispatcher.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ var (
3232

3333
// argsWebSocketDispatcher defines the arguments needed for ws dispatcher
3434
type argsWebSocketDispatcher struct {
35-
Hub dispatcher.Hub
35+
Dispatcher dispatcher.Dispatcher
3636
Conn dispatcher.WSConnection
3737
Marshaller marshal.Marshalizer
3838
}
@@ -42,14 +42,14 @@ type websocketDispatcher struct {
4242
wg sync.WaitGroup
4343
send chan []byte
4444
conn dispatcher.WSConnection
45-
hub dispatcher.Hub
45+
dispatcher dispatcher.Dispatcher
4646
marshaller marshal.Marshalizer
4747
}
4848

4949
// newWebSocketDispatcher createa a new ws dispatcher instance
5050
func newWebSocketDispatcher(args argsWebSocketDispatcher) (*websocketDispatcher, error) {
51-
if check.IfNil(args.Hub) {
52-
return nil, ErrNilHubHandler
51+
if check.IfNil(args.Dispatcher) {
52+
return nil, ErrNilDispatcher
5353
}
5454
if args.Conn == nil {
5555
return nil, ErrNilWSConn
@@ -62,7 +62,7 @@ func newWebSocketDispatcher(args argsWebSocketDispatcher) (*websocketDispatcher,
6262
id: uuid.New(),
6363
send: make(chan []byte, 256),
6464
conn: args.Conn,
65-
hub: args.Hub,
65+
dispatcher: args.Dispatcher,
6666
marshaller: args.Marshaller,
6767
}, nil
6868
}
@@ -254,7 +254,7 @@ func (wd *websocketDispatcher) writePump() {
254254
// readPump listens for incoming events and reads the content from the socket stream
255255
func (wd *websocketDispatcher) readPump() {
256256
defer func() {
257-
wd.hub.UnregisterEvent(wd)
257+
wd.dispatcher.UnregisterEvent(wd)
258258
if err := wd.conn.Close(); err != nil {
259259
log.Error("failed to close socket on defer", "err", err.Error())
260260
}
@@ -293,7 +293,7 @@ func (wd *websocketDispatcher) trySendSubscribeEvent(eventBytes []byte) {
293293
return
294294
}
295295
subscribeEvent.DispatcherID = wd.id
296-
wd.hub.Subscribe(subscribeEvent)
296+
wd.dispatcher.Subscribe(subscribeEvent)
297297
}
298298

299299
func (wd *websocketDispatcher) setSocketWriteLimits() error {

0 commit comments

Comments
 (0)