Skip to content

Commit

Permalink
Only change RPC response topic with supported topic hot swap
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 4, 2023
1 parent 13d265e commit e69b2e8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
6 changes: 4 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Expand Up @@ -155,8 +155,10 @@ abstract class BrokerClient(
return topics[topic]?.keys?.get(key)
}

internal fun createResponseTopic(topic: String): String = "$topic.responses"
internal fun createResponseKey(key: String): String = "$key.response"
internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

internal fun toResponseKey(key: String): String = "$key.response"

private fun onTopicMessage(
topic: String,
Expand Down
7 changes: 6 additions & 1 deletion latte/src/main/java/gg/beemo/latte/broker/BrokerMessage.kt
Expand Up @@ -6,4 +6,9 @@ data class BrokerMessage<T>(
val key: String,
val value: T,
val headers: BaseBrokerMessageHeaders
)
) {

val messageId: String
get() = headers.messageId

}
10 changes: 5 additions & 5 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt
Expand Up @@ -186,8 +186,8 @@ class RpcClient<RequestT, ResponseT>(
private val requestConsumer = client.consumer(topic, key, options, requestType, responseIsNullable) {
val result = callback(it)
val responseProducer = client.producer(
client.createResponseTopic(topic),
client.createResponseKey(key),
client.toResponseTopic(topic),
client.toResponseKey(key),
options,
responseType,
responseIsNullable,
Expand All @@ -197,7 +197,7 @@ class RpcClient<RequestT, ResponseT>(
result,
services = setOf(it.headers.sourceService),
instances = setOf(it.headers.sourceInstance),
inReplyTo = it.headers.messageId,
inReplyTo = it.messageId,
)
responseProducer.destroy()
}
Expand Down Expand Up @@ -230,8 +230,8 @@ class RpcClient<RequestT, ResponseT>(
val messageId = AtomicReference<String?>(null)

val responseConsumer = client.consumer(
client.createResponseTopic(topic),
client.createResponseKey(key),
client.toResponseTopic(topic),
client.toResponseKey(key),
options,
responseType,
responseIsNullable,
Expand Down

0 comments on commit e69b2e8

Please sign in to comment.