Skip to content

Commit

Permalink
Merge pull request #740 from paolosalvatori/event-hubs-investigation
Browse files Browse the repository at this point in the history
Abstracting EventHubs EventData
  • Loading branch information
paolosalvatori committed Sep 8, 2023
2 parents ad382cc + c90eca8 commit 3cfbe19
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 123 deletions.
50 changes: 50 additions & 0 deletions src/Common/Abstractions/EventDataMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
namespace ServiceBusExplorer.Abstractions
{
using System;
using System.Collections.Generic;
using System.IO;
using Microsoft.Azure.Amqp;
using Microsoft.ServiceBus.Messaging;

public class EventDataMessage : IDisposable
{
readonly EventData eventData;
Stream stream;

public EventDataMessage(EventData eventData)
{
this.eventData = eventData;
stream = eventData.GetBodyStream();
Properties = eventData.Properties;
PartitionKey = eventData.PartitionKey;
SequenceNumber = eventData.SequenceNumber;
Offset = eventData.Offset;
SerializedSizeInBytes = eventData.SerializedSizeInBytes;
EnqueuedTimeUtc = eventData.EnqueuedTimeUtc;
SystemProperties = eventData.SystemProperties;
}

public string PartitionKey { get; private set; }
public long SequenceNumber { get; private set; }
public long SerializedSizeInBytes { get; private set; }
public string Offset { get; private set; }
public DateTime EnqueuedTimeUtc { get; private set; }
public IDictionary<string, object> Properties { get; private set; }
public IDictionary<string, object> SystemProperties { get; private set; }

public Stream GetBodyStream()
{
var memoryStream = new MemoryStream();

stream.CopyTo(memoryStream);
stream.Seek(0L, SeekOrigin.Begin);

return memoryStream;
}

public void Dispose()
{
eventData.Dispose();
}
}
}
98 changes: 50 additions & 48 deletions src/Common/Helpers/ServiceBusHelper.cs

Large diffs are not rendered by default.

27 changes: 13 additions & 14 deletions src/ServiceBus/Helpers/ServiceBusPurger.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team
// Microsoft Azure Customer Advisory Team
//
// This sample is supplemental to the technical guidance published on my personal
// blog at http://blogs.msdn.com/b/paolos/.
//
// blog at http://blogs.msdn.com/b/paolos/.
//
// Author: Paolo Salvatori
//=======================================================================================
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
// http://www.apache.org/licenses/LICENSE-2.0
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// PERMISSIONS AND LIMITATIONS UNDER THE LICENSE.
//=======================================================================================
#endregion
Expand Down Expand Up @@ -100,7 +100,6 @@ private async Task<long> PurgeSessionEntity(TEntity entity)
{
long totalMessagesPurged = 0;
var consecutiveSessionTimeOuts = 0;
ServiceBusSessionReceiver sessionReceiver = null;
long messagesToPurgeCount = await GetMessageCount(entity, deadLetterQueueData: false)
.ConfigureAwait(false);

Expand All @@ -117,9 +116,9 @@ private async Task<long> PurgeSessionEntity(TEntity entity)

while (consecutiveSessionTimeOuts < enoughZeroReceives && totalMessagesPurged < messagesToPurgeCount)
{
sessionReceiver = await CreateServiceBusSessionReceiver(entity,
client,
purgeDeadLetterQueueInstead: false)
var sessionReceiver = await CreateServiceBusSessionReceiver(entity,
client,
purgeDeadLetterQueueInstead: false)
.ConfigureAwait(false);

var consecutiveZeroBatchReceives = 0;
Expand Down Expand Up @@ -236,7 +235,7 @@ private async Task<long> DoPurgeNonSessionEntity(TEntity entity, long messagesTo
await receiver.CloseAsync().ConfigureAwait(false);
}
}
}); // End of lambda
}); // End of lambda
}

await Task.WhenAll(tasks).ConfigureAwait(false);
Expand Down
3 changes: 1 addition & 2 deletions src/ServiceBusExplorer/Controls/HandleQueueControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ public partial class HandleQueueControl : UserControl

