Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2228,7 +2228,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
rd_kafka_buf_read_str(rkbuf, &Protocol);
rd_kafka_buf_read_str(rkbuf, &LeaderId);
rd_kafka_buf_read_str(rkbuf, &MyMemberId);
rd_kafka_buf_read_i32(rkbuf, &member_cnt);
rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, 100000);

if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
/* Protocol not set, we will not be able to find
Expand Down Expand Up @@ -2324,6 +2324,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
if (request->rkbuf_reqhdr.ApiVersion >= 5)
rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
rd_kafka_buf_read_kbytes(rkbuf, &MemberMetadata);
if (request->rkbuf_reqhdr.ApiVersion >= 6) {
rd_kafka_buf_skip_tags(rkbuf);
}

rkgm = &members[sub_cnt];
rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
Expand Down
18 changes: 15 additions & 3 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,9 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_i8(rkbuf, &KeyType);
}

/* Request: Struct tags */
rd_kafka_buf_skip_tags(rkbuf);


/*
* Construct response
Expand Down Expand Up @@ -1544,6 +1547,8 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_write_i32(resp, (int32_t)mrkb->port);
}

rd_kafka_buf_write_tags_empty(resp);

rd_kafka_mock_connection_send_response(mconn, resp);
return 0;

Expand Down Expand Up @@ -1581,7 +1586,7 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn,
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5)
rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
rd_kafka_buf_read_str(rkbuf, &ProtocolType);
rd_kafka_buf_read_i32(rkbuf, &ProtocolCnt);
rd_kafka_buf_read_arraycnt(rkbuf, &ProtocolCnt, 10000);

if (ProtocolCnt > 1000) {
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
Expand All @@ -1598,6 +1603,8 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn,
rd_kafkap_bytes_t Metadata;
rd_kafka_buf_read_str(rkbuf, &ProtocolName);
rd_kafka_buf_read_kbytes(rkbuf, &Metadata);
/* Request: Struct tags */
rd_kafka_buf_skip_tags(rkbuf);
protos[i].name = rd_kafkap_str_copy(&ProtocolName);
protos[i].metadata = rd_kafkap_bytes_copy(&Metadata);
}
Expand Down Expand Up @@ -1649,7 +1656,8 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_write_str(resp, NULL, -1); /* ProtocolName */
rd_kafka_buf_write_str(resp, NULL, -1); /* LeaderId */
rd_kafka_buf_write_kstr(resp, NULL); /* MemberId */
rd_kafka_buf_write_i32(resp, 0); /* MemberCnt */
rd_kafka_buf_write_arraycnt(resp, 0); /* MemberCnt */
rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */

rd_kafka_mock_connection_send_response(mconn, resp);

Expand Down Expand Up @@ -1727,6 +1735,8 @@ static int rd_kafka_mock_handle_Heartbeat(rd_kafka_mock_connection_t *mconn,

rd_kafka_buf_write_i16(resp, err); /* ErrorCode */

rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */

rd_kafka_mock_connection_send_response(mconn, resp);

return 0;
Expand Down Expand Up @@ -1830,7 +1840,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_str(rkbuf, &MemberId);
if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3)
rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
rd_kafka_buf_read_i32(rkbuf, &AssignmentCnt);
rd_kafka_buf_read_arraycnt(rkbuf, &AssignmentCnt, RD_KAFKAP_PARTITIONS_MAX);

/*
* Construct response
Expand Down Expand Up @@ -1890,6 +1900,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn,

rd_kafka_buf_read_str(rkbuf, &MemberId2);
rd_kafka_buf_read_kbytes(rkbuf, &Metadata);
rd_kafka_buf_skip_tags(rkbuf); /* Request: Struct tags */

if (err)
continue;
Expand All @@ -1916,6 +1927,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn,
/* Error case */
rd_kafka_buf_write_i16(resp, err); /* ErrorCode */
rd_kafka_buf_write_bytes(resp, NULL, -1); /* MemberState */
rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */

rd_kafka_mock_connection_send_response(mconn, resp);

Expand Down
46 changes: 27 additions & 19 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -601,13 +601,14 @@ rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
int16_t ApiVersion;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL);
rkb, RD_KAFKAP_FindCoordinator, 0, 3, NULL);

