Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite BrokerClient and RPC system #10

Merged
merged 26 commits into from Feb 2, 2024
Merged

Conversation

wasdennnoch
Copy link
Member

Over time, the previous implementation of the BrokerClient system showed some flaws:

  • Each BrokerClient could only communicate over the one specific topic it was configured with.
  • All messages sent through that BrokerClient had to be of one specific type, no matter what kind of request you're trying to send.
    • In order to represent different kinds of messages, your common message type had to have lots of nullable fields, and the receiver always had to manually verify the specific fields it expected for this kind of request were present.
  • There was no builtin way to indicate response error statuses, meaning you again had to create custom fields in your common message object.
  • Most importantly, this applied to both RPC requests and responses.
    • This meant that, in order to model a request-response-system, you often had to create a kind of wrapper class containing a request and a response field, and populating one of them depending on the message direction.

There were some more minor issues, but those three are the big ones which always caused headaches trying to implement new clients. Given that we plan to switch from Kafka to RabbitMQ soon, which is a breaking change anyways, I took this opportunity to (almost) completely rewrite our generic BrokerClient system to be a lot more flexible and to solve the aforementioned issues.


As before, new clients have to be subclesses of BrokerClient. This time, however, they do not need to immediately specify a topic and message type. Instead, the new BrokerClient is just a shell object, which allows you to create a form of "subclient" using the convenient consumer, producer and, most commonly, rpc method. An example implementation looks as follows:

class CalculatorClient(conn: BrokerConnection) : BrokerClient(conn) {

    private val calculateRpc = rpc<Request, Int>(topic = "calculations", key = "calculate") {
        val request = it.value
        if (request.operation == "add") {
            return@rpc Statuses.SUCCESS to request.a + request.b
        } else {
            throw RpcException(Statuses.UNKNOWN_OPERATION)
        }
    }

    suspend fun calculateSum(a: Int, b: Int): Int {
        try {
            val response = calculateRpc.call(Request("add", a, b))
            return response.value
        } catch (ex: RpcException) {
            println("Unexpected RPC status: ${ex.status}")
            return -1
        }
    }

}

@JsonClass(generateAdapter = true)
data class Request(
    val operation: String,
    val a: Int,
    val b: Int,
)

object Statuses {
    val SUCCESS = RpcStatus.OK
    val UNKNOWN_OPERATION = RpcStatus(1)
}

This BrokerClient creates an RPC client using the rpc() method, which receives the request message type, response message type, and the target topic and key (the same topic can be used by multiple clients using different keys for different message objects). A single BrokerClient can house multiple RPC clients, eahc of which can use different message types, topics, and keys. An RPC client also contains the suspending callback which is invoked upon receiving a matching message, and, in this example, returns the calculated result using a default success status, or, when receiving an unsupported operation, throws an RpcException with a predefined error code.

Below that, calculateSum invokes this RPC client with the passed request object. If the receive throws an RpcException, that exception will be re-thrown here. The call method optionally allows specifying the target service that should receive the requests, as well as a specific instance within that service cluster. What service or instance a specific client belongs to is determinded by the BrokerConnection. There is also a streammethod on the RPC client which allows reading multiple response messages in response to a single request using a Kotlin Flow.


Note

At the present, this rewrite only applies to the JVM implementation. The TypeScript port in Water will have to be updated as well before we can connect TS services to JVM services that are already using the new structure. Given we aren't yet using any TS services in production, this shouldn't be a big problem for now.

Copy link
Member

@ShindouMihou ShindouMihou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick scan on this, and it looks good to me 🔥

latte/src/main/java/gg/beemo/latte/broker/Subclients.kt Outdated Show resolved Hide resolved
@wasdennnoch wasdennnoch merged commit b708237 into main Feb 2, 2024
1 check passed
@wasdennnoch wasdennnoch deleted the adrian/broker-client-rework branch February 2, 2024 18:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants