-
Notifications
You must be signed in to change notification settings - Fork 46
/
When_querying_queue_length_data.cs
130 lines (114 loc) · 4.83 KB
/
When_querying_queue_length_data.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
namespace ServiceControl.Monitoring.AcceptanceTests.Tests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.EndpointTemplates;
using Http.Diagrams;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NUnit.Framework;
class When_querying_queue_length_data : AcceptanceTest
{
[Test]
public async Task Should_report_via_http()
{
var endpointName = NServiceBus.AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(SendingEndpoint));
var instanceId = Guid.NewGuid();
var metricsInstanceId = Guid.NewGuid();
MonitoredEndpointDetails monitoredEndpointDetails = null;
MonitoredEndpointInstance instance1 = null;
MonitoredEndpointInstance instance2 = null;
await Define<TestContext>()
.WithEndpoint<SendingEndpoint>(c =>
{
c.CustomConfig(ec =>
{
ec.MakeInstanceUniquelyAddressable("1");
ec.UniquelyIdentifyRunningInstance()
.UsingCustomIdentifier(instanceId);
ec.EnableMetrics()
.SendMetricDataToServiceControl(Settings.DEFAULT_ENDPOINT_NAME, TimeSpan.FromSeconds(1));
});
c.DoNotFailOnErrorMessages();
c.When(async s =>
{
for (var i = 0; i < 10; i++)
{
await s.SendLocal(new SampleMessage());
}
});
})
.WithEndpoint<SendingEndpoint>(c =>
{
c.CustomConfig(ec =>
{
ec.MakeInstanceUniquelyAddressable("2");
ec.EnableMetrics()
.SendMetricDataToServiceControl(Settings.DEFAULT_ENDPOINT_NAME,
TimeSpan.FromSeconds(1),
metricsInstanceId.ToString("N"));
});
c.DoNotFailOnErrorMessages();
c.When(async s =>
{
for (var i = 0; i < 10; i++)
{
await s.SendLocal(new SampleMessage());
}
});
})
.Done(async c =>
{
var result = await this.TryGet<MonitoredEndpointDetails>($"/monitored-endpoints/{endpointName}");
if (!result.HasResult)
{
return false;
}
monitoredEndpointDetails = result.Item;
instance1 = monitoredEndpointDetails.Instances.SingleOrDefault(instance => instance.Id == instanceId.ToString("N"));
instance2 = monitoredEndpointDetails.Instances.SingleOrDefault(instance => instance.Id == metricsInstanceId.ToString("N"));
if (instance1 == null || instance2 == null)
{
return false;
}
// Metric names are fixed names and therefore are not following the casing rules of the chosen serializer
if (monitoredEndpointDetails.Digest.Metrics.TryGetValue("QueueLength", out var queueLength) && queueLength.Average == 0.0)
{
return false;
}
c.TestEnded.SetResult(true);
return true;
})
.Run();
}
class SendingEndpoint : EndpointConfigurationBuilder
{
public SendingEndpoint() =>
EndpointSetup<DefaultServerWithoutAudit>(c =>
{
c.LimitMessageProcessingConcurrencyTo(1);
});
class Handler(TestContext testContext) : IHandleMessages<SampleMessage>
{
public Task Handle(SampleMessage message, IMessageHandlerContext context) =>
//Concurrency limit 1 and this should block any processing on input queue
Task.WhenAny(
Task.Delay(TimeSpan.FromSeconds(30), context.CancellationToken),
testContext.TestEnded.Task
);
}
}
class SampleMessage : SampleBaseMessage
{
}
class SampleBaseMessage : IMessage
{
}
class TestContext : ScenarioContext
{
public TaskCompletionSource<bool> TestEnded = new();
}
}
}