-
Notifications
You must be signed in to change notification settings - Fork 878
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KIP-848] Added support for testing with new 'consumer' group protocol. #1735
base: master
Are you sure you want to change the base?
Conversation
pranavrth
commented
Apr 25, 2024
- Added support for testing with new 'consumer' group protocol.
- Fixed a incorrect testcase.
tests/integration/cluster_fixture.py
Outdated
from confluent_kafka.admin import AdminClient, NewTopic | ||
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient | ||
from ..common import TestDeserializingConsumer, TestConsumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use absolute imports: tests.common
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, Pranav! Just a few changes more and we can enable the integration tests too
@@ -29,11 +29,11 @@ def commit_and_check(consumer, topic, metadata): | |||
assert offsets[0].metadata == metadata | |||
|
|||
|
|||
def test_consumer_topicpartition_metadata(kafka_cluster): | |||
topic = kafka_cluster.create_topic("test_topicpartition") | |||
def test_consumer_topicpartition_metadata(kafka_single_broker_cluster): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this new cluster to spawn, instead we can change the scope of cluster fixture to session
, to avoid they're destroyed and recreated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I didn't understand this correctly. I tried with session
and it was working as it is.
def test_consumer_topicpartition_metadata(kafka_cluster): | ||
topic = kafka_cluster.create_topic("test_topicpartition") | ||
def test_consumer_topicpartition_metadata(kafka_single_broker_cluster): | ||
topic = kafka_single_broker_cluster.create_topic("test_topicpartition") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make integration tests for admin APIs work with KRaft we need to add some sleeps after a number of operations:
time.sleep(1)
after:
- create_topic
- create_acls
- delete_acls
- alter_configs
- incremental_alter_configs
- alter_user_scram_credentials
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In test_describe_operations
we can disable this test with consumer
, as it returns it's a simple consumer group (needs to use the new RPC instead)
if not TestUtils.use_group_protocol_consumer():
verify_describe_groups(sasl_cluster, admin_client, our_topic)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In test_list_offsets
we need to disable this check at the moment
# FIXME: This test is disabled until KAFKA-16310 is released
# with AK 3.8.0
# requests = {topic_partition: OffsetSpec.max_timestamp()}
# futmap = admin_client.list_offsets(requests, **kwargs)
# for _, fut in futmap.items():
# result = fut.result()
# assert isinstance(result, ListOffsetsResultInfo)
# assert (result.offset == 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In test_user_scram_credentials
with KRaft it's giving an error if more than one UserScramCredentialUpsertion
is given for the same user, even if it's for a different SCRAM mechanism, so you can add an if an do the upsertion in two steps when using kraft
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In test_list_offsets we need to disable this check at the moment
This test is passing with Kraft. Disabled with Zookeeper.
In test_user_scram_credentials with KRaft it's giving an error if more than one UserScramCredentialUpsertion is given for the same user, even if it's for a different SCRAM mechanism, so you can add an if an do the upsertion in two steps when using kraft
Why is these behaving differently in KRaft and Zookeeper? This is a bug in Kafka I think.
tests/integration/conftest.py
Outdated
'transaction.state.log.min.isr=1']} | ||
'cp_version': '7.6.0', | ||
'kraft': TestUtils.use_kraft(), | ||
'version': 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
version needs to be trunk only with new protocol, otherwise 3.7.0
def _broker_version():
return (TestUtils.use_group_protocol_consumer() and
'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07' or
'3.7.0')
tools/source-package-verification.sh
Outdated
if [[ $TEST_CONSUMER_GROUP_PROTOCOL == consumer ]]; then | ||
python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration/admin | ||
else | ||
python -m pytest --timeout 1200 --ignore=dest | ||
fi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's run the integration tests after previous changes:
if [[ $TEST_CONSUMER_GROUP_PROTOCOL == consumer ]]; then | |
python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration/admin | |
else | |
python -m pytest --timeout 1200 --ignore=dest | |
fi | |
if [[ -z $TEST_CONSUMER_GROUP_PROTOCOL ]]; then | |
flake8 --exclude ./_venv,*_pb2.py | |
make docs | |
fi | |
python -m pytest --timeout 1200 --ignore=dest |