Skip to content

Commit

Permalink
GG-27553 Too many caches can block discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Masharov committed Mar 27, 2020
1 parent 47a7749 commit 263a522
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE =
"IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE";

/** Logging a warning message when metrics quantity exceeded a specified number. */
public static final String IGNITE_DISCOVERY_METRICS_QNT_WARN =
"IGNITE_DISCOVERY_METRICS_QNT_WARN";

/** Time interval that indicates that client reconnect throttle must be reset to zero. 2 minutes by default. */
public static final String CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL =
"CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,34 @@ private NavigableSet<ClusterNode> allVisibleNodes() {
return res;
}

/** {@inheritDoc} */
@Override public void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
boolean isLocDaemon = spi.locNode.isDaemon();

assert nodeId != null;
assert metrics != null;
assert isLocDaemon || cacheMetrics != null;

TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

if (node != null && node.visible()) {
node.setMetrics(metrics);

if (!isLocDaemon)
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* FOR TEST PURPOSE ONLY!
*/
Expand Down Expand Up @@ -2439,23 +2467,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
log.debug("Received metrics response: " + msg);
}
else {
long tsNanos = System.nanoTime();

if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}
if (msg.hasMetrics())
processMsgCacheMetrics(msg, System.nanoTime());
}
}

Expand Down Expand Up @@ -2555,39 +2568,6 @@ private void processPingRequest() {
sockWriter.sendMessage(res);
}

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
boolean isLocDaemon = spi.locNode.isDaemon();

assert nodeId != null;
assert metrics != null;
assert isLocDaemon || cacheMetrics != null;

TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

if (node != null && node.visible()) {
node.setMetrics(metrics);

if (!isLocDaemon)
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* @param type Event type.
* @param topVer Topology version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,29 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
return threads;
}

/** {@inheritDoc} */
@Override public void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
assert nodeId != null;
assert metrics != null;

TcpDiscoveryNode node = ring.node(nodeId);

if (node != null) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
Expand Down Expand Up @@ -5568,23 +5591,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {

long tsNanos = System.nanoTime();

if (spiStateCopy() == CONNECTED) {
if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}
}
if (spiStateCopy() == CONNECTED && msg.hasMetrics())
processMsgCacheMetrics(msg, tsNanos);

if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
Expand Down Expand Up @@ -5653,34 +5661,6 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
}
}

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
assert nodeId != null;
assert metrics != null;

TcpDiscoveryNode node = ring.node(nodeId);

if (node != null) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* Processes discard message and discards previously registered pending messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -40,9 +44,13 @@
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
import static org.apache.ignite.IgniteSystemProperties.getInteger;

/**
*
*/
Expand All @@ -59,6 +67,9 @@ abstract class TcpDiscoveryImpl {
/** Response join impossible. */
protected static final int RES_JOIN_IMPOSSIBLE = 255;

/** How often the warning message should occur in logs to prevent log spam. */
public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L;

/** */
protected final TcpDiscoverySpi spi;

Expand All @@ -78,6 +89,12 @@ abstract class TcpDiscoveryImpl {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
protected ConcurrentLinkedDeque<String> debugLogQ;

/** Logging a warning message when metrics quantity exceeded a specified number. */
protected int METRICS_QNT_WARN = getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, 500);

/** */
protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis();

/** */
protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
/** {@inheritDoc} */
Expand Down Expand Up @@ -347,6 +364,17 @@ protected static String threadStatus(Thread t) {
*/
protected abstract Collection<IgniteSpiThread> threads();

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
public abstract void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos);

/**
* @throws IgniteSpiException If failed.
*/
Expand Down Expand Up @@ -409,6 +437,32 @@ protected boolean checkAckTimeout(long ackTimeout) {
return true;
}

/** */
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
&& cacheMetrics.size() >= METRICS_QNT_WARN)
{
log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");

endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
}

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}

/**
* @param addrs Addresses.
*/
Expand Down

0 comments on commit 263a522

Please sign in to comment.