Skip to content

Commit

Permalink
Merge pull request #147 from jirkapok/feature/Fixed_Notifications_Tab…
Browse files Browse the repository at this point in the history
…_Recreation

Fixed notification not received in SWQL studio
  • Loading branch information
nothrow committed Jun 26, 2018
2 parents 0148a73 + fa238cd commit a1f7a3b
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 60 deletions.
5 changes: 3 additions & 2 deletions Src/SwqlStudio/ActivityMonitorTab.cs
Expand Up @@ -32,7 +32,7 @@ void ActivityMonitorTabDisposed(object sender, EventArgs e)
{
if (!String.IsNullOrEmpty(subscriptionId) && ConnectionInfo.IsConnected)
{
this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId);
this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived);
}
}

Expand Down Expand Up @@ -69,7 +69,8 @@ private void backgroundWorker1_DoWork(object sender, System.ComponentModel.DoWor
backgroundWorker1.ReportProgress(0, "Starting subscription...");
try
{
subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived);
subscriptionId = this.SubscriptionManager
.CreateSubscription(ConnectionInfo, "SUBSCRIBE System.QueryExecuted", SubscriptionIndicationReceived);
backgroundWorker1.ReportProgress(0, "Waiting for notifications");
}
catch (ApplicationException ex)
Expand Down
5 changes: 3 additions & 2 deletions Src/SwqlStudio/QueryTab.cs
Expand Up @@ -103,7 +103,7 @@ private void Unsubscribe()
{
if (HasSubscription)
{
this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId);
this.SubscriptionManager.Unsubscribe(ConnectionInfo, subscriptionId, SubscriptionIndicationReceived);
this.subscriptionId = string.Empty;
}
}
Expand Down Expand Up @@ -705,7 +705,8 @@ private void subscriptionWorker_DoWork(object sender, System.ComponentModel.DoWo

try
{
subscriptionId = this.SubscriptionManager.CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived);
subscriptionId = this.SubscriptionManager
.CreateSubscription(ConnectionInfo, arg.Query, SubscriptionIndicationReceived);
this.Invoke(new Action(() => this.ApplicationService.RefreshSelectedConnections()));
subscriptionWorker.ReportProgress(0, "Waiting for notifications");
}
Expand Down
76 changes: 76 additions & 0 deletions Src/SwqlStudio/Subscriptions/SubscriptionCallbacks.cs
@@ -0,0 +1,76 @@
using System.Collections.Generic;
using System.Linq;

namespace SwqlStudio.Subscriptions
{
internal class SubscriptionCallbacks
{
private readonly string activeSubscriberAddress;
internal string Uri { get; }
internal string Id { get; set; }

private readonly NotificationDeliveryServiceProxy proxy;
private readonly object itemsLock = new object();

internal IEnumerable<SubscriberCallback> Callbacks
{
get
{
lock (this.itemsLock)
{
return new List<SubscriberCallback>(this.callbacks);
}
}
}

public bool Empty
{
get
{
lock (this.itemsLock)
{
return !this.callbacks.Any();
}
}
}

private readonly List<SubscriberCallback> callbacks = new List<SubscriberCallback>();

public SubscriptionCallbacks(string uri, NotificationDeliveryServiceProxy proxy, string activeSubscriberAddress)
{
this.Uri = uri;
this.Id = uri.Substring(uri.LastIndexOf("=") + 1);
this.proxy = proxy;
this.activeSubscriberAddress = activeSubscriberAddress;
}

internal void Add(SubscriberCallback callback)
{
lock (this.itemsLock)
{
if (!this.callbacks.Contains(callback))
this.callbacks.Add(callback);
}
}

internal void Remove(SubscriberCallback callback)
{
lock (this.itemsLock)
{
this.callbacks.Remove(callback);
}
}

internal void CloseProxy()
{
lock (this.itemsLock)
{
if (this.proxy != null)
{
this.proxy.Disconnect(this.activeSubscriberAddress);
this.proxy.Close();
}
}
}
}
}
86 changes: 86 additions & 0 deletions Src/SwqlStudio/Subscriptions/SubscriptionInfo.cs
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Linq;

