Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-848] Added support for testing with new 'consumer' group protocol. #1735

Open
wants to merge 28 commits into
base: master
Choose a base branch
from

Conversation

pranavrth
Copy link
Member

  • Added support for testing with new 'consumer' group protocol.
  • Fixed a incorrect testcase.

from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
from ..common import TestDeserializingConsumer, TestConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use absolute imports: tests.common

Copy link
Contributor

@emasab emasab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, Pranav! Just a few changes more and we can enable the integration tests too

@@ -29,11 +29,11 @@ def commit_and_check(consumer, topic, metadata):
assert offsets[0].metadata == metadata


def test_consumer_topicpartition_metadata(kafka_cluster):
topic = kafka_cluster.create_topic("test_topicpartition")
def test_consumer_topicpartition_metadata(kafka_single_broker_cluster):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this new cluster to spawn, instead we can change the scope of cluster fixture to session, to avoid they're destroyed and recreated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I didn't understand this correctly. I tried with session and it was working as it is.

def test_consumer_topicpartition_metadata(kafka_cluster):
topic = kafka_cluster.create_topic("test_topicpartition")
def test_consumer_topicpartition_metadata(kafka_single_broker_cluster):
topic = kafka_single_broker_cluster.create_topic("test_topicpartition")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make integration tests for admin APIs work with KRaft we need to add some sleeps after a number of operations:
time.sleep(1)
after:

  • create_topic
  • create_acls
  • delete_acls
  • alter_configs
  • incremental_alter_configs
  • alter_user_scram_credentials

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_describe_operations we can disable this test with consumer, as it returns it's a simple consumer group (needs to use the new RPC instead)

    if not TestUtils.use_group_protocol_consumer():
        verify_describe_groups(sasl_cluster, admin_client, our_topic)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_list_offsets we need to disable this check at the moment

        # FIXME: This test is disabled until KAFKA-16310 is released
        #        with AK 3.8.0
        # requests = {topic_partition: OffsetSpec.max_timestamp()}
        # futmap = admin_client.list_offsets(requests, **kwargs)
        # for _, fut in futmap.items():
        #     result = fut.result()
        #     assert isinstance(result, ListOffsetsResultInfo)
        #     assert (result.offset == 1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_user_scram_credentials with KRaft it's giving an error if more than one UserScramCredentialUpsertion is given for the same user, even if it's for a different SCRAM mechanism, so you can add an if an do the upsertion in two steps when using kraft

Copy link
Member Author

@pranavrth pranavrth May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In test_list_offsets we need to disable this check at the moment

This test is passing with Kraft. Disabled with Zookeeper.

In test_user_scram_credentials with KRaft it's giving an error if more than one UserScramCredentialUpsertion is given for the same user, even if it's for a different SCRAM mechanism, so you can add an if an do the upsertion in two steps when using kraft

Why is these behaving differently in KRaft and Zookeeper? This is a bug in Kafka I think.

'transaction.state.log.min.isr=1']}
'cp_version': '7.6.0',
'kraft': TestUtils.use_kraft(),
'version': 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version needs to be trunk only with new protocol, otherwise 3.7.0

def _broker_version():
    return (TestUtils.use_group_protocol_consumer() and
            'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07' or
            '3.7.0')

Comment on lines 23 to 24
if [[ $TEST_CONSUMER_GROUP_PROTOCOL == consumer ]]; then
python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration/admin
else
python -m pytest --timeout 1200 --ignore=dest
fi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's run the integration tests after previous changes:

Suggested change
if [[ $TEST_CONSUMER_GROUP_PROTOCOL == consumer ]]; then
python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration/admin
else
python -m pytest --timeout 1200 --ignore=dest
fi
if [[ -z $TEST_CONSUMER_GROUP_PROTOCOL ]]; then
flake8 --exclude ./_venv,*_pb2.py
make docs
fi
python -m pytest --timeout 1200 --ignore=dest

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants