Skip to content

Commit

Permalink
Backport msg size limit 10.0.5 (#1023)
Browse files Browse the repository at this point in the history
* Mention Azure Storage Explorer

* Add Azurite gitignore to src

* Make sure messages close to the size limit can be moved as is to the error queue
  • Loading branch information
soujay committed Jun 5, 2023
1 parent 44f833d commit aa5e33b
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 41 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ The Azure Storage Queues transport for NServiceBus enables the use of the Azure

Follow these steps to run the acceptance tests locally:
* Add a new environment variable `Transport.UseSpecific` with the value `AzureStorageQueueTransport`
* Add a new environment variable `AzureStorageQueueTransport.ConnectionString` containing a connection string to your Azure storage account or use `UseDevelopmentStorage=true` to use the [Azure Storage Emulator](https://azure.microsoft.com/en-us/documentation/articles/storage-use-emulator/) (make sure to start it before you run the tests).
* Add a new environment variable `AzureStorageQueueTransport.ConnectionString` containing a connection string to your Azure storage account or use `UseDevelopmentStorage=true` to use the [Azurite emulator](https://docs.microsoft.com/en-us/azure/storage/common/storage-use-azurite) (make sure to start it before you run the tests).
* Additionally, [Microsoft Azure Storage Explorer](https://azure.microsoft.com/en-us/products/storage/storage-explorer) is an useful free tool that can allow you to view and manage the contents of the Azurite emulator as well as Azure Storage accounts in the cloud.
13 changes: 13 additions & 0 deletions src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Azurite queue
__queuestorage__
__azurite_db_queue__.json
__azurite_db_queue_extent__.json

# Azurite blob
__blobstorage__
__azurite_db_blob__.json
__azurite_db_blob_extent__.json

# Azurite table
__azurite_db_table__.json
__azurite_db_table_extent__.json
179 changes: 179 additions & 0 deletions src/AcceptanceTests/Receiving/When_receiving_large_message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
namespace NServiceBus.Transport.AzureStorageQueues.AcceptanceTests
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using Azure.Transports.WindowsAzureStorageQueues;
using global::Azure.Storage.Queues;
using global::Newtonsoft.Json;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Faults;
using NUnit.Framework;

public class When_receiving_large_message : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_consume_it_without_the_error_headers_when_message_size_very_close_to_limit()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.When((bus, c) =>
{
var connectionString = Testing.Utilities.GetEnvConfiguredConnectionString();
var queueClient = new QueueClient(connectionString, "receivinglargemessage-receiver");
//This value is fine tuned to ensure adding the 2 error headers make the message too large
string contentCloseToLimits = new string('x', (35 * 1024) + 425);
var message = new MyMessage { SomeProperty = contentCloseToLimits, };
var messageSerialized = JsonConvert.SerializeObject(message, typeof(MyMessage), Formatting.Indented, new JsonSerializerSettings());
string id = Guid.NewGuid().ToString();
var wrapper = new MessageWrapper
{
Id = id,
Body = Encoding.UTF8.GetBytes(messageSerialized),
Headers = new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, $"{typeof(MyMessage).AssemblyQualifiedName}" },
{ Headers.MessageId, id },
{ Headers.CorrelationId, id },
{TestIndependence.HeaderName, c.TestRunId.ToString()}
}
};
var wrapperSerialized = JsonConvert.SerializeObject(wrapper, typeof(MessageWrapper), Formatting.Indented, new JsonSerializerSettings());
var base64Encoded = Convert.ToBase64String(Encoding.UTF8.GetBytes(wrapperSerialized));
return queueClient.SendMessageAsync(base64Encoded);
}).DoNotFailOnErrorMessages();
})
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToTheErrorQueue)
.Run();

Assert.IsFalse(ctx.IsFailedQHeaderPresent, "IsFailedQHeaderPresent");
Assert.IsFalse(ctx.IsExceptionTypeHeaderPresent, "IsExceptionTypeHeaderPresent");
}

[Test]
public async Task Should_consume_it_with_only_two_error_headers_when_message_size_close_to_limit()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.When((bus, c) =>
{
var connectionString = Testing.Utilities.GetEnvConfiguredConnectionString();
var queueClient = new QueueClient(connectionString, "receivinglargemessage-receiver");
string contentCloseToLimits = new string('x', (35 * 1024) + 400);
var message = new MyMessage { SomeProperty = contentCloseToLimits, };
var messageSerialized = JsonConvert.SerializeObject(message, typeof(MyMessage), Formatting.Indented, new JsonSerializerSettings());
string id = Guid.NewGuid().ToString();
var wrapper = new MessageWrapper
{
Id = id,
Body = Encoding.UTF8.GetBytes(messageSerialized),
Headers = new Dictionary<string, string>
{
{ Headers.EnclosedMessageTypes, $"{typeof(MyMessage).AssemblyQualifiedName}" },
{ Headers.MessageId, id },
{ Headers.CorrelationId, id },
{TestIndependence.HeaderName, c.TestRunId.ToString()}
}
};
var wrapperSerialized = JsonConvert.SerializeObject(wrapper, typeof(MessageWrapper), Formatting.Indented, new JsonSerializerSettings());
var base64Encoded = Convert.ToBase64String(Encoding.UTF8.GetBytes(wrapperSerialized));
return queueClient.SendMessageAsync(base64Encoded);
}).DoNotFailOnErrorMessages();
})
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToTheErrorQueue)
.Run();

