Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

D2 client side implementation of server-reported health #617

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ private static RelativeLoadBalancerStrategy buildRelativeLoadBalancerStrategy()
ServiceProperties serviceProperties = new ServiceProperties(DUMMY_SERVICE_NAME, DEFAULT_CLUSTER_NAME, DEFAULT_PATH, DEFAULT_STRATEGY_LIST,
null, null, null, null, null,
null, null, RelativeStrategyPropertiesConverter.toMap(relativeStrategyProperties));
return new RelativeLoadBalancerStrategyFactory(EXECUTOR_SERVICE, null, new ArrayList<>(), null, SystemClock.instance())
return new RelativeLoadBalancerStrategyFactory(EXECUTOR_SERVICE, null, new ArrayList<>(), null, SystemClock.instance(),
false)
.newLoadBalancer(serviceProperties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.d2.balancer.strategies.framework.LatencyCorrelation;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunner;
import com.linkedin.d2.balancer.strategies.framework.LoadBalancerStrategyTestRunnerBuilder;
import com.linkedin.d2.balancer.strategies.framework.ServerReportedLoadCorrelation;
import com.linkedin.d2.loadBalancerStrategyType;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -334,6 +335,57 @@ public Object[][] isFastRecovery()
};
}

@Test(dataProvider = "clusterSizeAndQps")
public void testOneOverloadedServerHost(int clusterSize, int numRequestsPerInterval)
{
LoadBalancerStrategyTestRunner testRunner = buildRelativeRunnerWithOneOverloadedHost(clusterSize, numRequestsPerInterval);
testRunner.runWait();

List<Integer> loadHistory = testRunner.getLoadHistory().get(testRunner.getUri(0));
System.out.println(loadHistory);

/**
* Test result in terms of load:
* [20, 8, 5, 4, 4, 3, 5, 5, 4, 9, 3, 4, 31, 37, 17, 18, 23, 25, 21, 18, 9, 15, 27, 20, 20, 21, 21, 21, 19, 22, 22]
* [5, 5, 0, 2, 2, 0, 2, 2, 1, 0, 1, 0, 0, 0, 1, 0, 8, 10, 4, 3, 3, 4, 1, 5, 6, 4, 1, 3, 2, 6, 4]
* [1, 1, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 3, 5, 2, 2, 2, 1, 0, 1]
* [21, 2, 1, 1, 1, 0, 0, 1, 0, 2, 0, 0, 4, 37, 42, 14, 6, 22, 30, 21, 16, 19, 37, 17, 13, 26, 24, 15, 16, 25, 19]
*
* Conclusion:
* The machine with higher load will get less traffic
* When QPS per machine is big, the traffic may not drop to 0
* */
}

@Test(dataProvider = "clusterSizeAndQps")
public void testLoadDistribution(int clusterSize, int numRequestsPerInterval)
{
LoadBalancerStrategyTestRunner testRunner = buildRelativeRunnerWithEvenDistribution(clusterSize, numRequestsPerInterval);
testRunner.runWait();

for (int i = 0; i < 30; i ++)
{
for (int j = 0; j < clusterSize; j ++)
{
Integer load = testRunner.getLoadHistory().get(testRunner.getUri(j)).get(i);
// Verify max is smaller than half of the total requests, which means traffic is more evenly distributed
Assert.assertTrue(load * 2 <= numRequestsPerInterval);
}
}
}

@DataProvider(name = "clusterSizeAndQps")
public Object[][] clusterSizeAndQps()
{
return new Object[][]
{
{5, 100}, // Small cluster, high QPS per host
{5, 20}, // Small cluster, low QPS per host
{50, 100}, // Big cluster, small QPS per host
{50, 1000} // Big cluster, high QPS per host
};
}

private LoadBalancerStrategyTestRunner buildDefaultRunnerWithConstantBadHost(int numHosts, long badHostLatency,
double relativeLatencyHighThresholdFactor)
{
Expand Down Expand Up @@ -474,4 +526,63 @@ private LoadBalancerStrategyTestRunner buildRelativeRunnerWithRandomLatencyInRan
.setRelativeLoadBalancerStrategies(relativeStrategyProperties)
.build();
}

private LoadBalancerStrategyTestRunner buildRelativeRunnerWithOneOverloadedHost(int numHosts, int numRequestsPerInterval)
{
int overloadedServerLoad = 1000;

List<LatencyCorrelation> latencyCorrelationList = new ArrayList<>();
List<ServerReportedLoadCorrelation> serverReportedLoadCorrelationList = new ArrayList<>();

long leftLimit = 0L;
long rightLimit = 400L;
for (int i = 0; i < numHosts; i ++)
{
latencyCorrelationList.add((requestsPerInterval, intervalIndex) ->
400L + (long) (Math.random() * (rightLimit - leftLimit)));
}

// Return high load for the first 10 intervals
serverReportedLoadCorrelationList.add((requestsPerInterval, intervalIndex) ->
intervalIndex >= 0 && intervalIndex <= 10 ? overloadedServerLoad : requestsPerInterval
);
for (int i = 1; i < numHosts; i ++)
{
serverReportedLoadCorrelationList.add((requestsPerInterval, intervalIndex) -> requestsPerInterval);
}

return new LoadBalancerStrategyTestRunnerBuilder(loadBalancerStrategyType.RELATIVE, DEFAULT_SERVICE_NAME, numHosts)
.enableServerReportedLoad(true)
.setConstantRequestCount(numRequestsPerInterval)
.setNumIntervals(30)
.setDynamicLatency(latencyCorrelationList)
.setDynamicServerReportedLoad(serverReportedLoadCorrelationList)
.build();
}

