Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mocking doesn't work #629

Open
connormullett opened this issue Nov 13, 2023 · 6 comments
Open

Mocking doesn't work #629

connormullett opened this issue Nov 13, 2023 · 6 comments

Comments

@connormullett
Copy link

We use this library in production and it's been great so far. We noticed that there was a new mocking module and upgraded to help extend test coverage to ensure stability.

I'm getting the error KafkaError (Client creation error: librdkafka failed to create consumer queue) which I found sits really close to the C code in the base library. Our mocks look like the following and have used the example in examples/mocking.rs for reference.

fn create_mock_cluster<'c>() -> MockCluster<'c, DefaultProducerContext> {
        let cluster = MockCluster::new(3).expect("failed to create mock cluster");
        cluster
            .create_topic("<our topic>", 32, 3)
            .expect("failed to set topic");
        cluster
    }

    fn create_mock_producer(bootstrap_servers: String) -> FutureProducer {
        ClientConfig::new()
            .set("bootstrap.servers", bootstrap_servers)
            .create()
            .expect("failed to create mock producer")
    }

    fn create_mock_consumer(bootstrap_servers: String) -> StreamingConsumer {
        StreamingConsumer::mock(bootstrap_servers)
    }

We use the new type pattern on StreamConsumer for additional mocking and expectations such as committing. That's where StreamingConsumer comes from.

Because of the above error, I cloned the repo and attempted to run the example mocking.rs locally and failed with the following.

     Running `target/debug/examples/mocking`
%5|1699901725.314|CONFWARN|rdkafka#producer-1| [thrd:app]: No `bootstrap.servers` configured: client will not be able to connect to Kafka cluster
Warming up for 10s...
Recording for 10s...
measurements: 1938
mean latency: 255.35655314757489ms
p50 latency:  255ms
p90 latency:  456ms
p99 latency:  501ms
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:42977/bootstrap]: 127.0.0.1:42977/1: Connect to ipv4#127.0.0.1:42977 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:33881/bootstrap]: 127.0.0.1:33881/2: Connect to ipv4#127.0.0.1:33881 failed: Connection refused (after 0ms in state CONNECT)
%3|1699901745.548|FAIL|rdkafka#producer-2| [thrd:127.0.0.1:37597/bootstrap]: 127.0.0.1:37597/3: Connect to ipv4#127.0.0.1:37597 failed: Connection refused (after 0ms in state CONNECT)

Because the example failed, I'm optimistic that this is something on my end but I can't find any documentation surrounding this.

Thanks for looking into this

@connormullett connormullett changed the title Mocking not working Mocking doesn't work Nov 13, 2023
@scanterog
Copy link
Collaborator

@connormullett The example actually works and finishes as expected though it's indeed confusing we're getting bootstrap connection refused failures being logged (that might be a bug on librdkafka). But if you check the code, the consumer actually receives messages and the latency is actually measured.

Client creation error: librdkafka failed to create consumer queue

This sounds like group.id was not provided to the consumer. What is StreamingConsumer::mock doing?

@connormullett
Copy link
Author

connormullett commented Nov 17, 2023

This sounds like group.id was not provided to the consumer. What is StreamingConsumer::mock doing?

This was the issue and it works!

Does is make sense here to have a message to check configuration in case of these types of errors?

@scanterog
Copy link
Collaborator

Does is make sense here to have a message to check configuration in case of these types of errors?

mm I need to check but I think the latest does it already. Which version are you using?

@connormullett
Copy link
Author

Which version are you using?

0.35.0

@bretthoerner
Copy link

bretthoerner commented Dec 19, 2023

Something else seems funny about the Mock consumer, given this diff:

git --no-pager diff
diff --git a/examples/mocking.rs b/examples/mocking.rs
index c4a1d1c4..8b967e25 100644
--- a/examples/mocking.rs
+++ b/examples/mocking.rs
@@ -38,7 +38,7 @@ async fn main() {
                 .send_result(
                     FutureRecord::to(TOPIC)
                         .key(&i.to_string())
-                        .payload("dummy")
+                        .payload(&i.to_string())
                         .timestamp(now()),
                 )
                 .unwrap()
@@ -54,6 +54,7 @@ async fn main() {
     println!("Warming up for 10s...");
     loop {
         let message = consumer.recv().await.unwrap();
+        println!("{:?}", message.payload_view::<str>().unwrap());
         let then = message.timestamp().to_millis().unwrap();
         if start.elapsed() < Duration::from_secs(10) {
             // Warming up.

The first message I see via the consumer is around ~i=500. This makes it useful for benchmarking, but rough for testing messages produced by some bit of code.

If I only send a small handful of events my consumer never finishes the first recv().await call. (i.e., change the example to produce 1 message.) I messed with consumer configuration for a while (I was using the default, exactly like the example), thinking this had to do with fetch.min.bytes or fetch.wait.max.ms or something, but nothing seemed to work.

Plus, I feel like the fact that this example (with my diff) doesn't start by printing 0, but instead only picks up at some random point around ~500, is proof that messages produced to the mock cluster are being lost? It's like some buffer has to be filled, but then a big chunk of the first messages in the topic are lost?

@bretthoerner
Copy link

Ahhh, I finally checked out the mocking code, and saw the unit test inside, which publishes and consumes 1 item and works. The only difference was .set("auto.offset.reset", "earliest")

I guess there's something about Mocking topic creation where latest (the default) doesn't work as I'd expect. I just created the topic and subscribed with the consumer before I published anything, and I didn't see anything.

Changing to earliest worked, which is good enough for my needs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants