diff --git a/src/rdkafka_cgrp.c b/src/rdkafka_cgrp.c index d87ab2c05..562ba3ba9 100644 --- a/src/rdkafka_cgrp.c +++ b/src/rdkafka_cgrp.c @@ -2228,7 +2228,7 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, rd_kafka_buf_read_str(rkbuf, &Protocol); rd_kafka_buf_read_str(rkbuf, &LeaderId); rd_kafka_buf_read_str(rkbuf, &MyMemberId); - rd_kafka_buf_read_i32(rkbuf, &member_cnt); + rd_kafka_buf_read_arraycnt(rkbuf, &member_cnt, 100000); if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) { /* Protocol not set, we will not be able to find @@ -2324,6 +2324,9 @@ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, if (request->rkbuf_reqhdr.ApiVersion >= 5) rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); rd_kafka_buf_read_kbytes(rkbuf, &MemberMetadata); + if (request->rkbuf_reqhdr.ApiVersion >= 6) { + rd_kafka_buf_skip_tags(rkbuf); + } rkgm = &members[sub_cnt]; rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId); diff --git a/src/rdkafka_mock_handlers.c b/src/rdkafka_mock_handlers.c index 1b1e19141..4daf02aa4 100644 --- a/src/rdkafka_mock_handlers.c +++ b/src/rdkafka_mock_handlers.c @@ -1502,6 +1502,9 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_i8(rkbuf, &KeyType); } + /* Request: Struct tags */ + rd_kafka_buf_skip_tags(rkbuf); + /* * Construct response @@ -1544,6 +1547,8 @@ rd_kafka_mock_handle_FindCoordinator(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i32(resp, (int32_t)mrkb->port); } + rd_kafka_buf_write_tags_empty(resp); + rd_kafka_mock_connection_send_response(mconn, resp); return 0; @@ -1581,7 +1586,7 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn, if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); rd_kafka_buf_read_str(rkbuf, &ProtocolType); - rd_kafka_buf_read_i32(rkbuf, &ProtocolCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &ProtocolCnt, 10000); if (ProtocolCnt > 1000) { rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", @@ -1598,6 +1603,8 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn, rd_kafkap_bytes_t Metadata; rd_kafka_buf_read_str(rkbuf, &ProtocolName); rd_kafka_buf_read_kbytes(rkbuf, &Metadata); + /* Request: Struct tags */ + rd_kafka_buf_skip_tags(rkbuf); protos[i].name = rd_kafkap_str_copy(&ProtocolName); protos[i].metadata = rd_kafkap_bytes_copy(&Metadata); } @@ -1649,7 +1656,8 @@ static int rd_kafka_mock_handle_JoinGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_str(resp, NULL, -1); /* ProtocolName */ rd_kafka_buf_write_str(resp, NULL, -1); /* LeaderId */ rd_kafka_buf_write_kstr(resp, NULL); /* MemberId */ - rd_kafka_buf_write_i32(resp, 0); /* MemberCnt */ + rd_kafka_buf_write_arraycnt(resp, 0); /* MemberCnt */ + rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */ rd_kafka_mock_connection_send_response(mconn, resp); @@ -1727,6 +1735,8 @@ static int rd_kafka_mock_handle_Heartbeat(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_write_i16(resp, err); /* ErrorCode */ + rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */ + rd_kafka_mock_connection_send_response(mconn, resp); return 0; @@ -1830,7 +1840,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &MemberId); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 3) rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); - rd_kafka_buf_read_i32(rkbuf, &AssignmentCnt); + rd_kafka_buf_read_arraycnt(rkbuf, &AssignmentCnt, RD_KAFKAP_PARTITIONS_MAX); /* * Construct response @@ -1890,6 +1900,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn, rd_kafka_buf_read_str(rkbuf, &MemberId2); rd_kafka_buf_read_kbytes(rkbuf, &Metadata); + rd_kafka_buf_skip_tags(rkbuf); /* Request: Struct tags */ if (err) continue; @@ -1916,6 +1927,7 @@ static int rd_kafka_mock_handle_SyncGroup(rd_kafka_mock_connection_t *mconn, /* Error case */ rd_kafka_buf_write_i16(resp, err); /* ErrorCode */ rd_kafka_buf_write_bytes(resp, NULL, -1); /* MemberState */ + rd_kafka_buf_write_tags_empty(resp); /* Response: Struct tags */ rd_kafka_mock_connection_send_response(mconn, resp); diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index 788f3b8ac..4d699af71 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -601,13 +601,14 @@ rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL); + rkb, RD_KAFKAP_FindCoordinator, 0, 3, NULL); if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1) return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1, - 1 + 2 + strlen(coordkey)); + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_FindCoordinator, + 1, 1 + 2 + strlen(coordkey), + ApiVersion >= 3); rd_kafka_buf_write_str(rkbuf, coordkey, -1); @@ -1995,27 +1996,29 @@ void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_SyncGroup, 0, 3, &features); + rkb, RD_KAFKAP_SyncGroup, 0, 4, &features); - rkbuf = rd_kafka_buf_new_request( + rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_SyncGroup, 1, RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ + RD_KAFKAP_STR_SIZE(member_id) + RD_KAFKAP_STR_SIZE(group_instance_id) + 4 /* array size group_assignment */ + - (assignment_cnt * 100 /*guess*/)); + (assignment_cnt * 100 /*guess*/), + ApiVersion >= 4); rd_kafka_buf_write_kstr(rkbuf, group_id); rd_kafka_buf_write_i32(rkbuf, generation_id); rd_kafka_buf_write_kstr(rkbuf, member_id); if (ApiVersion >= 3) rd_kafka_buf_write_kstr(rkbuf, group_instance_id); - rd_kafka_buf_write_i32(rkbuf, assignment_cnt); + rd_kafka_buf_write_arraycnt(rkbuf, assignment_cnt); for (i = 0; i < assignment_cnt; i++) { const rd_kafka_group_member_t *rkgm = &assignments[i]; rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); + rd_kafka_buf_write_tags_empty(rkbuf); } /* This is a blocking request */ @@ -2053,17 +2056,18 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_JoinGroup, 0, 5, &features); + rkb, RD_KAFKAP_JoinGroup, 0, 6, &features); - rkbuf = rd_kafka_buf_new_request( + rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_JoinGroup, 1, RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ + 4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) + RD_KAFKAP_STR_SIZE(group_instance_id) + RD_KAFKAP_STR_SIZE(protocol_type) + 4 /* array count GroupProtocols */ + - (rd_list_cnt(topics) * 100)); + (rd_list_cnt(topics) * 100), + ApiVersion >= 6 /*flexver*/); rd_kafka_buf_write_kstr(rkbuf, group_id); rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms); if (ApiVersion >= 1) @@ -2072,7 +2076,7 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, if (ApiVersion >= 5) rd_kafka_buf_write_kstr(rkbuf, group_instance_id); rd_kafka_buf_write_kstr(rkbuf, protocol_type); - rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); + rd_kafka_buf_write_arraycnt(rkbuf, rk->rk_conf.enabled_assignor_cnt); RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { rd_kafkap_bytes_t *member_metadata; @@ -2084,6 +2088,7 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, rk->rk_cgrp->rkcg_group_assignment, rk->rk_conf.client_rack); rd_kafka_buf_write_kbytes(rkbuf, member_metadata); + rd_kafka_buf_write_tags_empty(rkbuf); rd_kafkap_bytes_destroy(member_metadata); } @@ -2240,16 +2245,17 @@ void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb, int features; ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_Heartbeat, 0, 3, &features); + rkb, RD_KAFKAP_Heartbeat, 0, 4, &features); rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", "Heartbeat for group \"%s\" generation id %" PRId32, group_id->str, generation_id); - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1, - RD_KAFKAP_STR_SIZE(group_id) + - 4 /* GenerationId */ + - RD_KAFKAP_STR_SIZE(member_id)); + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_Heartbeat, 1, + RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ + + RD_KAFKAP_STR_SIZE(member_id), + ApiVersion >= 4); rd_kafka_buf_write_kstr(rkbuf, group_id); rd_kafka_buf_write_i32(rkbuf, generation_id); @@ -3411,7 +3417,11 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, int16_t ApiVersion; int features; - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0); + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_SaslAuthenticate, 0, 2, &features); + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_SaslAuthenticate, 0, 0, ApiVersion >= 2); /* Should be sent before any other requests since it is part of * the initial connection handshake. */ @@ -3424,8 +3434,6 @@ void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, * close down the connection and reconnect on failure. */ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_SaslAuthenticate, 0, 1, &features); rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); if (replyq.q)