private LoadBalancerStrategyTestRunner buildRelativeRunnerWithEvenDistribution(int numHosts, int numRequestsPerInterval)
{
List<LatencyCorrelation> latencyCorrelationList = new ArrayList<>();
List<ServerReportedLoadCorrelation> serverReportedLoadCorrelationList = new ArrayList<>();

long leftLimit = 0L;
long rightLimit = 400L;
for (int i = 0; i < numHosts; i ++)
{
latencyCorrelationList.add((requestsPerInterval, intervalIndex) ->
400L + (long) (Math.random() * (rightLimit - leftLimit)));
}
for (int i = 0; i < numHosts; i ++)
{
serverReportedLoadCorrelationList.add((requestsPerInterval, intervalIndex) -> requestsPerInterval);
}

return new LoadBalancerStrategyTestRunnerBuilder(loadBalancerStrategyType.RELATIVE, DEFAULT_SERVICE_NAME, numHosts)
.enableServerReportedLoad(true)
.setConstantRequestCount(numRequestsPerInterval)
.setNumIntervals(30)
.setDynamicLatency(latencyCorrelationList)
.setDynamicServerReportedLoad(serverReportedLoadCorrelationList)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ public D2Client build()
_config.d2JmxManagerPrefix,
_config.zookeeperReadWindowMs,
_config.enableRelativeLoadBalancer,
_config.deterministicSubsettingMetadataProvider);
_config.deterministicSubsettingMetadataProvider,
_config.enableServerReportedLoad);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
new ZKFSLoadBalancerWithFacilitiesFactory() :
Expand Down Expand Up @@ -572,7 +573,7 @@ private Map<String, LoadBalancerStrategyFactory<?>> createDefaultLoadBalancerStr
{
final RelativeLoadBalancerStrategyFactory relativeLoadBalancerStrategyFactory = new RelativeLoadBalancerStrategyFactory(
_config._executorService, _config.healthCheckOperations, Collections.emptyList(), _config.eventEmitter,
SystemClock.instance());
SystemClock.instance(), _config.enableServerReportedLoad);
loadBalancerStrategyFactories.putIfAbsent(RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME,
relativeLoadBalancerStrategyFactory);
}
Expand Down
8 changes: 6 additions & 2 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

public class D2ClientConfig
{
public static final int DEFAULT_RETRY_LIMIT = 3;

String zkHosts = null;
long zkSessionTimeoutInMs = 3600000L;
long zkStartupTimeoutInMs = 10000L;
Expand Down Expand Up @@ -105,7 +107,7 @@ public class D2ClientConfig
String d2JmxManagerPrefix = "UnknownPrefix";
boolean enableRelativeLoadBalancer = false;
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider = null;
public static final int DEFAULT_RETRY_LIMIT = 3;
boolean enableServerReportedLoad = false;

public D2ClientConfig()
{
Expand Down Expand Up @@ -161,7 +163,8 @@ public D2ClientConfig()
String d2JmxManagerPrefix,
int zookeeperReadWindowMs,
boolean enableRelativeLoadBalancer,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider)
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
boolean enableServerReportedLoad)
{
this.zkHosts = zkHosts;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
Expand Down Expand Up @@ -214,5 +217,6 @@ public D2ClientConfig()
this.zookeeperReadWindowMs = zookeeperReadWindowMs;
this.enableRelativeLoadBalancer = enableRelativeLoadBalancer;
this.deterministicSubsettingMetadataProvider = deterministicSubsettingMetadataProvider;
this.enableServerReportedLoad = enableServerReportedLoad;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
SimpleLoadBalancerState state = new SimpleLoadBalancerState(
config._executorService, uriBus, clusterBus, serviceBus, config.clientFactories, config.loadBalancerStrategyFactories,
config.sslContext, config.sslParameters, config.isSSLEnabled, config.partitionAccessorRegistry,
config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider);
config.sslSessionValidatorFactory, config.deterministicSubsettingMetadataProvider, config.enableServerReportedLoad);
d2ClientJmxManager.setSimpleLoadBalancerState(state);

SimpleLoadBalancer simpleLoadBalancer = new SimpleLoadBalancer(state, config.lbWaitTimeout, config.lbWaitUnit, config._executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
config.sslSessionValidatorFactory,
d2ClientJmxManager,
config.zookeeperReadWindowMs,
config.deterministicSubsettingMetadataProvider
config.deterministicSubsettingMetadataProvider,
config.enableServerReportedLoad
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionD
Clock clock, DegraderImpl.Config config)
{
this(uri, partitionDataMap, wrappedClient, clock, config, TrackerClientImpl.DEFAULT_CALL_TRACKER_INTERVAL,
TrackerClientImpl.DEFAULT_ERROR_STATUS_PATTERN, false);
TrackerClientImpl.DEFAULT_ERROR_STATUS_PATTERN, false, false);
}

