Skip to content

Commit

Permalink
SOLR-17004: ZkStateReader waitForState should check clusterState befo…
Browse files Browse the repository at this point in the history
…re using watchers (#1945)
  • Loading branch information
risdenk committed Oct 4, 2023
1 parent e491400 commit 0e79224
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 61 deletions.
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Expand Up @@ -28,6 +28,8 @@ Optimizations
---------------------
* SOLR-16555: SolrIndexSearcher - FilterCache intersections/andNot should not clone bitsets repeatedly (Kevin Risden, David Smiley)

* SOLR-17004: ZkStateReader waitForState should check clusterState before using watchers (Kevin Risden)

Other Changes
---------------------
* SOLR-16141: Upgrade Apache Tika to 1.28.4 (Kevin Risden)
Expand Down
Expand Up @@ -72,45 +72,32 @@ public void testSimpleSliceLeaderElection() throws Exception {
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
assertNotNull(jetty);
assertTrue("shard1".equals(jetty.getCoreContainer().getCores().iterator().next()
.getCoreDescriptor().getCloudDescriptor().getShardId()));
assertEquals(
"shard1",
jetty
.getCoreContainer()
.getCores()
.iterator()
.next()
.getCoreDescriptor()
.getCloudDescriptor()
.getShardId());
String jettyNodeName = jetty.getNodeName(); // must get before shutdown
jetty.stop();
stoppedRunners.add(jetty);

// poll until leader change is visible
for (int j = 0; j < 90; j++) {
String currentLeader = getLeader(collection);
if(!leader.equals(currentLeader)) {
break;
}
Thread.sleep(500);
}

leader = getLeader(collection);
int retry = 0;
while (jetty == getRunner(leader)) {
if (retry++ == 60) {
break;
}
Thread.sleep(1000);
}

if (jetty == getRunner(leader)) {
cluster.getZkClient().printLayoutToStdOut();
fail("We didn't find a new leader! " + jetty + " was close, but it's still showing as the leader");
}

assertTrue("shard1".equals(getRunner(leader).getCoreContainer().getCores().iterator().next()
.getCoreDescriptor().getCloudDescriptor().getShardId()));
waitForState(
"Leader should not be " + jettyNodeName,
collection,
(n, c) ->
c.getLeader("shard1") != null
&& !jettyNodeName.equals(c.getLeader("shard1").getNodeName()));
}

for (JettySolrRunner runner : stoppedRunners) {
runner.start();
}
waitForState("Expected to see nodes come back " + collection, collection,
(n, c) -> {
return n.size() == 6;
});
waitForState(
"Expected to see nodes come back for " + collection, collection, (n, c) -> n.size() == 6);
CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());

// testLeaderElectionAfterClientTimeout
Expand All @@ -122,10 +109,8 @@ public void testSimpleSliceLeaderElection() throws Exception {
// timeout the leader
String leader = getLeader(collection);
JettySolrRunner jetty = getRunner(leader);
ZkController zkController = jetty.getCoreContainer().getZkController();

zkController.getZkClient().getSolrZooKeeper().closeCnxn();
cluster.getZkServer().expire(zkController.getZkClient().getSolrZooKeeper().getSessionId());
assertNotNull(jetty);
cluster.expireZkSession(jetty);

for (int i = 0; i < 60; i++) { // wait till leader is changed
if (jetty != getRunner(getLeader(collection))) {
Expand Down
76 changes: 51 additions & 25 deletions solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
Expand Up @@ -970,25 +970,25 @@ public Replica getLeaderRetry(String collection, String shard) throws Interrupte
return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT);
}

/**
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
AtomicReference<DocCollection> coll = new AtomicReference<>();
/** Get shard leader properties, with retry if none exist. */
public Replica getLeaderRetry(String collection, String shard, int timeout)
throws InterruptedException {
AtomicReference<Replica> leader = new AtomicReference<>();
try {
waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
if (c == null)
return false;
coll.set(c);
Replica l = getLeader(n, c, shard);
if (l != null) {
log.debug("leader found for {}/{} to be {}", collection, shard, l);
leader.set(l);
return true;
}
return false;
});
waitForState(
collection,
timeout,
TimeUnit.MILLISECONDS,
(n, c) -> {
if (c == null) return false;
Replica l = getLeader(n, c, shard);
if (l != null) {
log.debug("leader found for {}/{} to be {}", collection, shard, l);
leader.set(l);
return true;
}
return false;
});
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for "
+ timeout + "ms " + ", collection: " + collection + " slice: " + shard + " saw state=" + clusterState.getCollectionOrNull(collection)
Expand Down Expand Up @@ -1826,6 +1826,18 @@ public void waitForState(final String collection, long wait, TimeUnit unit, Coll
throw new AlreadyClosedException();
}

// Check predicate against known clusterState before trying to add watchers
if (clusterState != null) {
Set<String> liveNodes = clusterState.getLiveNodes();
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (liveNodes != null && docCollection != null) {
if (predicate.matches(liveNodes, docCollection)) {
log.debug("Found {} directly in clusterState", predicate);
return;
}
}
}

final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
Expand Down Expand Up @@ -1880,14 +1892,25 @@ public void waitForState(final String collection, long wait, TimeUnit unit, Pred
throw new AlreadyClosedException();
}

// Check predicate against known clusterState before trying to add watchers
if (clusterState != null) {
DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (docCollection != null) {
if (predicate.test(docCollection)) {
log.debug("Found {} directly in clusterState", predicate);
return;
}
}
}

final CountDownLatch latch = new CountDownLatch(1);
waitLatches.add(latch);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
DocCollectionWatcher watcher = (c) -> {
docCollection.set(c);
boolean matches = predicate.test(c);
if (matches)
latch.countDown();
AtomicReference<DocCollection> docCollectionReference = new AtomicReference<>();
DocCollectionWatcher watcher =
(c) -> {
docCollectionReference.set(c);
boolean matches = predicate.test(c);
if (matches) latch.countDown();

return matches;
};
Expand All @@ -1896,8 +1919,11 @@ public void waitForState(final String collection, long wait, TimeUnit unit, Pred
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit))
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + docCollection.get());

throw new TimeoutException(
"Timeout waiting to see state for collection="
+ collection
+ " :"
+ docCollectionReference.get());
} finally {
removeDocCollectionWatcher(collection, watcher);
waitLatches.remove(latch);
Expand Down

0 comments on commit 0e79224

Please sign in to comment.