Skip to content

Commit

Permalink
First implementation of adding ActivitySource to common operations to…
Browse files Browse the repository at this point in the history
… enable OpenTelemetry scenarios. (#1261)

Linking existing context for publish if one exists

dotnet format

Adding standard tags without allocating.

Updating code after comments

Moving ActivitySource tests to Integration

Making sure TaskCompletionSources execute asynchronously

Moving activity source tests to sequential integration
  • Loading branch information
stebet committed Jan 26, 2024
1 parent 52add14 commit 220f5a5
Show file tree
Hide file tree
Showing 34 changed files with 876 additions and 88 deletions.
6 changes: 6 additions & 0 deletions .gitignore
@@ -1,3 +1,4 @@
*.orig
*.log
_site/

Expand Down Expand Up @@ -28,6 +29,11 @@ test.sh
test-output.log
InternalTrace*
nunit-agent*
#################
## JetBrains Rider
#################
.idea/

#################
## Visual Studio
#################
Expand Down
1 change: 1 addition & 0 deletions RabbitMQDotNetClient.sln
Expand Up @@ -5,6 +5,7 @@ MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
.gitignore = .gitignore
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQ.Client", "projects\RabbitMQ.Client\RabbitMQ.Client.csproj", "{8C554257-5ECC-45DB-873D-560BFBB74EC8}"
Expand Down
9 changes: 7 additions & 2 deletions projects/RabbitMQ.Client/PublicAPI.Unshipped.txt
Expand Up @@ -481,8 +481,8 @@ RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events
RabbitMQ.Client.IChannel.BasicGetAsync(string queue, bool autoAck) -> System.Threading.Tasks.ValueTask<RabbitMQ.Client.BasicGetResult>
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString exchange, RabbitMQ.Client.CachedString routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, System.ReadOnlyMemory<byte> body = default(System.ReadOnlyMemory<byte>), bool mandatory = false) -> System.Threading.Tasks.ValueTask
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
Expand Down Expand Up @@ -658,6 +658,7 @@ RabbitMQ.Client.PublicationAddress
RabbitMQ.Client.PublicationAddress.PublicationAddress(string exchangeType, string exchangeName, string routingKey) -> void
RabbitMQ.Client.QueueDeclareOk
RabbitMQ.Client.QueueDeclareOk.QueueDeclareOk(string queueName, uint messageCount, uint consumerCount) -> void
RabbitMQ.Client.RabbitMQActivitySource
RabbitMQ.Client.ReadOnlyBasicProperties
RabbitMQ.Client.ReadOnlyBasicProperties.AppId.get -> string
RabbitMQ.Client.ReadOnlyBasicProperties.ClusterId.get -> string
Expand Down Expand Up @@ -851,6 +852,8 @@ static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync<T>(this RabbitMQ.Cli
static RabbitMQ.Client.PublicationAddress.Parse(string uriLikeString) -> RabbitMQ.Client.PublicationAddress
static RabbitMQ.Client.PublicationAddress.TryParse(string uriLikeString, out RabbitMQ.Client.PublicationAddress result) -> bool
static RabbitMQ.Client.QueueDeclareOk.implicit operator string(RabbitMQ.Client.QueueDeclareOk declareOk) -> string
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.get -> bool
static RabbitMQ.Client.RabbitMQActivitySource.UseRoutingKeyAsOperationName.set -> void
static RabbitMQ.Client.TcpClientAdapter.GetMatchingHost(System.Collections.Generic.IReadOnlyCollection<System.Net.IPAddress> addresses, System.Net.Sockets.AddressFamily addressFamily) -> System.Net.IPAddress
static RabbitMQ.Client.TimerBasedCredentialRefresherEventSource.Log.get -> RabbitMQ.Client.TimerBasedCredentialRefresherEventSource
static readonly RabbitMQ.Client.CachedString.Empty -> RabbitMQ.Client.CachedString
Expand Down Expand Up @@ -881,6 +884,8 @@ virtual RabbitMQ.Client.TcpClientAdapter.Dispose(bool disposing) -> void
virtual RabbitMQ.Client.TcpClientAdapter.GetStream() -> System.Net.Sockets.NetworkStream
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.get -> System.TimeSpan
virtual RabbitMQ.Client.TcpClientAdapter.ReceiveTimeout.set -> void
~const RabbitMQ.Client.RabbitMQActivitySource.PublisherSourceName = "RabbitMQ.Client.Publisher" -> string
~const RabbitMQ.Client.RabbitMQActivitySource.SubscriberSourceName = "RabbitMQ.Client.Subscriber" -> string
~override RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.ReadOnlyBasicProperties properties, System.ReadOnlyMemory<byte> body) -> System.Threading.Tasks.Task
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(RabbitMQ.Client.IEndpointResolver endpointResolver, string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
~RabbitMQ.Client.ConnectionFactory.CreateConnectionAsync(string clientProvidedName, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IConnection>
Expand Down
3 changes: 3 additions & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Expand Up @@ -80,4 +80,7 @@
<PackageReference Include="System.IO.Pipelines" Version="8.0.0" />
</ItemGroup>

<ItemGroup Condition="$(TargetFramework) == 'netstandard2.0'">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="7.0.2" />
</ItemGroup>
</Project>
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Expand Up @@ -192,7 +192,7 @@ public interface IChannel : IDisposable
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

/// <summary>
Expand All @@ -203,7 +203,7 @@ ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in
/// Routing key must be shorter than 255 bytes.
/// </para>
/// </remarks>
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader;

#nullable disable
Expand Down
6 changes: 3 additions & 3 deletions projects/RabbitMQ.Client/client/api/IChannelExtensions.cs
Expand Up @@ -89,14 +89,14 @@ public static class IChannelExtensions
public static ValueTask BasicPublishAsync<T>(this IChannel channel, PublicationAddress addr, in T basicProperties, ReadOnlyMemory<byte> body)
where T : IReadOnlyBasicProperties, IAmqpHeader
{
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, in basicProperties, body);
return channel.BasicPublishAsync(addr.ExchangeName, addr.RoutingKey, basicProperties, body);
}

public static ValueTask BasicPublishAsync(this IChannel channel, string exchange, string routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

public static ValueTask BasicPublishAsync(this IChannel channel, CachedString exchange, CachedString routingKey, ReadOnlyMemory<byte> body = default, bool mandatory = false)
=> channel.BasicPublishAsync(exchange, routingKey, in EmptyBasicProperty.Empty, body, mandatory);
=> channel.BasicPublishAsync(exchange, routingKey, EmptyBasicProperty.Empty, body, mandatory);

#nullable disable

Expand Down
@@ -1,4 +1,5 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

Expand Down Expand Up @@ -78,8 +79,9 @@ await _registeredWrapper.InvokeAsync(this, new ConsumerEventArgs(new[] { consume
public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
in ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
var deliverEventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
// No need to call base, it's empty.
return _receivedWrapper.InvokeAsync(this, new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
return BasicDeliverWrapper(deliverEventArgs);
}

///<summary>Fires the Shutdown event.</summary>
Expand All @@ -93,5 +95,13 @@ await _shutdownWrapper.InvokeAsync(this, reason)
.ConfigureAwait(false);
}
}

private async Task BasicDeliverWrapper(BasicDeliverEventArgs eventArgs)
{
using (Activity activity = RabbitMQActivitySource.Deliver(eventArgs))
{
await _receivedWrapper.InvokeAsync(this, eventArgs).ConfigureAwait(false);
}
}
}
}
11 changes: 7 additions & 4 deletions projects/RabbitMQ.Client/client/events/EventingBasicConsumer.cs
Expand Up @@ -30,6 +30,7 @@
//---------------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Events
Expand Down Expand Up @@ -88,10 +89,12 @@ public override void HandleBasicConsumeOk(string consumerTag)
public override async Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
ReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(
this,
new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body));
BasicDeliverEventArgs eventArgs = new BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
using (Activity activity = RabbitMQActivitySource.SubscriberHasListeners ? RabbitMQActivitySource.Deliver(eventArgs) : default)
{
await base.HandleBasicDeliverAsync(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
Received?.Invoke(this, eventArgs);
}
}

///<summary>Fires the Shutdown event.</summary>
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/AsyncRpcContinuations.cs
Expand Up @@ -72,6 +72,8 @@ public AsyncRpcContinuation(TimeSpan continuationTimeout)
_tcsConfiguredTaskAwaitable = _tcs.Task.ConfigureAwait(false);
}

internal DateTime StartTime { get; } = DateTime.UtcNow;

public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
{
return _tcsConfiguredTaskAwaitable.GetAwaiter();
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Expand Up @@ -290,13 +290,13 @@ public ValueTask<BasicGetResult> BasicGetAsync(string queue, bool autoAck)
public ValueTask BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue)
=> InnerChannel.BasicNackAsync(deliveryTag, multiple, requeue);

public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, in TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
public ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey, TProperties basicProperties, ReadOnlyMemory<byte> body, bool mandatory)
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
=> InnerChannel.BasicPublishAsync(exchange, routingKey, in basicProperties, body, mandatory);
=> InnerChannel.BasicPublishAsync(exchange, routingKey, basicProperties, body, mandatory);

public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global)
{
Expand Down

0 comments on commit 220f5a5

Please sign in to comment.