Assert.IsTrue(ctx.IsFailedQHeaderPresent, "IsFailedQHeaderPresent");
Assert.IsTrue(ctx.IsExceptionTypeHeaderPresent, "IsExceptionTypeHeaderPresent");
}

class Context : ScenarioContext
{
public bool MessageMovedToTheErrorQueue { get; set; }
public bool IsFailedQHeaderPresent { get; set; }
public bool IsExceptionTypeHeaderPresent { get; set; }

}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
c.UseSerialization<NewtonsoftSerializer>();
c.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(ErrorSpy)));
});

public class MyHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
throw new InvalidOperationException();
}
}
}

class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy() => EndpointSetup<DefaultServer>(config =>
{
config.UseSerialization<NewtonsoftSerializer>();
config.LimitMessageProcessingConcurrencyTo(1);
});

class MyMessageHandler : IHandleMessages<MyMessage>
{
public MyMessageHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
if (context.MessageHeaders.TryGetValue(TestIndependence.HeaderName, out var testRunId)
&& testRunId == testContext.TestRunId.ToString())
{
testContext.MessageMovedToTheErrorQueue = true;
}
if (context.MessageHeaders.ContainsKey(FaultsHeaderKeys.FailedQ)
&& testRunId == testContext.TestRunId.ToString())
{
testContext.IsFailedQHeaderPresent = true;
}
if (context.MessageHeaders.ContainsKey("NServiceBus.ExceptionInfo.ExceptionType")
&& testRunId == testContext.TestRunId.ToString())
{
testContext.IsExceptionTypeHeaderPresent = true;
}

return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class MyMessage : IMessage
{
public string SomeProperty { get; set; }
}
}
}
18 changes: 15 additions & 3 deletions src/Transport/AtLeastOnceReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ namespace NServiceBus.Transport.AzureStorageQueues
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using Extensibility;
using global::Azure;
using Logging;
using Transport;

/// <summary>
/// This corresponds to the RecieveOnly transport transaction mode
/// </summary>
class AtLeastOnceReceiveStrategy : ReceiveStrategy
{
public AtLeastOnceReceiveStrategy(Func<MessageContext, Task> pipeline, Func<ErrorContext, Task<ErrorHandleResult>> errorPipe, CriticalError criticalError)
Expand All @@ -18,10 +22,10 @@ public AtLeastOnceReceiveStrategy(Func<MessageContext, Task> pipeline, Func<Erro
this.criticalError = criticalError;
}

public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message)
public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message, CancellationToken cancellationToken = default)
{
Logger.DebugFormat("Pushing received message (ID: '{0}') through pipeline.", message.Id);
var body = message.Body ?? new byte[0];
var body = message.Body ?? Array.Empty<byte>();
try
{
using (var tokenSource = new CancellationTokenSource())
Expand Down Expand Up @@ -56,6 +60,14 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
{
immediateRetry = await errorPipe(context).ConfigureAwait(false);
}
catch (RequestFailedException e) when (e.Status == 413 && e.ErrorCode == "RequestBodyTooLarge")
{
Logger.WarnFormat($"Message with native ID `{message.Id}` could not be moved to the error queue with additional headers because it was too large. Moving to the error queue as is.", e);

await retrieved.MoveToErrorQueueWithMinimalFaultHeaders(context, cancellationToken).ConfigureAwait(false);

return;
}
catch (Exception e)
{
criticalError.Raise($"Failed to execute recoverability policy for message with native ID: `{message.Id}`", e);
Expand All @@ -64,7 +76,6 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me

return;
}

if (immediateRetry == ErrorHandleResult.RetryRequired)
{
// For an immediate retry, the error is logged and the message is returned to the queue to preserve the DequeueCount.
Expand All @@ -77,6 +88,7 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
// Just acknowledge the message as it's handled by the core retry.
await retrieved.Ack().ConfigureAwait(false);
}

}
}

Expand Down
19 changes: 14 additions & 5 deletions src/Transport/AtMostOnceReceiveStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace NServiceBus.Transport.AzureStorageQueues
using System.Threading.Tasks;
using Azure.Transports.WindowsAzureStorageQueues;
using Extensibility;
using global::Azure;
using Logging;
using Transport;

Expand All @@ -21,11 +22,11 @@ public AtMostOnceReceiveStrategy(Func<MessageContext, Task> pipeline, Func<Error
this.errorPipe = errorPipe;
}

