Skip to content

Commit

Permalink
Add more tests for super stream (#302)
Browse files Browse the repository at this point in the history
* Add more tests for super stream
* bump version to 1.4.2

---------
Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio committed May 3, 2024
1 parent 47adcdf commit 247a4f5
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,8 @@ You can read also the java stream-client blog post: https://rabbitmq.github.io/r
- The code is written in Java but the same concepts are valid for the Go client.
- The Go client has the same features as the Java client.
Super Stream supports [publish-filtering](#publish-filtering) and [consume-filtering](#consume-filtering) features.
### Performance test tool
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.1
1.4.2
2 changes: 1 addition & 1 deletion pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
const initBufferPublishSize = 2 + 2 + 1 + 4

const (
ClientVersion = "1.4.1"
ClientVersion = "1.4.2"

commandDeclarePublisher = 1
commandPublish = 2
Expand Down
90 changes: 90 additions & 0 deletions pkg/stream/super_stream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,4 +419,94 @@ var _ = Describe("Super Stream Producer", Label("super-stream-consumer"), func()
Expect(env.Close()).NotTo(HaveOccurred())
})

It("Super Stream Filtering should consume only Italy Post Filter", func() {

// test the filtering with super stream
// the filter is covered in the filter_test.go
// here is just to be sure the filter is applied in the super stream
env, err := NewEnvironment(nil)
Expect(err).NotTo(HaveOccurred())

superStream := "filtering-super-stream-should-consume-only-one-country"
Expect(env.DeclareSuperStream(superStream,
NewPartitionsOptions(2))).NotTo(HaveOccurred())

superProducer, err := env.NewSuperStreamProducer(superStream, NewSuperStreamProducerOptions(
NewHashRoutingStrategy(func(message message.StreamMessage) string {
return message.GetMessageProperties().GroupID
})).SetFilter(NewProducerFilter(func(m message.StreamMessage) string {
return fmt.Sprintf("%s", m.GetApplicationProperties()["county"])
})))
Expect(err).NotTo(HaveOccurred())

// In this test we mix the chunks with different types
// The scope is to test the post filter function
// in total we have:
// 5 messages ITALY + 5 messages SPAIN in the same chunk
// WAIT
// then 10 messages SPAIN in another chunk
// We should receive only the ITALY messages from the first chunk
// total 10 messages
for i := 0; i < 5; i++ {
msgItaly := amqp.NewMessage(make([]byte, 0))
msgItaly.ApplicationProperties = map[string]interface{}{"county": "italy"}
msgItaly.Properties = &amqp.MessageProperties{
GroupID: "group_first",
}
Expect(superProducer.Send(msgItaly)).NotTo(HaveOccurred())

msgSpain := amqp.NewMessage(make([]byte, 0))
msgSpain.ApplicationProperties = map[string]interface{}{"county": "spain"}
msgSpain.Properties = &amqp.MessageProperties{
GroupID: "group_first",
}
Expect(superProducer.Send(msgSpain)).NotTo(HaveOccurred())

}

// the sleep is to be sure the messages are stored in a chunk
// so the filter will be applied, so the first chunk will contain only Italy
time.Sleep(1 * time.Second)

for i := 0; i < 10; i++ {
msg := amqp.NewMessage(make([]byte, 0))
msg.ApplicationProperties = map[string]interface{}{"county": "spain"}
msg.Properties = &amqp.MessageProperties{
GroupID: "group_first",
}
Expect(superProducer.Send(msg)).NotTo(HaveOccurred())
}

time.Sleep(500 * time.Millisecond)

// we don't need to apply any post filter here
// the server side filter is enough
var consumerItaly int32
filter := NewConsumerFilter([]string{"italy"}, false,
func(message *amqp.Message) bool {
return message.ApplicationProperties["county"] == "italy"
})

handleMessages := func(consumerContext ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&consumerItaly, 1)
}

superStreamConsumer, err := env.NewSuperStreamConsumer(superStream, handleMessages,
NewSuperStreamConsumerOptions().SetFilter(filter).SetOffset(OffsetSpecification{}.First()))
Expect(err).NotTo(HaveOccurred())

time.Sleep(500 * time.Millisecond)
// we should receive only the Italy messages from the first chunk
// The first chunk is filter with the post filter function
// the second chuck won't send ( even in this test there is no evidence about that there is the test above about that)

Eventually(func() int32 { return atomic.LoadInt32(&consumerItaly) }).
WithPolling(300 * time.Millisecond).WithTimeout(5 * time.Second).Should(Equal(int32(5)))

Expect(superProducer.Close()).NotTo(HaveOccurred())
Expect(superStreamConsumer.Close()).NotTo(HaveOccurred())
Expect(env.DeleteSuperStream(superStream)).NotTo(HaveOccurred())
Expect(env.Close()).NotTo(HaveOccurred())
})

})

0 comments on commit 247a4f5

Please sign in to comment.