private QueueDescription queueDescription = default!;
private readonly ServiceBusHelper serviceBusHelper = default!;
private readonly ServiceBusHelper2 serviceBusHelper2 = default!;
private readonly WriteToLogDelegate writeToLog = default!;
private readonly string path = default!;
private readonly List<TabPage> hiddenPages = new List<TabPage>();
Expand Down Expand Up @@ -297,9 +296,9 @@ public partial class HandleQueueControl : UserControl
{
this.writeToLog = writeToLog;
this.serviceBusHelper = serviceBusHelper;
this.serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();
this.path = path;
this.queueDescription = queueDescription;
var serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();

if (!serviceBusHelper2.ConnectionStringContainsEntityPath())
{
Expand Down
3 changes: 1 addition & 2 deletions src/ServiceBusExplorer/Controls/HandleTopicControl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public partial class HandleTopicControl : UserControl
private readonly List<TabPage> hiddenPages = new List<TabPage>();
private TopicDescription topicDescription;
private readonly ServiceBusHelper serviceBusHelper;
private readonly ServiceBusHelper2 serviceBusHelper2 = default!;
private readonly WriteToLogDelegate writeToLog;
private readonly bool premiumNamespace;
private readonly string path;
Expand Down Expand Up @@ -153,7 +152,7 @@ public HandleTopicControl(WriteToLogDelegate writeToLog, ServiceBusHelper servic
{
this.writeToLog = writeToLog;
this.serviceBusHelper = serviceBusHelper;
this.serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();
var serviceBusHelper2 = serviceBusHelper.GetServiceBusHelper2();

if (!serviceBusHelper2.ConnectionStringContainsEntityPath())
{
Expand Down
77 changes: 47 additions & 30 deletions src/ServiceBusExplorer/Controls/PartitionListenerControl.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
#region Copyright
//=======================================================================================
// Microsoft Azure Customer Advisory Team
// Microsoft Azure Customer Advisory Team
//
// This sample is supplemental to the technical guidance published on my personal
// blog at http://blogs.msdn.com/b/paolos/.
//
// blog at http://blogs.msdn.com/b/paolos/.
//
// Author: Paolo Salvatori
//=======================================================================================
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
//
// LICENSED UNDER THE APACHE LICENSE, VERSION 2.0 (THE "LICENSE"); YOU MAY NOT USE THESE
// FILES EXCEPT IN COMPLIANCE WITH THE LICENSE. YOU MAY OBTAIN A COPY OF THE LICENSE AT
// http://www.apache.org/licenses/LICENSE-2.0
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING, SOFTWARE DISTRIBUTED UNDER THE
// LICENSE IS DISTRIBUTED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, EITHER EXPRESS OR IMPLIED. SEE THE LICENSE FOR THE SPECIFIC LANGUAGE GOVERNING
// PERMISSIONS AND LIMITATIONS UNDER THE LICENSE.
//=======================================================================================
#endregion
Expand Down Expand Up @@ -45,6 +45,8 @@

namespace ServiceBusExplorer.Controls
{
using Abstractions;

public partial class PartitionListenerControl : UserControl
{
#region Private Constants
Expand Down Expand Up @@ -115,14 +117,13 @@ public partial class PartitionListenerControl : UserControl
private readonly WriteToLogDelegate writeToLog;
private readonly Func<Task> stopLog;
private readonly Action startLog;
private EventData currentEventData;
private int grouperEventDataCustomPropertiesWidth;
private EventDataMessage currentEventData;
private int currentMessageRowIndex;
private readonly int partitionCount;
private bool sorting;
private readonly SortableBindingList<EventData> eventDataBindingList = new SortableBindingList<EventData> { AllowNew = false, AllowEdit = false, AllowRemove = false };
private readonly SortableBindingList<EventDataMessage> eventDataBindingList = new SortableBindingList<EventDataMessage> { AllowNew = false, AllowEdit = false, AllowRemove = false };
private readonly IList<PartitionRuntimeInformation> partitionRuntumeInformationList = new List<PartitionRuntimeInformation>();
private BlockingCollection<EventData> eventDataCollection = new BlockingCollection<EventData>();
private BlockingCollection<EventDataMessage> eventDataCollection = new BlockingCollection<EventDataMessage>();
private System.Timers.Timer timer;
private long receiverMessageNumber;
private long receiverMessageSizeTotal;
Expand Down Expand Up @@ -151,6 +152,7 @@ public partial class PartitionListenerControl : UserControl
private bool clearing;
private bool cleared;
private readonly string iotHubConnectionString;
int grouperEventDataCustomPropertiesWidth;
public Task AsyncTrackEventDataTask { get; private set; }

#endregion
Expand Down Expand Up @@ -315,12 +317,12 @@ private void InitializeControls()
eventDataDataGridView.DefaultCellStyle.SelectionBackColor = Color.FromArgb(92, 125, 150);
eventDataDataGridView.DefaultCellStyle.SelectionForeColor = SystemColors.Window;

// Set RowHeadersDefaultCellStyle.SelectionBackColor so that its default
// Set RowHeadersDefaultCellStyle.SelectionBackColor so that its default
// value won't override DataGridView.DefaultCellStyle.SelectionBackColor.
eventDataDataGridView.RowHeadersDefaultCellStyle.SelectionBackColor = Color.FromArgb(153, 180, 209);

// Set the background color for all rows and for alternating rows.
// The value for alternating rows overrides the value for all rows.
// Set the background color for all rows and for alternating rows.
// The value for alternating rows overrides the value for all rows.
eventDataDataGridView.RowsDefaultCellStyle.BackColor = SystemColors.Window;
eventDataDataGridView.RowsDefaultCellStyle.ForeColor = SystemColors.ControlText;
//eventDataDataGridView.AlternatingRowsDefaultCellStyle.BackColor = Color.White;
Expand Down Expand Up @@ -631,7 +633,7 @@ private void CalculateLastColumnWidth(object sender)

private void eventDataDataGridView_RowEnter(object sender, DataGridViewCellEventArgs e)
{
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
currentMessageRowIndex = e.RowIndex;
if (bindingList == null)
{
Expand All @@ -644,11 +646,26 @@ private void eventDataDataGridView_RowEnter(object sender, DataGridViewCellEvent
currentEventData = bindingList[e.RowIndex];
eventDataPropertyGrid.SelectedObject = currentEventData;

LanguageDetector.SetFormattedMessage(serviceBusHelper, currentEventData.Clone(), txtMessageText);
try
{
//var eventData = currentEventData.Clone();
LanguageDetector.SetFormattedMessage(serviceBusHelper, currentEventData, txtMessageText);
}
catch (Exception exception)
{
HandleException(exception);
}

var listViewItems = currentEventData.Properties.Select(p => new ListViewItem(new[] { p.Key, (p.Value ?? string.Empty).ToString() })).ToArray();
eventDataPropertyListView.Items.Clear();
eventDataPropertyListView.Items.AddRange(listViewItems);
try
{
var listViewItems = currentEventData.Properties.Select(p => new ListViewItem(new[] { p.Key, (p.Value ?? string.Empty).ToString() })).ToArray();
eventDataPropertyListView.Items.Clear();
eventDataPropertyListView.Items.AddRange(listViewItems);
}
catch (Exception exception)
{
HandleException(exception);
}
}

private void tabPageMessages_Resize(object sender, EventArgs e)
Expand Down Expand Up @@ -691,7 +708,7 @@ private void eventDataDataGridView_CellDoubleClick(object sender, DataGridViewCe
{
return;
}
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
if (bindingList == null)
{
return;
Expand Down Expand Up @@ -831,7 +848,7 @@ private async void btnStart_Click(object sender, EventArgs e)
checkBoxCheckpoint,
cancellationTokenSource.Token)
{
TrackEvent = ev => Invoke(new Action<EventData>(m => eventDataCollection.Add(m)), ev),
TrackEvent = ev => Invoke(new Action<EventData>(m => eventDataCollection.Add(new EventDataMessage(m))), ev),
GetElapsedTime = GetElapsedTime,
UpdateStatistics = UpdateStatistics,
WriteToLog = writeToLog,
Expand Down Expand Up @@ -953,7 +970,7 @@ private void btnClear_Click(object sender, EventArgs e)
clearing = true;
cleared = true;
eventDataCollection.Dispose();
eventDataCollection = new BlockingCollection<EventData>();
eventDataCollection = new BlockingCollection<EventDataMessage>();
ClearTrackedMessages();
ClearStatistics();
ClearCharts();
Expand Down Expand Up @@ -1166,8 +1183,8 @@ private void RefreshGraph()
if (InvokeRequired)
{
Invoke(new Action<long, long, long, bool>(InternalUpdateStatistics),
new object[] { receiveTuple.Item1,
receiveTuple.Item2,
new object[] { receiveTuple.Item1,
receiveTuple.Item2,
receiveTuple.Item3,
graph});
}
Expand Down Expand Up @@ -1338,7 +1355,7 @@ private void PartitionListenerControl_Paint(object sender, PaintEventArgs e)
cboReceiverInspector.Size.Height + 1);
}

/// <summary>
/// <summary>
/// Clean up any resources being used.
/// </summary>
/// <param name="disposing">true if managed resources should be disposed; otherwise, false.</param>
Expand Down Expand Up @@ -1501,7 +1518,7 @@ private void saveSelectedEventToolStripMenuItem_Click(object sender, EventArgs e
{
return;
}
var bindingList = eventDataBindingSource.DataSource as BindingList<EventData>;
var bindingList = eventDataBindingSource.DataSource as BindingList<EventDataMessage>;
if (bindingList == null)
{
return;
Expand Down Expand Up @@ -1543,8 +1560,8 @@ private void saveSelectedEventsToolStripMenuItem_Click(object sender, EventArgs
{
return;
}
var messages = eventDataDataGridView.SelectedRows.Cast<DataGridViewRow>().Select(r => r.DataBoundItem as EventData);
IEnumerable<EventData> events = messages as EventData[] ?? messages.ToArray();
var messages = eventDataDataGridView.SelectedRows.Cast<DataGridViewRow>().Select(r => r.DataBoundItem as EventDataMessage);
IEnumerable<EventDataMessage> events = messages as EventDataMessage[] ?? messages.ToArray();
if (!events.Any())
{
return;
Expand Down

0 comments on commit 3cfbe19

Please sign in to comment.