diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..d7eb43921 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -19,8 +19,7 @@ from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError -from confluent_kafka import ConsumerGroupState, TopicCollection - +from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType from tests.common import TestUtils topic_prefix = "test-topic" @@ -30,12 +29,16 @@ def verify_commit_result(err, _): assert err is not None -def consume_messages(sasl_cluster, group_id, topic, num_messages=None): +def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None): conf = {'group.id': group_id, - 'session.timeout.ms': 6000, + 'group.protocol': group_protocol, 'enable.auto.commit': False, 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} + + if group_protocol == 'classic': + conf['session.timeout.ms'] = 6000 + consumer = sasl_cluster.consumer(conf) consumer.subscribe([topic]) read_messages = 0 @@ -164,7 +167,9 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - consume_messages(cluster, group, topic, 2) + group_type = ConsumerGroupType.CONSUMER if TestUtils.use_group_protocol_consumer() else ConsumerGroupType.CLASSIC + group_type_str = 'classic' if group_type == ConsumerGroupType.CLASSIC else 'consumer' + consume_messages(cluster, group, group_type_str, topic, 2) # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -177,6 +182,7 @@ def verify_describe_groups(cluster, admin_client, topic): assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == group_type # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) @@ -217,11 +223,7 @@ def test_describe_operations(sasl_cluster): verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - # Skip this test if using group protocol `consumer` - # as there is new RPC for describe_groups() in - # group protocol `consumer` case. - if not TestUtils.use_group_protocol_consumer(): - verify_describe_groups(sasl_cluster, admin_client, our_topic) + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)