Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mocking consumer never exits #633

Open
ETisREAL opened this issue Nov 26, 2023 · 0 comments
Open

Mocking consumer never exits #633

ETisREAL opened this issue Nov 26, 2023 · 0 comments

Comments

@ETisREAL
Copy link

Hi :) Hope to find you well.

I am trying to test my application with the mocking module. I tried to follow the documentation and example and adapt it to my use case, however the test runs literally for minutes and never exits (which indicates me my consumer never actually receives the message...

Here is the relevant code:

pub fn provide_encoder(schema_registry_url: String) -> AvroEncoder<'static> {
    let encoder = AvroEncoder::new(SrSettings::new(String::from(schema_registry_url)));
    encoder
}

pub async fn produce_message(producer: FutureProducer, topic_name: &str, schema_name: String, encoder: AvroEncoder<'_>, notification: Notification) -> Result<(i32, i64), Box<dyn std::error::Error>> {

    let notification_type = &notification.metadata.notification_type.clone();
    let notification_type_str: &str;
    let notification_date = &notification.metadata.date.clone();
    let notification_id = &notification.metadata.id.clone();
    let encoded_msg = encoder.encode_struct(notification, &SubjectNameStrategy::RecordNameStrategy(schema_name)).await?;

    let mut msg_key = "".to_owned();
    match notification_type {
        NotificationType::Fcm => notification_type_str = "fcm",
        NotificationType::Email => notification_type_str = "email",
        NotificationType::Sms => notification_type_str = "sms",
    }
    msg_key.push_str(notification_type_str);
    msg_key.push_str("_");
    msg_key.push_str(notification_date);
    msg_key.push_str("_");
    msg_key.push_str(notification_id);
    msg_key.push_str("_request");

    let delivery_status = producer
        .send(
            FutureRecord::to(topic_name)
                .payload(&encoded_msg)
                .key(msg_key.as_str()),
                Duration::from_secs(1),
        )
        .await
        .map_err(|err| return Box::new(err.0))?;


    Ok(delivery_status) // partition and offset
}


