Skip to content

Fix thread not waking up when there is still data to be sent #2670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

gqmelo
Copy link

@gqmelo gqmelo commented Aug 19, 2025

When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent.

That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when linger_ms is 0. Not sure if it could happen when it is non-zero though.

The reason is that BrokerConnection.send_pending_requests_v2 would fill the internal buffer with the bytes from a request and try to send it.

self._send_buffer = self._protocol.send_bytes()

If it couldn't send it completely for some reason, it would try to send again in the next call to send_pending_requests_v2.

But if between those 2 calls, BrokerConnection.send was called, new data would be appended to self._protocol: KafkaProtocol:

correlation_id = self._protocol.send_request(request)

but the second call to send_pending_requests_v2 wouldn't check if any new data was available and would return False:

if not self._send_buffer:

This would tell KafkaClient._poll that all pending data was sent, which would make the client not listen to socked write readiness anymore:

if conn.send_pending_requests_v2():
# If send is complete, we dont need to track write readiness
# for this socket anymore
if key.events ^ selectors.EVENT_WRITE:
self._selector.modify(

When producing messages quickly without waiting for the future of
previous requests, there could  be some situations when the last batch
was not sent.

That seemed to be more frequent with larger messages (~100KiB), but
apparently it could happen to any message when `linger_ms` is 0. Not
sure if it could happen when it is non-zero though.

The reason is that `BrokerConnection.send_pending_requests_v2` would
fill the internal buffer with the bytes from a request and try to send it.

https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1071

If it couldn't send it completely for some reason, it would try to send
again in the next call to `send_pending_requests_v2`.

But if between those 2 calls, `BrokerConnection.send` was called, new
data would be appended to self._protocol: KafkaProtocol:

https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1015

but the second call to `send_pending_requests_v2` wouldn't check if
any new data was available and would return False:

https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1070

This would tell `KafkaClient._poll` that all pending data was sent,
which would make the client not listen to socked write readiness anymore:

https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/client_async.py#L744-L748
# This will send the remaining bytes in the buffer from the first request, but should notice
# that the second request was queued, therefore it should return False.
bytes_sent_sensor.record.reset_mock()
assert conn.send_pending_requests_v2() is False
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the test would fail before the fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant