Skip to content

Commit

Permalink
Make sure transport operations do not share mutable state between mul…
Browse files Browse the repository at this point in the history
…tiple routing strategies (#6905) (#6909)

* Add routing to dispatch connector tests to reproduce the shared state problem

* Fix the state sharing problem in ToTransportOperation

* Cross check headers and properties

* Test for the header modifications in the routing strategy

* add acceptance test

* better naming and comments

* block scoped namespace

* Remove left overs

* Move test

* Primary ctor

* Behavior

---------

Co-authored-by: Tim Bussmann <timbussmann@users.noreply.github.com>
  • Loading branch information
danielmarbach and timbussmann committed Nov 3, 2023
1 parent dddd89a commit 2a5e715
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Extensibility;
Expand Down Expand Up @@ -29,7 +30,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
var tx = (AcceptanceTestingOutboxTransaction)transaction;
tx.Enlist(() =>
{
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations)))
if (!storage.TryAdd(message.MessageId, new StoredMessage(message.MessageId, message.TransportOperations.Select(o => o.DeepCopy()).ToArray())))
{
throw new Exception($"Outbox message with id '{message.MessageId}' is already present in storage.");
}
Expand Down
168 changes: 168 additions & 0 deletions src/NServiceBus.AcceptanceTests/Outbox/When_publishing_with_outbox.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace NServiceBus.AcceptanceTests.Outbox
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using Features;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_publishing_with_outbox : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_be_delivered_to_all_subscribers()
{
Requires.OutboxPersistence();

Context context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b =>
b.When(c => c.Subscriber1Subscribed && c.Subscriber2Subscribed, (session, c) =>
{
// Send a trigger message that will invoke the handler method that publishes the event
c.AddTrace("Both subscribers are subscribed, going to send TriggerMessage");
return session.SendLocal(new TriggerMessage());
})
)
.WithEndpoint<Subscriber1>(b => b.When(async (session, ctx) =>
{
await session.Subscribe<MyEvent>();
if (ctx.HasNativePubSubSupport)
{
ctx.Subscriber1Subscribed = true;
ctx.AddTrace("Subscriber1 is now subscribed (at least we have asked the broker to be subscribed)");
}
else
{
ctx.AddTrace("Subscriber1 has now asked to be subscribed to MyEvent");
}
}))
.WithEndpoint<Subscriber2>(b => b.When(async (session, ctx) =>
{
await session.Subscribe<MyEvent>();
if (ctx.HasNativePubSubSupport)
{
ctx.Subscriber2Subscribed = true;
ctx.AddTrace("Subscriber2 is now subscribed (at least we have asked the broker to be subscribed)");
}
else
{
ctx.AddTrace("Subscriber2 has now asked to be subscribed to MyEvent");
}
}))
.Done(c => c.Subscriber1GotTheEvent && c.Subscriber2GotTheEvent)
.Run(TimeSpan.FromSeconds(10));

Assert.True(context.Subscriber1GotTheEvent);
Assert.True(context.Subscriber2GotTheEvent);
}

