Skip to content

Message Storage

Robin Rodricks edited this page Jan 18, 2023 · 8 revisions

Providers

Getting Started

Messaging is intended for message passing between one or more systems in disconnected fashion. You can send a message somewhere and current or remote system picks it up for processing later when required. This paradigm somehow fits into CQRS and Message Passing architectural ideas.

To name a few examples, Apache Kafka, RabbitMQ, Azure Service Bus are all falling into this category - essentially they are designed to pass messages. Some systems are more advanced to others of course, but most often it doesn't really matter.

FluentStorage supports many messaging providers out of the box, including Azure Service Bus Topics and Queues, Azure Event Hub and others.

There are two abstractions available - message publisher and message receiver. As the name stands, one is publishing messages, and another is receiving them on another end.

Publishing Messages

To publish messages you will usually construct an instance of IMessagePublisher with an appropriate implementation. All the available implementations can be created using factory methods in the FluentStorage.StorageFactory.Messages class. More methods appear in that class as you reference an assembly containing specific implementations.

Receiving Messages

Similarly, to receive messages you can use factory methods to create receivers which all implement IMessageReceiver interface.

The primary method of this interface

Task StartMessagePumpAsync(
	Func<IEnumerable<QueueMessage>, Task> onMessageAsync,
	int maxBatchSize = 1,
	CancellationToken cancellationToken = default);

starts a message pump that listens for incoming queue messages and calls Func<IEnumerable<QueueMessage>, Task> as a call back to pass those messages to your code.

maxBatchSize is a number specifying how many messages you are ready to handle at once in your callback. Choose this number carefully as specifying number too low will result in slower message processing, whereas number too large will increase RAM requirements for your software.

cancellationToken is used to signal the message pump to stop. Not passing any parameter there will result in never stopping message pump. See example below in Use Cases for a pattern on how to use this parameter.

You can find the list of supported messaging implementations here.

Handling Large Messages

FluentStorage provides built-in capability to handle large message content by allowing you to offload message content over a certain threshold to an external blob storage. It works in the following way:

  1. Check that message content is larger than threshold value.
  2. If not, do the usual processing.
  3. If it is, upload message content as a blob to external storage, clear message content and add a custom header x-sn-large that points to the blob containing message content.

When receiving messages, it will check that x-sn-large header is present, and if so, will download blob, set it's content as message content, and return the message to the receiver.

Blob is deleted from the blob storage only when message is confirmed by the receiver.

Large message handling works on any supported queue implementation because it's implemented in the core library itself, outside of specific queue implementation. To enable it, call .HandleLargeContent on both publisher and receiver:

IBlobStorage offloadStorage = ...; // your blob storage for offloading content

IMessagePublisher publisher = StorageFactory.Messages
  .XXXPublisher(...)
  .HandleLargeContent(offloadStorage, thresholdValue);

IMessageReceiver receiver = StorageFactory.Messages
  .XXXReceiver(...)
  .HandleLargeContent(offloadStorage);

Serialising/deserialising QueueMessage

QueueMessage class itself is not a serialisable entity when we talk about JSON or built-in .NET binary serialisation due to the fact it is a functionally rich structure. However, you might want to transfer the whole QueueMessage across the wire sometimes. For these purposes you can use built-in binary methods:

var qm = new QueueMessage("id", "content");
qm.DequeueCount = 4;
qm.Properties.Add("key", "value");

byte[] wireData = qm.ToByteArray();

//transfer the bytes

QueueMessage receivedMessage = QueueMessage.FromByteArray(wireData);

These methods make sure that all of the message data is preserved, and also are backward compatible between any changes to this class.