if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1)
return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1,
1 + 2 + strlen(coordkey));
rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_FindCoordinator,
1, 1 + 2 + strlen(coordkey),
ApiVersion >= 3);

rd_kafka_buf_write_str(rkbuf, coordkey, -1);

Expand Down Expand Up @@ -1995,27 +1996,29 @@ void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb,
int features;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_SyncGroup, 0, 3, &features);
rkb, RD_KAFKAP_SyncGroup, 0, 4, &features);

rkbuf = rd_kafka_buf_new_request(
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_SyncGroup, 1,
RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ +
RD_KAFKAP_STR_SIZE(member_id) +
RD_KAFKAP_STR_SIZE(group_instance_id) +
4 /* array size group_assignment */ +
(assignment_cnt * 100 /*guess*/));
(assignment_cnt * 100 /*guess*/),
ApiVersion >= 4);
rd_kafka_buf_write_kstr(rkbuf, group_id);
rd_kafka_buf_write_i32(rkbuf, generation_id);
rd_kafka_buf_write_kstr(rkbuf, member_id);
if (ApiVersion >= 3)
rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
rd_kafka_buf_write_arraycnt(rkbuf, assignment_cnt);

for (i = 0; i < assignment_cnt; i++) {
const rd_kafka_group_member_t *rkgm = &assignments[i];

rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
rd_kafka_buf_write_tags_empty(rkbuf);
}

/* This is a blocking request */
Expand Down Expand Up @@ -2053,17 +2056,18 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
int features;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_JoinGroup, 0, 5, &features);
rkb, RD_KAFKAP_JoinGroup, 0, 6, &features);


rkbuf = rd_kafka_buf_new_request(
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_JoinGroup, 1,
RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ +
4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) +
RD_KAFKAP_STR_SIZE(group_instance_id) +
RD_KAFKAP_STR_SIZE(protocol_type) +
4 /* array count GroupProtocols */ +
(rd_list_cnt(topics) * 100));
(rd_list_cnt(topics) * 100),
ApiVersion >= 6 /*flexver*/);
rd_kafka_buf_write_kstr(rkbuf, group_id);
rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
if (ApiVersion >= 1)
Expand All @@ -2072,7 +2076,7 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
if (ApiVersion >= 5)
rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
rd_kafka_buf_write_kstr(rkbuf, protocol_type);
rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
rd_kafka_buf_write_arraycnt(rkbuf, rk->rk_conf.enabled_assignor_cnt);

RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
rd_kafkap_bytes_t *member_metadata;
Expand All @@ -2084,6 +2088,7 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
rk->rk_cgrp->rkcg_group_assignment,
rk->rk_conf.client_rack);
rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
rd_kafka_buf_write_tags_empty(rkbuf);
rd_kafkap_bytes_destroy(member_metadata);
}

Expand Down Expand Up @@ -2240,16 +2245,17 @@ void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb,
int features;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_Heartbeat, 0, 3, &features);
rkb, RD_KAFKAP_Heartbeat, 0, 4, &features);

rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
"Heartbeat for group \"%s\" generation id %" PRId32,
group_id->str, generation_id);

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1,
RD_KAFKAP_STR_SIZE(group_id) +
4 /* GenerationId */ +
RD_KAFKAP_STR_SIZE(member_id));
rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_Heartbeat, 1,
RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ +
RD_KAFKAP_STR_SIZE(member_id),
ApiVersion >= 4);

rd_kafka_buf_write_kstr(rkbuf, group_id);
rd_kafka_buf_write_i32(rkbuf, generation_id);
Expand Down Expand Up @@ -3411,7 +3417,11 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
int16_t ApiVersion;
int features;

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0);
ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_SaslAuthenticate, 0, 2, &features);

rkbuf = rd_kafka_buf_new_flexver_request(
rkb, RD_KAFKAP_SaslAuthenticate, 0, 0, ApiVersion >= 2);

/* Should be sent before any other requests since it is part of
* the initial connection handshake. */
Expand All @@ -3424,8 +3434,6 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
* close down the connection and reconnect on failure. */
rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_SaslAuthenticate, 0, 1, &features);
rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

if (replyq.q)
Expand Down