Skip to content

Commit

Permalink
Add JS Long serialization support and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Dec 4, 2023
1 parent fa61e76 commit 45e3ced
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 54 deletions.
8 changes: 6 additions & 2 deletions latte/build.gradle.kts
Expand Up @@ -10,7 +10,9 @@ version = "1.0.0"
dependencies {

// Kotlin
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
val kotlinCoroutinesVersion = "1.7.3"
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")

// Kafka
val kafkaVersion = "3.2.3"
Expand All @@ -24,7 +26,9 @@ dependencies {

// Misc
implementation("org.jetbrains:annotations:24.1.0")
compileOnly("org.apache.logging.log4j:log4j-api:2.22.0")
val log4jVersion = "2.22.0"
compileOnly("org.apache.logging.log4j:log4j-api:$log4jVersion")
testImplementation("org.apache.logging.log4j:log4j-core:$log4jVersion")

// JUnit testing framework
val junitVersion = "5.10.1"
Expand Down
35 changes: 25 additions & 10 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
@@ -1,6 +1,6 @@
package gg.beemo.latte.broker

import gg.beemo.latte.logging.log
import gg.beemo.latte.logging.Log
import kotlinx.coroutines.*
import java.util.Collections

Expand All @@ -20,58 +20,69 @@ private class KeyMetadata(
abstract class BrokerClient(
@PublishedApi
internal val connection: BrokerConnection,
private val consumerScope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()),
) {

private val consumerScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private val log by Log
private val topics: MutableMap<String, TopicMetadata> = Collections.synchronizedMap(HashMap())

inline fun <reified T> consumer(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
noinline callback: suspend CoroutineScope.(BrokerMessage<T>) -> Unit,
): ConsumerSubclient<T> {
return consumer(topic, key, T::class.java, null is T, callback)
return consumer(topic, key, options, T::class.java, null is T, callback)
}

fun <T> consumer(
@PublishedApi
internal fun <T> consumer(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
type: Class<T>,
isNullable: Boolean,
callback: suspend CoroutineScope.(BrokerMessage<T>) -> Unit,
): ConsumerSubclient<T> {
return ConsumerSubclient(connection, this, topic, key, type, isNullable, callback).also {
log.debug("Creating consumer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ConsumerSubclient(connection, this, topic, key, options, type, isNullable, callback).also {
registerConsumer(it)
}
}

inline fun <reified T> producer(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
): ProducerSubclient<T> {
return producer(topic, key, T::class.java, null is T)
return producer(topic, key, options, T::class.java, null is T)
}

fun <T> producer(
@PublishedApi
internal fun <T> producer(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
type: Class<T>,
isNullable: Boolean,
): ProducerSubclient<T> {
return ProducerSubclient(connection, this, topic, key, type, isNullable).also {
log.debug("Creating producer for key '{}' in topic '{}' with type {}", key, topic, type.name)
return ProducerSubclient(connection, this, topic, key, options, type, isNullable).also {
registerProducer(it)
}
}

inline fun <reified RequestT, reified ResponseT> rpc(
topic: String,
key: String,
options: BrokerClientOptions = BrokerClientOptions(),
noinline callback: suspend CoroutineScope.(BrokerMessage<RequestT>) -> ResponseT,
): RpcClient<RequestT, ResponseT> {
return RpcClient(
this,
topic,
key,
options,
RequestT::class.java,
null is RequestT,
ResponseT::class.java,
Expand All @@ -80,7 +91,7 @@ abstract class BrokerClient(
)
}

fun destroy() {
fun destroy(cancelScope: Boolean = true) {
val producers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.producers } }
val consumers = topics.values.flatMap { metadata -> metadata.keys.values.flatMap { it.consumers } }
producers.forEach {
Expand All @@ -90,7 +101,9 @@ abstract class BrokerClient(
it.destroy()
}
topics.clear()
consumerScope.cancel()
if (cancelScope) {
consumerScope.cancel()
}
}

private fun registerProducer(producer: ProducerSubclient<*>) {
Expand All @@ -112,11 +125,13 @@ abstract class BrokerClient(
}

internal fun deregisterProducer(producer: ProducerSubclient<*>) {
log.debug("Removing producer for key '{}' in topic '{}'", producer.key, producer.topic)
val metadata = getExistingKeyMetadata(producer.topic, producer.key)
metadata?.producers?.remove(producer)
}

internal fun deregisterConsumer(consumer: ConsumerSubclient<*>) {
log.debug("Removing consumer for key '{}' in topic '{}'", consumer.key, consumer.topic)
val metadata = getExistingKeyMetadata(consumer.topic, consumer.key)
if (metadata?.consumers?.remove(consumer) == true && metadata.consumers.isEmpty()) {
metadata.topic.connectionListener?.let {
Expand Down
31 changes: 29 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerConnection.kt
@@ -1,6 +1,6 @@
package gg.beemo.latte.broker

import gg.beemo.latte.logging.log
import gg.beemo.latte.logging.Log
import java.util.*

fun interface TopicListener {
Expand All @@ -17,18 +17,36 @@ abstract class BrokerConnection {

protected val topicListeners: MutableMap<String, MutableSet<TopicListener>> = Collections.synchronizedMap(HashMap())

private val log by Log

abstract suspend fun start()
open fun destroy() {
topicListeners.clear()
}

internal abstract suspend fun send(
internal abstract suspend fun abstractSend(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
): MessageId

internal suspend fun send(
topic: String,
key: String,
value: String,
headers: BaseBrokerMessageHeaders,
): MessageId {
log.trace(
"Sending message {} with key '{}' in topic '{}' with value: {}",
headers.messageId,
key,
topic,
value,
)
return abstractSend(topic, key, value, headers)
}

internal abstract fun createHeaders(
targetServices: Set<String> = emptySet(),
targetInstances: Set<String> = emptySet(),
Expand All @@ -40,6 +58,7 @@ abstract class BrokerConnection {

internal fun on(topic: String, cb: TopicListener) {
topicListeners.computeIfAbsent(topic) {
log.debug("Creating new topic '{}'", topic)
createTopic(topic)
Collections.synchronizedSet(HashSet())
}.add(cb)
Expand All @@ -49,6 +68,7 @@ abstract class BrokerConnection {
topicListeners.computeIfPresent(topic) { _, listeners ->
listeners.remove(cb)
if (listeners.size == 0) {
log.debug("Removing topic '{}'", topic)
removeTopic(topic)
null
} else {
Expand Down Expand Up @@ -110,6 +130,13 @@ abstract class BrokerConnection {
}

private fun invokeLocalCallbacks(topic: String, key: String, value: String, headers: BaseBrokerMessageHeaders) {
log.trace(
"Dispatching message {} with key '{}' in topic '{}' to local listeners with value: {}",
headers.messageId,
key,
topic,
value,
)
val listeners = topicListeners[topic] ?: return
for (listener in listeners) {
try {
Expand Down

0 comments on commit 45e3ced

Please sign in to comment.