Skip to content

Commit

Permalink
Merge pull request #34 from Particular/imessagesession-support
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanFeldman committed Jan 14, 2021
2 parents a613c5b + 2ce7a0f commit 06dc9e3
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 3 deletions.
Expand Up @@ -4,6 +4,18 @@ namespace NServiceBus
{
public AwsLambdaSQSEndpoint(System.Func<Amazon.Lambda.Core.ILambdaContext, NServiceBus.AwsLambdaSQSEndpointConfiguration> configurationFactory) { }
public System.Threading.Tasks.Task Process(Amazon.Lambda.SQSEvents.SQSEvent @event, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default) { }
public System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
public System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext) { }
}
public class AwsLambdaSQSEndpointConfiguration
{
Expand All @@ -19,5 +31,17 @@ namespace NServiceBus
public interface IAwsLambdaSQSEndpoint
{
System.Threading.Tasks.Task Process(Amazon.Lambda.SQSEvents.SQSEvent @event, Amazon.Lambda.Core.ILambdaContext lambdaContext, System.Threading.CancellationToken cancellationToken = default);
System.Threading.Tasks.Task Publish(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Publish(object message, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Publish<T>(System.Action<T> messageConstructor, NServiceBus.PublishOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Send(object message, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Send(object message, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Send<T>(System.Action<T> messageConstructor, NServiceBus.SendOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Subscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Subscribe(System.Type eventType, NServiceBus.SubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, Amazon.Lambda.Core.ILambdaContext lambdaContext);
System.Threading.Tasks.Task Unsubscribe(System.Type eventType, NServiceBus.UnsubscribeOptions options, Amazon.Lambda.Core.ILambdaContext lambdaContext);
}
}
98 changes: 97 additions & 1 deletion src/NServiceBus.AwsLambda.SQS/AwsLambdaEndpoint.cs
Expand Up @@ -62,7 +62,8 @@ async Task InitializeEndpointIfNecessary(ILambdaContext executionContext, Cancel
var configuration = configurationFactory(executionContext);
await Initialize(configuration).ConfigureAwait(false);
LogManager.GetLogger("Previews").Info("NServiceBus.AwsLambda.SQS is a preview package. Preview packages are licensed separately from the rest of the Particular Software platform and have different support guarantees. You can view the license at https://particular.net/eula/previews and the support policy at https://docs.particular.net/previews/support-policy. Customer adoption drives whether NServiceBus.AwsLambda.SQS will be incorporated into the Particular Software platform. Let us know you are using it, if you haven't already, by emailing us at support@particular.net.");
await Endpoint.Start(configuration.EndpointConfiguration).ConfigureAwait(false);

endpoint = await Endpoint.Start(configuration.EndpointConfiguration).ConfigureAwait(false);

pipeline = configuration.PipelineInvoker;
}
Expand All @@ -74,6 +75,100 @@ async Task InitializeEndpointIfNecessary(ILambdaContext executionContext, Cancel
}
}

/// <inheritdoc />
public async Task Send(object message, SendOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Send(message, options).ConfigureAwait(false);
}

/// <inheritdoc />
public Task Send(object message, ILambdaContext lambdaContext)
{
return Send(message, new SendOptions(), lambdaContext);
}

/// <inheritdoc />
public async Task Send<T>(Action<T> messageConstructor, SendOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Send(messageConstructor, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Send<T>(Action<T> messageConstructor, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await Send(messageConstructor, new SendOptions(), lambdaContext).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Publish(message, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Publish(messageConstructor, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish(object message, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Publish(message).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Publish<T>(Action<T> messageConstructor, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Publish(messageConstructor).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Subscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Subscribe(Type eventType, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Subscribe(eventType).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Unsubscribe(eventType, options).ConfigureAwait(false);
}

/// <inheritdoc />
public async Task Unsubscribe(Type eventType, ILambdaContext lambdaContext)
{
await InitializeEndpointIfNecessary(lambdaContext).ConfigureAwait(false);

await endpoint.Unsubscribe(eventType).ConfigureAwait(false);
}

async Task Initialize(AwsLambdaSQSEndpointConfiguration configuration)
{
var settingsHolder = configuration.AdvancedConfiguration.GetSettings();
Expand Down Expand Up @@ -350,6 +445,7 @@ static void LogPoisonMessage(string messageId, Exception exception)
readonly Func<ILambdaContext, AwsLambdaSQSEndpointConfiguration> configurationFactory;
readonly SemaphoreSlim semaphoreLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
PipelineInvoker pipeline;
IEndpointInstance endpoint;
IAmazonSQS sqsClient;
IAmazonS3 s3Client;
string s3BucketForLargeMessages;
Expand Down
67 changes: 66 additions & 1 deletion src/NServiceBus.AwsLambda.SQS/IAwsLambdaSQSEndpoint.cs
@@ -1,4 +1,6 @@
namespace NServiceBus
using System;

namespace NServiceBus
{
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -15,5 +17,68 @@ public interface IAwsLambdaSQSEndpoint
/// Processes a messages received from an SQS trigger using the NServiceBus message pipeline.
/// </summary>
Task Process(SQSEvent @event, ILambdaContext lambdaContext, CancellationToken cancellationToken = default);

/// <summary>
/// Sends the provided message.
/// </summary>
Task Send(object message, SendOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Sends the provided message.
/// </summary>
Task Send(object message, ILambdaContext lambdaContext);

/// <summary>
/// Instantiates a message of type T and sends it.
/// </summary>
Task Send<T>(Action<T> messageConstructor, SendOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Instantiates a message of type T and sends it.
/// </summary>
Task Send<T>(Action<T> messageConstructor, ILambdaContext lambdaContext);

/// <summary>
/// Publish the message to subscribers.
/// </summary>
Task Publish(object message, PublishOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish<T>(Action<T> messageConstructor, PublishOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish(object message, ILambdaContext lambdaContext);

/// <summary>
/// Instantiates a message of type T and publishes it.
/// </summary>
Task Publish<T>(Action<T> messageConstructor, ILambdaContext lambdaContext);

/// <summary>
/// Subscribes to receive published messages of the specified type.
/// This method is only necessary if you turned off auto-subscribe.
/// </summary>
Task Subscribe(Type eventType, SubscribeOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Subscribes to receive published messages of the specified type.
/// This method is only necessary if you turned off auto-subscribe.
/// </summary>
Task Subscribe(Type eventType, ILambdaContext lambdaContext);

/// <summary>
/// Unsubscribes to receive published messages of the specified type.
/// </summary>
Task Unsubscribe(Type eventType, UnsubscribeOptions options, ILambdaContext lambdaContext);

/// <summary>
/// Unsubscribes to receive published messages of the specified type.
/// </summary>
Task Unsubscribe(Type eventType, ILambdaContext lambdaContext);

}
}
Expand Up @@ -9,7 +9,7 @@
<ItemGroup>
<PackageReference Include="Amazon.Lambda.Core" Version="1.2.0" />
<PackageReference Include="Amazon.Lambda.SQSEvents" Version="1.2.0" />
<PackageReference Include="NServiceBus.AmazonSQS" Version="[5.1.1, 6.0.0)" />
<PackageReference Include="NServiceBus.AmazonSQS" Version="5.1.1" />
<PackageReference Include="Particular.CodeRules" Version="0.7.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="0.9.0" PrivateAssets="All" />
</ItemGroup>
Expand Down

0 comments on commit 06dc9e3

Please sign in to comment.