#[tokio::test]
async fn test_produce_message() {

    use rdkafka::Message;
    use rdkafka::mocking::*;
    use rdkafka::ClientConfig;
    use rdkafka::consumer::*;
    use crate::notification_schema::*;
    use serde_json::json;

    const TOPIC: &str = "test_topic";
    const SCHEMA_NAME: &str = "test_notifications";
    const SCHEMA_FILE_PATH: &str = "resources/notification-schema.avsc";

    // Create mock cluster
    let mock_cluster = MockCluster::new(3).unwrap();
    mock_cluster
        .create_topic(TOPIC, 3, 3)
        .expect("Failed to create topic");

    let mock_producer: FutureProducer = rdkafka::ClientConfig::new()
        .set("bootstrap.servers", mock_cluster.bootstrap_servers())
        .create()
        .expect("Producer creation error");


    // Create FAILING mock encoder
    // Setup the mock schema response
    let sr_response_body = r#"{"subject":"msg","version":1,"id":4,"schema":"{}]}"}"#;
    let mut data: serde_json::Value = serde_json::from_str(&sr_response_body).unwrap();
    data["schema"] = json!(tokio::fs::read_to_string(SCHEMA_FILE_PATH).await.unwrap());
    let output_json = serde_json::to_string_pretty(&data).expect("Failed to serialize JSON");

    let mut mock_schema_registry = mockito::Server::new();
    let _m = mock_schema_registry.mock("GET", "/subjects/test_notifications/versions/latest") // make sure the subject matches the SCHEMA_NAME!
        .with_status(200)
        .with_header("content-type", "application/vnd.schemaregistry.v1+json")
        .with_body(output_json)
        .create();


    // Mock Base Notification Request. This fixture will be modified by the various test cases
    let base_notification = Notification{
        message: NotificationMessage{
            attachments: None,
            body: Some("Hi! You have an important notification. <Tournament> is about to start! Log in!".to_string()),
            header: "Tournament starting!".to_string(),
        },
        metadata: Metadata{
            countries: None,
            regions: None,
            notification_type: NotificationType::Fcm,
            targets: NotificationTargets::Subscribers,
            tournament_id: None,
            game: Some(6),
            date: "12_08_2023".to_string(),
            created_at: "2023-08-12T16:54:00.000000Z".to_string(),
            id: "MOCK_ID".to_string(),
        },
        processor_metadata: None,
        receiver: None,
        annotations: None,
    };


    /* Case: SR erroring because of wrong schema name */
    let encoder = AvroEncoder::new(SrSettings::new(mock_schema_registry.url()));
    match produce_message(mock_producer.clone(), TOPIC, String::from("wrong-name"), encoder, base_notification.clone()).await {
        Ok(_) => todo!(),
        Err(err) => assert_eq!(
            String::from("Error: could not parse to RawRegisteredSchema, schema might not exist on this schema registry, the http call failed, cause will give more information, was cause by error decoding response body: EOF while parsing a value at line 1 column 0, it's retriable: false, it's cached: true"),
            err.to_string()
        ),
    }
    

    /* Case: erroring because of SR unexpected response */
    let mut failing_schema_registry = mockito::Server::new();
    let _m = failing_schema_registry.mock("GET", "/subjects/test_notifications/versions/latest") // make sure the subject matches the SCHEMA_NAME!
        .with_status(200)
        .with_header("content-type", "application/vnd.schemaregistry.v1+json")
        .with_body(tokio::fs::read_to_string(SCHEMA_FILE_PATH).await.unwrap())
        .create();
    let encoder = AvroEncoder::new(SrSettings::new(failing_schema_registry.url()));
    match produce_message(mock_producer.clone(), TOPIC, SCHEMA_NAME.to_string(), encoder, base_notification.clone()).await {
        Ok(_) => todo!(),
        Err(err) => assert_eq!(String::from("Error: Could not get id from response had no other cause, it's retriable: false, it's cached: true"), err.to_string()),
    }


    /* Case: successful message production */

    // 1.
    let fcm_not_id = "fcm_mock_notification";
    let fcm_not_date = "10_11_2022";
    let fcm_not_type = NotificationType::Fcm;
    let mut fcm_nr = base_notification.clone();
    fcm_nr.metadata.id = fcm_not_id.to_string();
    fcm_nr.metadata.date = fcm_not_date.to_string();
    fcm_nr.metadata.notification_type = fcm_not_type;
    let encoder = AvroEncoder::new(SrSettings::new(mock_schema_registry.url()));
    match produce_message(mock_producer.clone(), TOPIC, SCHEMA_NAME.to_string(), encoder, fcm_nr.clone()).await {
        Ok(res) => assert_eq!((1, 0), res),
        Err(_) => todo!(),
    }

    let consumer: StreamConsumer = ClientConfig::new()
        .set("bootstrap.servers", mock_cluster.bootstrap_servers())
        .set("group.id", "kafka-mock-consumer")
        .set("session.timeout.ms", "1000")
        .create()
        .expect("Consumer creation failed");
    println!("created consumer");
    consumer.subscribe(&[TOPIC]).unwrap();

    let fcm_msg;
    loop {
        fcm_msg = consumer.recv().await.unwrap().detach().to_owned();
        break;
    }

    let mut expected_fcm_msg_key = String::new();
    expected_fcm_msg_key.push_str("fcm_");
    expected_fcm_msg_key.push_str(fcm_not_date);
    expected_fcm_msg_key.push_str("_");
    expected_fcm_msg_key.push_str(fcm_not_date);
    expected_fcm_msg_key.push_str("_request");
    assert_eq!(expected_fcm_msg_key, String::from_utf8_lossy(fcm_msg.key().unwrap()));
}

as you can see, at the end I want to receive the single message I am producin in the topic, however that seems to never be received,as the test never exits. What am I doing wrong?

Thanks in advanced

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant