From bf8bd1948bc44e901d3422a15416fc2e9df57469 Mon Sep 17 00:00:00 2001 From: Guilherme Quentel Melo Date: Fri, 4 Jul 2025 11:28:01 +0200 Subject: [PATCH] Fix thread not waking up when there is still data to be sent When producing messages quickly without waiting for the future of previous requests, there could be some situations when the last batch was not sent. That seemed to be more frequent with larger messages (~100KiB), but apparently it could happen to any message when `linger_ms` is 0. Not sure if it could happen when it is non-zero though. The reason is that `BrokerConnection.send_pending_requests_v2` would fill the internal buffer with the bytes from a request and try to send it. https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1071 If it couldn't send it completely for some reason, it would try to send again in the next call to `send_pending_requests_v2`. But if between those 2 calls, `BrokerConnection.send` was called, new data would be appended to self._protocol: KafkaProtocol: https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1015 but the second call to `send_pending_requests_v2` wouldn't check if any new data was available and would return False: https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/conn.py#L1070 This would tell `KafkaClient._poll` that all pending data was sent, which would make the client not listen to socked write readiness anymore: https://github.com/dpkp/kafka-python/blob/512d0a0b8d71cf7f34f1b23f8a42d52c28af3266/kafka/client_async.py#L744-L748 --- kafka/conn.py | 7 +++++++ test/test_conn.py | 52 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/kafka/conn.py b/kafka/conn.py index 64445fab0..c1d42a0b6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1075,6 +1075,13 @@ def send_pending_requests_v2(self): total_bytes = self._send_bytes(self._send_buffer) self._send_buffer = self._send_buffer[total_bytes:] + # If all data was sent, we need to get the new data from the protocol now, otherwise + # this function would return True, indicating that there are no more pending + # requests. This could cause the calling thread to wait indefinitely as it won't + # know that there is still buffered data to send. + if not self._send_buffer: + self._send_buffer = self._protocol.send_bytes() + if self._sensors: self._sensors.bytes_sent.record(total_bytes) # Return True iff send buffer is empty diff --git a/test/test_conn.py b/test/test_conn.py index 8d56668c5..a5761442c 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -12,6 +12,9 @@ from kafka.conn import BrokerConnection, ConnectionStates from kafka.future import Future +from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError +from kafka.metrics.metrics import Metrics +from kafka.metrics.stats.sensor import Sensor from kafka.protocol.api import RequestHeader from kafka.protocol.group import HeartbeatResponse from kafka.protocol.metadata import MetadataRequest @@ -43,6 +46,15 @@ def _socket(mocker): mocker.patch('socket.socket', return_value=socket) return socket +def metrics(mocker): + metrics = mocker.MagicMock(Metrics) + metrics.mocked_sensors = {} + def sensor(name, **kwargs): + if name not in metrics.mocked_sensors: + metrics.mocked_sensors[name] = mocker.MagicMock(Sensor) + return metrics.mocked_sensors[name] + metrics.sensor.side_effect = sensor + return metrics @pytest.fixture def conn(_socket, dns_lookup, mocker): @@ -228,6 +240,46 @@ def test_send_response(_socket, conn): assert len(conn.in_flight_requests) == 1 +def test_send_async_request_while_other_request_is_already_in_buffer(_socket, conn, metrics): + conn.connect() + assert conn.state is ConnectionStates.CONNECTED + assert 'node-0.bytes-sent' in metrics.mocked_sensors + bytes_sent_sensor = metrics.mocked_sensors['node-0.bytes-sent'] + + req1 = MetadataRequest[0](topics='foo') + header1 = RequestHeader(req1, client_id=conn.config['client_id']) + payload_bytes1 = len(header1.encode()) + len(req1.encode()) + req2 = MetadataRequest[0]([]) + header2 = RequestHeader(req2, client_id=conn.config['client_id']) + payload_bytes2 = len(header2.encode()) + len(req2.encode()) + + # The first call to the socket will raise a transient SSL exception. This will make the first + # request to be kept in the internal buffer to be sent in the next call of + # send_pending_requests_v2. + _socket.send.side_effect = [SSLWantWriteError, 4 + payload_bytes1, 4 + payload_bytes2] + + conn.send(req1, blocking=False) + # This won't send any bytes because of the SSL exception and the request bytes will be kept in + # the buffer. + assert conn.send_pending_requests_v2() is False + assert bytes_sent_sensor.record.call_args_list[0].args == (0,) + + conn.send(req2, blocking=False) + # This will send the remaining bytes in the buffer from the first request, but should notice + # that the second request was queued, therefore it should return False. + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is False + bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes1) + + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is True + bytes_sent_sensor.record.assert_called_once_with(4 + payload_bytes2) + + bytes_sent_sensor.record.reset_mock() + assert conn.send_pending_requests_v2() is True + bytes_sent_sensor.record.assert_called_once_with(0) + + def test_send_error(_socket, conn): conn.connect() assert conn.state is ConnectionStates.CONNECTED