Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D2 client side implementation of server-reported health #617

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.d2.balancer.strategies.framework.LatencyCorrelation;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunner;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunnerBuilder;
import com.linkedin.d2.balancer.strategies.framework.ServerLoadScoreCorrelation;
import com.linkedin.d2.loadBalancerStrategyType;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -334,6 +335,39 @@ public Object[][] isFastRecovery()
};
}

@Test(dataProvider = "clusterSizeAndQps")
public void testLoadDifference(int clusterSize, int numRequestsPerInterval)
{
LoadBalancerStrategyTestRunner testRunner = buildRelativeRunnerWithDifferentLoad(clusterSize, numRequestsPerInterval);
testRunner.runWait();

List<Integer> loadHistory = testRunner.getLoadHistory().get(testRunner.getUri(0));
System.out.println(loadHistory);

/**
* Test result in terms of load:
* [19, 18, 15, 12, 8, 7, 0, 0, 1, 2, 0, 2, 1, 2, 3, 2, 6, 7, 3, 7, 8, 9, 7, 9, 7, 19, 21, 18, 17, 14, 17]
* [2, 1, 0, 1, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
* [1, 2, 0, 1, 1, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
* [21, 22, 15, 18, 7, 6, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 3, 3, 4, 3, 9, 9, 10, 9, 12, 12, 11, 10, 11, 9, 17]
*
* Conclusion: When QPS is big, the recovery time is faster.
* Reaction time is similar, it takes around 6 intervals to drop to 0
* */
}

@DataProvider(name = "clusterSizeAndQps")
public Object[][] clusterSizeAndQps()
{
return new Object[][]
{
{5, 100}, // Small cluster, high QPS per host
{5, 5}, // Small cluster, low QPS per host
{50, 100}, // Big cluster, small QPS per host
{50, 1000} // Big cluster, high QPS per host
};
}

private LoadBalancerStrategyTestRunner buildDefaultRunnerWithConstantBadHost(int numHosts, long badHostLatency,
double relativeLatencyHighThresholdFactor)
{
Expand Down Expand Up @@ -474,4 +508,38 @@ private LoadBalancerStrategyTestRunner buildRelativeRunnerWithRandomLatencyInRan
.setRelativeLoadBalancerStrategies(relativeStrategyProperties)
.build();
}

private LoadBalancerStrategyTestRunner buildRelativeRunnerWithDifferentLoad(int numHosts, int numRequestsPerInterval)
{
int overloadedServerLoad = 200;

List<LatencyCorrelation> latencyCorrelationList = new ArrayList<>();
List<ServerLoadScoreCorrelation> serverLoadScoreCorrelationList = new ArrayList<>();

long leftLimit = 0L;
long rightLimit = 400L;
for (int i = 0; i < numHosts; i ++)
{
latencyCorrelationList.add((requestsPerInterval, intervalIndex) ->
400L + (long) (Math.random() * (rightLimit - leftLimit)));
}

// Return high load for the first 10 intervals
serverLoadScoreCorrelationList.add((requestsPerInterval, intervalIndex) ->
intervalIndex >= 0 && intervalIndex <= 10 ? overloadedServerLoad : requestsPerInterval
);
for (int i = 1; i < numHosts; i ++)
{
serverLoadScoreCorrelationList.add((requestsPerInterval, intervalIndex) ->
requestsPerInterval);
}

return new LoadBalancerStrategyTestRunnerBuilder(loadBalancerStrategyType.RELATIVE, DEFAULT_SERVICE_NAME, numHosts)
.enableServerReportedLoad(true)
.setConstantRequestCount(numRequestsPerInterval)
.setNumIntervals(30)
.setDynamicLatency(latencyCorrelationList)
.setDynamicServerLoadScore(serverLoadScoreCorrelationList)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ record D2RelativeStrategyProperties {
*/
lowErrorRate: optional double

/**
* If the latest server reported load score is above this specified factor of the cluster average,
* we will decrease the health score by downStep.
*/
relativeLoadHighThresholdFactor: optional double
rachelhanhan marked this conversation as resolved.
Show resolved Hide resolved

/**
* If the latest server reported load score is under this specified factor of the cluster average,
* we will increase the health score by upStep.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can have more discussion on up/downStep model vs. P2C. Sticky routing is currently best effort. It's not guaranteed to reach the same host when the pointsMap changes. I still think that 5s maybe too long for server reported load to be effective.

*/
relativeLoadLowThresholdFactor: optional double

/**
* The health score for a server will not be calculated unless the number of calls to it in the interval
* meets or exceeds the minimum call count.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public D2Client build()
_config.d2JmxManagerPrefix,
_config.zookeeperReadWindowMs,
_config.enableRelativeLoadBalancer,
_config.deterministicSubsettingMetadataProvider);
_config.deterministicSubsettingMetadataProvider,
_config.enableServerReportedLoad);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
new ZKFSLoadBalancerWithFacilitiesFactory() :
Expand Down Expand Up @@ -572,7 +573,7 @@ private Map<String, LoadBalancerStrategyFactory<?>> createDefaultLoadBalancerStr
{
final RelativeLoadBalancerStrategyFactory relativeLoadBalancerStrategyFactory = new RelativeLoadBalancerStrategyFactory(
_config._executorService, _config.healthCheckOperations, Collections.emptyList(), _config.eventEmitter,
SystemClock.instance());
SystemClock.instance(), _config.enableServerReportedLoad);
loadBalancerStrategyFactories.putIfAbsent(RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME,
relativeLoadBalancerStrategyFactory);
}
Expand Down
8 changes: 6 additions & 2 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

public class D2ClientConfig
{
public static final int DEFAULT_RETRY_LIMIT = 3;

String zkHosts = null;
long zkSessionTimeoutInMs = 3600000L;
long zkStartupTimeoutInMs = 10000L;
Expand Down Expand Up @@ -105,7 +107,7 @@ public class D2ClientConfig
String d2JmxManagerPrefix = "UnknownPrefix";
boolean enableRelativeLoadBalancer = false;
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider = null;
public static final int DEFAULT_RETRY_LIMIT = 3;
boolean enableServerReportedLoad = false;

public D2ClientConfig()
{
Expand Down Expand Up @@ -161,7 +163,8 @@ public D2ClientConfig()
String d2JmxManagerPrefix,
int zookeeperReadWindowMs,
boolean enableRelativeLoadBalancer,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider)
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
boolean enableServerReportedLoad)
{
this.zkHosts = zkHosts;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
Expand Down Expand Up @@ -214,5 +217,6 @@ public D2ClientConfig()
this.zookeeperReadWindowMs = zookeeperReadWindowMs;
this.enableRelativeLoadBalancer = enableRelativeLoadBalancer;
this.deterministicSubsettingMetadataProvider = deterministicSubsettingMetadataProvider;
this.enableServerReportedLoad = enableServerReportedLoad;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ public void onResponse(TransportResponse<RestResponse> response)
if (response.hasError())
{
Throwable throwable = response.getError();
handleError(_callCompletion, throwable);
handleError(_callCompletion, throwable, response.getWireAttributes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to parse server load from wireAttributes when enableServerReportedLoad is set to false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 3 places that we can potentially control the behavior on the client side:

  1. TrackerClient - whether to pass wire attributes
  2. CallTracker - whether to update the server-reported load in each interval
  3. load balancer strategy - whether to take the server-reported load into load balancing decision.

When I designed this I also debated where I should put the control, in the end I only put one control in load balancer strategy to make the control logic all in one place.

Now TrackerClient and CallTracker just update the server reported load whenever it's ready, and in the load balancing strategy there are 2 cases we need to watch out:

  1. Server enables reporting, but client did not enable, which we respect the client side config flag
  2. Server does not enable yet, but client enables, which means the reported load will always be -1. Either case, we control the RLB to not consider server reported load.

There is definitely a way to add control in TrackerClient, where we can always report -1 as the score. There will be more code we need to cleanup later if we add a control here. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just changed this logic in my latest commit. I guarded the code with the config on both TrackerClient and the hash ring selector logic.

}
else
{
_callCompletion.endCall();
_callCompletion.endCall(response.getWireAttributes());
}

_wrappedCallback.onResponse(response);
Expand Down Expand Up @@ -227,7 +227,7 @@ public void onResponse(TransportResponse<StreamResponse> response)
if (response.hasError())
{
Throwable throwable = response.getError();
handleError(_callCompletion, throwable);
handleError(_callCompletion, throwable, response.getWireAttributes());
}
else
{
Expand Down Expand Up @@ -259,13 +259,13 @@ public void onDataAvailable(ByteString data)
@Override
public void onDone()
{
_callCompletion.endCall();
_callCompletion.endCall(response.getWireAttributes());
}

@Override
public void onError(Throwable e)
{
handleError(_callCompletion, e);
handleError(_callCompletion, e, response.getWireAttributes());
}
};
entityStream.addObserver(observer);
Expand All @@ -275,35 +275,35 @@ public void onError(Throwable e)
}
}

private void handleError(CallCompletion callCompletion, Throwable throwable)
private void handleError(CallCompletion callCompletion, Throwable throwable, Map<String, String> wireAttributes)
{
if (isServerError(throwable))
{
callCompletion.endCallWithError(ErrorType.SERVER_ERROR);
callCompletion.endCallWithError(ErrorType.SERVER_ERROR, wireAttributes);
}
else if (throwable instanceof RemoteInvocationException)
{
Throwable originalThrowable = LoadBalancerUtil.findOriginalThrowable(throwable);
if (originalThrowable instanceof ConnectException)
{
callCompletion.endCallWithError(ErrorType.CONNECT_EXCEPTION);
callCompletion.endCallWithError(ErrorType.CONNECT_EXCEPTION, wireAttributes);
}
else if (originalThrowable instanceof ClosedChannelException)
{
callCompletion.endCallWithError(ErrorType.CLOSED_CHANNEL_EXCEPTION);
callCompletion.endCallWithError(ErrorType.CLOSED_CHANNEL_EXCEPTION, wireAttributes);
}
else if (originalThrowable instanceof TimeoutException)
{
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION);
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION, wireAttributes);
}
else
{
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION);
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION, wireAttributes);
}
}
else
{
callCompletion.endCallWithError();
callCompletion.endCallWithError(wireAttributes);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,26 @@ public class RelativeLoadBalancerStrategyFactory implements LoadBalancerStrategy
// Default ring properties
public static final int DEFAULT_POINTS_PER_WEIGHT = 100;

public static final double DEFAULT_RELATIVE_LOAD_HIGH_THRESHOLD_FACTOR = 2.0;
public static final double DEFAULT_RELATIVE_LOAD_LOW_THRESHOLD_FACTOR = 1.2;

private final ScheduledExecutorService _executorService;
private final HealthCheckOperations _healthCheckOperations;
private final List<PartitionStateUpdateListener.Factory<PartitionState>> _stateListenerFactories;
private final EventEmitter _eventEmitter;
private final Clock _clock;
private final boolean _enableServerReportedLoad;

public RelativeLoadBalancerStrategyFactory(ScheduledExecutorService executorService, HealthCheckOperations healthCheckOperations,
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock)
List<PartitionStateUpdateListener.Factory<PartitionState>> stateListenerFactories, EventEmitter eventEmitter, Clock clock,
boolean enableServerReportedLoad)
{
_executorService = executorService;
_healthCheckOperations = healthCheckOperations;
_stateListenerFactories = stateListenerFactories;
_eventEmitter = (eventEmitter == null) ? new NoopEventEmitter() : eventEmitter;
_clock = clock;
_enableServerReportedLoad = enableServerReportedLoad;
}


Expand Down Expand Up @@ -112,7 +118,7 @@ private StateUpdater getRelativeStateUpdater(D2RelativeStrategyProperties relati
{
listenerFactories.addAll(_stateListenerFactories);
}
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName);
return new StateUpdater(relativeStrategyProperties, quarantineManager, _executorService, listenerFactories, serviceName, _enableServerReportedLoad);
}

private ClientSelector getClientSelector(D2RelativeStrategyProperties relativeStrategyProperties)
Expand Down Expand Up @@ -163,6 +169,8 @@ static D2RelativeStrategyProperties putDefaultValues(D2RelativeStrategyPropertie
properties.setErrorStatusFilter(getOrDefault(properties.getErrorStatusFilter(), DEFAULT_ERROR_STATUS_FILTER));
properties.setEmittingIntervalMs(getOrDefault(properties.getEmittingIntervalMs(), DEFAULT_EMITTING_INTERVAL_MS));
properties.setEnableFastRecovery(getOrDefault(properties.isEnableFastRecovery(), DEFAULT_ENABLE_FAST_RECOVERY));
properties.setRelativeLoadHighThresholdFactor(getOrDefault(properties.getRelativeLoadHighThresholdFactor(), DEFAULT_RELATIVE_LOAD_HIGH_THRESHOLD_FACTOR));
properties.setRelativeLoadLowThresholdFactor(getOrDefault(properties.getRelativeLoadLowThresholdFactor(), DEFAULT_RELATIVE_LOAD_LOW_THRESHOLD_FACTOR));

D2QuarantineProperties quarantineProperties = properties.hasQuarantineProperties()
? properties.getQuarantineProperties() : new D2QuarantineProperties();
Expand Down