Skip to content

Commit

Permalink
Add Support for User Queue Level Routing (#176)
Browse files Browse the repository at this point in the history
* Add User Queue Level Routing

* PR comments

* Bump version to 1.9.0
  • Loading branch information
akhurana001 committed Aug 29, 2022
1 parent 3d3159e commit 8a4c747
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 39 deletions.
2 changes: 1 addition & 1 deletion baseapp/pom.xml
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.9</version>
<version>1.9.0</version>
<relativePath>../</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gateway-ha/pom.xml
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.9</version>
<version>1.9.0</version>
<relativePath>../</relativePath>
</parent>

Expand Down
@@ -1,8 +1,11 @@
package com.lyft.data.gateway.ha.clustermonitor;

import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH;
import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.lyft.data.gateway.ha.config.MonitorConfiguration;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
Expand All @@ -16,6 +19,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -31,6 +35,8 @@ public class ActiveClusterMonitor implements Managed {
public static final int MONITOR_TASK_DELAY_MIN = 1;
public static final int DEFAULT_THREAD_POOL_SIZE = 10;

private static final String SESSION_USER = "sessionUser";

private final List<PrestoClusterStatsObserver> clusterStatsObservers;
private final GatewayBackendManager gatewayBackendManager;
private final int connectionTimeout;
Expand Down Expand Up @@ -94,10 +100,7 @@ public void start() {
});
}

private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());
String target = backend.getProxyTo() + UI_API_STATS_PATH;
private String queryCluster(String target) {
HttpURLConnection conn = null;
try {
URL url = new URL(target);
Expand All @@ -108,22 +111,15 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
conn.connect();
int responseCode = conn.getResponseCode();
if (responseCode == HttpStatus.SC_OK) {
clusterStats.setHealthy(true);
BufferedReader reader =
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
HashMap<String, Object> result = OBJECT_MAPPER.readValue(sb.toString(), HashMap.class);
clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

return sb.toString();
} else {
log.warn("Received non 200 response, response code: {}", responseCode);
}
Expand All @@ -134,6 +130,58 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
conn.disconnect();
}
}
return null;
}

private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());

// Fetch Cluster level Stats.
String target = backend.getProxyTo() + UI_API_STATS_PATH;
String response = queryCluster(target);
if (Strings.isNullOrEmpty(response)) {
log.error("Received null/empty response for {}", target);
return clusterStats;
}
clusterStats.setHealthy(true);
try {
HashMap<String, Object> result = null;
result = OBJECT_MAPPER.readValue(response, HashMap.class);

clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

} catch (Exception e) {
log.error("Error parsing cluster stats from [{}]", response, e);
}

// Fetch User Level Stats.
Map<String, Integer> clusterUserStats = new HashMap<>();
target = backend.getProxyTo() + UI_API_QUEUED_LIST_PATH;
response = queryCluster(target);
if (Strings.isNullOrEmpty(response)) {
log.error("Received null/empty response for {}", target);
return clusterStats;
}
try {
List<Map<String, Object>> queries = OBJECT_MAPPER.readValue(response,
new TypeReference<List<Map<String, Object>>>(){});

for (Map<String, Object> q : queries) {
String user = (String) q.get(SESSION_USER);
clusterUserStats.put(user, clusterUserStats.getOrDefault(user, 0) + 1);
}
} catch (Exception e) {
log.error("Error parsing cluster user stats: {}", e);
}
clusterStats.setUserQueuedCount(clusterUserStats);

return clusterStats;
}

Expand Down
@@ -1,5 +1,7 @@
package com.lyft.data.gateway.ha.clustermonitor;

import java.util.Map;

import lombok.Data;
import lombok.ToString;

Expand All @@ -15,4 +17,5 @@ public class ClusterStats {
private String proxyTo;
private String externalUrl;
private String routingGroup;
private Map<String, Integer> userQueuedCount;
}
Expand Up @@ -22,6 +22,8 @@ public void observe(List<ClusterStats> stats) {
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>();
Map<String, Map<String, Integer>> clusterRunningMap
= new HashMap<String, Map<String, Integer>>();
Map<String, Map<String, Integer>> userClusterQueuedCount
= new HashMap<>();

for (ClusterStats stat : stats) {
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) {
Expand All @@ -43,8 +45,16 @@ public void observe(List<ClusterStats> stats) {
clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
stat.getRunningQueryCount());
}

// Create inverse map from user -> {cluster-> count}
for (Map.Entry<String, Integer> queueCount : stat.getUserQueuedCount().entrySet()) {
Map<String, Integer> clusterQueue = userClusterQueuedCount.getOrDefault(queueCount.getKey(),
new HashMap<>());
clusterQueue.put(stat.getClusterId(), queueCount.getValue());
userClusterQueuedCount.put(queueCount.getKey(), clusterQueue);
}
}

routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap);
routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueuedCount);
}
}
Expand Up @@ -35,6 +35,7 @@ public class QueryIdCachingProxyHandler extends ProxyHandler {
public static final String V1_QUERY_PATH = "/v1/query";
public static final String V1_INFO_PATH = "/v1/info";
public static final String UI_API_STATS_PATH = "/ui/api/stats";
public static final String UI_API_QUEUED_LIST_PATH = "/ui/api/query?state=QUEUED";
public static final String PRESTO_UI_PATH = "/ui";
public static final String USER_HEADER = "X-Trino-User";
public static final String ALTERNATE_USER_HEADER = "X-Presto-User";
Expand Down Expand Up @@ -121,11 +122,13 @@ public String rewriteTarget(HttpServletRequest request) {
backendAddress = routingManager.findBackendForQueryId(queryId);
} else {
String routingGroup = routingGroupSelector.findRoutingGroup(request);
String user = Optional.ofNullable(request.getHeader(USER_HEADER))
.orElse(request.getHeader(ALTERNATE_USER_HEADER));
if (!Strings.isNullOrEmpty(routingGroup)) {
// This falls back on adhoc backend if there are no cluster found for the routing group.
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup);
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup, user);
} else {
backendAddress = routingManager.provideAdhocBackend();
backendAddress = routingManager.provideAdhocBackend(user);
}
}
// set target backend so that we could save queryId to backend mapping later.
Expand Down
@@ -1,5 +1,6 @@
package com.lyft.data.gateway.ha.router;

