Skip to content

Commit

Permalink
Backport native integration and message preservation (#258)
Browse files Browse the repository at this point in the history
* Backport native integration and message preservation

* Update SQS version

* Update dependencies and TFMs

* Modernize build

---------

Co-authored-by: danielmarbach <daniel.marbach@openplace.net>
  • Loading branch information
kentdr and danielmarbach committed Apr 26, 2023
1 parent d6fa8e1 commit eeb2a56
Show file tree
Hide file tree
Showing 15 changed files with 839 additions and 88 deletions.
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Expand Up @@ -13,7 +13,7 @@ env:
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
NSERVICEBUS_AMAZONSQS_S3BUCKET: ${{ secrets.NSERVICEBUS_AMAZONSQS_S3BUCKET }}
jobs:
build:
build:
name: ${{ matrix.name }}
runs-on: ${{ matrix.os }}
strategy:
Expand All @@ -30,24 +30,24 @@ jobs:
SECRETS_AVAILABLE: ${{ secrets.SECRETS_AVAILABLE }}
shell: pwsh
run: exit $(If ($env:SECRETS_AVAILABLE -eq 'true') { 0 } Else { 1 })
- name: Checkout
uses: actions/checkout@v3.2.0
- name: Checkout
uses: actions/checkout@v3.5.2
with:
fetch-depth: 0
fetch-depth: 0
- name: Setup .NET SDK
uses: actions/setup-dotnet@v3.0.3
with:
dotnet-version: |
5.0.x
2.1.x
7.0.x
6.0.x
- name: Build
run: dotnet build src --configuration Release
- name: Upload packages
if: matrix.name == 'Windows'
uses: actions/upload-artifact@v3.1.1
uses: actions/upload-artifact@v3.1.2
with:
name: NuGet packages
path: nugets/
retention-days: 7
- name: Run tests
uses: Particular/run-tests-action@v1.4.0
uses: Particular/run-tests-action@v1.5.1
6 changes: 3 additions & 3 deletions .github/workflows/release.yml
Expand Up @@ -11,13 +11,13 @@ jobs:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v3.3.0
uses: actions/checkout@v3.5.2
with:
fetch-depth: 0
- name: Setup .NET SDK
uses: actions/setup-dotnet@v3.0.3
with:
dotnet-version: 7.0.x
dotnet-version: 7.0.x
- name: Build
run: dotnet build src --configuration Release
- name: Sign NuGet packages
Expand All @@ -36,4 +36,4 @@ jobs:
- name: Deploy
uses: Particular/push-octopus-package-action@v1.1.0
with:
octopus-deploy-api-key: ${{ secrets.OCTOPUS_DEPLOY_API_KEY }}
octopus-deploy-api-key: ${{ secrets.OCTOPUS_DEPLOY_API_KEY }}
Expand Up @@ -2,21 +2,30 @@
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Amazon.Lambda.SQSEvents;
using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;
using Amazon.SimpleNotificationService;
using Amazon.SQS;
using Amazon.SQS.Model;
using NUnit.Framework;

[TestFixture]
class AwsLambdaSQSEndpointTestBase
{
protected const string DelayedDeliveryQueueSuffix = "-delay.fifo";
const int QueueDelayInSeconds = 900; // 15 * 60

protected string QueueName { get; set; }

protected string DelayQueueName { get; set; }

protected string ErrorQueueName { get; set; }

protected string QueueNamePrefix { get; set; }
Expand Down Expand Up @@ -51,6 +60,17 @@ public async Task Setup()
}
});
RegisterQueueNameToCleanup(ErrorQueueName);
DelayQueueName = $"{QueueName}{DelayedDeliveryQueueSuffix}";
_ = await sqsClient.CreateQueueAsync(new CreateQueueRequest(DelayQueueName)
{
Attributes = new Dictionary<string, string>
{
{ "FifoQueue", "true" },
{ QueueAttributeName.DelaySeconds, QueueDelayInSeconds.ToString(CultureInfo.InvariantCulture)}
}
});
RegisterQueueNameToCleanup(DelayQueueName);

