Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GG-27553 Too many caches can block discovery #224

Open
wants to merge 1 commit into
base: ignite-2.5-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,10 @@ public final class IgniteSystemProperties {
public static final String IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE =
"IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE";

/** Logging a warning message when metrics quantity exceeded a specified number. */
public static final String IGNITE_DISCOVERY_METRICS_QNT_WARN =
"IGNITE_DISCOVERY_METRICS_QNT_WARN";

/** Time interval that indicates that client reconnect throttle must be reset to zero. 2 minutes by default. */
public static final String CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL =
"CLIENT_THROTTLE_RECONNECT_RESET_TIMEOUT_INTERVAL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,34 @@ private NavigableSet<ClusterNode> allVisibleNodes() {
return res;
}

/** {@inheritDoc} */
@Override public void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
boolean isLocDaemon = spi.locNode.isDaemon();

assert nodeId != null;
assert metrics != null;
assert isLocDaemon || cacheMetrics != null;

TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

if (node != null && node.visible()) {
node.setMetrics(metrics);

if (!isLocDaemon)
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

msgWorker.notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes(), null);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* FOR TEST PURPOSE ONLY!
*/
Expand Down Expand Up @@ -2439,23 +2467,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
log.debug("Received metrics response: " + msg);
}
else {
long tsNanos = System.nanoTime();

if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.<Integer, CacheMetrics>emptyMap();

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}
if (msg.hasMetrics())
processMsgCacheMetrics(msg, System.nanoTime());
}
}

Expand Down Expand Up @@ -2555,39 +2568,6 @@ private void processPingRequest() {
sockWriter.sendMessage(res);
}

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
boolean isLocDaemon = spi.locNode.isDaemon();

assert nodeId != null;
assert metrics != null;
assert isLocDaemon || cacheMetrics != null;

TcpDiscoveryNode node = nodeId.equals(getLocalNodeId()) ? locNode : rmtNodes.get(nodeId);

if (node != null && node.visible()) {
node.setMetrics(metrics);

if (!isLocDaemon)
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, topVer, node, allVisibleNodes());
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* @param type Event type.
* @param topVer Topology version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,29 @@ private void clearNodeAddedMessage(TcpDiscoveryAbstractMessage msg) {
return threads;
}

/** {@inheritDoc} */
@Override public void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
assert nodeId != null;
assert metrics != null;

TcpDiscoveryNode node = ring.node(nodeId);

if (node != null) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* <strong>FOR TEST ONLY!!!</strong>
* <p>
Expand Down Expand Up @@ -5568,23 +5591,8 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {

long tsNanos = System.nanoTime();

if (spiStateCopy() == CONNECTED) {
if (msg.hasMetrics()) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}
}
if (spiStateCopy() == CONNECTED && msg.hasMetrics())
processMsgCacheMetrics(msg, tsNanos);

if (sendMessageToRemotes(msg)) {
if (laps == 0 && spiStateCopy() == CONNECTED) {
Expand Down Expand Up @@ -5653,34 +5661,6 @@ private void processMetricsUpdateMessage(TcpDiscoveryMetricsUpdateMessage msg) {
}
}

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
private void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos)
{
assert nodeId != null;
assert metrics != null;

TcpDiscoveryNode node = ring.node(nodeId);

if (node != null) {
node.setMetrics(metrics);
node.setCacheMetrics(cacheMetrics);

node.lastUpdateTimeNanos(tsNanos);

notifyDiscovery(EVT_NODE_METRICS_UPDATED, ring.topologyVersion(), node);
}
else if (log.isDebugEnabled())
log.debug("Received metrics from unknown node: " + nodeId);
}

/**
* Processes discard message and discards previously registered pending messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand All @@ -40,9 +44,13 @@
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRingLatencyCheckMessage;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_METRICS_QNT_WARN;
import static org.apache.ignite.IgniteSystemProperties.getInteger;

/**
*
*/
Expand All @@ -59,6 +67,9 @@ abstract class TcpDiscoveryImpl {
/** Response join impossible. */
protected static final int RES_JOIN_IMPOSSIBLE = 255;

/** How often the warning message should occur in logs to prevent log spam. */
public static final long LOG_WARN_MSG_TIMEOUT = 60 * 60 * 1000L;

/** */
protected final TcpDiscoverySpi spi;

Expand All @@ -78,6 +89,12 @@ abstract class TcpDiscoveryImpl {
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
protected ConcurrentLinkedDeque<String> debugLogQ;

/** Logging a warning message when metrics quantity exceeded a specified number. */
protected int METRICS_QNT_WARN = getInteger(IGNITE_DISCOVERY_METRICS_QNT_WARN, 500);

/** */
protected long endTimeMetricsSizeProcessWait = System.currentTimeMillis();

/** */
protected final ServerImpl.DebugLogger debugLog = new DebugLogger() {
/** {@inheritDoc} */
Expand Down Expand Up @@ -347,6 +364,17 @@ protected static String threadStatus(Thread t) {
*/
protected abstract Collection<IgniteSpiThread> threads();

/**
* @param nodeId Node ID.
* @param metrics Metrics.
* @param cacheMetrics Cache metrics.
* @param tsNanos Timestamp as returned by {@link System#nanoTime()}.
*/
public abstract void updateMetrics(UUID nodeId,
ClusterMetrics metrics,
Map<Integer, CacheMetrics> cacheMetrics,
long tsNanos);

/**
* @throws IgniteSpiException If failed.
*/
Expand Down Expand Up @@ -409,6 +437,32 @@ protected boolean checkAckTimeout(long ackTimeout) {
return true;
}

/** */
public void processMsgCacheMetrics(TcpDiscoveryMetricsUpdateMessage msg, long tsNanos) {
for (Map.Entry<UUID, TcpDiscoveryMetricsUpdateMessage.MetricsSet> e : msg.metrics().entrySet()) {
UUID nodeId = e.getKey();

TcpDiscoveryMetricsUpdateMessage.MetricsSet metricsSet = e.getValue();

Map<Integer, CacheMetrics> cacheMetrics = msg.hasCacheMetrics(nodeId) ?
msg.cacheMetrics().get(nodeId) : Collections.emptyMap();

if (endTimeMetricsSizeProcessWait <= U.currentTimeMillis()
&& cacheMetrics.size() >= METRICS_QNT_WARN)
{
log.warning("The Discovery message has metrics for " + cacheMetrics.size() + " caches.\n" +
"To prevent Discovery blocking use -DIGNITE_DISCOVERY_DISABLE_CACHE_METRICS_UPDATE=true option.");

endTimeMetricsSizeProcessWait = U.currentTimeMillis() + LOG_WARN_MSG_TIMEOUT;
}

updateMetrics(nodeId, metricsSet.metrics(), cacheMetrics, tsNanos);

for (T2<UUID, ClusterMetrics> t : metricsSet.clientMetrics())
updateMetrics(t.get1(), t.get2(), cacheMetrics, tsNanos);
}
}

/**
* @param addrs Addresses.
*/
Expand Down