Skip to content

Commit

Permalink
Add more error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Jan 28, 2024
1 parent 85797dd commit c5cf147
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
22 changes: 20 additions & 2 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerSubclients.kt
Expand Up @@ -24,7 +24,6 @@ data class BrokerClientOptions(
val useSafeJsLongs: Boolean = false,
)

// TODO Add error handling, some try-finally to close the producer/consumer even with errors
sealed class BaseSubclient(
protected val connection: BrokerConnection,
protected val client: BrokerClient,
Expand Down Expand Up @@ -162,7 +161,16 @@ class ConsumerSubclient<T>(
value,
)
@Suppress("UNCHECKED_CAST") // Safe due to above null validation
callback(message as BaseBrokerMessage<T>)
val brokerMessage = message as BaseBrokerMessage<T>
try {
callback(brokerMessage)
} catch (ex: Exception) {
log.error(
"Uncaught consumer callback error while processing message ${headers.messageId} " +
"with key '$key' in topic '$topic'",
ex,
)
}
}

private fun parseIncoming(json: String): T? {
Expand Down Expand Up @@ -191,6 +199,8 @@ class RpcClient<RequestT, ResponseT>(
options,
) {

private val log by Log

private val requestProducer = client.producer(topic, key, options, requestType, requestIsNullable)
private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg ->
val responseProducer = client.producer(
Expand Down Expand Up @@ -231,6 +241,13 @@ class RpcClient<RequestT, ResponseT>(
} catch (ex: RpcException) {
sendResponse(null, ex.status, true, isUpdate = false)
return@consumer
} catch (ex: Exception) {
log.error(
"Uncaught RPC callback error while processing message ${msg.headers.messageId} " +
"with key '$key' in topic '$topic'",
ex,
)
return@consumer
} finally {
responseProducer.destroy()
}
Expand Down Expand Up @@ -274,6 +291,7 @@ class RpcClient<RequestT, ResponseT>(
if (msg.headers.inReplyTo != messageId.get()) {
return@consumer
}
// Close the flow if we receive an exception
if (msg.headers.isException) {
close(RpcException(msg.headers.status))
return@consumer
Expand Down
Expand Up @@ -7,7 +7,7 @@ import org.junit.jupiter.api.assertThrows

class BrokerClientTest {

private val connection = LocalConnection()
private val connection = LocalConnection("service", "instance")

@Test
fun `test greeting RPC`() = withTestClient { client ->
Expand Down

0 comments on commit c5cf147

Please sign in to comment.