public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern)
{
this(uri, partitionDataMap, wrappedClient, clock, config, interval, errorStatusPattern, false);
this(uri, partitionDataMap, wrappedClient, clock, config, interval, errorStatusPattern, false, false);
}

public DegraderTrackerClientImpl(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, DegraderImpl.Config config, long interval, Pattern errorStatusPattern,
boolean doNotSlowStart)
boolean doNotSlowStart, boolean enableServerReportedLoad)
{
super(uri, partitionDataMap, wrappedClient, clock, interval,
(status) -> errorStatusPattern.matcher(Integer.toString(status)).matches(), true, doNotSlowStart);
(status) -> errorStatusPattern.matcher(Integer.toString(status)).matches(), true, doNotSlowStart, enableServerReportedLoad);

if (config == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,28 @@ public class TrackerClientFactory
private static final int LOG_RATE_MS = 20000;

/**
* @see #createTrackerClient(URI, UriProperties, ServiceProperties, String, TransportClient, Clock)
* @see #createTrackerClient(URI, UriProperties, ServiceProperties, String, TransportClient, Clock, boolean)
*/
public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient)
{
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, SystemClock.instance());
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, SystemClock.instance(), false);
}

/**
* @see #createTrackerClient(URI, UriProperties, ServiceProperties, String, TransportClient, Clock, boolean)
*/
public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient,
boolean enableServerReportedLoad)
{
return createTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, SystemClock.instance(), enableServerReportedLoad);
}

/**
Expand All @@ -75,14 +88,16 @@ public static TrackerClient createTrackerClient(URI uri,
* @param uriProperties URI properties.
* @param transportClient Inner TransportClient.
* @param clock Clock used for internal call tracking.
* @param enableServerReportedLoad Enable load balancing based on server reported load from the response.
* @return TrackerClient
*/
public static TrackerClient createTrackerClient(URI uri,
UriProperties uriProperties,
ServiceProperties serviceProperties,
String loadBalancerStrategyName,
TransportClient transportClient,
Clock clock)
Clock clock,
boolean enableServerReportedLoad)
{
TrackerClient trackerClient;

Expand All @@ -97,15 +112,16 @@ public static TrackerClient createTrackerClient(URI uri,
switch (loadBalancerStrategyName)
{
case (DegraderLoadBalancerStrategyV3.DEGRADER_STRATEGY_NAME):
trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName, transportClient, clock, doNotSlowStart);
trackerClient = createDegraderTrackerClient(uri, uriProperties, serviceProperties, loadBalancerStrategyName,
transportClient, clock, doNotSlowStart, enableServerReportedLoad);
break;
case (RelativeLoadBalancerStrategy.RELATIVE_LOAD_BALANCER_STRATEGY_NAME):
trackerClient = createTrackerClientImpl(uri, uriProperties, serviceProperties, loadBalancerStrategyName,
transportClient, clock, false, doNotSlowStart);
transportClient, clock, false, doNotSlowStart, enableServerReportedLoad);
break;
default:
trackerClient = createTrackerClientImpl(uri, uriProperties, serviceProperties, loadBalancerStrategyName,
transportClient, clock, true, doNotSlowStart);
transportClient, clock, true, doNotSlowStart, enableServerReportedLoad);
}

return trackerClient;
Expand All @@ -117,7 +133,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri,
String loadBalancerStrategyName,
TransportClient transportClient,
Clock clock,
boolean doNotSlowStart)
boolean doNotSlowStart,
boolean enableServerReportedLoad)
{
DegraderImpl.Config config = null;

Expand All @@ -144,7 +161,8 @@ private static DegraderTrackerClient createDegraderTrackerClient(URI uri,
config,
trackerClientInterval,
errorStatusPattern,
doNotSlowStart);
doNotSlowStart,
enableServerReportedLoad);
}

private static long getInterval(String loadBalancerStrategyName, ServiceProperties serviceProperties)
Expand Down Expand Up @@ -222,7 +240,8 @@ private static TrackerClientImpl createTrackerClientImpl(URI uri,
TransportClient transportClient,
Clock clock,
boolean percentileTrackingEnabled,
boolean doNotSlowStart)
boolean doNotSlowStart,
boolean enableServerReportedLoad)
{
List<HttpStatusCodeRange> errorStatusCodeRanges = getErrorStatusRanges(serviceProperties);
Predicate<Integer> isErrorStatus = (status) -> {
Expand All @@ -243,6 +262,7 @@ private static TrackerClientImpl createTrackerClientImpl(URI uri,
getInterval(loadBalancerStrategyName, serviceProperties),
isErrorStatus,
percentileTrackingEnabled,
doNotSlowStart);
doNotSlowStart,
enableServerReportedLoad);
}
}