namespace SwqlStudio.Subscriptions
{
internal class SubscriptionInfo
{
private readonly ConnectionInfo connection;
private readonly object itemsLock = new object();

private readonly Dictionary<string, SubscriptionCallbacks> subscriptions =
new Dictionary<string, SubscriptionCallbacks>();

internal SubscriptionInfo(ConnectionInfo connection)
{
this.connection = connection;
}

internal bool HasSubScription(string subscriptionId)
{
lock (this.itemsLock)
{
return this.subscriptions.Values.Any(s => s.Id == subscriptionId);
}
}

internal string Register(string query, SubscriberCallback callback,
Func<ConnectionInfo, string, SubscriptionCallbacks> subscribe)
{
SubscriptionCallbacks subscription;
var normalized = query.ToLower();

lock (this.itemsLock)
{
if (!this.subscriptions.TryGetValue(normalized, out subscription))
{
subscription = subscribe(this.connection, query);
this.subscriptions.Add(normalized, subscription);
}
}

subscription.Add(callback);
return subscription.Uri;
}

internal void Remove(string subscriptionUri, SubscriberCallback callback)
{
lock (this.itemsLock)
{
var query = this.subscriptions.Where(kv => kv.Value.Uri == subscriptionUri)
.Select(kv => kv.Key)
.FirstOrDefault();

if (String.IsNullOrEmpty(query))
return;

var subscription = this.subscriptions[query];
subscription.Remove(callback);

if (subscription.Empty)
{
this.subscriptions.Remove(query);
this.Unsubscribe(subscriptionUri);
subscription.CloseProxy();
}
}
}

private void Unsubscribe(string subscriptionUri)
{
if (this.connection.IsConnected)
this.connection.Proxy.Delete(subscriptionUri);
}

internal IEnumerable<SubscriberCallback> CallBacks(string subscriptionId)
{
lock (this.itemsLock)
{
return this.subscriptions.Values.Where(v => v.Id == subscriptionId)
.SelectMany(kv => kv.Callbacks)
.ToList();
}
}
}
}
74 changes: 18 additions & 56 deletions Src/SwqlStudio/Subscriptions/SubscriptionManager.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
Expand All @@ -16,49 +15,32 @@ namespace SwqlStudio.Subscriptions
public class SubscriptionManager : INotificationSubscriber
{
private readonly static Log log = new Log();
private readonly ConcurrentDictionary<string, SubscriberCallback> subscriptions = new ConcurrentDictionary<string, SubscriberCallback>();
private readonly ConcurrentDictionary<ConnectionInfo, SubscriptionInfo> proxies = new ConcurrentDictionary<ConnectionInfo, SubscriptionInfo>();

private readonly SubscriptionServiceHost _subscriptionListener;
private readonly string _activeSubscriberAddress;

class SubscriptionInfo
{
public List<string > SubscriptionIDs { get; } = new List<string>();

public NotificationDeliveryServiceProxy Proxy { get; set; }
}

public SubscriptionManager()
{
_subscriptionListener = new SubscriptionServiceHost(this);

_activeSubscriberAddress = $"active://{Utility.GetFqdn()}/SolarWinds/SwqlStudio/{Process.GetCurrentProcess().Id}";
}

public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionId)
public void Unsubscribe(ConnectionInfo connectionInfo, string subscriptionUri, SubscriberCallback callback)
{
try
{
if (connectionInfo.IsConnected)
SubscriptionInfo subscriptionInfo;
if (proxies.TryGetValue(connectionInfo, out subscriptionInfo))
{
connectionInfo.Proxy.Delete(subscriptionId);
subscriptionInfo.Remove(subscriptionUri, callback);
}
}
catch (FaultException<InfoServiceFaultContract> ex)
{
log.Debug($"Unable to unsubscribe {subscriptionId}. Must have been deleted already", ex);
log.Debug($"Unable to unsubscribe {subscriptionUri}. Must have been deleted already", ex);
}

SubscriptionInfo subscriptionInfo;
if (proxies.TryGetValue(connectionInfo, out subscriptionInfo))
{
subscriptionInfo.SubscriptionIDs.Remove(subscriptionId);
}

var key = GetSubscriptionKeyFromUri(subscriptionId);
SubscriberCallback callback;
subscriptions.TryRemove(key, out callback);
}

