Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions examples/rdkafka_performance.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ static int exit_after = 0;
static int exit_eof = 0;
static FILE *stats_fp;
static int dr_disp_div;
static int verbosity = 1;
static int latency_mode = 0;
static FILE *latency_fp = NULL;
static int msgcnt = -1;
static int incremental_mode = 0;
static int partition_cnt = 0;
static int eof_cnt = 0;
static int with_dr = 1;
static int read_hdrs = 0;
static int verbosity = 1;
static int latency_mode = 0;
static FILE *latency_fp = NULL;
static int msgcnt = -1;
static int incremental_mode = 0;
static int partition_cnt = 0;
static int eof_cnt = 0;
static int with_dr = 1;
static int read_hdrs = 0;
static int is_group_protocol_classic = 1;


static void stop(int sig) {
Expand Down Expand Up @@ -887,14 +888,13 @@ int main(int argc, char **argv) {
* Try other values with: ... -X queued.min.messages=1000
*/
rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0);

topics = rd_kafka_topic_partition_list_new(1);

while ((opt = getopt(argc, argv,
"PCG:t:p:b:s:k:c:fi:MDd:m:S:x:"
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) {
"R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:g:")) != -1) {
switch (opt) {
case 'G':
if (rd_kafka_conf_set(conf, "group.id", optarg, errstr,
Expand Down Expand Up @@ -922,6 +922,16 @@ int main(int argc, char **argv) {
case 'b':
brokers = optarg;
break;
case 'g':
if (rd_kafka_conf_set(conf, "group.protocol", optarg,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
exit(1);
}
is_group_protocol_classic =
strcmp(optarg, "classic") == 0;
break;
case 's':
msgsize = atoi(optarg);
break;
Expand Down Expand Up @@ -1142,6 +1152,8 @@ int main(int argc, char **argv) {
" -p <num> Partition (defaults to random). "
"Multiple partitions are allowed in -C consumer mode.\n"
" -M Print consumer interval stats\n"
" -g <protocol> Group rebalance protocol to use. classic "
"or consumer (defaults to classic)\n"
" -b <brokers> Broker address list (host[:port],..)\n"
" -s <size> Message size (producer)\n"
" -k <key> Message key (producer)\n"
Expand Down Expand Up @@ -1201,6 +1213,11 @@ int main(int argc, char **argv) {
exit(1);
}

if (is_group_protocol_classic) {
/** Session timeout is moved to broker side on the new consumer
* group rebalance protocols */
rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0);
}

dispintvl *= 1000; /* us */

Expand Down
5 changes: 3 additions & 2 deletions tests/0045-subscribe_update.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ static void do_test_regex_many_mock(const char *assignment_strategy,
/* Wait for an assignment to let the consumer catch up on
* all rebalancing. */
if (i % await_assignment_every == await_assignment_every - 1)
test_consumer_wait_assignment(rk, rd_true /*poll*/);
test_consumer_wait_assignment(rk, rd_true /*poll*/,
1000);
else if (!lots_of_topics)
rd_usleep(100 * 1000, NULL);
}
Expand Down Expand Up @@ -783,7 +784,7 @@ static void do_test_resubscribe_with_regex() {

/* Subscribe to regex ^.*topic_regex.* and topic_a literal */
TEST_SAY("Subscribing to regex ^.*topic_regex.* and topic_a\n");
test_consumer_subscribe_multi(rk, 2, "^.*topic_regex.*", topic_a);
test_consumer_subscribe_multi_va(rk, 2, "^.*topic_regex.*", topic_a);
/* Wait for assignment */
if (test_consumer_group_protocol_classic()) {
await_assignment("Assignment for topic1, topic2 and topic_a",
Expand Down
6 changes: 3 additions & 3 deletions tests/0081-admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -4212,7 +4212,7 @@ static void do_test_DeleteConsumerGroupOffsets(const char *what,

if (sub_consumer) {
TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
test_consumer_wait_assignment(consumer, rd_true);
test_consumer_wait_assignment(consumer, rd_true, 1000);
}

/* Commit some offsets */
Expand Down Expand Up @@ -4488,7 +4488,7 @@ static void do_test_AlterConsumerGroupOffsets(const char *what,
if (sub_consumer) {
TEST_CALL_ERR__(
rd_kafka_subscribe(consumer, subscription));
test_consumer_wait_assignment(consumer, rd_true);
test_consumer_wait_assignment(consumer, rd_true, 1000);
}
}

Expand Down Expand Up @@ -4769,7 +4769,7 @@ static void do_test_ListConsumerGroupOffsets(const char *what,

if (sub_consumer) {
TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
test_consumer_wait_assignment(consumer, rd_true);
test_consumer_wait_assignment(consumer, rd_true, 1000);
}

/* Commit some offsets */
Expand Down
2 changes: 1 addition & 1 deletion tests/0102-static_group_rebalance.c
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ static void do_test_static_membership_mock(

TEST_SAY("Subscribing consumer 1 again\n");
test_consumer_subscribe(consumer1, topic);
test_consumer_wait_assignment(consumer1, rd_false);
test_consumer_wait_assignment(consumer1, rd_false, 1000);

next_generation_id1 = consumer_generation_id(consumer1);
next_generation_id2 = consumer_generation_id(consumer2);
Expand Down
4 changes: 2 additions & 2 deletions tests/0103-transactions.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static void do_test_basic_producer_txn(rd_bool_t enable_compression) {
/* Wait for assignment to make sure consumer is fetching messages
* below, so we can use the poll_no_msgs() timeout to
* determine that messages were indeed aborted. */
test_consumer_wait_assignment(c, rd_true);
test_consumer_wait_assignment(c, rd_true, 1000);

/* Init transactions */
TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000));
Expand Down Expand Up @@ -1039,7 +1039,7 @@ static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) {
test_conf_set(c_conf, "enable.auto.commit", "false");
c = test_create_consumer(topic, NULL, c_conf, NULL);
test_consumer_subscribe(c, topic);
test_consumer_wait_assignment(c, rd_false);
test_consumer_wait_assignment(c, rd_false, 1000);

TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1));

Expand Down
2 changes: 1 addition & 1 deletion tests/0107-topic_recreate.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {

/* Start consumer */
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_true);
test_consumer_wait_assignment(consumer, rd_true, 1000);

mtx_lock(&value_mtx);
value = "before";
Expand Down
6 changes: 3 additions & 3 deletions tests/0113-cooperative_rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3346,7 +3346,7 @@ static void x_incremental_rebalances(void) {
/* First consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[0]));
test_consumer_subscribe(c[0], topic);
test_consumer_wait_assignment(c[0], rd_true /*poll*/);
test_consumer_wait_assignment(c[0], rd_true /*poll*/, 1000);
test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0,
topic, 1, topic, 2, topic, 3, topic, 4, topic,
5, NULL);
Expand All @@ -3355,7 +3355,7 @@ static void x_incremental_rebalances(void) {
/* Second consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[1]));
test_consumer_subscribe(c[1], topic);
test_consumer_wait_assignment(c[1], rd_true /*poll*/);
test_consumer_wait_assignment(c[1], rd_true /*poll*/, 1000);
rd_sleep(3);
if (test_consumer_group_protocol_classic()) {
test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3,
Expand All @@ -3372,7 +3372,7 @@ static void x_incremental_rebalances(void) {
/* Third consumer joins group */
TEST_SAY("%s: joining\n", rd_kafka_name(c[2]));
test_consumer_subscribe(c[2], topic);
test_consumer_wait_assignment(c[2], rd_true /*poll*/);
test_consumer_wait_assignment(c[2], rd_true /*poll*/, 1000);
rd_sleep(3);
if (test_consumer_group_protocol_classic()) {
test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4,
Expand Down
2 changes: 1 addition & 1 deletion tests/0120-asymmetric_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static void do_test_asymmetric(const char *assignor, const char *bootstraps) {

/* Await assignments for all consumers */
for (i = 0; i < _C_CNT; i++)
test_consumer_wait_assignment(c[i], rd_true);
test_consumer_wait_assignment(c[i], rd_true, 1000);

/* All have assignments, grab them. */
for (i = 0; i < _C_CNT; i++) {
Expand Down
4 changes: 2 additions & 2 deletions tests/0122-buffer_cleaning_after_rebalance.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static void do_test_consume_batch(const char *strategy) {
c2 = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(c1, topic);
test_consumer_wait_assignment(c1, rd_false);
test_consumer_wait_assignment(c1, rd_false, 1000);

/* Create generic consume queue */
rkq1 = rd_kafka_queue_get_consumer(c1);
Expand All @@ -183,7 +183,7 @@ static void do_test_consume_batch(const char *strategy) {
TEST_FAIL("Failed to create thread for %s", "C1.PRE");

test_consumer_subscribe(c2, topic);
test_consumer_wait_assignment(c2, rd_false);
test_consumer_wait_assignment(c2, rd_false, 1000);

thrd_join(thread_id, NULL);

Expand Down
2 changes: 1 addition & 1 deletion tests/0132-strategy_ordering.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ static void do_test_strategy_ordering(const char *assignor,

/* Await assignments for all consumers */
for (i = 0; i < _C_CNT; i++) {
test_consumer_wait_assignment(c[i], rd_true);
test_consumer_wait_assignment(c[i], rd_true, 1000);
}

if (!strcmp(expected_assignor, "range"))
Expand Down
10 changes: 5 additions & 5 deletions tests/0137-barrier_batch_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static void do_test_consume_batch_with_seek(void) {
consumer = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);
test_consumer_wait_assignment(consumer, rd_false, 1000);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);
Expand Down Expand Up @@ -236,7 +236,7 @@ static void do_test_consume_batch_with_pause_and_resume_different_batch(void) {
consumer = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);
test_consumer_wait_assignment(consumer, rd_false, 1000);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);
Expand Down Expand Up @@ -341,7 +341,7 @@ static void do_test_consume_batch_with_pause_and_resume_same_batch(void) {
consumer = test_create_consumer(topic, NULL, conf, NULL);

test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);
test_consumer_wait_assignment(consumer, rd_false, 1000);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);
Expand Down Expand Up @@ -439,7 +439,7 @@ static void do_test_consume_batch_store_offset(void) {
consumer = test_create_consumer(topic, NULL,
rd_kafka_conf_dup(conf), NULL);
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);
test_consumer_wait_assignment(consumer, rd_false, 1000);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);
Expand Down Expand Up @@ -544,7 +544,7 @@ static void do_test_consume_batch_control_msgs(void) {
consumer = test_create_consumer(topic, NULL, c_conf, NULL);

test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);
test_consumer_wait_assignment(consumer, rd_false, 1000);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);
Expand Down
Loading