public class Context : ScenarioContext
{
public bool Subscriber1GotTheEvent { get; set; }
public bool Subscriber2GotTheEvent { get; set; }
public bool Subscriber1Subscribed { get; set; }
public bool Subscriber2Subscribed { get; set; }
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher() =>
EndpointSetup<DefaultPublisher>(b =>
{
b.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
b.EnableOutbox();
// Test the outbox behavior in situations where messages are deserialized and dispatched from the outbox storage by injecting an exception into the dispatch pipeline
b.Pipeline.Register(new BlowUpAfterDispatchBehavior(), "ensure outbox dispatch fails");
b.Recoverability().Immediate(i => i.NumberOfRetries(1));
b.OnEndpointSubscribed<Context>((s, context) =>
{
var subscriber1 = Conventions.EndpointNamingConvention(typeof(Subscriber1));
if (s.SubscriberEndpoint.Contains(subscriber1))
{
context.Subscriber1Subscribed = true;
context.AddTrace($"{subscriber1} is now subscribed");
}
var subscriber2 = Conventions.EndpointNamingConvention(typeof(Subscriber2));
if (s.SubscriberEndpoint.Contains(subscriber2))
{
context.AddTrace($"{subscriber2} is now subscribed");
context.Subscriber2Subscribed = true;
}
});
b.DisableFeature<AutoSubscribe>();
});

public class TriggerHandler : IHandleMessages<TriggerMessage>
{
public Task Handle(TriggerMessage message, IMessageHandlerContext context)
=> context.Publish(new MyEvent());
}

class BlowUpAfterDispatchBehavior : IBehavior<IBatchDispatchContext, IBatchDispatchContext>
{
public async Task Invoke(IBatchDispatchContext context, Func<IBatchDispatchContext, Task> next)
{
if (Interlocked.Increment(ref invocationCounter) == 1)
{
throw new SimulatedException();
}

await next(context).ConfigureAwait(false);
}

int invocationCounter;
}
}

public class Subscriber1 : EndpointConfigurationBuilder
{
public Subscriber1() =>
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler : IHandleMessages<MyEvent>
{
public MyHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyEvent message, IMessageHandlerContext context)
{
testContext.Subscriber1GotTheEvent = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class Subscriber2 : EndpointConfigurationBuilder
{
public Subscriber2() =>
EndpointSetup<DefaultServer>(c => c.DisableFeature<AutoSubscribe>(),
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));

public class MyHandler : IHandleMessages<MyEvent>
{
public MyHandler(Context testContext) => this.testContext = testContext;

public Task Handle(MyEvent messageThatIsEnlisted, IMessageHandlerContext context)
{
testContext.Subscriber2GotTheEvent = true;
return Task.CompletedTask;
}

readonly Context testContext;
}
}

public class MyEvent : IEvent
{
}

public class TriggerMessage : ICommand
{
}
}
}
119 changes: 117 additions & 2 deletions src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,107 @@
[TestFixture]
public class RoutingToDispatchConnectorTests
{
[Test]
public async Task Should_preserve_message_state_for_one_routing_strategy_for_allocation_reasons()
{
var behavior = new RoutingToDispatchConnector();
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
RoutingStrategies = new List<RoutingStrategy>
{
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")
}
};
var originalDispatchProperties = new DispatchProperties
{
{ "SomeKey", "SomeValue" }
};
testableRoutingContext.Extensions.Set(originalDispatchProperties);
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
await behavior.Invoke(testableRoutingContext, context =>
{
operations = context.Operations;
return Task.CompletedTask;
});

Assert.That(operations, Has.Length.EqualTo(1));

TransportOperation destination1Operation = operations.ElementAt(0);
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Is.SameAs(originalHeaders));
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.SameAs(originalDispatchProperties));
}

[Test]
public async Task Should_copy_message_state_for_multiple_routing_strategies()
{
var behavior = new RoutingToDispatchConnector();
IEnumerable<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
RoutingStrategies = new List<RoutingStrategy>
{
new DestinationRoutingStrategy("destination1", "HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"),
new DestinationRoutingStrategy("destination2", "HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")
}
};
var originalDispatchProperties = new DispatchProperties
{
{ "SomeKey", "SomeValue" }
};
testableRoutingContext.Extensions.Set(originalDispatchProperties);
var originalHeaders = new Dictionary<string, string> { { "SomeHeaderKey", "SomeHeaderValue" } };
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
await behavior.Invoke(testableRoutingContext, context =>
{
operations = context.Operations;
return Task.CompletedTask;
});

Assert.That(operations, Has.Length.EqualTo(2));

TransportOperation destination1Operation = operations.ElementAt(0);
Assert.That(destination1Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders));
DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties));

