/
SqlServerTransportInfrastructure.cs
331 lines (272 loc) · 16.8 KB
/
SqlServerTransportInfrastructure.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
namespace NServiceBus.Transport.SqlServer
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
#if SYSTEMDATASQLCLIENT
using System.Data.SqlClient;
#else
using Microsoft.Data.SqlClient;
#endif
using Logging;
using NServiceBus.Transport.SqlServer.PubSub;
using Transport;
class SqlServerTransportInfrastructure : TransportInfrastructure
{
internal SqlServerTransportInfrastructure(SqlServerTransport transport, HostSettings hostSettings, QueueAddressTranslator addressTranslator, bool isEncrypted)
{
this.transport = transport;
this.hostSettings = hostSettings;
this.isEncrypted = isEncrypted;
this.addressTranslator = addressTranslator;
tableBasedQueueCache = new TableBasedQueueCache(addressTranslator, !isEncrypted);
connectionFactory = CreateConnectionFactory();
if (typeof(SqlConnection).Namespace != "Microsoft.Data.SqlClient")
{
return;
}
var informationalVersion = typeof(SqlConnection).Assembly.GetCustomAttributes().OfType<AssemblyInformationalVersionAttribute>().Single();
var currentClientVersion = new Version(informationalVersion.InformationalVersion.Split('+').First());
if (currentClientVersion < new Version(5, 2, 0))
{
_logger.WarnFormat("You are using an outdated version '{0}' of Microsoft.Data.SqlClient. We recommend using version 5.2.0 or later by adding a top-level package reference to avoid known problems in older versions of the client. See https://github.com/dotnet/SqlClient/blob/main/release-notes/5.0/5.0.0.md#breaking-changes for details on breaking changes before ugprading.", currentClientVersion);
}
}
public async Task ConfigureSubscriptions(string catalog, CancellationToken cancellationToken = default)
{
var pubSubSettings = transport.Subscriptions;
var subscriptionStoreSchema = string.IsNullOrWhiteSpace(transport.DefaultSchema) ? "dbo" : transport.DefaultSchema;
var subscriptionTableName = pubSubSettings.SubscriptionTableName.Qualify(subscriptionStoreSchema, catalog);
subscriptionStore = new PolymorphicSubscriptionStore(new SubscriptionTable(subscriptionTableName.QuotedQualifiedName, connectionFactory));
if (pubSubSettings.DisableCaching == false)
{
subscriptionStore = new CachedSubscriptionStore(subscriptionStore, pubSubSettings.CacheInvalidationPeriod);
}
if (hostSettings.SetupInfrastructure)
{
await new SubscriptionTableCreator(subscriptionTableName, connectionFactory).CreateIfNecessary(cancellationToken).ConfigureAwait(false);
}
transport.Testing.SubscriptionTable = subscriptionTableName.QuotedQualifiedName;
}
SqlConnectionFactory CreateConnectionFactory()
{
if (transport.ConnectionFactory != null)
{
return new SqlConnectionFactory(transport.ConnectionFactory);
}
return SqlConnectionFactory.Default(transport.ConnectionString);
}
public async Task ConfigureReceiveInfrastructure(ReceiveSettings[] receiveSettings, string[] sendingAddresses, CancellationToken cancellationToken = default)
{
if (receiveSettings.Length == 0)
{
Receivers = new Dictionary<string, IMessageReceiver>();
return;
}
var transactionOptions = transport.TransactionScope.TransactionOptions;
diagnostics.Add("NServiceBus.Transport.SqlServer.Transactions", new
{
TransactionMode = transport.TransportTransactionMode,
transactionOptions.IsolationLevel,
transactionOptions.Timeout
});
diagnostics.Add("NServiceBus.Transport.SqlServer.CircuitBreaker", new
{
TimeToWaitBeforeTriggering = transport.TimeToWaitBeforeTriggeringCircuitBreaker
});
var queuePeekerOptions = transport.QueuePeeker;
var createMessageBodyComputedColumn = transport.CreateMessageBodyComputedColumn;
Func<TransportTransactionMode, ProcessStrategy> processStrategyFactory =
guarantee => SelectProcessStrategy(guarantee, transactionOptions, connectionFactory);
var queuePurger = new QueuePurger(connectionFactory);
var queuePeeker = new QueuePeeker(connectionFactory, queuePeekerOptions);
IExpiredMessagesPurger expiredMessagesPurger;
bool validateExpiredIndex;
if (transport.ExpiredMessagesPurger.PurgeOnStartup == false)
{
diagnostics.Add("NServiceBus.Transport.SqlServer.ExpiredMessagesPurger", new
{
Enabled = false,
});
expiredMessagesPurger = new NoOpExpiredMessagesPurger();
validateExpiredIndex = false;
}
else
{
var purgeBatchSize = transport.ExpiredMessagesPurger.PurgeBatchSize;
diagnostics.Add("NServiceBus.Transport.SqlServer.ExpiredMessagesPurger", new
{
Enabled = true,
BatchSize = purgeBatchSize
});
expiredMessagesPurger = new ExpiredMessagesPurger((_, token) => connectionFactory.OpenNewConnection(token), purgeBatchSize);
validateExpiredIndex = true;
}
var schemaVerification = new SchemaInspector((queue, token) => connectionFactory.OpenNewConnection(token), validateExpiredIndex);
var queueFactory = transport.Testing.QueueFactoryOverride ?? (queueName => new TableBasedQueue(addressTranslator.Parse(queueName).QualifiedTableName, queueName, !isEncrypted));
//Create delayed delivery infrastructure
CanonicalQueueAddress delayedQueueCanonicalAddress = null;
if (transport.DisableDelayedDelivery == false)
{
var delayedDelivery = transport.DelayedDelivery;
diagnostics.Add("NServiceBus.Transport.SqlServer.DelayedDelivery", new
{
Native = true,
Suffix = delayedDelivery.TableSuffix,
delayedDelivery.BatchSize,
});
var queueAddress = new Transport.QueueAddress(hostSettings.Name, null, new Dictionary<string, string>(), delayedDelivery.TableSuffix);
delayedQueueCanonicalAddress = addressTranslator.GetCanonicalForm(addressTranslator.Generate(queueAddress));
//For backwards-compatibility with previous version of the seam and endpoints that have delayed
//delivery infrastructure, we assume that the first receiver address matches main input queue address
//from version 7 of Core. For raw usages this will still work but delayed-delivery messages
//might be moved to arbitrary picked receiver
var mainReceiverInputQueueAddress = ToTransportAddress(receiveSettings[0].ReceiveAddress);
var inputQueueTable = addressTranslator.Parse(mainReceiverInputQueueAddress).QualifiedTableName;
var delayedMessageTable = new DelayedMessageTable(delayedQueueCanonicalAddress.QualifiedTableName, inputQueueTable);
//Allows dispatcher to store messages in the delayed store
delayedMessageStore = delayedMessageTable;
dueDelayedMessageProcessor = new DueDelayedMessageProcessor(delayedMessageTable, connectionFactory, delayedDelivery.BatchSize, transport.TimeToWaitBeforeTriggeringCircuitBreaker, hostSettings);
}
Receivers = receiveSettings.Select(receiveSetting =>
{
var receiveAddress = ToTransportAddress(receiveSetting.ReceiveAddress);
ISubscriptionManager subscriptionManager = transport.SupportsPublishSubscribe
? new SubscriptionManager(subscriptionStore, hostSettings.Name, receiveAddress)
: new NoOpSubscriptionManager();
return new MessageReceiver(transport, receiveSetting.Id, receiveAddress, receiveSetting.ErrorQueue, hostSettings.CriticalErrorAction, processStrategyFactory, queueFactory, queuePurger,
expiredMessagesPurger,
queuePeeker, queuePeekerOptions, schemaVerification, transport.TimeToWaitBeforeTriggeringCircuitBreaker, subscriptionManager, receiveSetting.PurgeOnStartup);
}).ToDictionary<MessageReceiver, string, IMessageReceiver>(receiver => receiver.Id, receiver => receiver);
await ValidateDatabaseAccess(transactionOptions, cancellationToken).ConfigureAwait(false);
var receiveAddresses = Receivers.Values.Select(r => r.ReceiveAddress).ToList();
if (hostSettings.SetupInfrastructure)
{
var queuesToCreate = new List<string>();
queuesToCreate.AddRange(sendingAddresses);
queuesToCreate.AddRange(receiveAddresses);
var queueCreator = new QueueCreator(connectionFactory, addressTranslator, createMessageBodyComputedColumn);
await queueCreator.CreateQueueIfNecessary(queuesToCreate.ToArray(), delayedQueueCanonicalAddress, cancellationToken)
.ConfigureAwait(false);
}
dueDelayedMessageProcessor?.Start(cancellationToken);
transport.Testing.SendingAddresses = sendingAddresses.Select(s => addressTranslator.Parse(s).QualifiedTableName).ToArray();
transport.Testing.ReceiveAddresses = receiveAddresses.Select(r => addressTranslator.Parse(r).QualifiedTableName).ToArray();
transport.Testing.DelayedDeliveryQueue = delayedQueueCanonicalAddress?.QualifiedTableName;
}
ProcessStrategy SelectProcessStrategy(TransportTransactionMode minimumConsistencyGuarantee, TransactionOptions options, SqlConnectionFactory connectionFactory)
{
if (minimumConsistencyGuarantee == TransportTransactionMode.TransactionScope)
{
return new ProcessWithTransactionScope(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache);
}
if (minimumConsistencyGuarantee == TransportTransactionMode.SendsAtomicWithReceive)
{
return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache);
}
if (minimumConsistencyGuarantee == TransportTransactionMode.ReceiveOnly)
{
return new ProcessWithNativeTransaction(options, connectionFactory, new FailureInfoStorage(10000), tableBasedQueueCache, transactionForReceiveOnly: true);
}
return new ProcessWithNoTransaction(connectionFactory, tableBasedQueueCache);
}
async Task ValidateDatabaseAccess(TransactionOptions transactionOptions, CancellationToken cancellationToken)
{
await TryOpenDatabaseConnection(cancellationToken).ConfigureAwait(false);
await TryEscalateToDistributedTransactions(transactionOptions, cancellationToken).ConfigureAwait(false);
}
async Task TryOpenDatabaseConnection(CancellationToken cancellationToken)
{
try
{
using (await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
{
}
}
catch (Exception ex) when (!ex.IsCausedBy(cancellationToken))
{
var message = "Could not open connection to the SQL instance. Check the original error message for details. Original error message: " + ex.Message;
throw new Exception(message, ex);
}
}
async Task TryEscalateToDistributedTransactions(TransactionOptions transactionOptions, CancellationToken cancellationToken)
{
if (transport.TransportTransactionMode == TransportTransactionMode.TransactionScope)
{
var message = string.Empty;
try
{
using (var scope = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions, TransactionScopeAsyncFlowOption.Enabled))
{
FakePromotableResourceManager.ForceDtc();
using (await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
{
scope.Complete();
}
}
}
catch (NotSupportedException ex)
{
message = "The version of the SqlClient in use does not support enlisting SQL connections in distributed transactions. " +
"Check original error message for details. " +
"In case the problem is related to distributed transactions you can still use SQL Server transport but " +
"should specify a different transaction mode via the `SqlServerTransport.TransportTransactionMode` property when configuring the enpdoint. " +
"Note that different transaction modes may affect consistency guarantees as you can't rely on distributed " +
"transactions to atomically update the database and consume a message. Original error message: " + ex.Message;
}
catch (Exception exception) when (!exception.IsCausedBy(cancellationToken))
{
message = "Could not escalate to a distributed transaction while configured to use TransactionScope. Check original error message for details. " +
"In case the problem is related to distributed transactions you can still use SQL Server transport but " +
"should specify a different transaction mode via the `SqlServerTransport.TransportTransactionMode` property when configuring the enpdoint. " +
"Note that different transaction modes may affect consistency guarantees as you can't rely on distributed " +
"transactions to atomically update the database and consume a message. Original error message: " + exception.Message;
}
if (!string.IsNullOrWhiteSpace(message))
{
_logger.Warn(message);
}
}
}
public void ConfigureSendInfrastructure()
{
Dispatcher = new MessageDispatcher(
addressTranslator,
new MulticastToUnicastConverter(subscriptionStore),
tableBasedQueueCache,
delayedMessageStore,
connectionFactory);
}
public override Task Shutdown(CancellationToken cancellationToken = default)
{
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
}
#pragma warning disable CS0618 // Type or member is obsolete
public override string ToTransportAddress(Transport.QueueAddress address) => transport.ToTransportAddress(address);
#pragma warning restore CS0618 // Type or member is obsolete
class FakePromotableResourceManager : IEnlistmentNotification
{
public static readonly Guid Id = Guid.NewGuid();
public void Prepare(PreparingEnlistment preparingEnlistment) => preparingEnlistment.Prepared();
public void Commit(Enlistment enlistment) => enlistment.Done();
public void Rollback(Enlistment enlistment) => enlistment.Done();
public void InDoubt(Enlistment enlistment) => enlistment.Done();
public static void ForceDtc() => Transaction.Current.EnlistDurable(Id, new FakePromotableResourceManager(), EnlistmentOptions.None);
}
readonly QueueAddressTranslator addressTranslator;
readonly SqlServerTransport transport;
readonly HostSettings hostSettings;
DueDelayedMessageProcessor dueDelayedMessageProcessor;
Dictionary<string, object> diagnostics = new Dictionary<string, object>();
SqlConnectionFactory connectionFactory;
ISubscriptionStore subscriptionStore;
IDelayedMessageStore delayedMessageStore = new SendOnlyDelayedMessageStore();
TableBasedQueueCache tableBasedQueueCache;
bool isEncrypted;
static ILog _logger = LogManager.GetLogger<SqlServerTransportInfrastructure>();
}
}