diff --git a/Src/SwqlStudio/ActivityMonitorTab.cs b/Src/SwqlStudio/ActivityMonitorTab.cs index faa4b9ec0..b7007ff75 100644 --- a/Src/SwqlStudio/ActivityMonitorTab.cs +++ b/Src/SwqlStudio/ActivityMonitorTab.cs @@ -32,7 +32,7 @@ void ActivityMonitorTabDisposed(object sender, EventArgs e) { if (!String.IsNullOrEmpty(subscriptionId) && ConnectionInfo.IsConnected) { - this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId); + this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived); } } @@ -69,7 +69,8 @@ private void backgroundWorker1_DoWork(object sender, System.ComponentModel.DoWor backgroundWorker1.ReportProgress(0, "Starting subscription..."); try { - subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived); + subscriptionId = this.SubscriptionManager + .CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived); backgroundWorker1.ReportProgress(0, "Waiting for notifications"); } catch (ApplicationException ex) diff --git a/Src/SwqlStudio/QueryTab.cs b/Src/SwqlStudio/QueryTab.cs index fa179e8a6..cd8d818eb 100644 --- a/Src/SwqlStudio/QueryTab.cs +++ b/Src/SwqlStudio/QueryTab.cs @@ -103,7 +103,7 @@ private void Unsubscribe() { if (HasSubscription) { - this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId); + this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived); this.subscriptionId = string.Empty; } } @@ -705,7 +705,8 @@ private void subscriptionWorker_DoWork(object sender, System.ComponentModel.DoWo try { - subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived); + subscriptionId = this.SubscriptionManager + .CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived); this.Invoke(new Action(() => this.ApplicationService.RefreshSelectedConnections())); subscriptionWorker.ReportProgress(0, "Waiting for notifications"); } diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs new file mode 100644 index 000000000..26ae9422b --- /dev/null +++ b/Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs @@ -0,0 +1,76 @@ +using System.Collections.Generic; +using System.Linq; + +namespace SwqlStudio.Subscriptions +{ + internal class SubscriptionCallbacks + { + private readonly string activeSubscriberAddress; + internal string Uri { get; } + internal string Id { get; set; } + + private readonly NotificationDeliveryServiceProxy proxy; + private readonly object itemsLock = new object(); + + internal IEnumerable Callbacks + { + get + { + lock (this.itemsLock) + { + return new List(this.callbacks); + } + } + } + + public bool Empty + { + get + { + lock (this.itemsLock) + { + return !this.callbacks.Any(); + } + } + } + + private readonly List callbacks = new List(); + + public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy, string activeSubscriberAddress) + { + this.Uri = uri; + this.Id = uri.Substring(uri.LastIndexOf("=") + 1); + this.proxy = proxy; + this.activeSubscriberAddress = activeSubscriberAddress; + } + + internal void Add(SubscriberCallback callback) + { + lock (this.itemsLock) + { + if (!this.callbacks.Contains(callback)) + this.callbacks.Add(callback); + } + } + + internal void Remove(SubscriberCallback callback) + { + lock (this.itemsLock) + { + this.callbacks.Remove(callback); + } + } + + internal void CloseProxy() + { + lock (this.itemsLock) + { + if (this.proxy != null) + { + this.proxy.Disconnect(this.activeSubscriberAddress); + this.proxy.Close(); + } + } + } + } +} \ No newline at end of file diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs new file mode 100644 index 000000000..bb41511b8 --- /dev/null +++ b/Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace SwqlStudio.Subscriptions +{ + internal class SubscriptionInfo + { + private readonly ConnectionInfo connection; + private readonly object itemsLock = new object(); + + private readonly Dictionary subscriptions = + new Dictionary(); + + internal SubscriptionInfo(ConnectionInfo connection) + { + this.connection = connection; + } + + internal bool HasSubScription(string subscriptionId) + { + lock (this.itemsLock) + { + return this.subscriptions.Values.Any(s => s.Id == subscriptionId); + } + } + + internal string Register(string query, SubscriberCallback callback, + Func subscribe) + { + SubscriptionCallbacks subscription; + var normalized = query.ToLower(); + + lock (this.itemsLock) + { + if (!this.subscriptions.TryGetValue(normalized, out subscription)) + { + subscription = subscribe(this.connection, query); + this.subscriptions.Add(normalized, subscription); + } + } + + subscription.Add(callback); + return subscription.Uri; + } + + internal void Remove(string subscriptionUri, SubscriberCallback callback) + { + lock (this.itemsLock) + { + var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri) + .Select(kv => kv.Key) + .FirstOrDefault(); + + if (String.IsNullOrEmpty(query)) + return; + + var subscription = this.subscriptions[query]; + subscription.Remove(callback); + + if (subscription.Empty) + { + this.subscriptions.Remove(query); + this.Unsubscribe(subscriptionUri); + subscription.CloseProxy(); + } + } + } + + private void Unsubscribe(string subscriptionUri) + { + if (this.connection.IsConnected) + this.connection.Proxy.Delete(subscriptionUri); + } + + internal IEnumerable CallBacks(string subscriptionId) + { + lock (this.itemsLock) + { + return this.subscriptions.Values.Where(v => v.Id == subscriptionId) + .SelectMany(kv => kv.Callbacks) + .ToList(); + } + } + } +} \ No newline at end of file diff --git a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs index e1f7a485f..d4832421f 100644 --- a/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs +++ b/Src/SwqlStudio/Subscriptions/SubscriptionManager.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.ServiceModel; @@ -16,19 +15,11 @@ namespace SwqlStudio.Subscriptions public class SubscriptionManager : INotificationSubscriber { private readonly static Log log = new Log(); - private readonly ConcurrentDictionary subscriptions = new ConcurrentDictionary(); private readonly ConcurrentDictionary proxies = new ConcurrentDictionary(); private readonly SubscriptionServiceHost _subscriptionListener; private readonly string _activeSubscriberAddress; - class SubscriptionInfo - { - public List SubscriptionIDs { get; } = new List(); - - public NotificationDeliveryServiceProxy Proxy { get; set; } - } - public SubscriptionManager() { _subscriptionListener = new SubscriptionServiceHost(this); @@ -36,29 +27,20 @@ public SubscriptionManager() _activeSubscriberAddress = $"active://{Utility.GetFqdn()}/SolarWinds/SwqlStudio/{Process.GetCurrentProcess().Id}"; } - public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionId) + public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, SubscriberCallback callback) { try { - if (connectionInfo.IsConnected) + SubscriptionInfo subscriptionInfo; + if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) { - connectionInfo.Proxy.Delete(subscriptionId); + subscriptionInfo.Remove(subscriptionUri, callback); } } catch (FaultException ex) { - log.Debug($"Unable to unsubscribe {subscriptionId}. Must have been deleted already", ex); + log.Debug($"Unable to unsubscribe {subscriptionUri}. Must have been deleted already", ex); } - - SubscriptionInfo subscriptionInfo; - if (proxies.TryGetValue(connectionInfo, out subscriptionInfo)) - { - subscriptionInfo.SubscriptionIDs.Remove(subscriptionId); - } - - var key = GetSubscriptionKeyFromUri(subscriptionId); - SubscriberCallback callback; - subscriptions.TryRemove(key, out callback); } public string CreateSubscription(ConnectionInfo connectionInfo, string subscriptionQuery, SubscriberCallback callback) @@ -69,54 +51,32 @@ public string CreateSubscription(ConnectionInfo connectionInfo, string subscript SubscriptionInfo subInfo; if (!proxies.TryGetValue(connectionInfo, out subInfo)) { - var listener = CreateListener(connectionInfo); - - subInfo = new SubscriptionInfo() { Proxy = listener }; + subInfo = new SubscriptionInfo(connectionInfo); proxies[connectionInfo] = subInfo; connectionInfo.ConnectionClosing += ServerConnectionClosing; } - var subscriptionId = Subscribe(connectionInfo, subscriptionQuery); - subInfo.SubscriptionIDs.Add(subscriptionId); - - var key = GetSubscriptionKeyFromUri(subscriptionId); - subscriptions.TryAdd(key, callback); - - return subscriptionId; + return subInfo.Register(subscriptionQuery, callback, Subscribe); } private void ServerConnectionClosing(object sender, EventArgs e) { ConnectionInfo connectionInfo = (ConnectionInfo) sender; - SubscriptionInfo subscriptionInfo; - if (proxies.TryRemove(connectionInfo, out subscriptionInfo)) - { - foreach (var id in subscriptionInfo.SubscriptionIDs.ToList()) - Unsubscribe(connectionInfo, id); - - if (subscriptionInfo.Proxy != null) - { - subscriptionInfo.Proxy.Disconnect(_activeSubscriberAddress); - subscriptionInfo.Proxy.Close(); - } - } - } - - private static string GetSubscriptionKeyFromUri(string subscriptionId) - { - var key = subscriptionId.Substring(subscriptionId.LastIndexOf("=") + 1); - return key; + proxies.TryRemove(connectionInfo, out subscriptionInfo); } public void OnIndication(string subscriptionId, string indicationType, PropertyBag indicationProperties, PropertyBag sourceInstanceProperties) { - SubscriberCallback targetSubscriber; + var targetSubscribers = this.proxies.Values + .Where(p => p.HasSubScription(subscriptionId)) + .SelectMany(p => p.CallBacks(subscriptionId)) + .ToList(); - if (subscriptions.TryGetValue(subscriptionId, out targetSubscriber)) + foreach (var callback in targetSubscribers) { - targetSubscriber(new IndicationEventArgs + callback(new IndicationEventArgs { SubscriptionID = subscriptionId ,IndicationType = indicationType @@ -126,7 +86,7 @@ private static string GetSubscriptionKeyFromUri(string subscriptionId) } } - private string Subscribe(ConnectionInfo connectioninfo, string query) + private SubscriptionCallbacks Subscribe(ConnectionInfo connectioninfo, string query) { var credentialType = connectioninfo.SupportsActiveSubscriptions ? "None" : _subscriptionListener.GetCredentialTypeForBinding(connectioninfo.Binding); @@ -147,7 +107,9 @@ private string Subscribe(ConnectionInfo connectioninfo, string query) propertyBag.Add("Password", "subscriber"); } - return ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag)); + var subscriptionUri = ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag)); + var listener = CreateListener(connectioninfo); + return new SubscriptionCallbacks(subscriptionUri, listener, _activeSubscriberAddress); } private NotificationDeliveryServiceProxy CreateListener(ConnectionInfo connectionInfo) diff --git a/Src/SwqlStudio/SwqlStudio.csproj b/Src/SwqlStudio/SwqlStudio.csproj index a0749c8ed..be62304f8 100644 --- a/Src/SwqlStudio/SwqlStudio.csproj +++ b/Src/SwqlStudio/SwqlStudio.csproj @@ -226,6 +226,8 @@ + +