Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ResumeAtRevision support to KV Watcher #491

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/NATS.Client.KeyValueStore/Internal/NatsKVWatcher.cs
Expand Up @@ -383,11 +383,17 @@ private async ValueTask<INatsJSConsumer> CreatePushConsumer(string origin)
config.HeadersOnly = true;
}

// Resume from a specific revision ?
if (sequence > 0)
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = sequence + 1;
}
else if (_opts.ResumeAtRevision > 0)
{
config.DeliverPolicy = ConsumerConfigDeliverPolicy.ByStartSequence;
config.OptStartSeq = _opts.ResumeAtRevision;
}

var consumer = await _context.CreateOrUpdateConsumerAsync(
_stream,
Expand Down
8 changes: 8 additions & 0 deletions src/NATS.Client.KeyValueStore/NatsKVOpts.cs
Expand Up @@ -41,6 +41,14 @@ public record NatsKVWatchOpts
/// Async function called when the enumerator reaches the end of data. Return True to break the async enumeration, False to allow the enumeration to continue.
/// </summary>
public Func<CancellationToken, ValueTask<bool>>? OnNoData { get; init; }

/// <summary>
/// The revision to start from, if set to 0 (default) this will be ignored.
/// <remarks>
/// Setting this to a non-zero value will cause the watcher to ignore the values for <see cref="IncludeHistory"/> and <see cref="UpdatesOnly"/>.
/// </remarks>
Comment on lines +47 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more consistent/correct to annotate that this option silently overrides other options? Or should we fail with an Exception that states that two options that are mutually exclusive have been set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably whichever pattern we follow in Consumer Creation is the same pattern we would want to follow here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caleblloyd I initially considered throwing 😄 I'll take a look what happens in other places. (any pointers to where we do stuff like this ?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought it might be a little abrasive to throw here, since IncludeHistory will default to false but when setting ResumeAtRevision it must be true. I think it makes sense to ignore IncludeHistory when ResumeAtRevision is set.

UpdatesOnly = true and ResumeAtRevision > 0 are 2 non-defaults though so I think it makes sense to throw when those are both set.

Regular Consumer Crerate isn't checking any pre-conditions it looks like, it's taking the Deliver Policy and Start Seq / Start Time separately. So if the Deliver Policy doesn't match the Server will fail with an error, and an exception will be thrown.

Copy link
Contributor Author

@niklasfp niklasfp Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought it might be a little abrasive to throw here, since IncludeHistory will default to false but when setting ResumeAtRevision it must be true. I think it makes sense to ignore IncludeHistory when ResumeAtRevision is set.

UpdatesOnly = true and ResumeAtRevision > 0 are 2 non-defaults though so I think it makes sense to throw when those are both set.

Ok, so if I update the docs, on NatsKVOpts to mention that it ignores IncludeHistory if ResumeAtRevision is > 0, and that UpdatesOnly and ResumeAtRevision is mutually exclusive.

At the same time update the docs on INatsKVStore.WatchAsync methods to mention that an exception will be thrown if UpdatesOnly = true and ResumeAtRevision >0. Then the question is, ArgumentException, InvalidOperationException or something completely different ? 😄

Regular Consumer Crerate isn't checking any pre-conditions it looks like, it's taking the Deliver Policy and Start Seq / Start Time separately. So if the Deliver Policy doesn't match the Server will fail with an error, and an exception will be thrown.

It feels dirty to let this go to the server, if we already know it will fail 😜

Copy link
Collaborator

@mtmk mtmk Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling GetKeys with the resume parameter might be a valid use case. I agree that including history is overridden due to the method overload. However, considering the following points, I suggest keeping it in the options:

  1. Cross-cutting functionality: resume seems to be applicable to multiple scenarios and not specific to a single use case. Including it in the options would make it more reusable and accessible across different parts of the codebase.

  2. Consistency with other NATS clients: It's important to maintain a certain degree of consistency with other client implementations, unless there are .NET specific differences.

  3. Impact on other clients: Altering the method signature or introducing overloads would require updating other client implementations as well. This can be a significant effort, and there should be a strong justification for making such changes unless we can say this is a .NET specific difference.

edit: in terms of validation, since NatsKVWatcher is the last destination for the options, should we validate conflicting options there (e.g. history and ResumeAtRevision)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mtmk I agree with almost everything you wrote there, I just wanted to highlight, that the nats kv watch opts opens up for some "special scenarios"

As for consistency with other clients, I looked at both.net1 and java, IIRC they both introduced overloads (not saying that we should though 😀)

Adding overloads (if done right does not break existing implememtations) (still not saying we should add overloads 😂)

I'll push the validation back or the watcher, and update the docs on all methods?
Since most methods accepting Nats kvopts end up throwing if resume at and include history is set

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking Go and JS implementations https://github.com/nats-io/nats.go/pull/1489/files https://github.com/nats-io/nats.deno/pull/651/files .. so we're not that consistent after all😄

I guess we can add overloads if you think it's going to be a better developer experience. it just feels like a less generic solution to me. wdyt @niklasfp @caleblloyd

Copy link
Contributor Author

@niklasfp niklasfp Apr 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was checking Go and JS implementations https://github.com/nats-io/nats.go/pull/1489/files https://github.com/nats-io/nats.deno/pull/651/files .. so we're not that consistent after all😄

I guess we can add overloads if you think it's going to be a better developer experience. it just feels like a less generic solution to me. wdyt @niklasfp @caleblloyd

Tbh, overloading it will not be as generic as you stated, and it might be ok to throw at a lover level, as long as we make it clear that some combinations are considered illegal and will throw.
I guess that all methods accepting natkv watch opts should have <exception... docs explaining this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also in favor of leaving it in Options and adding Validation.

If the Validation needs to run from multiple callers, we could add a internal void ApplyOptsToConsumerConfig(ConsumerConfig config) method on the Options, which performs validation, then mutates a ConsumerConfig?

We do something similar in NatsTlsOpts - where validation is performed at the time of use

/// </summary>
public ulong ResumeAtRevision { get; init; }
}

