Skip to content

Commit

Permalink
fix: improve pubsub example (#1549)
Browse files Browse the repository at this point in the history
Instead of waiting an arbitrary amount of time for subscriptions to propagate, before sending messages, ensure that node1 has node2's subs and node2 has node3's subs.

Closes #1540
  • Loading branch information
achingbrain committed Jan 14, 2023
1 parent 2dac4be commit ba8527c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
23 changes: 19 additions & 4 deletions examples/pubsub/message-filtering/1.js
Expand Up @@ -68,7 +68,8 @@ const createNode = async () => {
node3.pubsub.subscribe(topic)

// wait for subscriptions to propagate
await delay(1000)
await hasSubscription(node1, node2, topic)
await hasSubscription(node2, node3, topic)

const validateFruit = (msgTopic, msg) => {
const fruit = uint8ArrayToString(msg.data)
Expand All @@ -91,13 +92,27 @@ const createNode = async () => {
await node1.pubsub.publish(topic, uint8ArrayFromString(fruit))
}

// wait a few seconds for messages to be received
await delay(5000)
console.log('############## all messages sent ##############')
})()

async function delay (ms) {
await new Promise((resolve) => {
setTimeout(() => resolve(), ms)
})
}
}

/**
* Wait for node1 to see that node2 has subscribed to the topic
*/
async function hasSubscription (node1, node2, topic) {
while (true) {
const subs = await node1.pubsub.getSubscribers(topic)

if (subs.map(peer => peer.toString()).includes(node2.peerId.toString())) {
return
}

// wait for subscriptions to propagate
await delay(100)
}
}
37 changes: 22 additions & 15 deletions examples/pubsub/message-filtering/test.js
Expand Up @@ -19,29 +19,36 @@ export async function test () {
all: true
})

let output = ''

const expected = [
'node2 received: banana',
'node2 received: apple',
'node2 received: orange',
'node3 received: banana',
'node3 received: apple',
'node3 received: orange'
]

proc.all.on('data', async (data) => {
process.stdout.write(data)
const line = uint8ArrayToString(data)
output += uint8ArrayToString(data)

// End
if (line.includes('all messages sent')) {
if (messages.car > 0) {
defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
}
if (output.includes('received: car')) {
defer.reject(new Error('Message validation failed - peers failed to filter car messages'))
}

for (const fruit of ['banana', 'apple', 'orange']) {
if (messages[fruit] !== 2) {
defer.reject(new Error(`Not enough ${fruit} messages - received ${messages[fruit] ?? 0}, expected 2`))
}
let allMessagesReceived = true

expected.forEach(message => {
if (!output.includes(message)) {
allMessagesReceived = false
}
})

if (allMessagesReceived) {
defer.resolve()
}

if (line.includes('received:')) {
const fruit = line.split('received:')[1].trim()
messages[fruit] = (messages[fruit] ?? 0) + 1
}
})

await defer.promise
Expand Down

0 comments on commit ba8527c

Please sign in to comment.