Skip to content

Commit

Permalink
Watch test 284 (#1117)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Apr 11, 2024
1 parent cda5212 commit 73364d2
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ public interface KeyValue {
void purge(String key, long expectedRevision) throws IOException, JetStreamApiException;

/**
* Watch updates for a specific key or keys.
* @param key the key or a comma delimited list of keys.
* Watch updates for a specific key.
* @param key the key.
* @param watcher the watcher the implementation to receive changes
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
* @return The KeyValueWatchSubscription
Expand All @@ -181,8 +181,8 @@ public interface KeyValue {
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;

/**
* Watch updates for a specific key or keys, starting at a specific revision.
* @param key the key or a comma delimited list of keys.
* Watch updates for a specific key, starting at a specific revision.
* @param key the key.
* @param watcher the watcher the implementation to receive changes
* @param fromRevision the revision to start from
* @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -239,28 +238,27 @@ private PublishAck _write(String key, byte[] data, Headers h) throws IOException

@Override
public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
if (key.contains(",")) {
return watch(Arrays.asList(key.split(",")), watcher, -1, watchOptions);
}
return watch(Collections.singletonList(key), watcher, -1, watchOptions);
validateKvKeyWildcardAllowedRequired(key);
validateNotNull(watcher, "Watcher is required");
return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, -1, watchOptions);
}

@Override
public NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
if (key.contains(",")) {
return watch(Arrays.asList(key.split(",")), watcher, fromRevision, watchOptions);
}
return watch(Collections.singletonList(key), watcher, fromRevision, watchOptions);
validateKvKeyWildcardAllowedRequired(key);
validateNotNull(watcher, "Watcher is required");
return new NatsKeyValueWatchSubscription(this, Collections.singletonList(key), watcher, fromRevision, watchOptions);
}

@Override
public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
return watch(keys, watcher, -1, watchOptions);
validateKvKeysWildcardAllowedRequired(keys);
validateNotNull(watcher, "Watcher is required");
return new NatsKeyValueWatchSubscription(this, keys, watcher, -1, watchOptions);
}

@Override
public NatsKeyValueWatchSubscription watch(List<String> keys, KeyValueWatcher watcher, long fromRevision, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException {
// all watch methods (watch, watchAll) delegate to here
validateKvKeysWildcardAllowedRequired(keys);
validateNotNull(watcher, "Watcher is required");
return new NatsKeyValueWatchSubscription(this, keys, watcher, fromRevision, watchOptions);
Expand Down
11 changes: 5 additions & 6 deletions src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,6 @@ public void testWatch() throws Exception {
TestKeyValueWatcher gtMetaWatcher = new TestKeyValueWatcher("gtMetaWatcher", true, META_ONLY);
TestKeyValueWatcher multipleFullWatcher = new TestKeyValueWatcher("multipleFullWatcher", true);
TestKeyValueWatcher multipleMetaWatcher = new TestKeyValueWatcher("multipleMetaWatcher", true, META_ONLY);
TestKeyValueWatcher multipleFullWatcher2 = new TestKeyValueWatcher("multipleFullWatcher2", true);
TestKeyValueWatcher multipleMetaWatcher2 = new TestKeyValueWatcher("multipleMetaWatcher2", true, META_ONLY);
TestKeyValueWatcher key1AfterWatcher = new TestKeyValueWatcher("key1AfterWatcher", false, META_ONLY);
TestKeyValueWatcher key1AfterIgDelWatcher = new TestKeyValueWatcher("key1AfterIgDelWatcher", false, META_ONLY, IGNORE_DELETE);
TestKeyValueWatcher key1AfterStartNewWatcher = new TestKeyValueWatcher("key1AfterStartNewWatcher", false, META_ONLY, UPDATES_ONLY);
Expand Down Expand Up @@ -943,10 +941,6 @@ public void testWatch() throws Exception {
_testWatch(nc, starMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.*", starMetaWatcher, starMetaWatcher.watchOptions));
_testWatch(nc, gtFullWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtFullWatcher, gtFullWatcher.watchOptions));
_testWatch(nc, gtMetaWatcher, allExpecteds, -1, kv -> kv.watch("key.>", gtMetaWatcher, gtMetaWatcher.watchOptions));
_testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions));
_testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions));
_testWatch(nc, multipleFullWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleFullWatcher2, multipleFullWatcher.watchOptions));
_testWatch(nc, multipleMetaWatcher2, allExpecteds, -1, kv -> kv.watch(String.join(",", allKeys), multipleMetaWatcher2, multipleMetaWatcher.watchOptions));
_testWatch(nc, key1AfterWatcher, purgeOnlyExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterWatcher, key1AfterWatcher.watchOptions));
_testWatch(nc, key1AfterIgDelWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterIgDelWatcher, key1AfterIgDelWatcher.watchOptions));
_testWatch(nc, key1AfterStartNewWatcher, noExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_1, key1AfterStartNewWatcher, key1AfterStartNewWatcher.watchOptions));
Expand All @@ -956,6 +950,11 @@ public void testWatch() throws Exception {
_testWatch(nc, key2AfterStartFirstWatcher, key2AllExpecteds, -1, kv -> kv.watch(TEST_WATCH_KEY_2, key2AfterStartFirstWatcher, key2AfterStartFirstWatcher.watchOptions));
_testWatch(nc, key1FromRevisionAfterWatcher, key1FromRevisionExpecteds, 2, kv -> kv.watch(TEST_WATCH_KEY_1, key1FromRevisionAfterWatcher, 2, key1FromRevisionAfterWatcher.watchOptions));
_testWatch(nc, allFromRevisionAfterWatcher, allFromRevisionExpecteds, 2, kv -> kv.watchAll(allFromRevisionAfterWatcher, 2, allFromRevisionAfterWatcher.watchOptions));

if (atLeast2_10()) {
_testWatch(nc, multipleFullWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleFullWatcher, multipleFullWatcher.watchOptions));
_testWatch(nc, multipleMetaWatcher, allExpecteds, -1, kv -> kv.watch(allKeys, multipleMetaWatcher, multipleMetaWatcher.watchOptions));
}
});
}

Expand Down

0 comments on commit 73364d2

Please sign in to comment.