Skip to content

Commit

Permalink
[latte] Fix BrokerClient topic handling (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Feb 21, 2024
1 parent b708237 commit 852c121
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 67 deletions.
191 changes: 132 additions & 59 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Expand Up @@ -6,18 +6,8 @@ import gg.beemo.latte.broker.rpc.RpcResponse
import gg.beemo.latte.logging.Log
import kotlinx.coroutines.*
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean

private class TopicMetadata(
val topic: String,
val keys: MutableMap<String, KeyMetadata>,
var connectionListener: TopicListener? = null,
)

private class KeyMetadata(
val topic: TopicMetadata,
val producers: MutableSet<ProducerSubclient<*>>,
val consumers: MutableSet<ConsumerSubclient<*>>,
)

abstract class BrokerClient(
@PublishedApi
Expand Down Expand Up @@ -48,7 +38,7 @@ abstract class BrokerClient(
): ConsumerSubclient<T> {
log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also {
registerConsumer(it)
registerSubclient(it)
}
}

Expand All @@ -70,7 +60,7 @@ abstract class BrokerClient(
): ProducerSubclient<T> {
log.debug("Creating producer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ProducerSubclient(connection, this, topic, key, options, type, isNullable).also {
registerProducer(it)
registerSubclient(it)
}
}

Expand All @@ -93,82 +83,149 @@ abstract class BrokerClient(
)
}

fun destroy(cancelScope: Boolean = true) {
val producers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.producers } }
val consumers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.consumers } }
producers.forEach {
it.destroy()
private fun registerSubclient(subclient: BaseSubclient) {
val topic = subclient.topic
val metadata = topics.computeIfAbsent(topic) {
TopicMetadata(connection, consumerScope, topic)
}
metadata.registerSubclient(subclient)
}

internal fun deregisterSubclient(subclient: BaseSubclient) {
val topic = subclient.topic
topics[topic]?.let {
it.deregisterSubclient(subclient)
if (it.isEmpty) {
it.destroy()
topics.remove(topic)
}
}
consumers.forEach {
it.destroy()
}

fun destroy(cancelScope: Boolean = true) {
log.debug("Destroying BrokerClient of type {} with active topics {}", javaClass.simpleName, topics.keys)
while (topics.isNotEmpty()) {
val topic = topics.keys.first()
topics[topic]?.destroy()
topics.remove(topic)
}
topics.clear()
if (cancelScope) {
consumerScope.cancel()
}
}

private fun registerProducer(producer: ProducerSubclient<*>) {
val metadata = getOrCreateKeyMetadata(producer.topic, producer.key)
metadata.producers.add(producer)
}
internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

private fun registerConsumer(consumer: ConsumerSubclient<*>) {
val metadata = getOrCreateKeyMetadata(consumer.topic, consumer.key)
if (metadata.consumers.isEmpty() && metadata.topic.connectionListener == null) {
// New consumer - create a new connection listener for this topic
val listener = TopicListener { topic, key, value, headers ->
onTopicMessage(topic, key, value, headers)
}
connection.on(consumer.topic, listener)
metadata.topic.connectionListener = listener
internal fun toResponseKey(key: String): String = "$key.response"

}

private class TopicMetadata(
private val connection: BrokerConnection,
private val consumerScope: CoroutineScope,
private val topic: String,
) {

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
metadata.consumers.add(consumer)
}

internal fun deregisterProducer(producer: ProducerSubclient<*>) {
log.debug("Removing producer for key '{}' in topic '{}'", producer.key, producer.topic)
val metadata = getExistingKeyMetadata(producer.topic, producer.key)
metadata?.producers?.remove(producer)
private val log by Log
private val _keys: MutableMap<String, KeyMetadata> = Collections.synchronizedMap(HashMap())
private val isBeingDestroyed = AtomicBoolean(false)
val isEmpty: Boolean
get() = _keys.isEmpty()

@Volatile
private var connectionListener: TopicListener? = null

fun registerSubclient(subclient: BaseSubclient) {
log.debug(
"Adding {} for key '{}' in topic '{}'",
subclient.javaClass.simpleName,
subclient.key,
subclient.topic
)
val metadata = getOrCreateKeyMetadata(subclient.key)
when (subclient) {
is ConsumerSubclient<*> -> {
if (metadata.consumers.isEmpty() && connectionListener == null) {
log.debug("Creating new connection listener for topic '{}'", subclient.topic)
// New consumer - create a new connection listener for this topic
val listener = TopicListener { topic, key, value, headers ->
onTopicMessage(topic, key, value, headers)
}
connection.on(subclient.topic, listener)
connectionListener = listener
}
metadata.consumers.add(subclient)
}

is ProducerSubclient<*> -> {
metadata.producers.add(subclient)
}
}
}

internal fun deregisterConsumer(consumer: ConsumerSubclient<*>) {
log.debug("Removing consumer for key '{}' in topic '{}'", consumer.key, consumer.topic)
val metadata = getExistingKeyMetadata(consumer.topic, consumer.key)
if (metadata?.consumers?.remove(consumer) == true && metadata.consumers.isEmpty()) {
metadata.topic.connectionListener?.let {
connection.off(metadata.topic.topic, it)
metadata.topic.connectionListener = null
fun deregisterSubclient(subclient: BaseSubclient) {
log.debug(
"Removing {} for key '{}' in topic '{}'",
subclient.javaClass.simpleName,
subclient.key,
subclient.topic
)
val metadata = getExistingKeyMetadata(subclient.key)
metadata?.let {
when (subclient) {
is ConsumerSubclient<*> -> it.consumers.remove(subclient)
is ProducerSubclient<*> -> it.producers.remove(subclient)
}
maybeCleanupKeyMetadata(it)
}
}

private fun getOrCreateKeyMetadata(topic: String, key: String): KeyMetadata {
val topicData = topics.computeIfAbsent(topic) {
TopicMetadata(topic, Collections.synchronizedMap(HashMap()))
private fun maybeCleanupKeyMetadata(keyMetadata: KeyMetadata) {
if (keyMetadata.isEmpty) {
_keys.remove(keyMetadata.key)
}
val keyData = topicData.keys.computeIfAbsent(key) {
KeyMetadata(topicData, Collections.synchronizedSet(HashSet()), Collections.synchronizedSet(HashSet()))
if (this.isEmpty) {
connectionListener?.let {
log.debug("Removing connection listener for topic '{}' after key cleanup", topic)
connection.off(topic, it)
connectionListener = null
}
}
return keyData
}

private fun getExistingKeyMetadata(topic: String, key: String): KeyMetadata? {
return topics[topic]?.keys?.get(key)
private fun getOrCreateKeyMetadata(key: String): KeyMetadata {
return _keys.computeIfAbsent(key) {
KeyMetadata(key)
}
}

internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

internal fun toResponseKey(key: String): String = "$key.response"
private fun getExistingKeyMetadata(key: String): KeyMetadata? {
return _keys[key]
}

private fun onTopicMessage(
topic: String,
key: String,
value: String,
headers: BrokerMessageHeaders,
) {
val metadata = getExistingKeyMetadata(topic, key) ?: return
val metadata = getExistingKeyMetadata(key) ?: return
for (consumer in metadata.consumers) {
consumerScope.launch {
try {
Expand All @@ -180,6 +237,22 @@ abstract class BrokerClient(
}
}

fun destroy() {
if (!isBeingDestroyed.compareAndSet(false, true)) {
return
}
while (_keys.isNotEmpty()) {
val key = _keys.keys.first()
_keys[key]?.destroy()
_keys.remove(key)
}
connectionListener?.let {
log.debug("Removing connection listener for topic '{}' in destroy()", topic)
connection.off(topic, it)
connectionListener = null
}
}

}

@PublishedApi
Expand Down
Expand Up @@ -21,6 +21,7 @@ abstract class BrokerConnection {

abstract suspend fun start()
open fun destroy() {
log.debug("Destroying BrokerConnection")
topicListeners.clear()
}

Expand Down
20 changes: 15 additions & 5 deletions latte/src/main/java/gg/beemo/latte/broker/Subclients.kt
Expand Up @@ -10,6 +10,7 @@ import gg.beemo.latte.util.MoshiUnitAdapter
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import java.time.Instant
import java.util.concurrent.atomic.AtomicBoolean

data class BrokerClientOptions(
val useSafeJsLongs: Boolean = false,
Expand All @@ -23,7 +24,16 @@ abstract class BaseSubclient(
protected val options: BrokerClientOptions,
) {

internal abstract fun destroy()
private val log by Log
private val isBeingDestroyed = AtomicBoolean(false)

internal fun destroy() {
if (isBeingDestroyed.compareAndSet(false, true)) {
doDestroy()
}
}

protected abstract fun doDestroy()

protected fun <T> createMoshiAdapter(type: Class<T>): JsonAdapter<T?> {
val mochi = if (options.useSafeJsLongs) safeJsMoshi else baseMoshi
Expand Down Expand Up @@ -62,8 +72,8 @@ class ProducerSubclient<T>(
private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(requestType)

override fun destroy() {
client.deregisterProducer(this)
override fun doDestroy() {
client.deregisterSubclient(this)
}

suspend fun send(
Expand Down Expand Up @@ -128,8 +138,8 @@ class ConsumerSubclient<T>(
private val log by Log
private val adapter: JsonAdapter<T?> = createMoshiAdapter(incomingType)

override fun destroy() {
client.deregisterConsumer(this)
override fun doDestroy() {
client.deregisterSubclient(this)
}

internal suspend fun onIncomingMessage(
Expand Down
Expand Up @@ -91,6 +91,7 @@ class KafkaConnection(
}

override fun destroy() {
log.debug("Destroying KafkaConnection")
consumer?.close()
consumer = null
producer?.close()
Expand Down
2 changes: 1 addition & 1 deletion latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt
Expand Up @@ -158,7 +158,7 @@ class RpcClient<RequestT, ResponseT>(

}

override fun destroy() {
override fun doDestroy() {
requestProducer.destroy()
requestConsumer.destroy()
}
Expand Down
2 changes: 1 addition & 1 deletion latte/src/main/resources/log4j2.xml
@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO">
<Configuration status="INFO" shutdownHook="disable">
<Properties>
<Property name="PATTERN">%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight{%-5level}{FATAL=bg_bright_red, ERROR=bright_red, WARN=bright_yellow, INFO=bright_green, DEBUG=bright_cyan, TRACE=bright_white} [%style{%t}{bright_white}] %style{%logger{36}}{white}: %msg%n%ex</Property>
</Properties>
Expand Down
11 changes: 10 additions & 1 deletion vanilla/src/main/java/gg/beemo/vanilla/Vanilla.kt
Expand Up @@ -4,7 +4,9 @@ import gg.beemo.latte.CommonConfig
import gg.beemo.latte.broker.kafka.KafkaConnection
import gg.beemo.latte.config.Configurator
import gg.beemo.latte.logging.Log
import gg.beemo.latte.logging.log
import kotlinx.coroutines.runBlocking
import org.apache.logging.log4j.LogManager

object Vanilla {

Expand All @@ -25,7 +27,14 @@ object Vanilla {
)

log.debug("Initializing Kafka Ratelimit client")
RatelimitClient(brokerConnection)
val ratelimitClient = RatelimitClient(brokerConnection)

Runtime.getRuntime().addShutdownHook(Thread({
log.info("Destroying everything")
ratelimitClient.destroy()
brokerConnection.destroy()
LogManager.shutdown(true, true)
}, "Vanilla Shutdown Hook"))

log.debug("Starting Kafka connection")
brokerConnection.start()
Expand Down

0 comments on commit 852c121

Please sign in to comment.