s3Client = CreateS3Client();
KeyPrefix = QueueNamePrefix;
}
Expand Down Expand Up @@ -90,7 +110,7 @@ protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count) where T
{
var endpointConfiguration = new EndpointConfiguration($"{QueueNamePrefix}sender");
endpointConfiguration.SendOnly();
endpointConfiguration.UsePersistence<InMemoryPersistence>();

var transport = endpointConfiguration.UseTransport<SqsTransport>();
transport.ClientFactory(CreateSQSClient);
var s3 = transport.S3(BucketName, KeyPrefix);
Expand All @@ -112,13 +132,50 @@ protected async Task<SQSEvent> GenerateAndReceiveSQSEvent<T>(int count) where T
{
MaxNumberOfMessages = count,
WaitTimeSeconds = 20,
AttributeNames = new List<string> { "SentTimestamp" },
MessageAttributeNames = new List<string> { "*" }
};

var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest);

return receivedMessages.ToSQSEvent();
}

protected async Task<SQSEvent> GenerateAndReceiveNativeSQSEvent(Dictionary<string, MessageAttributeValue> messageAttributeValues, string message, bool base64Encode = true)
{
var body = base64Encode ? Convert.ToBase64String(Encoding.UTF8.GetBytes(message)) : message;

var sendMessageRequest = new SendMessageRequest
{
QueueUrl = createdQueue.QueueUrl,
MessageAttributes = messageAttributeValues,
MessageBody = body
};

await sqsClient.SendMessageAsync(sendMessageRequest)
.ConfigureAwait(false);

var receiveRequest = new ReceiveMessageRequest(createdQueue.QueueUrl)
{
MaxNumberOfMessages = 10,
WaitTimeSeconds = 20,
AttributeNames = new List<string> { "SentTimestamp" },
MessageAttributeNames = new List<string> { "*" }
};

var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest);

return receivedMessages.ToSQSEvent();
}

protected async Task UploadMessageBodyToS3(string key, string body) =>
await s3Client.PutObjectAsync(new PutObjectRequest
{
Key = $"{key}",
BucketName = BucketName,
ContentBody = body
});

protected async Task<int> CountMessagesInErrorQueue()
{
var attReq = new GetQueueAttributesRequest { QueueUrl = createdErrorQueue.QueueUrl };
Expand All @@ -127,12 +184,33 @@ protected async Task<int> CountMessagesInErrorQueue()
return response.ApproximateNumberOfMessages;
}

protected async Task<SQSEvent> RetrieveMessagesInErrorQueue(int maxMessageCount = 10)
{
var receiveRequest = new ReceiveMessageRequest(createdErrorQueue.QueueUrl)
{
MaxNumberOfMessages = maxMessageCount,
WaitTimeSeconds = 20,
AttributeNames = new List<string> { "SentTimestamp" },
MessageAttributeNames = new List<string> { "*" }
};

var receivedMessages = await sqsClient.ReceiveMessageAsync(receiveRequest);

return receivedMessages.ToSQSEvent();
}

public static IAmazonSQS CreateSQSClient()
{
var credentials = new EnvironmentVariablesAWSCredentials();
return new AmazonSQSClient(credentials);
}

public static IAmazonSimpleNotificationService CreateSNSClient()
{
var credentials = new EnvironmentVariablesAWSCredentials();
return new AmazonSimpleNotificationServiceClient(credentials);
}

public static IAmazonS3 CreateS3Client()
{
var credentials = new EnvironmentVariablesAWSCredentials();
Expand Down
@@ -1,16 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFrameworks>net6.0;net7.0</TargetFrameworks>
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
<RootNamespace>NServiceBus.AwsLambda.Tests</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="1.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.4.2" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit eeb2a56

Please sign in to comment.