public record NatsKVDeleteOpts
Expand Down
84 changes: 84 additions & 0 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Expand Up @@ -347,4 +347,88 @@ await foreach (var entry in store.WatchAsync<int>(cancellationToken: cts.Token))
break;
}
}

// Test that watch can resume from a specific revision
[Fact]
public async Task Watch_resume_at_revision()
{
await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();

const string bucket = "Watch_resume_at_revision";
var config = new NatsKVConfig(bucket) { History = 10 };

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var cancellationToken = cts.Token;

var js = new NatsJSContext(nats);
var kv = new NatsKVContext(js);
var store = await kv.CreateStoreAsync(config, cancellationToken: cancellationToken);

await store.PutAsync("k1", 1, cancellationToken: cancellationToken);
await store.PutAsync("k2", 2, cancellationToken: cancellationToken);
var revK3 = await store.PutAsync("k3", 3, cancellationToken: cancellationToken);
await store.PutAsync("k4", 3, cancellationToken: cancellationToken);

// Watch all
var watchOps = new NatsKVWatchOpts() { MetaOnly = true, };
var watchAll = store.WatchAsync<int>(opts: watchOps, cancellationToken: cancellationToken);

// Expect to see k1, k2, k3 and k4
var allEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchAll)
{
allEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k1, k2, k3 and k4
allEntries.Should().HaveCount(4);

// Watch from the revision of k3
var watchOpsFromRevK3 = watchOps with { ResumeAtRevision = revK3, };

var watchFromRevision = store.WatchAsync<int>(opts: watchOpsFromRevK3, cancellationToken: cancellationToken);

// Expect to see k2 and k3, and k4
var fromRevisionEntries = new List<(ulong Revision, string key)>();
await foreach (var key in watchFromRevision)
{
fromRevisionEntries.Add((key.Revision, key.Key));
if (key.Delta == 0)
{
break;
}
}

// Expects k2, k3 and k4
fromRevisionEntries.Should().HaveCount(2);

// Watch from none existing revision
var noData = false;
var watchOpsNoneExisting = watchOps with
{
ResumeAtRevision = 9999,
OnNoData = (_) =>
{
noData = true;
return ValueTask.FromResult(true);
},
};

var watchFromNoneExistingRevision =
store.WatchAsync<int>(opts: watchOpsNoneExisting, cancellationToken: cancellationToken);

// Expect to see no data
await foreach (var key in watchFromNoneExistingRevision)
{
// We should not see any entries, if we get here something is wrong
Assert.Fail("Should not return any entries, and OnNoData should have been called to bail out");
}

noData.Should().BeTrue();
}
}