diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index dab0b06b8f..290828bcd1 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -67,15 +67,16 @@ static int exit_after = 0; static int exit_eof = 0; static FILE *stats_fp; static int dr_disp_div; -static int verbosity = 1; -static int latency_mode = 0; -static FILE *latency_fp = NULL; -static int msgcnt = -1; -static int incremental_mode = 0; -static int partition_cnt = 0; -static int eof_cnt = 0; -static int with_dr = 1; -static int read_hdrs = 0; +static int verbosity = 1; +static int latency_mode = 0; +static FILE *latency_fp = NULL; +static int msgcnt = -1; +static int incremental_mode = 0; +static int partition_cnt = 0; +static int eof_cnt = 0; +static int with_dr = 1; +static int read_hdrs = 0; +static int is_group_protocol_classic = 1; static void stop(int sig) { @@ -887,14 +888,13 @@ int main(int argc, char **argv) { * Try other values with: ... -X queued.min.messages=1000 */ rd_kafka_conf_set(conf, "queued.min.messages", "1000000", NULL, 0); - rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", NULL, 0); topics = rd_kafka_topic_partition_list_new(1); while ((opt = getopt(argc, argv, "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" - "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) { + "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:g:")) != -1) { switch (opt) { case 'G': if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, @@ -922,6 +922,16 @@ int main(int argc, char **argv) { case 'b': brokers = optarg; break; + case 'g': + if (rd_kafka_conf_set(conf, "group.protocol", optarg, + errstr, sizeof(errstr)) != + RD_KAFKA_CONF_OK) { + fprintf(stderr, "%% %s\n", errstr); + exit(1); + } + is_group_protocol_classic = + strcmp(optarg, "classic") == 0; + break; case 's': msgsize = atoi(optarg); break; @@ -1142,6 +1152,8 @@ int main(int argc, char **argv) { " -p Partition (defaults to random). " "Multiple partitions are allowed in -C consumer mode.\n" " -M Print consumer interval stats\n" + " -g Group rebalance protocol to use. classic " + "or consumer (defaults to classic)\n" " -b Broker address list (host[:port],..)\n" " -s Message size (producer)\n" " -k Message key (producer)\n" @@ -1201,6 +1213,11 @@ int main(int argc, char **argv) { exit(1); } + if (is_group_protocol_classic) { + /** Session timeout is moved to broker side on the new consumer + * group rebalance protocols */ + rd_kafka_conf_set(conf, "session.timeout.ms", "6000", NULL, 0); + } dispintvl *= 1000; /* us */ diff --git a/tests/0045-subscribe_update.c b/tests/0045-subscribe_update.c index a520ba369e..dbc045ba3b 100644 --- a/tests/0045-subscribe_update.c +++ b/tests/0045-subscribe_update.c @@ -474,7 +474,8 @@ static void do_test_regex_many_mock(const char *assignment_strategy, /* Wait for an assignment to let the consumer catch up on * all rebalancing. */ if (i % await_assignment_every == await_assignment_every - 1) - test_consumer_wait_assignment(rk, rd_true /*poll*/); + test_consumer_wait_assignment(rk, rd_true /*poll*/, + 1000); else if (!lots_of_topics) rd_usleep(100 * 1000, NULL); } @@ -783,7 +784,7 @@ static void do_test_resubscribe_with_regex() { /* Subscribe to regex ^.*topic_regex.* and topic_a literal */ TEST_SAY("Subscribing to regex ^.*topic_regex.* and topic_a\n"); - test_consumer_subscribe_multi(rk, 2, "^.*topic_regex.*", topic_a); + test_consumer_subscribe_multi_va(rk, 2, "^.*topic_regex.*", topic_a); /* Wait for assignment */ if (test_consumer_group_protocol_classic()) { await_assignment("Assignment for topic1, topic2 and topic_a", diff --git a/tests/0081-admin.c b/tests/0081-admin.c index f16f958e58..06a1256ea2 100644 --- a/tests/0081-admin.c +++ b/tests/0081-admin.c @@ -4212,7 +4212,7 @@ static void do_test_DeleteConsumerGroupOffsets(const char *what, if (sub_consumer) { TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); + test_consumer_wait_assignment(consumer, rd_true, 1000); } /* Commit some offsets */ @@ -4488,7 +4488,7 @@ static void do_test_AlterConsumerGroupOffsets(const char *what, if (sub_consumer) { TEST_CALL_ERR__( rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); + test_consumer_wait_assignment(consumer, rd_true, 1000); } } @@ -4769,7 +4769,7 @@ static void do_test_ListConsumerGroupOffsets(const char *what, if (sub_consumer) { TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); + test_consumer_wait_assignment(consumer, rd_true, 1000); } /* Commit some offsets */ diff --git a/tests/0102-static_group_rebalance.c b/tests/0102-static_group_rebalance.c index 8f6c2a90c9..679344b7b3 100644 --- a/tests/0102-static_group_rebalance.c +++ b/tests/0102-static_group_rebalance.c @@ -759,7 +759,7 @@ static void do_test_static_membership_mock( TEST_SAY("Subscribing consumer 1 again\n"); test_consumer_subscribe(consumer1, topic); - test_consumer_wait_assignment(consumer1, rd_false); + test_consumer_wait_assignment(consumer1, rd_false, 1000); next_generation_id1 = consumer_generation_id(consumer1); next_generation_id2 = consumer_generation_id(consumer2); diff --git a/tests/0103-transactions.c b/tests/0103-transactions.c index 0bc1664d83..3fce5eac71 100644 --- a/tests/0103-transactions.c +++ b/tests/0103-transactions.c @@ -165,7 +165,7 @@ static void do_test_basic_producer_txn(rd_bool_t enable_compression) { /* Wait for assignment to make sure consumer is fetching messages * below, so we can use the poll_no_msgs() timeout to * determine that messages were indeed aborted. */ - test_consumer_wait_assignment(c, rd_true); + test_consumer_wait_assignment(c, rd_true, 1000); /* Init transactions */ TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); @@ -1039,7 +1039,7 @@ static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) { test_conf_set(c_conf, "enable.auto.commit", "false"); c = test_create_consumer(topic, NULL, c_conf, NULL); test_consumer_subscribe(c, topic); - test_consumer_wait_assignment(c, rd_false); + test_consumer_wait_assignment(c, rd_false, 1000); TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1)); diff --git a/tests/0107-topic_recreate.c b/tests/0107-topic_recreate.c index 68b9784796..105d71105e 100644 --- a/tests/0107-topic_recreate.c +++ b/tests/0107-topic_recreate.c @@ -193,7 +193,7 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) { /* Start consumer */ test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_true); + test_consumer_wait_assignment(consumer, rd_true, 1000); mtx_lock(&value_mtx); value = "before"; diff --git a/tests/0113-cooperative_rebalance.cpp b/tests/0113-cooperative_rebalance.cpp index c9b068cfd6..edf0dda202 100644 --- a/tests/0113-cooperative_rebalance.cpp +++ b/tests/0113-cooperative_rebalance.cpp @@ -3346,7 +3346,7 @@ static void x_incremental_rebalances(void) { /* First consumer joins group */ TEST_SAY("%s: joining\n", rd_kafka_name(c[0])); test_consumer_subscribe(c[0], topic); - test_consumer_wait_assignment(c[0], rd_true /*poll*/); + test_consumer_wait_assignment(c[0], rd_true /*poll*/, 1000); test_consumer_verify_assignment(c[0], rd_true /*fail immediately*/, topic, 0, topic, 1, topic, 2, topic, 3, topic, 4, topic, 5, NULL); @@ -3355,7 +3355,7 @@ static void x_incremental_rebalances(void) { /* Second consumer joins group */ TEST_SAY("%s: joining\n", rd_kafka_name(c[1])); test_consumer_subscribe(c[1], topic); - test_consumer_wait_assignment(c[1], rd_true /*poll*/); + test_consumer_wait_assignment(c[1], rd_true /*poll*/, 1000); rd_sleep(3); if (test_consumer_group_protocol_classic()) { test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 3, @@ -3372,7 +3372,7 @@ static void x_incremental_rebalances(void) { /* Third consumer joins group */ TEST_SAY("%s: joining\n", rd_kafka_name(c[2])); test_consumer_subscribe(c[2], topic); - test_consumer_wait_assignment(c[2], rd_true /*poll*/); + test_consumer_wait_assignment(c[2], rd_true /*poll*/, 1000); rd_sleep(3); if (test_consumer_group_protocol_classic()) { test_consumer_verify_assignment(c[0], rd_false /*fail later*/, topic, 4, diff --git a/tests/0120-asymmetric_subscription.c b/tests/0120-asymmetric_subscription.c index 4439d5e51d..a89e6c3c27 100644 --- a/tests/0120-asymmetric_subscription.c +++ b/tests/0120-asymmetric_subscription.c @@ -100,7 +100,7 @@ static void do_test_asymmetric(const char *assignor, const char *bootstraps) { /* Await assignments for all consumers */ for (i = 0; i < _C_CNT; i++) - test_consumer_wait_assignment(c[i], rd_true); + test_consumer_wait_assignment(c[i], rd_true, 1000); /* All have assignments, grab them. */ for (i = 0; i < _C_CNT; i++) { diff --git a/tests/0122-buffer_cleaning_after_rebalance.c b/tests/0122-buffer_cleaning_after_rebalance.c index 9778391e89..23d657850b 100644 --- a/tests/0122-buffer_cleaning_after_rebalance.c +++ b/tests/0122-buffer_cleaning_after_rebalance.c @@ -164,7 +164,7 @@ static void do_test_consume_batch(const char *strategy) { c2 = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(c1, topic); - test_consumer_wait_assignment(c1, rd_false); + test_consumer_wait_assignment(c1, rd_false, 1000); /* Create generic consume queue */ rkq1 = rd_kafka_queue_get_consumer(c1); @@ -183,7 +183,7 @@ static void do_test_consume_batch(const char *strategy) { TEST_FAIL("Failed to create thread for %s", "C1.PRE"); test_consumer_subscribe(c2, topic); - test_consumer_wait_assignment(c2, rd_false); + test_consumer_wait_assignment(c2, rd_false, 1000); thrd_join(thread_id, NULL); diff --git a/tests/0132-strategy_ordering.c b/tests/0132-strategy_ordering.c index 379bed8c18..0479d9f09d 100644 --- a/tests/0132-strategy_ordering.c +++ b/tests/0132-strategy_ordering.c @@ -147,7 +147,7 @@ static void do_test_strategy_ordering(const char *assignor, /* Await assignments for all consumers */ for (i = 0; i < _C_CNT; i++) { - test_consumer_wait_assignment(c[i], rd_true); + test_consumer_wait_assignment(c[i], rd_true, 1000); } if (!strcmp(expected_assignor, "range")) diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c index 19bec387db..03bf4a8f7a 100644 --- a/tests/0137-barrier_batch_consume.c +++ b/tests/0137-barrier_batch_consume.c @@ -146,7 +146,7 @@ static void do_test_consume_batch_with_seek(void) { consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_false); + test_consumer_wait_assignment(consumer, rd_false, 1000); /* Create generic consume queue */ rkq = rd_kafka_queue_get_consumer(consumer); @@ -236,7 +236,7 @@ static void do_test_consume_batch_with_pause_and_resume_different_batch(void) { consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_false); + test_consumer_wait_assignment(consumer, rd_false, 1000); /* Create generic consume queue */ rkq = rd_kafka_queue_get_consumer(consumer); @@ -341,7 +341,7 @@ static void do_test_consume_batch_with_pause_and_resume_same_batch(void) { consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_false); + test_consumer_wait_assignment(consumer, rd_false, 1000); /* Create generic consume queue */ rkq = rd_kafka_queue_get_consumer(consumer); @@ -439,7 +439,7 @@ static void do_test_consume_batch_store_offset(void) { consumer = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_false); + test_consumer_wait_assignment(consumer, rd_false, 1000); /* Create generic consume queue */ rkq = rd_kafka_queue_get_consumer(consumer); @@ -544,7 +544,7 @@ static void do_test_consume_batch_control_msgs(void) { consumer = test_create_consumer(topic, NULL, c_conf, NULL); test_consumer_subscribe(consumer, topic); - test_consumer_wait_assignment(consumer, rd_false); + test_consumer_wait_assignment(consumer, rd_false, 1000); /* Create generic consume queue */ rkq = rd_kafka_queue_get_consumer(consumer); diff --git a/tests/8002-rebalance-performance.c b/tests/8002-rebalance-performance.c new file mode 100644 index 0000000000..07a78234b9 --- /dev/null +++ b/tests/8002-rebalance-performance.c @@ -0,0 +1,468 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include "test.h" +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ +#include "rdtime.h" + +#if defined(__linux__) +#define SET_THREAD_NAME(name) \ + do { \ + char pthread_thread_name[32]; \ + snprintf(pthread_thread_name, sizeof(pthread_thread_name), \ + "%s", name); \ + pthread_setname_np(pthread_self(), pthread_thread_name); \ + } while (0) +#else +#define SET_THREAD_NAME(name) \ + do { \ + } while (0) +#endif + +static int number_of_test_runs = 1; +static int partition_cnt = 30; +static int topic_cnt = 2; +static int consumer_cnt = 60; +static int batch_size = 60; +static atomic_int run = 0; + +typedef struct consumer_s { + int consumer_id; + int expected_assignment_cnt; + char *group_id; + const char **subscriptions; + atomic_llong end_time; + rd_kafka_t *consumer; + rd_kafka_topic_partition_list_t *prev_assignment; +} consumer_t; + +typedef struct producer_s { + int producer_id; + char *topic; +} producer_t; + +static long long int max(long long int a, long long int b) { + return (a > b) ? a : b; +} + +static int producer_thread(void *arg) { + producer_t *producer_args = arg; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + char buf[50]; + char payload[64]; + int i = 0; + + rk = test_create_handle(RD_KAFKA_PRODUCER, NULL); + rkt = rd_kafka_topic_new(rk, producer_args->topic, NULL); + + TEST_SAY("Producer %d started for topic %s\n", + producer_args->producer_id, producer_args->topic); + + while (run) { + snprintf(buf, sizeof(buf), "Producer %d message %d", + producer_args->producer_id, i); + snprintf(payload, sizeof(payload), "Payload %s", buf); + rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, payload, strlen(payload), + NULL, 0, NULL); + rd_usleep(5000, NULL); /* 5ms */ + i++; + i %= 2147483643; + } + + // TEST_SAY_WHITE("Producer %d finished producing messages for topic + // %s\n", producer_args->producer_id, producer_args->topic); + rd_kafka_flush( + rk, 10000); // Wait for all messages to be sent before exiting + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + rd_free( + producer_args->topic); // Free the topic string allocated in main + + return 0; +} + +static rd_kafka_topic_partition_list_t * +list_diff(rd_kafka_topic_partition_list_t *a, + rd_kafka_topic_partition_list_t *b) { + rd_kafka_topic_partition_list_t *result = + rd_kafka_topic_partition_list_new(0); + int i; + + for (i = 0; i < a->cnt; i++) { + if (!rd_kafka_topic_partition_list_find( + b, a->elems[i].topic, a->elems[i].partition)) { + rd_kafka_topic_partition_list_add( + result, a->elems[i].topic, a->elems[i].partition); + } + } + + return result; +} + +static int consumer_thread(void *arg) { + rd_kafka_conf_t *conf; + rd_kafka_t *consumer; + rd_kafka_topic_partition_list_t *assignment = NULL; + consumer_t *consumer_args = arg; + rd_kafka_topic_partition_list_t *diff = + rd_kafka_topic_partition_list_new(0); + int inside_list_diff = 0; + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "enable.auto.commit", "false"); + if (test_consumer_group_protocol_classic()) { + test_conf_set(conf, "heartbeat.interval.ms", "500"); + test_conf_set(conf, "partition.assignment.strategy", + "cooperative-sticky"); + } + // test_conf_set(conf, "debug", "conf"); + /* Create consumers */ + consumer = + test_create_consumer(consumer_args->group_id, NULL, conf, NULL); + consumer_args->consumer = consumer; + test_consumer_subscribe_multi(consumer, consumer_args->subscriptions, + topic_cnt); + + while (consumer_args->prev_assignment->cnt < + consumer_args->expected_assignment_cnt) { + // TEST_SAY_WHITE("Consumer %d waiting for assignment: %d < + // %d\n", consumer_args->consumer_id, + // consumer_args->prev_assignment->cnt, + // consumer_args->expected_assignment_cnt); + rd_kafka_assignment(consumer, &assignment); + if (assignment->cnt != consumer_args->prev_assignment->cnt) { + // TEST_SAY("Consumer %d assignment changed: %d -> + // %d\n", + // consumer_args->consumer_id, + // consumer_args->prev_assignment->cnt, + // assignment->cnt); + rd_kafka_topic_partition_list_destroy(diff); + inside_list_diff++; + // TEST_SAY_GREEN("Consumer %d inside list_diff: %d\n", + // consumer_args->consumer_id, + // inside_list_diff); + diff = list_diff(assignment, + consumer_args->prev_assignment); + rd_kafka_topic_partition_list_destroy( + consumer_args->prev_assignment); + consumer_args->prev_assignment = + rd_kafka_topic_partition_list_copy(assignment); + } + rd_kafka_topic_partition_list_destroy(assignment); + rd_usleep(100000, NULL); /* 10ms */ + } + + while (!(consumer_args->end_time)) { + // TEST_SAY_WHITE("Consumer %d waiting for end_time to be + // set\n", consumer_args->consumer_id); + rd_kafka_message_t *rkmessage; + rkmessage = rd_kafka_consumer_poll(consumer, 1); + + if (rkmessage) { + int64_t batch_end_time = rd_uclock(); + // TEST_SAY_WHITE("Consumer %d received message from + // topic %s partition %d at %ld\n", + // consumer_args->consumer_id, + // rd_kafka_topic_name(rkmessage->rkt), + // rkmessage->partition, batch_end_time); + if (rd_kafka_topic_partition_list_find( + diff, rd_kafka_topic_name(rkmessage->rkt), + rkmessage->partition)) { + consumer_args->end_time = batch_end_time; + // TEST_SAY_WHITE("Consumer %d setting end_time + // to %lld\n", consumer_args->consumer_id, + // consumer_args->end_time); + } + rd_kafka_message_destroy(rkmessage); + } + } + + rd_kafka_topic_partition_list_destroy(diff); + + while (run) { + rd_kafka_message_t *rkmessage; + rkmessage = rd_kafka_consumer_poll(consumer, 1000); + if (rkmessage) { + rd_kafka_message_destroy(rkmessage); + } + rd_usleep(50000, NULL); /* Sleep for 50 milli seconds to avoid + busy waiting */ + } + + test_consumer_close(consumer); + + // TEST_SAY_WHITE("Consumer closed\n"); + + rd_kafka_destroy(consumer); + + // TEST_SAY_WHITE("Consumer destroyed\n"); + return 0; +} + +int do_test_performance_multiple_consumer() { + char **topics; + uint64_t testid; + producer_t producer_args[topic_cnt]; + consumer_t consumer_args[consumer_cnt]; + test_msgver_t mv; + thrd_t producer_thread_ids[topic_cnt], + consumer_thread_ids[consumer_cnt]; + const int timeout_ms = 10000; + int i; + int number_of_batches = consumer_cnt / batch_size; + int current_batch; + int batch_sleep_wait_time_us = 10000; + long long int start_time; + long long int end_time; + long long int batch_start_time; + long long int batch_end_time; + long long int elapsed_time_ms; + long long int individual_batch_elapsed_time_ms[number_of_batches]; + long long int total_batch_elapsed_time_ms[number_of_batches]; + const char *topics_prefix = + test_mk_topic_name("8002-rebalance_performance", 1); + + test_timeout_set(450); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + run = 1; + + // Topic creation + topics = rd_malloc(topic_cnt * sizeof(*topics)); + for (i = 0; i < topic_cnt; i++) { + topics[i] = rd_malloc(64); + rd_snprintf(topics[i], 64, "%s-%d", topics_prefix, i); + + /* + * TODO: Improve the topic creation logic to use multiple topics + * creation API instead of creating topics one by one. + */ + test_create_topic(NULL, topics[i], partition_cnt, 1); + } + // Wait for topics to be created and propogated to all the brokers + test_wait_topic_exists(NULL, topics[topic_cnt - 1], timeout_ms); + rd_sleep(5); + + // Producer thread creation + for (i = 0; i < topic_cnt; i++) { + producer_args[i].producer_id = i; + producer_args[i].topic = strdup(topics[i]); + if (thrd_create(&producer_thread_ids[i], producer_thread, + &producer_args[i]) != thrd_success) { + fprintf(stderr, "Failed to create producer thread\n"); + return 1; + } + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), "producer-%d", i); + SET_THREAD_NAME(thread_name); + } + + start_time = rd_uclock(); + + for (current_batch = 0; current_batch < number_of_batches; + current_batch++) { + TEST_SAY("Starting batch %d of %d\n", current_batch + 1, + number_of_batches); + int batch_start_index = current_batch * batch_size; + int expected_assignment_cnt = + (partition_cnt * topic_cnt) / + ((current_batch + 1) * batch_size); + batch_start_time = rd_uclock(); + for (i = 0; i < batch_size; i++) { + int consumer_index = batch_start_index + i; + consumer_args[consumer_index].consumer_id = + consumer_index; + consumer_args[consumer_index].group_id = topics[0]; + consumer_args[consumer_index].subscriptions = + (const char **)topics; + consumer_args[consumer_index].prev_assignment = + rd_kafka_topic_partition_list_new(0); + consumer_args[consumer_index].expected_assignment_cnt = + expected_assignment_cnt; + consumer_args[consumer_index].end_time = 0; + TEST_SAY("Consumer %d started\n", + consumer_args[consumer_index].consumer_id); + if (thrd_create(&consumer_thread_ids[consumer_index], + consumer_thread, + &consumer_args[consumer_index]) != + thrd_success) { + fprintf(stderr, + "Failed to create consumer thread\n"); + return 1; + } + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), + "consumer-%d", consumer_index); + SET_THREAD_NAME(thread_name); + } + batch_end_time = 0; + int didnt_find_end_time = 1; + while (didnt_find_end_time) { + TEST_SAY_WHITE("Waiting for batch %d to complete...\n", + current_batch + 1); + rd_sleep(1); + // Since there is no revocation, we can just rely on the + // end_time of the new consumers in the batch as there + // are going to be assignments for the new consumers and + // old consumers will only have revocations. + didnt_find_end_time = 0; + for (i = 0; i < batch_size; i++) { + int consumer_index = batch_start_index + i; + rd_kafka_topic_partition_list_t *assignment; + rd_kafka_assignment( + consumer_args[consumer_index].consumer, + &assignment); + // TEST_SAY_WHITE("Checking consumer %d (%s) + // end_time: %lld. Number of assignments are + // %d\n", + // consumer_index, + // rd_kafka_memberid(consumer_args[consumer_index].consumer), + // consumer_args[consumer_index].end_time, + // assignment->cnt); + if (!(consumer_args[consumer_index].end_time)) { + didnt_find_end_time = 1; + for (int j = 0; j < assignment->cnt; + j++) { + TEST_SAY_YELLOW( + "Consumer %d (%s) has " + "assignment for topic %s " + "partition %d\n", + consumer_index, + rd_kafka_memberid( + consumer_args + [consumer_index] + .consumer), + assignment->elems[j].topic, + assignment->elems[j] + .partition); + } + } + rd_kafka_topic_partition_list_destroy( + assignment); + batch_end_time = + max(batch_end_time, + consumer_args[consumer_index].end_time); + } + } + + TEST_SAY("Batch %d started at %lld\n", current_batch + 1, + batch_start_time); + TEST_SAY("Batch %d completed with end time %lld\n", + current_batch + 1, batch_end_time); + individual_batch_elapsed_time_ms[current_batch] = + (batch_end_time - batch_start_time) / 1000; + TEST_SAY_RED( + "Batch %d took %llds and %lldms\n", current_batch + 1, + individual_batch_elapsed_time_ms[current_batch] / 1000, + (individual_batch_elapsed_time_ms[current_batch] % 1000)); + + total_batch_elapsed_time_ms[current_batch] = + (batch_end_time - start_time) / 1000; + TEST_SAY_RED( + "Total time after batch %d: %llds and %lldms\n", + current_batch + 1, + total_batch_elapsed_time_ms[current_batch] / 1000, + (total_batch_elapsed_time_ms[current_batch] % 1000)); + + // if(individual_batch_elapsed_time_ms[current_batch] > 3000) { + // TEST_SAY_RED("Batch %d took too long: %llds and + // %lldms\n", + // current_batch + 1, + // individual_batch_elapsed_time_ms[current_batch] + // / 1000, + // (individual_batch_elapsed_time_ms[current_batch] + // % 1000)); + // break; + // } + + rd_usleep( + batch_sleep_wait_time_us, + NULL); // Sleep for 10ms before starting the next batch + } + + end_time = rd_uclock(); + run = 0; + elapsed_time_ms = (end_time - start_time) / 1000; + TEST_SAY_RED("All rebalances took %llds and %lldms\n", + elapsed_time_ms / 1000, (elapsed_time_ms % 1000)); + + // Clean ups + + // destroy all the previous assignments + for (i = 0; i < consumer_cnt; i++) { + rd_kafka_topic_partition_list_destroy( + consumer_args[i].prev_assignment); + } + + // TEST_SAY_WHITE("Waiting for all producer threads to finish...\n"); + for (i = 0; i < topic_cnt; i++) + thrd_join(producer_thread_ids[i], NULL); + + // TEST_SAY_WHITE("Waiting for all consumer threads to finish...\n"); + + for (i = 0; i < consumer_cnt; i++) + thrd_join(consumer_thread_ids[i], NULL); + + for (i = 0; i < topic_cnt; i++) { + rd_free(topics[i]); + } + rd_free(topics); + + // TEST_SAY_WHITE("All consumer threads finished\n"); + test_delete_all_test_topics(timeout_ms); + + // TEST_SAY_WHITE("All topics deleted\n"); + + return elapsed_time_ms; +} + +int main_8002_rebalance_performance(int argc, char **argv) { + int avg_rebalance_time_ms = 0; + int current_run = 1; + SET_THREAD_NAME("main-8002"); + for (current_run = 1; current_run <= number_of_test_runs; + current_run++) { + TEST_SAY_RED("Starting run %d of %d\n", current_run, + number_of_test_runs); + avg_rebalance_time_ms += + do_test_performance_multiple_consumer(); + } + avg_rebalance_time_ms /= number_of_test_runs; + TEST_SAY_RED("Average rebalance time: %d ms\n", avg_rebalance_time_ms); + return 0; +} diff --git a/tests/8003-chaos-testing-consumer-group.c b/tests/8003-chaos-testing-consumer-group.c new file mode 100644 index 0000000000..b463a8fc74 --- /dev/null +++ b/tests/8003-chaos-testing-consumer-group.c @@ -0,0 +1,469 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include "test.h" +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ +#include "rdtime.h" + +#define PARTITION_CNT 120 +#define CONSUMER_CNT 60 +#define TOPIC_CNT 1 // Currently working only for a single topic +#define ITERATIONS 5 +#define CONSUMER_POOL_SIZE (CONSUMER_CNT * ITERATIONS) +#define ITERATION_TIME_IN_US 15000000 /* 15 second */ + +static atomic_int run = 0; +static int64_t + offsets[PARTITION_CNT]; // Currently working only for a single topic +static int assigned_partitions[PARTITION_CNT]; +const int64_t assign_check_interval_us = ITERATION_TIME_IN_US / 30; +static atomic_int calculate_assignments = 1; + +typedef struct consumer_s { + int consumer_id; + char *group_id; + const char **subscriptions; + atomic_int run; + rd_kafka_t *consumer; + rd_kafka_topic_partition_list_t *prev_assignment; + rwlock_t prev_assignment_lock; +} consumer_t; + +typedef struct producer_s { + int producer_id; + char *topic; +} producer_t; + +static int producer_thread(void *arg) { + producer_t *producer_args = arg; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + char buf[50]; + char payload[64]; + int i = 0; + + rk = test_create_handle(RD_KAFKA_PRODUCER, NULL); + rkt = rd_kafka_topic_new(rk, producer_args->topic, NULL); + + TEST_SAY_MAGENTA("Producer %d started for topic %s\n", + producer_args->producer_id, producer_args->topic); + + while (run) { + snprintf(buf, sizeof(buf), "Producer %d message %d", + producer_args->producer_id, i); + snprintf(payload, sizeof(payload), "Payload %s", buf); + rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, payload, strlen(payload), + NULL, 0, NULL); + rd_usleep(50000, NULL); /* 50ms */ + i++; + i %= 2147483643; + } + + // printf("Producer %d finished producing messages for topic %s\n", + // producer_args->producer_id, producer_args->topic); + rd_kafka_flush( + rk, 10000); // Wait for all messages to be sent before exiting + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + rd_free( + producer_args->topic); // Free the topic string allocated in main + + return 0; +} + +static int consumer_thread(void *arg) { + rd_kafka_conf_t *conf; + rd_kafka_t *consumer; + consumer_t *consumer_args = arg; + rd_kafka_topic_partition_list_t *current_assignment = NULL; + rd_kafka_message_t *rkmessage; + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "auto.commit.interval.ms", "10000"); + + + TEST_SAY_GREEN("Consumer %d started with group id %s\n", + consumer_args->consumer_id, consumer_args->group_id); + + /* Create consumers */ + consumer = + test_create_consumer(consumer_args->group_id, NULL, conf, NULL); + consumer_args->consumer = consumer; + test_consumer_subscribe_multi(consumer, consumer_args->subscriptions, + TOPIC_CNT); + + TEST_SAY_GREEN("Consumer %d created with name %s\n", + consumer_args->consumer_id, rd_kafka_name(consumer)); + + TEST_SAY_GREEN("Consumer %d subscribed to topics\n", + consumer_args->consumer_id); + while (consumer_args->run && run) { + if (calculate_assignments) { + if (rd_kafka_assignment(consumer, + ¤t_assignment) == + RD_KAFKA_RESP_ERR_NO_ERROR && + current_assignment) { + int different = 0; + + rwlock_rdlock( + &consumer_args->prev_assignment_lock); + if (consumer_args->prev_assignment->cnt != + current_assignment->cnt) { + different = 1; + } else { + for (int i = 0; + i < current_assignment->cnt; i++) { + if (!rd_kafka_topic_partition_list_find( + consumer_args + ->prev_assignment, + current_assignment + ->elems[i] + .topic, + current_assignment + ->elems[i] + .partition)) { + different = 1; + break; + } + } + } + rwlock_rdunlock( + &consumer_args->prev_assignment_lock); + + if (different) { + rwlock_wrlock( + &consumer_args + ->prev_assignment_lock); + TEST_SAY_YELLOW( + "Consumer %d assignment changed: " + "prev_cnt=%d, curr_cnt=%d\n", + consumer_args->consumer_id, + consumer_args->prev_assignment->cnt, + current_assignment->cnt); + rd_kafka_topic_partition_list_destroy( + consumer_args->prev_assignment); + consumer_args->prev_assignment = + rd_kafka_topic_partition_list_copy( + current_assignment); + rwlock_wrunlock( + &consumer_args + ->prev_assignment_lock); + } + + rd_kafka_topic_partition_list_destroy( + current_assignment); + } + } + + rkmessage = rd_kafka_consumer_poll(consumer, 1000); + if (rkmessage) { + TEST_SAY_GREEN( + "Consumer %d received message with offset %ld for " + "partition %d\n", + consumer_args->consumer_id, rkmessage->offset, + rkmessage->partition); + TEST_ASSERT(offsets[rkmessage->partition] + 1 == + rkmessage->offset, + "Consumer %d received message with offset " + "%ld for partition %d, expected %ld", + consumer_args->consumer_id, + rkmessage->offset, rkmessage->partition, + offsets[rkmessage->partition] + 1); + // rd_kafka_commit_message(consumer, rkmessage, 0); + offsets[rkmessage->partition] = rkmessage->offset; + rd_kafka_message_destroy(rkmessage); + } + } + + TEST_SAY_GREEN("Consumer %d finished consuming messages for group %s\n", + consumer_args->consumer_id, consumer_args->group_id); + + test_consumer_close(consumer); + + TEST_SAY_GREEN("Consumer %d closed\n", consumer_args->consumer_id); + + rd_kafka_destroy(consumer); + + TEST_SAY_GREEN("Consumer %d destroyed\n", consumer_args->consumer_id); + + return 0; +} + +static void +wait_for_iteration_and_validate_assignments(consumer_t *consumer_pool, + int first_running, + int last_running) { + int no_of_assigned_partitions = 0; + rd_usleep(ITERATION_TIME_IN_US, + NULL); /* Sleep for the iterations to complete */ + calculate_assignments = 0; + TEST_SAY_MAGENTA("Validating consumers from %d to %d (including)\n", + first_running, last_running); + for (int s = first_running; s <= last_running; s++) { + rwlock_rdlock(&consumer_pool[s].prev_assignment_lock); + rd_kafka_topic_partition_list_t *current_assignment = + consumer_pool[s].prev_assignment; + if (current_assignment && current_assignment->cnt > 0) { + for (int p = 0; p < current_assignment->cnt; p++) { + int partition = + current_assignment->elems[p].partition; + TEST_ASSERT( + assigned_partitions[partition] == -1, + "Partition %d assigned to multiple " + "consumers: %d and %d", + partition, assigned_partitions[partition], + consumer_pool[s].consumer_id); + assigned_partitions[partition] = s; + no_of_assigned_partitions++; + } + } + rwlock_rdunlock(&consumer_pool[s].prev_assignment_lock); + } + TEST_ASSERT(no_of_assigned_partitions == PARTITION_CNT, + "Expected %d assigned partitions, but got %d", + PARTITION_CNT, no_of_assigned_partitions); + for (int i = 0; i < PARTITION_CNT; i++) { + assigned_partitions[i] = -1; + } + calculate_assignments = 1; +} + +int do_test_chaos_testing_consumer_group() { + char **topics; + uint64_t testid; + producer_t producer_args[TOPIC_CNT]; + consumer_t consumer_args_pool[CONSUMER_POOL_SIZE]; + thrd_t consumer_thread_ids_pool[CONSUMER_POOL_SIZE] = {0}; + test_msgver_t mv; + thrd_t producer_thread_ids[TOPIC_CNT]; + const int timeout_ms = 10000; + int i; + long long int start_time; + long long int end_time; + long long int elapsed_time_ms; + const char *topics_prefix = + test_mk_topic_name("8003-chaos-testing-consumer-group", 1); + int first_running = 0; + int last_running = (CONSUMER_CNT / 2) - 1; + int pool_next = 0; + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + for (i = 0; i < PARTITION_CNT; i++) { + offsets[i] = -1; // Initialize offsets to -1 + assigned_partitions[i] = -1; + } + + run = 1; + + // Topic creation + topics = rd_malloc(TOPIC_CNT * sizeof(*topics)); + for (i = 0; i < TOPIC_CNT; i++) { + topics[i] = rd_malloc(64); + rd_snprintf(topics[i], 64, "%s-%d", topics_prefix, i); + + /* + * TODO: Improve the topic creation logic to use multiple topics + * creation API instead of creating topics one by one. + */ + test_create_topic(NULL, topics[i], PARTITION_CNT, 1); + } + // Wait for topics to be created and propogated to all the brokers + test_wait_topic_exists(NULL, topics[TOPIC_CNT - 1], timeout_ms); + rd_sleep(5); + + // Producer thread creation + for (i = 0; i < TOPIC_CNT; i++) { + producer_args[i].producer_id = i; + producer_args[i].topic = strdup(topics[i]); + if (thrd_create(&producer_thread_ids[i], producer_thread, + &producer_args[i]) != thrd_success) { + fprintf(stderr, "Failed to create producer thread\n"); + return 1; + } + } + + start_time = rd_uclock(); + + srand((unsigned int)time(NULL)); + + // Initialize the consumer_args_pool + for (i = 0; i < CONSUMER_POOL_SIZE; i++) { + consumer_args_pool[i].consumer_id = i; + consumer_args_pool[i].group_id = topics[0]; + consumer_args_pool[i].subscriptions = (const char **)topics; + consumer_args_pool[i].prev_assignment = + rd_kafka_topic_partition_list_new(0); + consumer_args_pool[i].run = 1; + consumer_args_pool[i].consumer = NULL; + rwlock_init(&consumer_args_pool[i].prev_assignment_lock); + } + + // Start initial consumers + for (i = 0; i < CONSUMER_CNT / 2; i++) { + int idx = pool_next++; + TEST_SAY_YELLOW("Consumer %d started\n", + consumer_args_pool[idx].consumer_id); + if (thrd_create(&consumer_thread_ids_pool[idx], consumer_thread, + &consumer_args_pool[idx]) != thrd_success) { + fprintf(stderr, "Failed to create consumer thread\n"); + return 1; + } + } + + for (int iter = 0; iter < ITERATIONS; iter++) { + wait_for_iteration_and_validate_assignments( + consumer_args_pool, first_running, last_running); + + TEST_SAY_RED("Iteration %d: Rebalancing consumers...\n", iter); + // Calculate current running consumers + int running_count = last_running - first_running + 1; + if (running_count >= 1) { + // Randomly decide how many consumers to stop (at least + // 1, at most running_count) + int to_stop = (running_count > 1) + ? (rand() % running_count) + 1 + : 1; + TEST_SAY_MAGENTA( + "Stopping %d consumers out of %d running " + "consumers\n", + to_stop, running_count); + for (int s = 0; + s < to_stop && first_running <= last_running; + s++, first_running++) { + int idx = first_running; + TEST_SAY_YELLOW( + "Stopping consumer %d\n", + consumer_args_pool[idx].consumer_id); + consumer_args_pool[idx].run = 0; + } + } + + // Update running_count after stopping + running_count = last_running - first_running + 1; + if (running_count < 0) + running_count = 0; + + // Calculate how many can be started + int can_start = CONSUMER_CNT - running_count; + if (can_start < 1) + continue; // No more consumers can be started + + // Randomly decide how many consumers to start (at least 1, at + // most can_start) + int to_start = (can_start > 1) ? (rand() % can_start) + 1 : 1; + TEST_SAY_MAGENTA( + "Starting %d consumers out of %d available slots\n", + to_start, can_start); + for (int s = 0; s < to_start; s++) { + if (pool_next >= CONSUMER_POOL_SIZE) + break; // No more available in pool + int idx = pool_next++; + TEST_SAY_YELLOW("Starting consumer %d\n", + consumer_args_pool[idx].consumer_id); + consumer_args_pool[idx].run = 1; + consumer_args_pool[idx].consumer = NULL; + + if (thrd_create( + &consumer_thread_ids_pool[idx], consumer_thread, + &consumer_args_pool[idx]) != thrd_success) { + fprintf(stderr, + "Failed to create consumer thread\n"); + // Don't increment last_running, just skip + } else { + last_running = idx; + } + } + } + + TEST_SAY_MAGENTA( + "All iterations completed, waiting for consumers to finish...\n"); + + wait_for_iteration_and_validate_assignments( + consumer_args_pool, first_running, last_running); + + TEST_SAY_MAGENTA("Stopping all consumers...\n"); + + run = 0; // Stop all producers and consumers + end_time = rd_uclock(); + elapsed_time_ms = (end_time - start_time) / 1000; + TEST_SAY_YELLOW("All rebalances took %llds and %lldms\n", + elapsed_time_ms / 1000, (elapsed_time_ms % 1000)); + + // Clean ups + + TEST_SAY_YELLOW("Destroying all the previous assignments...\n"); + // destroy all the previous assignments + for (i = 0; i < CONSUMER_POOL_SIZE; i++) { + rwlock_wrlock(&consumer_args_pool[i].prev_assignment_lock); + rd_kafka_topic_partition_list_destroy( + consumer_args_pool[i].prev_assignment); + rwlock_wrunlock(&consumer_args_pool[i].prev_assignment_lock); + rwlock_destroy(&consumer_args_pool[i].prev_assignment_lock); + } + + TEST_SAY_YELLOW("Waiting for all producer threads to finish...\n"); + for (i = 0; i < TOPIC_CNT; i++) + thrd_join(producer_thread_ids[i], NULL); + + TEST_SAY_YELLOW("Waiting for all consumer threads to finish...\n"); + for (i = 0; i < CONSUMER_POOL_SIZE; i++) { + if (consumer_thread_ids_pool[i]) { + thrd_join(consumer_thread_ids_pool[i], NULL); + TEST_SAY_YELLOW("Consumer %d finished\n", + consumer_args_pool[i].consumer_id); + } + } + + for (i = 0; i < TOPIC_CNT; i++) { + rd_free(topics[i]); + } + rd_free(topics); + + test_delete_all_test_topics(timeout_ms); + + return elapsed_time_ms; +} + +int main_8003_chaos_testing_consumer_group(int argc, char **argv) { + test_timeout_set(450); + // if(test_consumer_group_protocol_classic()) { + // TEST_SKIP("Skipping test for classic consumer group + // protocol\n"); return 0; + // } + do_test_chaos_testing_consumer_group(); + return 0; +} diff --git a/tests/8004-rebalance-performance-single-consumer.c b/tests/8004-rebalance-performance-single-consumer.c new file mode 100644 index 0000000000..869ee6ab36 --- /dev/null +++ b/tests/8004-rebalance-performance-single-consumer.c @@ -0,0 +1,164 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * 2023, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include "test.h" +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ +#include "rdtime.h" + +static int number_of_test_runs = 20; +static int partition_cnt = 6; +static int topic_cnt = 1; +// static int consumer_cnt = 6; +static atomic_int run = 0; + +typedef struct consumer_s { + int consumer_id; +} consumer_t; + +typedef struct producer_s { + int producer_id; + char *topic; +} producer_t; + +static int producer_thread(void *arg) { + producer_t *producer_args = arg; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + char buf[50]; + char payload[64]; + int i = 0; + + rk = test_create_handle(RD_KAFKA_PRODUCER, NULL); + rkt = rd_kafka_topic_new(rk, producer_args->topic, NULL); + + TEST_SAY("Producer %d started for topic %s\n", + producer_args->producer_id, producer_args->topic); + + while (run) { + snprintf(buf, sizeof(buf), "Producer %d message %d", + producer_args->producer_id, i); + snprintf(payload, sizeof(payload), "Payload %s", buf); + rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_COPY, payload, strlen(payload), + NULL, 0, NULL); + rd_usleep(5000, NULL); /* 5ms */ + i++; + i %= 2147483643; + } + + rd_kafka_flush( + rk, 10000); // Wait for all messages to be sent before exiting + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + rd_free( + producer_args->topic); // Free the topic string allocated in main + + return 0; +} + +int do_test() { + const char *topics[topic_cnt]; + rd_kafka_t *consumer; + uint64_t testid; + rd_kafka_conf_t *conf; + producer_t producer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id[topic_cnt]; + const int timeout_ms = 10000; + int i; + long long int start_time, end_time, elapsed_time_ms; + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "heartbeat.interval.ms", "5000"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + run = 1; + + for (i = 0; i < topic_cnt; i++) { + topics[i] = test_mk_topic_name( + "8004_rebalance-performance-single-consumer", 1); + test_create_topic_wait_exists(NULL, topics[i], partition_cnt, 1, + 5000); + + producer_args.producer_id = i; + producer_args.topic = strdup(topics[i]); + if (thrd_create(&thread_id[i], producer_thread, + &producer_args) != thrd_success) { + fprintf(stderr, "Failed to create producer thread\n"); + return 1; + } + } + + start_time = rd_uclock(); + + /* Create consumers */ + consumer = test_create_consumer(topics[0], NULL, conf, NULL); + test_consumer_subscribe_multi(consumer, topics, topic_cnt); + test_consumer_wait_assignment(consumer, rd_false, 0); + while (test_consumer_poll_once(consumer, NULL, timeout_ms) != 1) + ; + end_time = rd_uclock(); + run = 0; + elapsed_time_ms = (end_time - start_time) / 1000; + TEST_SAY("Rebalance took %llds and %lld ms\n", elapsed_time_ms / 1000, + (elapsed_time_ms % 1000)); + + for (i = 0; i < topic_cnt; i++) { + thrd_join(thread_id[i], NULL); + } + + test_consumer_close(consumer); + rd_kafka_destroy(consumer); + + test_delete_all_test_topics(timeout_ms); + + return elapsed_time_ms; +} + +int main_8004_rebalance_performance_single_consumer(int argc, char **argv) { + int avg_rebalance_time_ms = 0; + int current_run = 1; + for (current_run = 1; current_run <= number_of_test_runs; + current_run++) { + TEST_SAY("Starting run %d of %d\n", current_run, + number_of_test_runs); + avg_rebalance_time_ms += do_test(); + } + avg_rebalance_time_ms /= number_of_test_runs; + TEST_SAY("Average rebalance time: %d ms\n", avg_rebalance_time_ms); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e509092873..32987d2d15 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -145,6 +145,9 @@ set( 0153-memberid.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c + 8002-rebalance-performance.c + 8003-chaos-testing-consumer-group.c + 8004-rebalance-performance-single-consumer.c test.c testcpp.cpp rusage.c diff --git a/tests/test.c b/tests/test.c index 86205dd5de..b01583589e 100644 --- a/tests/test.c +++ b/tests/test.c @@ -274,7 +274,9 @@ _TEST_DECL(0153_memberid); /* Manual tests */ _TEST_DECL(8000_idle); _TEST_DECL(8001_fetch_from_follower_mock_manual); - +_TEST_DECL(8002_rebalance_performance); +_TEST_DECL(8003_chaos_testing_consumer_group); +_TEST_DECL(8004_rebalance_performance_single_consumer); /* Define test resource usage thresholds if the default limits * are not tolerable. @@ -541,6 +543,9 @@ struct test tests[] = { /* Manual tests */ _TEST(8000_idle, TEST_F_MANUAL), _TEST(8001_fetch_from_follower_mock_manual, TEST_F_MANUAL), + _TEST(8002_rebalance_performance, TEST_BRKVER(4, 0, 0, 0)), + _TEST(8003_chaos_testing_consumer_group, 0, TEST_BRKVER(4, 0, 0, 0)), + _TEST(8004_rebalance_performance_single_consumer, TEST_BRKVER(4, 0, 0, 0)), {NULL}}; @@ -3022,7 +3027,9 @@ void test_consume_txn_msgs_easy(const char *group_id, * @warning This method will poll the consumer and might thus read messages. * Set \p do_poll to false to use a sleep rather than poll. */ -void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll) { +void test_consumer_wait_assignment(rd_kafka_t *rk, + rd_bool_t do_poll, + int wait_interval_ms) { rd_kafka_topic_partition_list_t *assignment = NULL; int i; @@ -3039,9 +3046,9 @@ void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll) { rd_kafka_topic_partition_list_destroy(assignment); if (do_poll) - test_consumer_poll_once(rk, NULL, 1000); + test_consumer_poll_once(rk, NULL, wait_interval_ms); else - rd_usleep(1000 * 1000, NULL); + rd_usleep(wait_interval_ms * 1000, NULL); } TEST_SAY("%s: Assignment (%d partition(s)): ", rd_kafka_name(rk), @@ -3133,32 +3140,61 @@ void test_consumer_subscribe(rd_kafka_t *rk, const char *topic) { rd_kafka_topic_partition_list_destroy(topics); } - /** * @brief Start subscribing for multiple topics */ -void test_consumer_subscribe_multi(rd_kafka_t *rk, int topic_count, ...) { - rd_kafka_topic_partition_list_t *topics; +static void test_consumer_subscribe_multi0(rd_kafka_t *rk, + const char **topics, + int topic_count) { + rd_kafka_topic_partition_list_t *topics_list; rd_kafka_resp_err_t err; - va_list ap; int i; - topics = rd_kafka_topic_partition_list_new(topic_count); + TEST_SAY("%s: Subscribing to %d topic(s)\n", rd_kafka_name(rk), + topic_count); + + topics_list = rd_kafka_topic_partition_list_new(topic_count); - va_start(ap, topic_count); for (i = 0; i < topic_count; i++) { - const char *topic = va_arg(ap, const char *); - rd_kafka_topic_partition_list_add(topics, topic, + TEST_SAY(" %s\n", topics[i]); + rd_kafka_topic_partition_list_add(topics_list, topics[i], RD_KAFKA_PARTITION_UA); } - va_end(ap); - err = rd_kafka_subscribe(rk, topics); + err = rd_kafka_subscribe(rk, topics_list); if (err) TEST_FAIL("%s: Failed to subscribe to topics: %s\n", rd_kafka_name(rk), rd_kafka_err2str(err)); - rd_kafka_topic_partition_list_destroy(topics); + rd_kafka_topic_partition_list_destroy(topics_list); +} + +/** + * @brief Start subscribing for multiple topics. + */ +void test_consumer_subscribe_multi(rd_kafka_t *rk, + const char **topics, + int topic_count) { + test_consumer_subscribe_multi0(rk, topics, topic_count); +} + + +/** + * @brief Start subscribing for multiple topics using variable arguments. + */ +void test_consumer_subscribe_multi_va(rd_kafka_t *rk, int topic_count, ...) { + va_list ap; + int i; + const char **topics = rd_calloc(topic_count, sizeof(*topics)); + va_start(ap, topic_count); + for (i = 0; i < topic_count; i++) { + const char *topic = va_arg(ap, const char *); + TEST_SAY0(" %s\n", topic); + topics[i] = topic; + } + va_end(ap); + test_consumer_subscribe_multi0(rk, topics, topic_count); + rd_free(topics); } diff --git a/tests/test.h b/tests/test.h index cc462ec293..6151aff3ad 100644 --- a/tests/test.h +++ b/tests/test.h @@ -189,21 +189,33 @@ struct test { #define TEST_SAY0(...) fprintf(stderr, __VA_ARGS__) -#define TEST_SAYL(LVL, ...) \ +#define TEST_SAY_COLOR(LVL, COLOR, ...) \ do { \ if (test_level >= LVL) { \ + char thread_name[32] = {0}; \ + pthread_getname_np(pthread_self(), thread_name, \ + sizeof(thread_name)); \ + fprintf(stderr, "\033[%sm", COLOR); \ fprintf( \ - stderr, "\033[36m[%-28s/%7.3fs] ", \ - test_curr->name, \ + stderr, "[%-28s/%7.3fs/%s] ", test_curr->name, \ test_curr->start \ ? ((float)(test_clock() - test_curr->start) / \ 1000000.0f) \ - : 0); \ + : 0, \ + thread_name); \ fprintf(stderr, __VA_ARGS__); \ fprintf(stderr, "\033[0m"); \ } \ } while (0) -#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__) +#define TEST_SAYL(LVL, ...) TEST_SAY_COLOR(LVL, "36", __VA_ARGS__) +#define TEST_SAY(...) TEST_SAYL(2, __VA_ARGS__) +#define TEST_SAY_RED(...) TEST_SAY_COLOR(2, "31", __VA_ARGS__) +#define TEST_SAY_GREEN(...) TEST_SAY_COLOR(2, "32", __VA_ARGS__) +#define TEST_SAY_YELLOW(...) TEST_SAY_COLOR(2, "33", __VA_ARGS__) +#define TEST_SAY_BLUE(...) TEST_SAY_COLOR(2, "34", __VA_ARGS__) +#define TEST_SAY_MAGENTA(...) TEST_SAY_COLOR(2, "35", __VA_ARGS__) +#define TEST_SAY_CYAN(...) TEST_SAY_COLOR(2, "36", __VA_ARGS__) +#define TEST_SAY_WHITE(...) TEST_SAY_COLOR(2, "37", __VA_ARGS__) /** * Append JSON object (as string) to this tests' report array. @@ -592,7 +604,11 @@ void test_verify_rkmessage0(const char *func, void test_consumer_subscribe(rd_kafka_t *rk, const char *topic); -void test_consumer_subscribe_multi(rd_kafka_t *rk, int topic_count, ...); +void test_consumer_subscribe_multi(rd_kafka_t *rk, + const char **topics, + int topic_count); + +void test_consumer_subscribe_multi_va(rd_kafka_t *rk, int topic_count, ...); void test_consume_msgs_easy_mv0(const char *group_id, const char *topic, @@ -666,7 +682,9 @@ int test_consumer_poll_timeout(const char *what, test_msgver_t *mv, int timeout_ms); -void test_consumer_wait_assignment(rd_kafka_t *rk, rd_bool_t do_poll); +void test_consumer_wait_assignment(rd_kafka_t *rk, + rd_bool_t do_poll, + int wait_interval_ms); void test_consumer_verify_assignment0(const char *func, int line, rd_kafka_t *rk, diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index c00bc84418..c395d8c3a3 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -235,6 +235,9 @@ + + +