public string CreateSubscription(ConnectionInfo connectionInfo, string subscriptionQuery, SubscriberCallback callback)
Expand All @@ -69,54 +51,32 @@ public string CreateSubscription(ConnectionInfo connectionInfo, string subscript
SubscriptionInfo subInfo;
if (!proxies.TryGetValue(connectionInfo, out subInfo))
{
var listener = CreateListener(connectionInfo);

subInfo = new SubscriptionInfo() { Proxy = listener };
subInfo = new SubscriptionInfo(connectionInfo);
proxies[connectionInfo] = subInfo;
connectionInfo.ConnectionClosing += ServerConnectionClosing;
}

var subscriptionId = Subscribe(connectionInfo, subscriptionQuery);
subInfo.SubscriptionIDs.Add(subscriptionId);

var key = GetSubscriptionKeyFromUri(subscriptionId);
subscriptions.TryAdd(key, callback);

return subscriptionId;
return subInfo.Register(subscriptionQuery, callback, Subscribe);
}

private void ServerConnectionClosing(object sender, EventArgs e)
{
ConnectionInfo connectionInfo = (ConnectionInfo) sender;

SubscriptionInfo subscriptionInfo;
if (proxies.TryRemove(connectionInfo, out subscriptionInfo))
{
foreach (var id in subscriptionInfo.SubscriptionIDs.ToList())
Unsubscribe(connectionInfo, id);

if (subscriptionInfo.Proxy != null)
{
subscriptionInfo.Proxy.Disconnect(_activeSubscriberAddress);
subscriptionInfo.Proxy.Close();
}
}
}

private static string GetSubscriptionKeyFromUri(string subscriptionId)
{
var key = subscriptionId.Substring(subscriptionId.LastIndexOf("=") + 1);
return key;
proxies.TryRemove(connectionInfo, out subscriptionInfo);
}

public void OnIndication(string subscriptionId, string indicationType, PropertyBag indicationProperties,
PropertyBag sourceInstanceProperties)
{
SubscriberCallback targetSubscriber;
var targetSubscribers = this.proxies.Values
.Where(p => p.HasSubScription(subscriptionId))
.SelectMany(p => p.CallBacks(subscriptionId))
.ToList();

if (subscriptions.TryGetValue(subscriptionId, out targetSubscriber))
foreach (var callback in targetSubscribers)
{
targetSubscriber(new IndicationEventArgs
callback(new IndicationEventArgs
{
SubscriptionID = subscriptionId
,IndicationType = indicationType
Expand All @@ -126,7 +86,7 @@ private static string GetSubscriptionKeyFromUri(string subscriptionId)
}
}

private string Subscribe(ConnectionInfo connectioninfo, string query)
private SubscriptionCallbacks Subscribe(ConnectionInfo connectioninfo, string query)
{
var credentialType = connectioninfo.SupportsActiveSubscriptions ? "None" : _subscriptionListener.GetCredentialTypeForBinding(connectioninfo.Binding);

Expand All @@ -147,7 +107,9 @@ private string Subscribe(ConnectionInfo connectioninfo, string query)
propertyBag.Add("Password", "subscriber");
}

return ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag));
var subscriptionUri = ConnectionInfo.DoWithExceptionTranslation(() => connectioninfo.Proxy.Create("System.Subscription", propertyBag));
var listener = CreateListener(connectioninfo);
return new SubscriptionCallbacks(subscriptionUri, listener, _activeSubscriberAddress);
}

private NotificationDeliveryServiceProxy CreateListener(ConnectionInfo connectionInfo)
Expand Down
2 changes: 2 additions & 0 deletions Src/SwqlStudio/SwqlStudio.csproj
Expand Up @@ -226,6 +226,8 @@
<Compile Include="ServerList.cs" />
<Compile Include="ServerType.cs" />
<Compile Include="Subscription.cs" />
<Compile Include="Subscriptions\SubscriptionCallbacks.cs" />
<Compile Include="Subscriptions\SubscriptionInfo.cs" />
<Compile Include="Subscriptions\SubscriptionManager.cs" />
<Compile Include="Subscriptions\SubscriptionServiceHost.cs" />
<Compile Include="SubscriptionTab.cs">
Expand Down

0 comments on commit a1f7a3b

Please sign in to comment.