public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message)
public override async Task Receive(MessageRetrieved retrieved, MessageWrapper message, CancellationToken cancellationToken = default)
{
Logger.DebugFormat("Pushing received message (ID: '{0}') through pipeline.", message.Id);
await retrieved.Ack().ConfigureAwait(false);
var body = message.Body ?? new byte[0];
var body = message.Body ?? Array.Empty<byte>();

try
{
Expand All @@ -37,10 +38,18 @@ public override async Task Receive(MessageRetrieved retrieved, MessageWrapper me
Logger.Warn("Azure Storage Queue transport failed pushing a message through pipeline", ex);

var context = CreateErrorContext(retrieved, message, ex, body);
try
{
// The exception is pushed through the error pipeline in a fire and forget manner.
// There's no call to onCriticalError if errorPipe fails. Exceptions are handled on the transport level.
await errorPipe(context).ConfigureAwait(false);
}
catch (RequestFailedException e) when (e.Status == 413 && e.ErrorCode == "RequestBodyTooLarge")
{
Logger.WarnFormat($"Message with native ID `{message.Id}` could not be moved to the error queue with additional headers because it was too large. Moving to the error queue as is.", e);

// The exception is pushed through the error pipeline in a fire and forget manner.
// There's no call to onCriticalError if errorPipe fails. Exceptions are handled on the transport level.
await errorPipe(context).ConfigureAwait(false);
await retrieved.MoveToErrorQueueWithMinimalFaultHeaders(context, cancellationToken).ConfigureAwait(false);
}
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/Transport/AzureMessageQueueReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ namespace NServiceBus.Transport.AzureStorageQueues

class AzureMessageQueueReceiver
{
public AzureMessageQueueReceiver(IMessageEnvelopeUnwrapper unwrapper, IProvideQueueServiceClient queueServiceClientProvider, QueueAddressGenerator addressGenerator)
public AzureMessageQueueReceiver(IMessageEnvelopeUnwrapper unwrapper, IProvideQueueServiceClient queueServiceClientProvider, QueueAddressGenerator addressGenerator, MessageWrapperSerializer serializer)
{
this.unwrapper = unwrapper;
queueServiceClient = queueServiceClientProvider.Client;
this.addressGenerator = addressGenerator;
this.serializer = serializer;
}

/// <summary>
Expand Down Expand Up @@ -55,7 +56,7 @@ internal async Task Receive(int batchSize, List<MessageRetrieved> receivedMessag
// ReSharper disable once LoopCanBeConvertedToQuery
foreach (var rawMessage in rawMessages)
{
receivedMessages.Add(new MessageRetrieved(unwrapper, rawMessage, inputQueue, errorQueue));
receivedMessages.Add(new MessageRetrieved(unwrapper, serializer, rawMessage, inputQueue, errorQueue));
}

await backoffStrategy.OnBatch(receivedMessages.Count, token).ConfigureAwait(false);
Expand All @@ -64,7 +65,7 @@ internal async Task Receive(int batchSize, List<MessageRetrieved> receivedMessag
IMessageEnvelopeUnwrapper unwrapper;

QueueAddressGenerator addressGenerator;

MessageWrapperSerializer serializer;
QueueClient inputQueue;
QueueClient errorQueue;
QueueServiceClient queueServiceClient;
Expand Down
4 changes: 2 additions & 2 deletions src/Transport/AzureStorageQueueInfrastructure.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace NServiceBus.Transport.AzureStorageQueues
{
using System;
using System.Globalization;
using System.Collections.Generic;
using System.Globalization;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -230,7 +230,7 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure()
{
var unwrapper = settings.HasSetting<IMessageEnvelopeUnwrapper>() ? settings.GetOrDefault<IMessageEnvelopeUnwrapper>() : new DefaultMessageEnvelopeUnwrapper(serializer);
var receiver = new AzureMessageQueueReceiver(unwrapper, queueServiceClientProvider, addressGenerator)
var receiver = new AzureMessageQueueReceiver(unwrapper, queueServiceClientProvider, addressGenerator, serializer)
{
MessageInvisibleTime = messageInvisibleTime,
};
Expand Down
12 changes: 2 additions & 10 deletions src/Transport/Dispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -14,6 +13,7 @@
using global::Azure.Storage.Queues;
using Logging;
using NServiceBus.AzureStorageQueues;
using NServiceBus.Transport.AzureStorageQueues.Utils;
using Performance.TimeToBeReceived;
using Transport;
using Unicast.Queuing;
Expand Down Expand Up @@ -180,15 +180,7 @@ static TDeliveryConstraint FirstOrDefault<TDeliveryConstraint>(List<DeliveryCons

Task Send(MessageWrapper wrapper, QueueClient sendQueue, TimeSpan timeToBeReceived)
{
string base64String;

using (var stream = new MemoryStream())
{
serializer.Serialize(wrapper, stream);

var bytes = stream.ToArray();
base64String = Convert.ToBase64String(bytes);
}
string base64String = MessageWrapperHelper.ConvertToBase64String(wrapper, serializer);

return sendQueue.SendMessageAsync(base64String, timeToLive: timeToBeReceived);
}
Expand Down

0 comments on commit aa5e33b

Please sign in to comment.