Skip to content

Commit

Permalink
[WIP] Make header class more flexible and extendable
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 11, 2023
1 parent e0d01b6 commit aad14e6
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 108 deletions.

This file was deleted.

8 changes: 4 additions & 4 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Expand Up @@ -30,7 +30,7 @@ abstract class BrokerClient(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
noinline callback: suspend CoroutineScope.(BrokerMessage<T>) -> Unit,
noinline callback: suspend CoroutineScope.(BaseBrokerMessage<T>) -> Unit,
): ConsumerSubclient<T> {
return consumer(topic, key, options, T::class.java, null is T, callback)
}
Expand All @@ -42,7 +42,7 @@ abstract class BrokerClient(
options: BrokerClientOptions = BrokerClientOptions(),
type: Class<T>,
isNullable: Boolean,
callback: suspend CoroutineScope.(BrokerMessage<T>) -> Unit,
callback: suspend CoroutineScope.(BaseBrokerMessage<T>) -> Unit,
): ConsumerSubclient<T> {
log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also {
Expand Down Expand Up @@ -76,7 +76,7 @@ abstract class BrokerClient(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
noinline callback: suspend CoroutineScope.(BrokerMessage<RequestT>) -> ResponseT,
noinline callback: suspend CoroutineScope.(BaseRpcRequestMessage<RequestT, ResponseT>) -> Pair<RpcStatus, ResponseT>,
): RpcClient<RequestT, ResponseT> {
return RpcClient(
this,
Expand Down Expand Up @@ -164,7 +164,7 @@ abstract class BrokerClient(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
headers: BrokerMessageHeaders,
) {
val metadata = getExistingKeyMetadata(topic, key) ?: return
for (consumer in metadata.consumers) {
Expand Down
18 changes: 6 additions & 12 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
Expand Up @@ -4,7 +4,7 @@ import gg.beemo.latte.logging.Log
import java.util.*

fun interface TopicListener {
fun onMessage(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders)
fun onMessage(topic: String, key: String, value: String, headers: BrokerMessageHeaders)
}

typealias MessageId = String
Expand All @@ -28,14 +28,14 @@ abstract class BrokerConnection {
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
headers: BrokerMessageHeaders,
): MessageId

internal suspend fun send(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
headers: BrokerMessageHeaders,
): MessageId {
log.trace(
"Sending message {} with key '{}' in topic '{}' with value: {}",
Expand All @@ -47,12 +47,6 @@ abstract class BrokerConnection {
return abstractSend(topic, key, value, headers)
}

internal abstract fun createHeaders(
targetServices: Set<String> = emptySet(),
targetInstances: Set<String> = emptySet(),
inReplyTo: MessageId? = null,
): BaseBrokerMessageHeaders

protected abstract fun createTopic(topic: String)
protected abstract fun removeTopic(topic: String)

Expand Down Expand Up @@ -82,7 +76,7 @@ abstract class BrokerConnection {
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders
headers: BrokerMessageHeaders
) {
if (
(headers.targetServices.isNotEmpty() && serviceName !in headers.targetServices) ||
Expand All @@ -104,7 +98,7 @@ abstract class BrokerConnection {
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders
headers: BrokerMessageHeaders
): Boolean {
val targetServices = headers.targetServices
val targetInstances = headers.targetInstances
Expand All @@ -129,7 +123,7 @@ abstract class BrokerConnection {
)
}

private fun invokeLocalCallbacks(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders) {
private fun invokeLocalCallbacks(topic: String, key: String, value: String, headers: BrokerMessageHeaders) {
log.trace(
"Dispatching message {} with key '{}' in topic '{}' to local listeners with value: {}",
headers.messageId,
Expand Down
29 changes: 26 additions & 3 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
@@ -1,14 +1,37 @@
package gg.beemo.latte.broker

data class BrokerMessage<T>(
val client: BrokerClient,
open class BrokerMessage<T, H : BrokerMessageHeaders>(
val topic: String,
val key: String,
val value: T,
val headers: BaseBrokerMessageHeaders
val headers: H
) {

val messageId: String
get() = headers.messageId

internal fun <ResponseT> toRpcRequestMessage(
updateSender: suspend (ResponseT, RpcStatus) -> Unit,
): RpcRequestMessage<T, ResponseT, H> {
return RpcRequestMessage(topic, key, value, headers, updateSender)
}

}

typealias BaseBrokerMessage<T> = BrokerMessage<T, BrokerMessageHeaders>
typealias BaseRpcRequestMessage<RequestT, ResponseT> = RpcRequestMessage<RequestT, ResponseT, BrokerMessageHeaders>
typealias RpcResponseMessage<T> = BrokerMessage<T, RpcMessageHeaders>

class RpcRequestMessage<RequestT, ResponseT, H : BrokerMessageHeaders>(
topic: String,
key: String,
value: RequestT,
headers: H,
private val updateSender: suspend (ResponseT, RpcStatus) -> Unit,
) : BrokerMessage<RequestT, H>(topic, key, value, headers) {

suspend fun sendUpdate(response: ResponseT, status: RpcStatus) {
updateSender(response, status)
}

}
91 changes: 91 additions & 0 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt
@@ -0,0 +1,91 @@
package gg.beemo.latte.broker

import java.util.UUID

open class BrokerMessageHeaders(val headers: Map<String, String>) {

val sourceService: String by lazy {
headers.getOrThrow(HEADER_SOURCE_SERVICE)
}
val sourceInstance: String by lazy {
headers.getOrThrow(HEADER_SOURCE_INSTANCE)
}
val targetServices: Set<String> by lazy {
splitToSet(headers.getOrDefault(HEADER_TARGET_SERVICES, ""))
}
val targetInstances: Set<String> by lazy {
splitToSet(headers.getOrDefault(HEADER_TARGET_INSTANCES, ""))
}
val messageId: MessageId by lazy {
headers.getOrThrow(HEADER_MESSAGE_ID)
}

constructor(
sourceService: String,
sourceInstance: String,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
createHeadersMap(
sourceService,
sourceInstance,
targetServices,
targetInstances,
null,
)
)

constructor(
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
)

companion object {

private const val HEADER_SOURCE_SERVICE = "source-service"
private const val HEADER_SOURCE_INSTANCE = "source-instance"
private const val HEADER_TARGET_SERVICES = "target-services"
private const val HEADER_TARGET_INSTANCES = "target-instances"
private const val HEADER_MESSAGE_ID = "message-id"

@JvmStatic
protected fun createHeadersMap(
sourceService: String,
sourceInstance: String,
targetServices: Set<String>,
targetInstances: Set<String>,
messageId: MessageId?,
extra: Map<String, String> = emptyMap(),
): Map<String, String> {
return mapOf(
HEADER_SOURCE_SERVICE to sourceService,
HEADER_SOURCE_INSTANCE to sourceInstance,
HEADER_TARGET_SERVICES to joinToString(targetServices),
HEADER_TARGET_INSTANCES to joinToString(targetInstances),
HEADER_MESSAGE_ID to (messageId ?: UUID.randomUUID().toString()),
)
}

@JvmStatic
protected fun splitToSet(value: String): Set<String> {
return value.split(",").filter { it.isNotEmpty() }.toSet()
}

@JvmStatic
protected fun joinToString(value: Set<String>): String {
return value.joinToString(",")
}

}

}

internal fun Map<String, String>.getOrThrow(key: String): String {
return get(key) ?: throw IllegalArgumentException("Missing broker message header '$key'")
}

0 comments on commit aad14e6

Please sign in to comment.