Skip to content

Commit

Permalink
Added throttling limitation
Browse files Browse the repository at this point in the history
Signed-off-by: vhnes <ext-vladyslav.hnes@here.com>
  • Loading branch information
VladyslavHnes committed May 14, 2024
1 parent c5e042a commit 4707b21
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public RpcContext execute(final Marker marker, final Event event, final boolean
final boolean expectBinaryResponse = expectBinaryResponse(event);
final String eventJson = event.serialize();
final byte[] eventBytes = eventJson.getBytes();
final RpcContext context = new RpcContext(connector.getMaxConnectionsPerClient()).withRequestSize(eventBytes.length);
final RpcContext context = new RpcContext(connector).withRequestSize(eventBytes.length);

//Check whether the event type is allowed on the connector
String region = Service.configuration == null ? null : Service.configuration.AWS_REGION;
Expand Down Expand Up @@ -359,7 +359,7 @@ public RpcContext send(final Marker marker, @SuppressWarnings("rawtypes") final
final Connector connector = getConnector();
injectConnectorParams(event, connector);
final byte[] eventBytes = event.toByteArray();
RpcContext context = new RpcContext(connector.getMaxConnectionsPerClient()).withRequestSize(eventBytes.length);
RpcContext context = new RpcContext(connector).withRequestSize(eventBytes.length);
invokeWithRelocation(marker, context, eventBytes, true, false, r -> {
if (r.failed()) {
if (r.cause() instanceof HttpException
Expand Down Expand Up @@ -623,22 +623,22 @@ public static class RpcContext {
private int responseSize = -1;
private volatile boolean cancelled = false;

private final int maxConnectionsPerClient;
private final Connector connector;

private static final ConcurrentHashMap<String, AtomicInteger> clientIdToConnectionsMap = new ConcurrentHashMap<>();
private String clientId;

private FunctionCall functionCall;

public RpcContext(int maxConnectionsPerClient) {
this.maxConnectionsPerClient = maxConnectionsPerClient;
public RpcContext(Connector connector) {
this.connector = connector;
}

public RpcContext enableThrottling(String clientId) throws HttpException {
if(clientId != null) {
this.clientId = clientId;
AtomicInteger connectionCount = clientIdToConnectionsMap.computeIfAbsent(clientId, (key) -> new AtomicInteger(0));
if(!compareAndIncrementUpTo(maxConnectionsPerClient, connectionCount)) {
if(!compareAndIncrementUpTo(connector.getMaxConnectionsPerClient(), connectionCount)) {
throw new HttpException(TOO_MANY_REQUESTS, "Too many requests for the service node.");
}
}
Expand Down

0 comments on commit 4707b21

Please sign in to comment.