Skip to content

Ambigous error when consumer is fenced by group.instance.id #869

Open
@Traktormaster

Description

@Traktormaster

Description

When using the group.instance.id for two consumers and they conflict, the fenced client receives an ambiguous error.

The .code() of the error is KafkaError._FATAL which is generic, but the .str() of the error suggests that is specifically a KafkaError.FENCED_INSTANCE_ID.
Strangely the .str() of the error is duplicated/redundant for some reason.
Here is the ambiguous error exactly: KafkaError{code=_FATAL,val=-150,str="Fatal error: Broker: Static consumer fenced by other consumer with same group.instance.id: Fatal consumer error: Broker: Static consumer fenced by other consumer with same group.instance.id"}.

Another possible issue with the error is that the .code() suggests this is an unrecoverable error, but the .fatal() returns false.

How to reproduce

Replace the kafka_servers with your development servers and set topic to any valid and empty topic.
The last assertion fails because the error is ambiguous.

topic = "testingtopic"
kafka_servers = ["127.0.0.1:9092"]

def _new_consumer():
    return Consumer(
        {
            "bootstrap.servers": " ".join(kafka_servers),
            "group.id": "my_fleet",
            "group.instance.id": "ship_0",
            "enable.auto.commit": False,
            "enable.auto.offset.store": False,
            "default.topic.config": {"auto.offset.reset": "earliest",},
        }
    )

# start first consumer instance
first_c = _new_consumer()
first_c.subscribe([topic])
assert first_c.poll(timeout=10) is None
assert len(first_c.assignment()) > 0

# start replacement consumer instance (first will be fenced off in favor of this)
second_c = _new_consumer()
second_c.subscribe([topic])
assert second_c.poll(timeout=10) is None
assert len(second_c.assignment()) > 0

# check if first consumer is fenced off as it should be
msg = first_c.poll(timeout=5)
assert msg is not None
assert msg.error() is not None
assert msg.error().code() == KafkaError.FENCED_INSTANCE_ID, "%s %s" % (msg.error().fatal(), msg.error(),)

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version: ('1.4.1', 17039616), ('1.4.0', 17039615)
  • Apache Kafka broker version: 2.5.0 (bitnami docker image)
  • Client configuration: included in the reproduction snippet
  • Operating system: Arch Linux (Linux lgg 5.6.5-arch3-1 #1 SMP PREEMPT Sun, 19 Apr 2020 13:14:25 +0000 x86_64 GNU/Linux)
  • Provide client logs
  • Provide broker log excerpts
  • Critical issue

I'm not sure what to provide with client/broker logs as everything works, except the error seems to be badly constructed. If more information is needed I'll provide it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugReporting an unexpected or problematic behavior of the codebasecode:CIssues that are specific to C behavior or domain within the library independent of library logiccomponent:consumerIssues tied specifically to consumer logic or code pathsinvestigate furtherIt's unclear what the issue is at this time but there is enough interest to look into itstatus:blockedUsed when an issue has a blocking dependency or awaiting some other external change

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions