Skip to content

Commit

Permalink
Extract RPC code to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Jan 29, 2024
1 parent c5cf147 commit 3c38bc4
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 194 deletions.
3 changes: 3 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
@@ -1,5 +1,8 @@
package gg.beemo.latte.broker

import gg.beemo.latte.broker.rpc.BaseRpcRequestMessage
import gg.beemo.latte.broker.rpc.RpcClient
import gg.beemo.latte.broker.rpc.RpcResponse
import gg.beemo.latte.logging.Log
import kotlinx.coroutines.*
import java.util.Collections
Expand Down
28 changes: 5 additions & 23 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
@@ -1,5 +1,10 @@
package gg.beemo.latte.broker

import gg.beemo.latte.broker.rpc.RpcMessageHeaders
import gg.beemo.latte.broker.rpc.RpcRequestMessage
import gg.beemo.latte.broker.rpc.RpcResponseMessage
import gg.beemo.latte.broker.rpc.RpcStatus

open class BrokerMessage<T, H : BrokerMessageHeaders>(
val topic: String,
val key: String,
Expand All @@ -24,26 +29,3 @@ open class BrokerMessage<T, H : BrokerMessageHeaders>(

typealias AbstractBrokerMessage<T> = BrokerMessage<T, out BrokerMessageHeaders>
typealias BaseBrokerMessage<T> = BrokerMessage<T, BrokerMessageHeaders>
typealias BaseRpcRequestMessage<RequestT, ResponseT> = RpcRequestMessage<RequestT, ResponseT, BrokerMessageHeaders>

class RpcRequestMessage<RequestT, ResponseT, H : BrokerMessageHeaders>(
topic: String,
key: String,
value: RequestT,
headers: H,
private val updateSender: suspend (RpcStatus, ResponseT) -> Unit,
) : BrokerMessage<RequestT, H>(topic, key, value, headers) {

suspend fun sendUpdate(status: RpcStatus, response: ResponseT) {
updateSender(status, response)
}

}

class RpcResponseMessage<ResponseT>(topic: String, key: String, value: ResponseT, headers: RpcMessageHeaders) :
BrokerMessage<ResponseT, RpcMessageHeaders>(topic, key, value, headers) {

val status: RpcStatus
get() = headers.status

}
2 changes: 2 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/Exceptions.kt
@@ -1,5 +1,7 @@
package gg.beemo.latte.broker

import gg.beemo.latte.broker.rpc.RpcStatus

sealed class BrokerException(message: String?) : Exception(message)
class RpcRequestTimeout(message: String) : BrokerException(message)
class IgnoreRpcRequest : BrokerException("Ignoring RPC request")
Expand Down
171 changes: 171 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/Subclients.kt
@@ -0,0 +1,171 @@
package gg.beemo.latte.broker

import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.Moshi
import gg.beemo.latte.broker.rpc.RpcMessageHeaders
import gg.beemo.latte.logging.Log
import gg.beemo.latte.util.MoshiInstantAdapter
import gg.beemo.latte.util.MoshiJsLongAdapter
import gg.beemo.latte.util.MoshiUnitAdapter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import java.time.Instant

data class BrokerClientOptions(
val useSafeJsLongs: Boolean = false,
)

abstract class BaseSubclient(
protected val connection: BrokerConnection,
protected val client: BrokerClient,
val topic: String,
val key: String,
protected val options: BrokerClientOptions,
) {

internal abstract fun destroy()

protected fun <T> createMoshiAdapter(type: Class<T>): JsonAdapter<T?> {
val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi
return mochi.adapter(type).nullSafe()
}

companion object {
private val baseMoshi: Moshi = Moshi.Builder()
.add(Unit::class.java, MoshiUnitAdapter())
.add(Instant::class.java, MoshiInstantAdapter())
.build()
private val safeJsMoshi: Moshi = baseMoshi
.newBuilder()
.add(Long::class.java, MoshiJsLongAdapter())
.build()
}

}

class ProducerSubclient<T>(
connection: BrokerConnection,
client: BrokerClient,
topic: String,
key: String,
options: BrokerClientOptions,
requestType: Class<T>,
private val isNullable: Boolean,
) : BaseSubclient(
connection,
client,
topic,
key,
options,
) {

private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(requestType)

override fun destroy() {
client.deregisterProducer(this)
}

suspend fun send(
data: T,
services: Set<String> = emptySet(),
instances: Set<String> = emptySet(),
): MessageId {
val msg = BrokerMessage(
topic,
key,
data,
BrokerMessageHeaders(
connection,
targetServices = services,
targetInstances = instances,
),
)
@Suppress("UNCHECKED_CAST")
return internalSend(msg as AbstractBrokerMessage<T?>)
}

internal suspend fun internalSend(msg: AbstractBrokerMessage<T?>, bypassNullCheck: Boolean = false): MessageId {
if (!bypassNullCheck && !isNullable) {
requireNotNull(msg.value) {
"Cannot send null message for non-nullable type with key '$key' in topic '$topic'"
}
}
val strigifiedData = stringifyOutgoing(msg.value)
log.trace(
"Sending message {} with key '{}' in topic '{}' with value: {}",
msg.messageId,
key,
topic,
strigifiedData,
)
return connection.send(topic, key, strigifiedData, msg.headers)
}

private fun stringifyOutgoing(data: T?): String {
return adapter.toJson(data)
}

}

class ConsumerSubclient<T>(
connection: BrokerConnection,
client: BrokerClient,
topic: String,
key: String,
options: BrokerClientOptions,
incomingType: Class<T>,
private val isNullable: Boolean,
private val callback: suspend CoroutineScope.(BaseBrokerMessage<T>) -> Unit,
) : BaseSubclient(
connection,
client,
topic,
key,
options,
) {

private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(incomingType)

override fun destroy() {
client.deregisterConsumer(this)
}

internal suspend fun onIncomingMessage(
value: String,
headers: BrokerMessageHeaders,
) = coroutineScope {
val data = parseIncoming(value)
// Disable nullability enforcement for RPC exceptions. The caller has to deal with the unsafe typing now.
if (!isNullable && (headers !is RpcMessageHeaders || !headers.isException)) {
checkNotNull(data) {
"Received null message for non-nullable type with key '$key' in topic '$topic'"
}
}
val message = BrokerMessage(topic, key, data, headers)
log.trace(
"Received message {} with key '{}' in topic '{}' with value: {}",
headers.messageId,
key,
topic,
value,
)
@Suppress("UNCHECKED_CAST") // Safe due to above null validation
val brokerMessage = message as BaseBrokerMessage<T>
try {
callback(brokerMessage)
} catch (ex: Exception) {
log.error(
"Uncaught consumer callback error while processing message ${headers.messageId} " +
"with key '$key' in topic '$topic'",
ex,
)
}
}

private fun parseIncoming(json: String): T? {
return adapter.fromJson(json)
}

}

0 comments on commit 3c38bc4

Please sign in to comment.