Skip to content

Commit

Permalink
Bug 36591590 - [32909258->14.1.1.0.18] CQC and NearCache results in d…
Browse files Browse the repository at this point in the history
…isconnected state after snapshot recovery (merge 14.1.1.0.18 -> ce/14.1.1.0 @ 108928)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v14.1.1.0/": change = 108932]
  • Loading branch information
vasac committed May 10, 2024
1 parent cb83699 commit 81a7916
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 15 deletions.
15 changes: 12 additions & 3 deletions prj/coherence-core/src/main/java/com/tangosol/net/MemberEvent.java
@@ -1,8 +1,8 @@
/*
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.net;
Expand Down Expand Up @@ -121,6 +121,10 @@ public void dispatch(EventListener[] aListeners)
case MEMBER_LEFT:
target.memberLeft(this);
break;

case MEMBER_RECOVERED:
target.memberRecovered(this);
break;
}
}
}
Expand Down Expand Up @@ -187,10 +191,15 @@ public String toString()
*/
public static final int MEMBER_LEFT = 3;

/**
* This event indicates that a Member has performed persistence recovery.
*/
public static final int MEMBER_RECOVERED = 4;

/**
* Descriptions of the various event IDs.
*/
private static final String[] DESCRIPTIONS = {"<unknown>", "JOINED", "LEAVING", "LEFT"};
private static final String[] DESCRIPTIONS = {"<unknown>", "JOINED", "LEAVING", "LEFT", "RECOVERED"};


// ----- data members ---------------------------------------------------
Expand Down
@@ -1,8 +1,8 @@
/*
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.net;
Expand Down Expand Up @@ -60,4 +60,13 @@ public interface MemberListener
* @param evt the MemberEvent.MEMBER_LEFT event
*/
public void memberLeft(MemberEvent evt);

/**
* Invoked when a Member has recovered from persistence.
*
* @param evt the MemberEvent.MEMBER_RECOVERED event
*/
default public void memberRecovered(MemberEvent evt)
{
}
}
Expand Up @@ -2185,6 +2185,12 @@ public void memberLeft(MemberEvent evt)
}
}

@Override
public void memberRecovered(MemberEvent evt)
{
changeState(STATE_DISCONNECTED);
}

/**
* Produce a human-readable description of this object.
*
Expand Down
@@ -1,8 +1,8 @@
/*
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.net.cache;
Expand Down Expand Up @@ -662,6 +662,14 @@ public void memberLeft(MemberEvent evt)
}
}
}

/**
* Invoked when a Member has recovered.
*/
public void memberRecovered(MemberEvent evt)
{
resetFrontMap();
}
}


Expand Down
@@ -1,8 +1,8 @@
/*
* Copyright (c) 2000, 2020, Oracle and/or its affiliates.
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* http://oss.oracle.com/licenses/upl.
* https://oss.oracle.com/licenses/upl.
*/
package persistence;

Expand Down Expand Up @@ -37,6 +37,8 @@
import com.tangosol.net.PartitionedService.PartitionRecoveryAction;
import com.tangosol.net.Service;

import com.tangosol.net.cache.ContinuousQueryCache;

import com.tangosol.net.management.MBeanHelper;

import com.tangosol.net.partition.SimplePartitionKey;
Expand All @@ -47,6 +49,7 @@
import com.tangosol.util.BinaryEntry;
import com.tangosol.util.ClassHelper;
import com.tangosol.util.ExternalizableHelper;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.LongArray;
import com.tangosol.util.MapListener;
Expand Down Expand Up @@ -303,6 +306,18 @@ public void testTruncate()
"simple-persistent", "simple-persistent-truncate");
}

/**
* Test CQC cache recovery.
* @throws IOException
* @throws MBeanException
*/
@Test
public void testCqcSnapshotRecovery()
throws IOException, MBeanException
{
testCqcSnapshotRecovery("testCqcSnapshot" + getPersistenceManagerName(), "simple-persistent");
}

// ----- helpers --------------------------------------------------------

private void testBasicPersistence(String sServer, String sPersistentCache, String sTransientCache)
Expand Down Expand Up @@ -776,6 +791,8 @@ private void testBasicSnapshot(String sServer, String sPersistentCache, boolean
// with partitioned consistency
Thread[] aReaders = ensureContinuousReaders(cache, 8);
Base.sleep(100L); // allow for some successful reads
cache.put("foo", "BAR");
cache.put("biz", "BAZ");
helper.createSnapshot(sService, "snapshot-B");
destroyContinuousReaders(aReaders);

Expand All @@ -799,8 +816,8 @@ private void testBasicSnapshot(String sServer, String sPersistentCache, boolean
// of "snapshot-B"

// validate that the data were recovered
assertEquals("bar", cache.get("foo"));
assertEquals("baz", cache.get("biz"));
assertEquals("BAR", cache.get("foo"));
assertEquals("BAZ", cache.get("biz"));
for (int i = 0; i < 20000; i++)
{
assertEquals(i, cache.get(i));
Expand Down Expand Up @@ -835,8 +852,8 @@ private void testBasicSnapshot(String sServer, String sPersistentCache, boolean
waitForBalanced(service);

// validate that the data were recovered
assertEquals("bar", cache.get("foo"));
assertEquals("baz", cache.get("biz"));
assertEquals("BAR", cache.get("foo"));
assertEquals("BAZ", cache.get("biz"));
for (int i = 0; i < 20000; i++)
{
assertEquals(i, cache.get(i));
Expand All @@ -857,8 +874,8 @@ private void testBasicSnapshot(String sServer, String sPersistentCache, boolean
waitForBalanced(service);

// validate that the data were recovered
assertEquals("bar", cache.get("foo"));
assertEquals("baz", cache.get("biz"));
assertEquals("BAR", cache.get("foo"));
assertEquals("BAZ", cache.get("biz"));
for (int i = 0; i < 20000; i++)
{
assertEquals(i, cache.get(i));
Expand Down Expand Up @@ -1404,6 +1421,109 @@ private void testMultipleRestarts(String sServer, String sPersistentCache,int nR
}
}

public void testCqcSnapshotRecovery(String sServer, String sPersistentCache)
throws IOException, MBeanException
{
File fileSnapshot = FileHelper.createTempDir();
File fileTrash = FileHelper.createTempDir();

Properties props = new Properties();
props.setProperty("test.persistence.mode", "on-demand");
props.setProperty("test.persistence.active.dir", "");
props.setProperty("test.persistence.trash.dir", fileTrash.getAbsolutePath());
props.setProperty("test.persistence.snapshot.dir", fileSnapshot.getAbsolutePath());
props.setProperty("coherence.management", "all");
props.setProperty("coherence.management.remote", "true");
props.setProperty("coherence.distribution.2server", "false");
props.setProperty("test.threads", "1");
props.setProperty("coherence.override", "tangosol-coherence-override.xml");

final NamedCache cache = getNamedCache(sPersistentCache);
DistributedCacheService service = (DistributedCacheService) cache.getCacheService();
Cluster cluster = service.getCluster();
String sService = service.getInfo().getServiceName();

startCacheServer(sServer + "-1", getProjectName(), getCacheConfigPath(), props);
startCacheServer(sServer + "-2", getProjectName(), getCacheConfigPath(), props);

Eventually.assertThat(invoking(service).getOwnershipEnabledMembers().size(), is(2));
waitForBalanced(service);

PersistenceTestHelper helper = new PersistenceTestHelper();
final ContinuousQueryCache cqc = new ContinuousQueryCache(cache, (Filter) o -> o instanceof String
|| (o instanceof Integer
&& ((Integer) o) >= 9500
&& ((Integer) o) < 10500), true);

try
{
// add a bunch of data
cache.put("foo", "bar");
cache.put("biz", "baz");
HashMap mapTemp = new HashMap();
for (int i = 0; i < 10000; i++)
{
mapTemp.put(i, i);
}
cache.putAll(mapTemp);

helper.createSnapshot(sService, "snapshot-A");

mapTemp = new HashMap();
for (int i = 10000; i < 20000; i++)
{
mapTemp.put(i, i);
}
cache.putAll(mapTemp);

cache.put("foo", "BAR");
cache.put("biz", "BAZ");
cqc.put(9999, "X");
helper.createSnapshot(sService, "snapshot-B");

cache.clear();

assertEquals(0, cqc.size());
assertEquals(0, cache.size());

helper.recoverSnapshot(sService, "snapshot-A");

// validate that the data were recovered
assertEquals(502, cqc.size());
assertEquals(10002, cache.size());
assertEquals("bar", cqc.get("foo"));
assertEquals("baz", cqc.get("biz"));
for (int i = 0; i < 10000; i++)
{
assertEquals(i, cache.get(i));
assertEquals(i < 9500 ? null : i, cqc.get(i));
}

helper.recoverSnapshot(sService, "snapshot-B");

// validate that the data were recovered
assertEquals("BAR", cqc.get("foo"));
assertEquals("BAZ", cqc.get("biz"));
for (int i = 0; i < 20000; i++)
{
assertEquals(i == 9999 ? "X" : i, cache.get(i));
assertEquals(i < 9500 || i >= 10500
? null
: (i == 9999 ? "X" : i), cqc.get(i));
}
assertEquals(1002, cqc.size());
assertEquals(20002, cache.size());
}
finally
{
stopAllApplications();
CacheFactory.shutdown();

FileHelper.deleteDirSilent(fileSnapshot);
FileHelper.deleteDirSilent(fileTrash);
}
}

/**
* A helper method to call the static {@link PersistenceTestHelper#listSnapshots(String)}
* method. This allows us to use this method in Eventually.assertThat(Deferred, Matcher)
Expand Down
Binary file not shown.
Binary file not shown.
Binary file modified tde/core-net/3.0/cdb/Component/Util/SafeService.cdb
Binary file not shown.

0 comments on commit 81a7916

Please sign in to comment.