TransportOperation destination2Operation = operations.ElementAt(1);
Assert.That(destination2Operation.Message.MessageId, Is.EqualTo("ID"));
Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2"));
Dictionary<string, string> destination2Headers = destination2Operation.Message.Headers;
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders));
DispatchProperties destination2DispatchProperties = destination2Operation.Properties;
Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
Assert.That(destination2DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));

Assert.That(destination1Headers, Is.Not.SameAs(destination2Headers));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(destination2DispatchProperties));
}

[Test]
public async Task Should_preserve_headers_generated_by_custom_routing_strategy()
{
var behavior = new RoutingToDispatchConnector();
Dictionary<string, string> headers = null;
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new CustomRoutingStrategy() } }, context =>
await behavior.Invoke(new TestableRoutingContext { RoutingStrategies = new List<RoutingStrategy> { new HeaderModifyingRoutingStrategy() } }, context =>
{
headers = context.Operations.First().Message.Headers;
return Task.CompletedTask;
Expand Down Expand Up @@ -113,7 +208,7 @@ static IOutgoingSendContext CreateContext(SendOptions options, bool fromHandler)
return context;
}

class CustomRoutingStrategy : RoutingStrategy
class HeaderModifyingRoutingStrategy : RoutingStrategy
{
public override AddressTag Apply(Dictionary<string, string> headers)
{
Expand All @@ -122,6 +217,26 @@ public override AddressTag Apply(Dictionary<string, string> headers)
}
}

class DestinationRoutingStrategy : RoutingStrategy
{
public DestinationRoutingStrategy(string destination, string headerKey, string headerValue)
{
this.destination = destination;
this.headerKey = headerKey;
this.headerValue = headerValue;
}

public override AddressTag Apply(Dictionary<string, string> headers)
{
headers[headerKey] = headerValue;
return new UnicastAddressTag(destination);
}

readonly string destination;
readonly string headerKey;
readonly string headerValue;
}

class MyMessage : IMessage
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
namespace NServiceBus
{
using System.Collections.Generic;
using Pipeline;
using Routing;
using Transport;

static class RoutingContextExtensions
{
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency)
public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency, bool copySharedMutableMessageState)
{
var addressLabel = strategy.Apply(context.Message.Headers);
var message = new OutgoingMessage(context.Message.MessageId, context.Message.Headers, context.Message.Body);

if (!context.Extensions.TryGet(out DispatchProperties dispatchProperties))
{
dispatchProperties = new DispatchProperties();
}
var headers = copySharedMutableMessageState ? new Dictionary<string, string>(context.Message.Headers) : context.Message.Headers;
var dispatchProperties = context.Extensions.TryGet(out DispatchProperties properties)
? copySharedMutableMessageState ? new DispatchProperties(properties) : properties
: new DispatchProperties();
var addressLabel = strategy.Apply(headers);
var message = new OutgoingMessage(context.Message.MessageId, headers, context.Message.Body);

var transportOperation = new TransportOperation(message, addressLabel, dispatchProperties, dispatchConsistency);
return transportOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ public override Task Invoke(IRoutingContext context, Func<IDispatchContext, Task

var operations = new TransportOperation[context.RoutingStrategies.Count];
var index = 0;
// when there are more than one routing strategy we want to make sure each transport operation is independent
var copySharedMutableMessageState = context.RoutingStrategies.Count > 1;
foreach (var strategy in context.RoutingStrategies)
{
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency);
operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState);
index++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ class SatelliteRecoverabilityExecutor<TState> : IRecoverabilityPipelineExecutor
// using the count here is not entirely accurate because of the way we duplicate based on the strategies
// but in many cases it is a good approximation.
transportOperations ??= new List<TransportOperation>(routingContexts.Count);
// when there are more than one routing strategy we want to make sure each transport operation is independent
var copySharedMutableMessageState = routingContext.RoutingStrategies.Count > 1;
foreach (var strategy in routingContext.RoutingStrategies)
{
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default);
var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default, copySharedMutableMessageState);
transportOperations.Add(transportOperation);
}
}
Expand Down

0 comments on commit 2a5e715

Please sign in to comment.