From 503e6570dea9fc119317c416b7c26e532bc72402 Mon Sep 17 00:00:00 2001 From: JirkaPok Date: Tue, 26 Jun 2018 02:43:19 +0200 Subject: [PATCH 1/4] Added classes to wrap registered changes in swis --- Src/SwqlStudio/ActivityMonitorTab.cs | 5 +- Src/SwqlStudio/QueryTab.cs | 5 +- .../Subscriptions/SubscriptionManager.cs | 152 +++++++++++++----- 3 files changed, 116 insertions(+), 46 deletions(-) diff --git a/Src/SwqlStudio/ActivityMonitorTab.cs b/Src/SwqlStudio/ActivityMonitorTab.cs index faa4b9ec0..b7007ff75 100644 --- a/Src/SwqlStudio/ActivityMonitorTab.cs +++ b/Src/SwqlStudio/ActivityMonitorTab.cs @@ -32,7 +32,7 @@ void ActivityMonitorTabDisposed(object sender, EventArgs e) { if (!String.IsNullOrEmpty(subscriptionId) && ConnectionInfo.IsConnected) { - this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId); + this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived); } } @@ -69,7 +69,8 @@ private void backgroundWorker1_DoWork(object sender, System.ComponentModel.DoWor backgroundWorker1.ReportProgress(0, "Starting subscription..."); try { - subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived); + subscriptionId = this.SubscriptionManager + .CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived); backgroundWorker1.ReportProgress(0, "Waiting for notifications"); } catch (ApplicationException ex) diff --git a/Src/SwqlStudio/QueryTab.cs b/Src/SwqlStudio/QueryTab.cs index fa179e8a6..cd8d818eb 100644 --- a/Src/SwqlStudio/QueryTab.cs +++ b/Src/SwqlStudio/QueryTab.cs @@ -103,7 +103,7 @@ private void Unsubscribe() { if (HasSubscription) { - this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId); + this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived); this.subscriptionId = string.Empty; } } @@ -705,7 +705,8 @@ private void subscriptionWorker_DoWork(object sender, System.ComponentModel.DoWo try { - subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived); + subscriptionId = this.SubscriptionManager + .CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived); this.Invoke(new Action(() => this.ApplicationService.RefreshSelectedConnections())); subscriptionWorker.ReportProgress(0, "Waiting for notifications"); } diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs index e1f7a485f..e9201e0a3 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs @@ -16,17 +16,106 @@ namespace SwqlStudio.Subscriptions public class SubscriptionManager : INotificationSubscriber { private readonly static Log log = new Log(); - private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); private readonly ConcurrentDictionary proxies = new ConcurrentDictionary(); private readonly SubscriptionServiceHost _subscriptionListener; private readonly string _activeSubscriberAddress; - class SubscriptionInfo + internal class SubscriptionCallbacks { - public List SubscriptionIDs { get; } = new List(); + internal string Uri { get; } + internal string Id { get; set; } - public NotificationDeliveryServiceProxy Proxy { get; set; } + internal IEnumerable Callbacks => this.callbacks; + public bool Empty => !this.callbacks.Any(); + + private readonly List callbacks = new List(); + + public SubscriptionCallbacks(string uri) + { + this.Uri = uri; + this.Id = uri.Substring(uri.LastIndexOf("=") + 1); + } + + internal void Add(SubscriberCallback callback) + { + if (!this.callbacks.Contains(callback)) + this.callbacks.Add(callback); + } + + internal void Remove(SubscriberCallback callback) + { + this.callbacks.Remove(callback); + } + } + + internal class SubscriptionInfo + { + private readonly ConnectionInfo connection; + public NotificationDeliveryServiceProxy Proxy { get; } + + private readonly ConcurrentDictionary subscriptions = + new ConcurrentDictionary(); + + internal SubscriptionInfo(ConnectionInfo connection, NotificationDeliveryServiceProxy proxy) + { + Proxy = proxy; + this.connection = connection; + } + + internal bool HasSubScription(string subscriptionId) + { + return this.subscriptions.Values.Any(s => s.Id == subscriptionId); + } + + internal string Register(string query, SubscriberCallback callback, + Func subscribe) + { + SubscriptionCallbacks subscription; + var normalized = query.ToLower(); + + if (!this.subscriptions.TryGetValue(normalized, out subscription)) + { + var subscriptionUri = subscribe(this.connection, query); + subscription = new SubscriptionCallbacks(subscriptionUri); + this.subscriptions.TryAdd(normalized, subscription); + } + + subscription.Add(callback); + return subscription.Uri; + } + + internal void Remove(string subscriptionUri, SubscriberCallback callback) + { + var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) + .Select(kv => kv.Key) + .FirstOrDefault(); + + if (string.IsNullOrEmpty(query)) + return; + + var subscription = this.subscriptions[query]; + subscription.Remove(callback); + + if (subscription.Empty) + { + this.subscriptions.TryRemove(query, out subscription); + this.Unsubscribe(subscriptionUri); + } + } + + private void Unsubscribe(string subscriptionUri) + { + if (this.connection.IsConnected) + this.connection.Proxy.Delete(subscriptionUri); + } + + internal IEnumerable CallBacks(string subscriptionId) + { + return this.subscriptions.Values.Where(v => v.Id == subscriptionId) + .SelectMany(kv => kv.Callbacks) + .ToList(); + } } public SubscriptionManager() @@ -36,29 +125,20 @@ public SubscriptionManager() _activeSubscriberAddress = $"active://{Utility.GetFqdn()}/SolarWinds/SwqlStudio/{Process.GetCurrentProcess().Id}"; } - public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionId) + public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, SubscriberCallback callback) { try { - if (connectionInfo.IsConnected) + SubscriptionInfo subscriptionInfo; + if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) { - connectionInfo.Proxy.Delete(subscriptionId); + subscriptionInfo.Remove(subscriptionUri, callback); } } catch (FaultException ex) { - log.Debug($"Unable to unsubscribe {subscriptionId}. Must have been deleted already", ex); + log.Debug($"Unable to unsubscribe {subscriptionUri}. Must have been deleted already", ex); } - - SubscriptionInfo subscriptionInfo; - if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) - { - subscriptionInfo.SubscriptionIDs.Remove(subscriptionId); - } - - var key = GetSubscriptionKeyFromUri(subscriptionId); - SubscriberCallback callback; - subscriptions.TryRemove(key, out callback); } public string CreateSubscription(ConnectionInfo connectionInfo, string subscriptionQuery, SubscriberCallback callback) @@ -70,19 +150,12 @@ public string CreateSubscription(ConnectionInfo connectionInfo, string subscript if (!proxies.TryGetValue(connectionInfo, out subInfo)) { var listener = CreateListener(connectionInfo); - - subInfo = new SubscriptionInfo() { Proxy = listener }; + subInfo = new SubscriptionInfo(connectionInfo, listener); proxies[connectionInfo] = subInfo; connectionInfo.ConnectionClosing += ServerConnectionClosing; } - var subscriptionId = Subscribe(connectionInfo, subscriptionQuery); - subInfo.SubscriptionIDs.Add(subscriptionId); - - var key = GetSubscriptionKeyFromUri(subscriptionId); - subscriptions.TryAdd(key, callback); - - return subscriptionId; + return subInfo.Register(subscriptionQuery, callback, Subscribe); } private void ServerConnectionClosing(object sender, EventArgs e) @@ -92,31 +165,26 @@ private void ServerConnectionClosing(object sender, EventArgs e) SubscriptionInfo subscriptionInfo; if (proxies.TryRemove(connectionInfo, out subscriptionInfo)) { - foreach (var id in subscriptionInfo.SubscriptionIDs.ToList()) - Unsubscribe(connectionInfo, id); - - if (subscriptionInfo.Proxy != null) + var proxy = subscriptionInfo.Proxy; + if (proxy != null) { - subscriptionInfo.Proxy.Disconnect(_activeSubscriberAddress); - subscriptionInfo.Proxy.Close(); + proxy.Disconnect(_activeSubscriberAddress); + proxy.Close(); } } } - private static string GetSubscriptionKeyFromUri(string subscriptionId) - { - var key = subscriptionId.Substring(subscriptionId.LastIndexOf("=") + 1); - return key; - } - public void OnIndication(string subscriptionId, string indicationType, PropertyBag indicationProperties, PropertyBag sourceInstanceProperties) { - SubscriberCallback targetSubscriber; + var targetSubscribers = this.proxies.Values + .Where(p => p.HasSubScription(subscriptionId)) + .SelectMany(p => p.CallBacks(subscriptionId)) + .ToList(); - if (subscriptions.TryGetValue(subscriptionId, out targetSubscriber)) + foreach (var callback in targetSubscribers) { - targetSubscriber(new IndicationEventArgs + callback(new IndicationEventArgs { SubscriptionID = subscriptionId ,IndicationType = indicationType From ddbe69183bca5a214fcbfad0de64b4ddc2448dfa Mon Sep 17 00:00:00 2001 From: JirkaPok Date: Tue, 26 Jun 2018 09:53:03 +0200 Subject: [PATCH 2/4] Moved recreation of proxy per query --- .../Subscriptions/SubscriptionManager.cs | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs index e9201e0a3..83a9b850e 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs @@ -26,15 +26,18 @@ internal class SubscriptionCallbacks internal string Uri { get; } internal string Id { get; set; } + public NotificationDeliveryServiceProxy Proxy { get; } + internal IEnumerable Callbacks => this.callbacks; public bool Empty => !this.callbacks.Any(); private readonly List callbacks = new List(); - public SubscriptionCallbacks(string uri) + public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy) { this.Uri = uri; this.Id = uri.Substring(uri.LastIndexOf("=") + 1); + this.Proxy = proxy; } internal void Add(SubscriberCallback callback) @@ -52,14 +55,12 @@ internal void Remove(SubscriberCallback callback) internal class SubscriptionInfo { private readonly ConnectionInfo connection; - public NotificationDeliveryServiceProxy Proxy { get; } private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); - internal SubscriptionInfo(ConnectionInfo connection, NotificationDeliveryServiceProxy proxy) + internal SubscriptionInfo(ConnectionInfo connection) { - Proxy = proxy; this.connection = connection; } @@ -69,15 +70,14 @@ internal bool HasSubScription(string subscriptionId) } internal string Register(string query, SubscriberCallback callback, - Func subscribe) + Func subscribe) { SubscriptionCallbacks subscription; var normalized = query.ToLower(); if (!this.subscriptions.TryGetValue(normalized, out subscription)) { - var subscriptionUri = subscribe(this.connection, query); - subscription = new SubscriptionCallbacks(subscriptionUri); + subscription = subscribe(this.connection, query); this.subscriptions.TryAdd(normalized, subscription); } @@ -85,7 +85,7 @@ internal bool HasSubScription(string subscriptionId) return subscription.Uri; } - internal void Remove(string subscriptionUri, SubscriberCallback callback) + internal void Remove(string subscriptionUri, SubscriberCallback callback, Action closeListener) { var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) .Select(kv => kv.Key) @@ -101,6 +101,7 @@ internal void Remove(string subscriptionUri, SubscriberCallback callback) { this.subscriptions.TryRemove(query, out subscription); this.Unsubscribe(subscriptionUri); + closeListener(subscription.Proxy); } } @@ -132,7 +133,7 @@ public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, S SubscriptionInfo subscriptionInfo; if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) { - subscriptionInfo.Remove(subscriptionUri, callback); + subscriptionInfo.Remove(subscriptionUri, callback, CloseProxy); } } catch (FaultException ex) @@ -141,6 +142,15 @@ public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, S } } + private void CloseProxy(NotificationDeliveryServiceProxy proxy) + { + if (proxy != null) + { + proxy.Disconnect(_activeSubscriberAddress); + proxy.Close(); + } + } + public string CreateSubscription(ConnectionInfo connectionInfo, string subscriptionQuery, SubscriberCallback callback) { if (!connectionInfo.CanCreateSubscription) @@ -149,8 +159,7 @@ public string CreateSubscription(ConnectionInfo connectionInfo, string subscript SubscriptionInfo subInfo; if (!proxies.TryGetValue(connectionInfo, out subInfo)) { - var listener = CreateListener(connectionInfo); - subInfo = new SubscriptionInfo(connectionInfo, listener); + subInfo = new SubscriptionInfo(connectionInfo); proxies[connectionInfo] = subInfo; connectionInfo.ConnectionClosing += ServerConnectionClosing; } @@ -161,17 +170,8 @@ public string CreateSubscription(ConnectionInfo connectionInfo, string subscript private void ServerConnectionClosing(object sender, EventArgs e) { ConnectionInfo connectionInfo = (ConnectionInfo) sender; - SubscriptionInfo subscriptionInfo; - if (proxies.TryRemove(connectionInfo, out subscriptionInfo)) - { - var proxy = subscriptionInfo.Proxy; - if (proxy != null) - { - proxy.Disconnect(_activeSubscriberAddress); - proxy.Close(); - } - } + proxies.TryRemove(connectionInfo, out subscriptionInfo); } public void OnIndication(string subscriptionId, string indicationType, PropertyBag indicationProperties, @@ -194,7 +194,7 @@ private void ServerConnectionClosing(object sender, EventArgs e) } } - private string Subscribe(ConnectionInfo connectioninfo, string query) + private SubscriptionCallbacks Subscribe(ConnectionInfo connectioninfo, string query) { var credentialType = connectioninfo.SupportsActiveSubscriptions ? "None" : _subscriptionListener.GetCredentialTypeForBinding(connectioninfo.Binding); @@ -215,7 +215,9 @@ private string Subscribe(ConnectionInfo connectioninfo, string query) propertyBag.Add("Password", "subscriber"); } - return ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag)); + var subscriptionUri = ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag)); + var listener = CreateListener(connectioninfo); + return new SubscriptionCallbacks(subscriptionUri, listener); } private NotificationDeliveryServiceProxy CreateListener(ConnectionInfo connectionInfo) From dbcf9ee202c7b17a2dc878e98d31571cedfd568b Mon Sep 17 00:00:00 2001 From: JirkaPok Date: Tue, 26 Jun 2018 10:03:50 +0200 Subject: [PATCH 3/4] Extracted nested classes from subscription manager --- .../Subscriptions/SubscriptionCallbacks.cs | 47 ++++++++ .../Subscriptions/SubscriptionInfo.cs | 74 ++++++++++++ .../Subscriptions/SubscriptionManager.cs | 112 +----------------- Src/SwqlStudio/SwqlStudio.csproj | 2 + 4 files changed, 125 insertions(+), 110 deletions(-) create mode 100644 Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs create mode 100644 Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs new file mode 100644 index 000000000..84eec1cd5 --- /dev/null +++ b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs @@ -0,0 +1,47 @@ +using System.Collections.Generic; +using System.Linq; + +namespace SwqlStudio.Subscriptions +{ + internal class SubscriptionCallbacks + { + private readonly string activeSubscriberAddress; + internal string Uri { get; } + internal string Id { get; set; } + + private readonly NotificationDeliveryServiceProxy proxy; + + internal IEnumerable Callbacks => this.callbacks; + public bool Empty => !this.callbacks.Any(); + + private readonly List callbacks = new List(); + + public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy, string activeSubscriberAddress) + { + this.Uri = uri; + this.Id = uri.Substring(uri.LastIndexOf("=") + 1); + this.proxy = proxy; + this.activeSubscriberAddress = activeSubscriberAddress; + } + + internal void Add(SubscriberCallback callback) + { + if (!this.callbacks.Contains(callback)) + this.callbacks.Add(callback); + } + + internal void Remove(SubscriberCallback callback) + { + this.callbacks.Remove(callback); + } + + internal void CloseProxy() + { + if (this.proxy != null) + { + this.proxy.Disconnect(this.activeSubscriberAddress); + this.proxy.Close(); + } + } + } +} \ No newline at end of file diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs new file mode 100644 index 000000000..624da339d --- /dev/null +++ b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; + +namespace SwqlStudio.Subscriptions +{ + internal class SubscriptionInfo + { + private readonly ConnectionInfo connection; + + private readonly ConcurrentDictionary subscriptions = + new ConcurrentDictionary(); + + internal SubscriptionInfo(ConnectionInfo connection) + { + this.connection = connection; + } + + internal bool HasSubScription(string subscriptionId) + { + return this.subscriptions.Values.Any(s => s.Id == subscriptionId); + } + + internal string Register(string query, SubscriberCallback callback, + Func subscribe) + { + SubscriptionCallbacks subscription; + var normalized = query.ToLower(); + + if (!this.subscriptions.TryGetValue(normalized, out subscription)) + { + subscription = subscribe(this.connection, query); + this.subscriptions.TryAdd(normalized, subscription); + } + + subscription.Add(callback); + return subscription.Uri; + } + + internal void Remove(string subscriptionUri, SubscriberCallback callback) + { + var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) + .Select(kv => kv.Key) + .FirstOrDefault(); + + if (String.IsNullOrEmpty(query)) + return; + + var subscription = this.subscriptions[query]; + subscription.Remove(callback); + + if (subscription.Empty) + { + this.subscriptions.TryRemove(query, out subscription); + this.Unsubscribe(subscriptionUri); + subscription.CloseProxy(); + } + } + + private void Unsubscribe(string subscriptionUri) + { + if (this.connection.IsConnected) + this.connection.Proxy.Delete(subscriptionUri); + } + + internal IEnumerable CallBacks(string subscriptionId) + { + return this.subscriptions.Values.Where(v => v.Id == subscriptionId) + .SelectMany(kv => kv.Callbacks) + .ToList(); + } + } +} \ No newline at end of file diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs index 83a9b850e..d4832421f 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; @@ -21,104 +20,6 @@ public class SubscriptionManager : INotificationSubscriber private readonly SubscriptionServiceHost _subscriptionListener; private readonly string _activeSubscriberAddress; - internal class SubscriptionCallbacks - { - internal string Uri { get; } - internal string Id { get; set; } - - public NotificationDeliveryServiceProxy Proxy { get; } - - internal IEnumerable Callbacks => this.callbacks; - public bool Empty => !this.callbacks.Any(); - - private readonly List callbacks = new List(); - - public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy) - { - this.Uri = uri; - this.Id = uri.Substring(uri.LastIndexOf("=") + 1); - this.Proxy = proxy; - } - - internal void Add(SubscriberCallback callback) - { - if (!this.callbacks.Contains(callback)) - this.callbacks.Add(callback); - } - - internal void Remove(SubscriberCallback callback) - { - this.callbacks.Remove(callback); - } - } - - internal class SubscriptionInfo - { - private readonly ConnectionInfo connection; - - private readonly ConcurrentDictionary subscriptions = - new ConcurrentDictionary(); - - internal SubscriptionInfo(ConnectionInfo connection) - { - this.connection = connection; - } - - internal bool HasSubScription(string subscriptionId) - { - return this.subscriptions.Values.Any(s => s.Id == subscriptionId); - } - - internal string Register(string query, SubscriberCallback callback, - Func subscribe) - { - SubscriptionCallbacks subscription; - var normalized = query.ToLower(); - - if (!this.subscriptions.TryGetValue(normalized, out subscription)) - { - subscription = subscribe(this.connection, query); - this.subscriptions.TryAdd(normalized, subscription); - } - - subscription.Add(callback); - return subscription.Uri; - } - - internal void Remove(string subscriptionUri, SubscriberCallback callback, Action closeListener) - { - var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) - .Select(kv => kv.Key) - .FirstOrDefault(); - - if (string.IsNullOrEmpty(query)) - return; - - var subscription = this.subscriptions[query]; - subscription.Remove(callback); - - if (subscription.Empty) - { - this.subscriptions.TryRemove(query, out subscription); - this.Unsubscribe(subscriptionUri); - closeListener(subscription.Proxy); - } - } - - private void Unsubscribe(string subscriptionUri) - { - if (this.connection.IsConnected) - this.connection.Proxy.Delete(subscriptionUri); - } - - internal IEnumerable CallBacks(string subscriptionId) - { - return this.subscriptions.Values.Where(v => v.Id == subscriptionId) - .SelectMany(kv => kv.Callbacks) - .ToList(); - } - } - public SubscriptionManager() { _subscriptionListener = new SubscriptionServiceHost(this); @@ -133,7 +34,7 @@ public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, S SubscriptionInfo subscriptionInfo; if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) { - subscriptionInfo.Remove(subscriptionUri, callback, CloseProxy); + subscriptionInfo.Remove(subscriptionUri, callback); } } catch (FaultException ex) @@ -142,15 +43,6 @@ public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, S } } - private void CloseProxy(NotificationDeliveryServiceProxy proxy) - { - if (proxy != null) - { - proxy.Disconnect(_activeSubscriberAddress); - proxy.Close(); - } - } - public string CreateSubscription(ConnectionInfo connectionInfo, string subscriptionQuery, SubscriberCallback callback) { if (!connectionInfo.CanCreateSubscription) @@ -217,7 +109,7 @@ private SubscriptionCallbacks Subscribe(ConnectionInfo connectioninfo, string qu var subscriptionUri = ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag)); var listener = CreateListener(connectioninfo); - return new SubscriptionCallbacks(subscriptionUri, listener); + return new SubscriptionCallbacks(subscriptionUri, listener, _activeSubscriberAddress); } private NotificationDeliveryServiceProxy CreateListener(ConnectionInfo connectionInfo) diff --git a/Src/SwqlStudio/SwqlStudio.csproj b/Src/SwqlStudio/SwqlStudio.csproj index a0749c8ed..be62304f8 100644 --- a/Src/SwqlStudio/SwqlStudio.csproj +++ b/Src/SwqlStudio/SwqlStudio.csproj @@ -226,6 +226,8 @@ + + From fa238cd7a8a53b6d390805ce411e332fc873ba71 Mon Sep 17 00:00:00 2001 From: JirkaPok Date: Tue, 26 Jun 2018 13:59:54 +0200 Subject: [PATCH 4/4] Made Subscription objects thread safe --- .../Subscriptions/SubscriptionCallbacks.cs | 45 +++++++++++--- .../Subscriptions/SubscriptionInfo.cs | 58 +++++++++++-------- 2 files changed, 72 insertions(+), 31 deletions(-) diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs index 84eec1cd5..26ae9422b 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs @@ -10,9 +10,29 @@ internal class SubscriptionCallbacks internal string Id { get; set; } private readonly NotificationDeliveryServiceProxy proxy; + private readonly object itemsLock = new object(); - internal IEnumerable Callbacks => this.callbacks; - public bool Empty => !this.callbacks.Any(); + internal IEnumerable Callbacks + { + get + { + lock (this.itemsLock) + { + return new List(this.callbacks); + } + } + } + + public bool Empty + { + get + { + lock (this.itemsLock) + { + return !this.callbacks.Any(); + } + } + } private readonly List callbacks = new List(); @@ -26,21 +46,30 @@ public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy, internal void Add(SubscriberCallback callback) { - if (!this.callbacks.Contains(callback)) - this.callbacks.Add(callback); + lock (this.itemsLock) + { + if (!this.callbacks.Contains(callback)) + this.callbacks.Add(callback); + } } internal void Remove(SubscriberCallback callback) { - this.callbacks.Remove(callback); + lock (this.itemsLock) + { + this.callbacks.Remove(callback); + } } internal void CloseProxy() { - if (this.proxy != null) + lock (this.itemsLock) { - this.proxy.Disconnect(this.activeSubscriberAddress); - this.proxy.Close(); + if (this.proxy != null) + { + this.proxy.Disconnect(this.activeSubscriberAddress); + this.proxy.Close(); + } } } } diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs index 624da339d..bb41511b8 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs @@ -1,5 +1,4 @@ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -8,9 +7,10 @@ namespace SwqlStudio.Subscriptions internal class SubscriptionInfo { private readonly ConnectionInfo connection; + private readonly object itemsLock = new object(); - private readonly ConcurrentDictionary subscriptions = - new ConcurrentDictionary(); + private readonly Dictionary subscriptions = + new Dictionary(); internal SubscriptionInfo(ConnectionInfo connection) { @@ -19,7 +19,10 @@ internal SubscriptionInfo(ConnectionInfo connection) internal bool HasSubScription(string subscriptionId) { - return this.subscriptions.Values.Any(s => s.Id == subscriptionId); + lock (this.itemsLock) + { + return this.subscriptions.Values.Any(s => s.Id == subscriptionId); + } } internal string Register(string query, SubscriberCallback callback, @@ -28,10 +31,13 @@ internal bool HasSubScription(string subscriptionId) SubscriptionCallbacks subscription; var normalized = query.ToLower(); - if (!this.subscriptions.TryGetValue(normalized, out subscription)) + lock (this.itemsLock) { - subscription = subscribe(this.connection, query); - this.subscriptions.TryAdd(normalized, subscription); + if (!this.subscriptions.TryGetValue(normalized, out subscription)) + { + subscription = subscribe(this.connection, query); + this.subscriptions.Add(normalized, subscription); + } } subscription.Add(callback); @@ -40,21 +46,24 @@ internal bool HasSubScription(string subscriptionId) internal void Remove(string subscriptionUri, SubscriberCallback callback) { - var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) - .Select(kv => kv.Key) - .FirstOrDefault(); + lock (this.itemsLock) + { + var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) + .Select(kv => kv.Key) + .FirstOrDefault(); - if (String.IsNullOrEmpty(query)) - return; - - var subscription = this.subscriptions[query]; - subscription.Remove(callback); + if (String.IsNullOrEmpty(query)) + return; - if (subscription.Empty) - { - this.subscriptions.TryRemove(query, out subscription); - this.Unsubscribe(subscriptionUri); - subscription.CloseProxy(); + var subscription = this.subscriptions[query]; + subscription.Remove(callback); + + if (subscription.Empty) + { + this.subscriptions.Remove(query); + this.Unsubscribe(subscriptionUri); + subscription.CloseProxy(); + } } } @@ -66,9 +75,12 @@ private void Unsubscribe(string subscriptionUri) internal IEnumerable CallBacks(string subscriptionId) { - return this.subscriptions.Values.Where(v => v.Id == subscriptionId) - .SelectMany(kv => kv.Callbacks) - .ToList(); + lock (this.itemsLock) + { + return this.subscriptions.Values.Where(v => v.Id == subscriptionId) + .SelectMany(kv => kv.Callbacks) + .ToList(); + } } } } \ No newline at end of file