Skip to content

Commit

Permalink
Simplify ratelimit handling
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 4, 2023
1 parent e69b2e8 commit e0d01b6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 35 deletions.
17 changes: 15 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt
Expand Up @@ -8,6 +8,7 @@ import gg.beemo.latte.util.MoshiJsLongAdapter
import gg.beemo.latte.util.MoshiUnitAdapter
import gg.beemo.latte.util.SuspendingCountDownLatch
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -184,7 +185,11 @@ class RpcClient<RequestT, ResponseT>(
private val requestProducer = client.producer(topic, key, options, requestType, requestIsNullable)

private val requestConsumer = client.consumer(topic, key, options, requestType, responseIsNullable) {
val result = callback(it)
val result = try {
callback(it)
} catch (_: IgnoreRpcRequest) {
return@consumer
}
val responseProducer = client.producer(
client.toResponseTopic(topic),
client.toResponseKey(key),
Expand Down Expand Up @@ -254,7 +259,11 @@ class RpcClient<RequestT, ResponseT>(
messageId.set(requestProducer.send(request, services, instances))

if (timeoutLatch != null) {
timeoutLatch.awaitThrowing(timeout)
try {
timeoutLatch.awaitThrowing(timeout)
} catch (_: TimeoutCancellationException) {
throw RpcRequestTimeout("RPC request timed out after $timeout")
}
} else {
delay(timeout)
}
Expand All @@ -269,3 +278,7 @@ class RpcClient<RequestT, ResponseT>(
}

}

sealed class BrokerException(message: String?) : Exception(message)
class RpcRequestTimeout(message: String) : BrokerException(message)
class IgnoreRpcRequest : BrokerException(null)
@@ -1,15 +1,27 @@
package gg.beemo.latte.ratelimit

import com.squareup.moshi.Json
import com.squareup.moshi.JsonClass

object SharedRatelimitData {

const val RATELIMIT_TOPIC = "ratelimit"

const val KEY_REQUEST_IDENTIFY_QUOTA = "quota.identify.request"
const val KEY_REQUEST_GLOBAL_QUOTA = "quota.global.request"
const val KEY_REQUEST_QUOTA = "quota.request"

@JsonClass(generateAdapter = true)
class RatelimitClientData(val discordClientId: String?, val requestExpiresAt: Long? = null)
class RatelimitRequestData(
val type: RatelimitType,
val discordClientId: String?,
val requestExpiresAt: Long? = null,
)

enum class RatelimitType {
@Json(name = "global")
GLBOAL,

@Json(name = "identify")
IDENTIFY,
}

}
55 changes: 25 additions & 30 deletions vanilla/src/main/java/gg/beemo/vanilla/KafkaRatelimitClient.kt
Expand Up @@ -2,10 +2,9 @@ package gg.beemo.vanilla

import gg.beemo.latte.broker.BrokerClient
import gg.beemo.latte.broker.BrokerConnection
import gg.beemo.latte.broker.BrokerMessage
import gg.beemo.latte.broker.IgnoreRpcRequest
import gg.beemo.latte.logging.Log
import gg.beemo.latte.ratelimit.SharedRatelimitData
import gg.beemo.latte.ratelimit.SharedRatelimitData.RatelimitClientData
import gg.beemo.latte.util.SuspendingRatelimit
import java.util.concurrent.ConcurrentHashMap
import kotlin.time.Duration
Expand All @@ -21,37 +20,33 @@ class RatelimitClient(connection: BrokerConnection) : BrokerClient(connection) {
private val identifyRatelimitProvider = RatelimitProvider(1, 5.seconds)

init {
rpc<RatelimitClientData, Unit>(
rpc<SharedRatelimitData.RatelimitRequestData, Unit>(
SharedRatelimitData.RATELIMIT_TOPIC,
SharedRatelimitData.KEY_REQUEST_GLOBAL_QUOTA,
SharedRatelimitData.KEY_REQUEST_QUOTA,
) {
handleRatelimitRequest(it, globalRatelimitProvider, "global")
val msg = it.value
val type = msg.type
val clientId = msg.discordClientId ?: "default"
val service = "${it.headers.sourceService}/${it.headers.sourceInstance} (${clientId})"
val expiresAt = msg.requestExpiresAt

if (expiresAt != null && (expiresAt + EXPIRY_GRACE_PERIOD) < System.currentTimeMillis()) {
log.info("Incoming expired $type quota request from service $service, ignoring")
// If the request has already expired, ignore it to not eat quotas unnecessarily
throw IgnoreRpcRequest()
}

log.debug("Incoming {} quota request from service {}", type, service)
val provider = when (msg.type) {
SharedRatelimitData.RatelimitType.GLBOAL -> globalRatelimitProvider
SharedRatelimitData.RatelimitType.IDENTIFY -> identifyRatelimitProvider
else -> throw IllegalArgumentException("Unknown ratelimit type ${msg.type}")
}

provider.getClientRatelimit(clientId).requestQuota()

log.debug("Granted {} quota request for service {}", type, service)
}

rpc<RatelimitClientData, Unit>(
SharedRatelimitData.RATELIMIT_TOPIC,
SharedRatelimitData.KEY_REQUEST_IDENTIFY_QUOTA,
) {
handleRatelimitRequest(it, identifyRatelimitProvider, "identify")
}
}

private suspend fun handleRatelimitRequest(
msg: BrokerMessage<RatelimitClientData>,
ratelimitProvider: RatelimitProvider,
type: String,
) {
val clientId = msg.value.discordClientId ?: "default"
val service = "${msg.headers.sourceService}/${msg.headers.sourceInstance} (${clientId})"
val expiresAt = msg.value.requestExpiresAt
if (expiresAt != null && (expiresAt + EXPIRY_GRACE_PERIOD) < System.currentTimeMillis()) {
log.info("Incoming expired $type quota request from service $service, ignoring")
// If the request has already expired, ignore it to not eat quotas unnecessarily
return
}
log.debug("Incoming {} quota request from service {}", type, service)
ratelimitProvider.getClientRatelimit(clientId).requestQuota()
log.debug("Granted {} quota request for service {}", type, service)
}

}
Expand Down

0 comments on commit e0d01b6

Please sign in to comment.