import com.google.common.base.Strings;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -14,6 +15,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -35,6 +37,8 @@ public class PrestoQueueLengthRoutingTable extends HaRoutingManager {
private ConcurrentHashMap<String, Integer> routingGroupWeightSum;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> clusterQueueLengthMap;

private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> userClusterQueueLengthMap;

private Map<String, TreeMap<Integer, String>> weightedDistributionRouting;

/**
Expand All @@ -47,6 +51,7 @@ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager
routingGroupWeightSum = new ConcurrentHashMap<String, Integer>();
clusterQueueLengthMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>>();
weightedDistributionRouting = new HashMap<String, TreeMap<Integer, String>>();
userClusterQueueLengthMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -182,7 +187,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
/**
* Update the Routing Table only if a previously known backend has been deactivated.
* Newly added backends are handled through
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map)}
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map, Map)}
* updateRoutingTable}
*/
public void updateRoutingTable(String routingGroup, Set<String> backends) {
Expand Down Expand Up @@ -212,11 +217,21 @@ public void updateRoutingTable(String routingGroup, Set<String> backends) {
* Update routing Table with new Queue Lengths.
*/
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap,
Map<String, Map<String, Integer>> updatedRunningLengthMap) {
Map<String, Map<String, Integer>> updatedRunningLengthMap,
Map<String, Map<String, Integer>> updatedUserQueueLengthMap) {
synchronized (lockObject) {
log.debug("Update Routing table with new cluster queue lengths : [{}]",
updatedQueueLengthMap.toString());
clusterQueueLengthMap.clear();
userClusterQueueLengthMap.clear();

if (updatedUserQueueLengthMap != null) {
for (String user : updatedUserQueueLengthMap.keySet()) {
ConcurrentHashMap<String, Integer> clusterQueueMap =
new ConcurrentHashMap<>(updatedUserQueueLengthMap.get(user));
userClusterQueueLengthMap.put(user, clusterQueueMap);
}
}

for (String grp : updatedQueueLengthMap.keySet()) {
if (grp == null) {
Expand All @@ -227,7 +242,8 @@ public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLen
int maxQueueLen = Collections.max(updatedQueueLengthMap.get(grp).values());
int minQueueLen = Collections.min(updatedQueueLengthMap.get(grp).values());

if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1) {
if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1
&& updatedRunningLengthMap.containsKey(grp)) {
log.info("Queue lengths equal: {} for all clusters in the group {}."
+ " Falling back to Running Counts : {}", maxQueueLen, grp,
updatedRunningLengthMap.get(grp));
Expand Down Expand Up @@ -268,9 +284,40 @@ public Map<String, Integer> getInternalClusterQueueLength(String routingGroup) {
}

/**
* Looks up the closest weight to random number generated for a given routing group.
* Find the cluster with least user queue else fall back to overall cluster weight based routing.
*/
public String getEligibleBackEnd(String routingGroup) {
public String getEligibleBackEnd(String routingGroup, String user) {

// Route to the least queued backend for the user out of all backends for that group
if (!Strings.isNullOrEmpty(user)) {
Map<String, Integer> clusterQueueCountForUser = userClusterQueueLengthMap.get(user);

if (clusterQueueCountForUser != null && !clusterQueueCountForUser.isEmpty()) {
Set<String> backends = clusterQueueLengthMap.get(routingGroup).keySet();
String leastQueuedCluster = null;
Integer minQueueCount = Integer.MAX_VALUE;
Integer maxQueueCount = Integer.MIN_VALUE;
for (String b : backends) {
// If missing, we assume no queued queries for the user on that cluster.
Integer queueCount = clusterQueueCountForUser.getOrDefault(b, 0);

if (queueCount < minQueueCount) {
leastQueuedCluster = b;
minQueueCount = queueCount;
}
if (queueCount > maxQueueCount) {
maxQueueCount = queueCount;
}
}
// If all clusters have the same queue count, then fallback to the older weighted logic.
if (!Strings.isNullOrEmpty(leastQueuedCluster) && minQueueCount != maxQueueCount) {
log.debug("{} routing to:{}. userQueueCount:{}", user, leastQueuedCluster, minQueueCount);

return leastQueuedCluster;
}
}
}
// Looks up the closest weight to random number generated for a given routing group.
if (routingGroupWeightSum.containsKey(routingGroup)
&& weightedDistributionRouting.containsKey(routingGroup)) {
int rnd = RANDOM.nextInt(routingGroupWeightSum.get(routingGroup));
Expand All @@ -285,20 +332,20 @@ public String getEligibleBackEnd(String routingGroup) {
* backend is found.
*/
@Override
public String provideBackendForRoutingGroup(String routingGroup) {
public String provideBackendForRoutingGroup(String routingGroup, String user) {
List<ProxyBackendConfiguration> backends =
getGatewayBackendManager().getActiveBackends(routingGroup);

if (backends.isEmpty()) {
return provideAdhocBackend();
return provideAdhocBackend(user);
}
Map<String, String> proxyMap = new HashMap<>();
for (ProxyBackendConfiguration backend : backends) {
proxyMap.put(backend.getName(), backend.getProxyTo());
}

updateRoutingTable(routingGroup, proxyMap.keySet());
String clusterId = getEligibleBackEnd(routingGroup);
String clusterId = getEligibleBackEnd(routingGroup, user);
log.debug("Routing to eligible backend : [{}] for routing group: [{}]",
clusterId, routingGroup);

Expand All @@ -318,7 +365,7 @@ public String provideBackendForRoutingGroup(String routingGroup) {
* <p>d.
*/
@Override
public String provideAdhocBackend() {
public String provideAdhocBackend(String user) {
Map<String, String> proxyMap = new HashMap<>();
List<ProxyBackendConfiguration> backends = getGatewayBackendManager().getActiveAdhocBackends();
if (backends.size() == 0) {
Expand All @@ -331,7 +378,7 @@ public String provideAdhocBackend() {

updateRoutingTable("adhoc", proxyMap.keySet());

String clusterId = getEligibleBackEnd("adhoc");
String clusterId = getEligibleBackEnd("adhoc", user);
log.debug("Routing to eligible backend : " + clusterId + " for routing group: adhoc");
if (clusterId != null) {
return proxyMap.get(clusterId);
Expand Down
Expand Up @@ -61,7 +61,7 @@ public void setBackendForQueryId(String queryId, String backend) {
*
* @return
*/
public String provideAdhocBackend() {
public String provideAdhocBackend(String user) {
List<ProxyBackendConfiguration> backends = this.gatewayBackendManager.getActiveAdhocBackends();
if (backends.size() == 0) {
throw new IllegalStateException("Number of active backends found zero");
Expand All @@ -76,11 +76,11 @@ public String provideAdhocBackend() {
*
* @return
*/
public String provideBackendForRoutingGroup(String routingGroup) {
public String provideBackendForRoutingGroup(String routingGroup, String user) {
List<ProxyBackendConfiguration> backends =
gatewayBackendManager.getActiveBackends(routingGroup);
if (backends.isEmpty()) {
return provideAdhocBackend();
return provideAdhocBackend(user);
}
int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
return backends.get(backendId).getProxyTo();
Expand Down

0 comments on commit 8a4c747

Please sign in to comment.