Skip to content
Open
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
9d0bd65
K2 testing commit
k-raina Jul 23, 2025
9a46014
K2 testing commit
k-raina Jul 23, 2025
4db0be8
K2 testing commit
k-raina Jul 23, 2025
10eafca
K2 Fix ( - 81)
Ankith-Confluent Aug 11, 2025
9135051
2.11 cherrypick fix
Ankith-Confluent Aug 11, 2025
04103da
K2 Fix
Ankith-Confluent Aug 12, 2025
ba824c7
K2 fix for 0011 and 0081 for 2.11
Ankith-Confluent Aug 14, 2025
39ba005
K2 Fix
Ankith-Confluent Aug 12, 2025
8d536dc
Cherry-pick Fix K2
Ankith-Confluent Aug 18, 2025
e7520d7
k2 11 and 81 fix
Ankith-Confluent Aug 14, 2025
9655b58
k2 Fix 2.8 in latest tests
Ankith-Confluent Aug 14, 2025
f9ab564
small fix
Ankith-Confluent Aug 18, 2025
86551ff
K2 Fix for 2.7
Ankith-Confluent Aug 19, 2025
160725f
Cherrypicked fix from 2.8 latest
Ankith-Confluent Aug 14, 2025
8117fa3
small fix
Ankith-Confluent Aug 19, 2025
b6158e9
k2 Fix
Ankith-Confluent Aug 21, 2025
e0dcd29
k2 Fix
Ankith-Confluent Aug 28, 2025
b4654c8
K2 Fix
Ankith-Confluent Aug 28, 2025
d32bede
K2 Fix
Ankith-Confluent Aug 28, 2025
24b8316
K2 Fix
Ankith-Confluent Aug 29, 2025
e041181
K2 Fix
Ankith-Confluent Sep 2, 2025
4c42662
K2 Fix
Ankith-Confluent Sep 2, 2025
37bc02b
K2 Fix
Ankith-Confluent Sep 4, 2025
c4c7f76
81 Fix
Ankith-Confluent Sep 8, 2025
3854c71
minor fix 11 and 59
Ankith-Confluent Sep 12, 2025
425e075
59 clean up
Ankith-Confluent Sep 12, 2025
4ca7d6f
81 test clean up
Ankith-Confluent Sep 12, 2025
31b3c20
fix for 107 and 113
Ankith-Confluent Sep 15, 2025
a5689d2
Fix critical K2 logic bug in 0081-admin.c - inverted condition for de…
Ankith-Confluent Sep 15, 2025
eb58f2c
merged 2.8
Ankith-Confluent Sep 15, 2025
5b15cc2
Merge branch 'k2_testing_v2.8.0' into k2_testing_final
Ankith-Confluent Sep 16, 2025
4a12f0b
Merge k2_testing_v2.8.0_latest into k2_testing_final
Ankith-Confluent Sep 16, 2025
bed0e3d
Merged 2.7
Ankith-Confluent Sep 16, 2025
931c560
Merge branch 'k2_testing_v2.5.3-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
09f564e
Merged 2.4
Ankith-Confluent Sep 17, 2025
c972992
Merge branch 'k2_testing_v2.3.0-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
78a5d24
Merge branch 'k2_testing_v2.2.1-Latest' into k2_testing_final
Ankith-Confluent Sep 17, 2025
e48166c
Merge branch 'k2_testing_v2.1.1-Latest' into k2_testing_final
Ankith-Confluent Sep 18, 2025
bb55d4a
Merge branch 'k2_testing_v2.0.2-Latest' into k2_testing_final
Ankith-Confluent Sep 18, 2025
af55b2e
fix regex tests
Ankith-Confluent Sep 23, 2025
0f29e67
ACKS Fix
Ankith-Confluent Sep 23, 2025
4a6efcf
rd_sleep Fix
Ankith-Confluent Sep 23, 2025
d2dca7c
rd_sleep second fix
Ankith-Confluent Sep 23, 2025
bd7cff0
rd_sleep Fix 3
Ankith-Confluent Sep 23, 2025
be170ed
rd_sleep Fix 4
Ankith-Confluent Sep 23, 2025
300afc2
rd_sleep Fix 5
Ankith-Confluent Sep 23, 2025
741d2db
Removing test_k2_cluster for timeout sceneriaos
Ankith-Confluent Sep 24, 2025
fbb66ef
Clean up 113 112 1
Ankith-Confluent Sep 24, 2025
e1722dc
K2 related skip fix by adding a new config
Ankith-Confluent Sep 24, 2025
863de8d
Fix couple of test , 81 mainly
Ankith-Confluent Sep 24, 2025
347539d
Delete topics (0001 - 0050)
Ankith-Confluent Sep 25, 2025
7e644ce
removed delete
Ankith-Confluent Sep 26, 2025
e93a11f
SSL skips
Ankith-Confluent Sep 26, 2025
600b387
delete topics utility
Ankith-Confluent Sep 26, 2025
2522cde
minro commit
Ankith-Confluent Sep 26, 2025
24c066c
removed delete
Ankith-Confluent Sep 26, 2025
93f9d45
delete topics utility
Ankith-Confluent Sep 26, 2025
4301f9d
minor bug
Ankith-Confluent Sep 26, 2025
6a15e96
Github Copilot reviews
Ankith-Confluent Sep 29, 2025
8cea5dd
minor spelling mistake
Ankith-Confluent Sep 29, 2025
9df1716
Removed extra lines and delete topics code
Ankith-Confluent Oct 3, 2025
21538a8
removed extra lines and fixed formatting
Ankith-Confluent Oct 3, 2025
4edf3d8
small fix
Ankith-Confluent Oct 3, 2025
21661a6
Refactor partition list printing functions to improve version safety.…
Ankith-Confluent Oct 3, 2025
566cc33
Refactor sleep function calls in tests to use sleep_for for consisten…
Ankith-Confluent Oct 6, 2025
1af4193
Remove K2 cluster mode references from tests and simplify fetch confi…
Ankith-Confluent Oct 6, 2025
a7fc114
Update fetch configuration and topic creation in tests for consistenc…
Ankith-Confluent Oct 6, 2025
b4e7851
Enhance test stability by adjusting sleep durations and handling time…
Ankith-Confluent Oct 6, 2025
6f83605
Improve test robustness by enhancing partition comparison logic and a…
Ankith-Confluent Oct 7, 2025
3156906
clang-formatted the changes
Ankith-Confluent Oct 22, 2025
ff16f52
Refactor vector syntax in cooperative rebalance tests for consistency.
Ankith-Confluent Oct 22, 2025
cdcbdd9
clang-format 18 verison
Ankith-Confluent Oct 22, 2025
2a72bf6
small styling changes
Ankith-Confluent Oct 22, 2025
05946f2
Remove unnecessary topic creation call in rkt_cache test to streamlin…
Ankith-Confluent Oct 22, 2025
1054fd9
Adjust sleep duration in fast metadata refresh test to improve timing…
Ankith-Confluent Oct 22, 2025
e3a7e68
Updated timeout duration
Ankith-Confluent Oct 22, 2025
c15bb21
increased time
Ankith-Confluent Oct 22, 2025
06148f1
Refactor variable initialization
Ankith-Confluent Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions tests/0001-multiobj.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int main_0001_multiobj(int argc, char **argv) {
if (!topic)
topic = test_mk_topic_name("0001", 0);

test_create_topic_if_auto_create_disabled(NULL, topic, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be done only once when we're in the if (!topic) block above?


TIMING_START(&t_full, "full create-produce-destroy cycle");
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

Expand Down Expand Up @@ -91,9 +93,9 @@ int main_0001_multiobj(int argc, char **argv) {
TIMING_STOP(&t_full);

/* Topic is created on the first iteration. */
if (i > 0)
TIMING_ASSERT(&t_full, 0, 999);
else
if (i > 0)
TIMING_ASSERT(&t_full, 0, tmout_multip(999));
else
/* Allow metadata propagation. */
rd_sleep(1);
}
Expand Down
8 changes: 7 additions & 1 deletion tests/0002-unkpart.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ static void do_test_unkpart(void) {
int i;
int fails = 0;
const struct rd_kafka_metadata *metadata;
const char* topic;

TEST_SAY(_C_BLU "%s\n" _C_CLR, __FUNCTION__);

Expand All @@ -94,7 +95,10 @@ static void do_test_unkpart(void) {
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0002", 0), topic_conf);
topic = test_mk_topic_name("0002", 0);
test_create_topic_if_auto_create_disabled(rk, topic, 3);

rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
Expand Down Expand Up @@ -200,6 +204,8 @@ static void do_test_unkpart_timeout_nobroker(void) {
test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

test_create_topic_if_auto_create_disabled(NULL, topic, 3);
rkt = rd_kafka_topic_new(rk, topic, NULL);

err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
Expand Down
6 changes: 5 additions & 1 deletion tests/0003-msgmaxsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ int main_0003_msgmaxsize(int argc, char **argv) {
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
const char* topic;

static const struct {
ssize_t keylen;
Expand Down Expand Up @@ -108,7 +109,10 @@ int main_0003_msgmaxsize(int argc, char **argv) {
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0003", 0), topic_conf);
topic = test_mk_topic_name("0003", 0);
test_create_topic_if_auto_create_disabled(NULL, topic, -1);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);

if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));

Expand Down
123 changes: 69 additions & 54 deletions tests/0004-conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,7 @@ int main_0004_conf(int argc, char **argv) {
"ssl.ca.certificate.stores",
"Intermediate ,, Root ,",
#endif
"client.dns.lookup",
"resolve_canonical_bootstrap_servers_only",
/* client.dns.lookup was introduced in librdkafka 2.2.0+ - skip for 2.1.x library */
NULL};
static const char *tconfs[] = {"request.required.acks",
"-1", /* int */
Expand All @@ -557,6 +556,14 @@ int main_0004_conf(int argc, char **argv) {
TEST_FAIL("%s\n", errstr);
}

/* Add client.dns.lookup if librdkafka version >= 2.2.0 */
if (rd_kafka_version() >= 0x02020000) {
if (rd_kafka_conf_set(conf, "client.dns.lookup",
"resolve_canonical_bootstrap_servers_only", errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK)
TEST_FAIL("%s\n", errstr);
}

rd_kafka_conf_set_dr_cb(conf, dr_cb);
rd_kafka_conf_set_error_cb(conf, error_cb);
/* interceptor configs are not exposed as strings or in dumps
Expand Down Expand Up @@ -721,69 +728,77 @@ int main_0004_conf(int argc, char **argv) {
}

#if WITH_OAUTHBEARER_OIDC
{
TEST_SAY(
"Verify that https.ca.location is mutually "
"exclusive with https.ca.pem\n");
/* HTTPS CA configuration tests - https.ca.pem available since librdkafka 2.2.0 */
if (rd_kafka_version() >= 0x02020000) {
{
TEST_SAY(
"Verify that https.ca.location is mutually "
"exclusive with https.ca.pem\n");

conf = rd_kafka_conf_new();
conf = rd_kafka_conf_new();

test_conf_set(conf, "https.ca.pem",
"-----BEGIN CERTIFICATE-----");
test_conf_set(conf, "https.ca.location",
"/path/to/certificate.pem");
test_conf_set(conf, "https.ca.pem",
"-----BEGIN CERTIFICATE-----");
test_conf_set(conf, "https.ca.location",
"/path/to/certificate.pem");

rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
!rk, "Expected rd_kafka_new() to fail, but it succeeded");
TEST_ASSERT(!strcmp(errstr,
"`https.ca.location` and "
"`https.ca.pem` are mutually exclusive"),
"Expected rd_kafka_new() to fail with: "
"\"`https.ca.location` and `https.ca.pem` "
"are mutually exclusive\", got: \"%s\"",
errstr);
rd_kafka_conf_destroy(conf);
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
!rk, "Expected rd_kafka_new() to fail, but it succeeded");
TEST_ASSERT(!strcmp(errstr,
"`https.ca.location` and "
"`https.ca.pem` are mutually exclusive"),
"Expected rd_kafka_new() to fail with: "
"\"`https.ca.location` and `https.ca.pem` "
"are mutually exclusive\", got: \"%s\"",
errstr);
rd_kafka_conf_destroy(conf);
}
}
{
TEST_SAY(
"Verify that https.ca.location gives an error when "
"set to an invalid path\n");
if (rd_kafka_version() >= 0x02020000) { /* https.ca.location available since librdkafka 2.2.0 */
{
TEST_SAY(
"Verify that https.ca.location gives an error when "
"set to an invalid path\n");

conf = rd_kafka_conf_new();
conf = rd_kafka_conf_new();

test_conf_set(conf, "https.ca.location",
"/?/does/!/not/exist!");
test_conf_set(conf, "https.ca.location",
"/?/does/!/not/exist!");

rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
!rk, "Expected rd_kafka_new() to fail, but it succeeded");
TEST_ASSERT(!strcmp(errstr,
"`https.ca.location` must be "
"an existing file or directory"),
"Expected rd_kafka_new() to fail with: "
"\"`https.ca.location` must be "
"an existing file or directory\", got: \"%s\"",
errstr);
rd_kafka_conf_destroy(conf);
}
{
TEST_SAY(
"Verify that https.ca.location doesn't give an error when "
"set to `probe`\n");
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
!rk, "Expected rd_kafka_new() to fail, but it succeeded");
TEST_ASSERT(!strcmp(errstr,
"`https.ca.location` must be "
"an existing file or directory"),
"Expected rd_kafka_new() to fail with: "
"\"`https.ca.location` must be "
"an existing file or directory\", got: \"%s\"",
errstr);
rd_kafka_conf_destroy(conf);
}
{
TEST_SAY(
"Verify that https.ca.location doesn't give an error when "
"set to `probe`\n");

conf = rd_kafka_conf_new();
conf = rd_kafka_conf_new();

test_conf_set(conf, "https.ca.location", "probe");
test_conf_set(conf, "https.ca.location", "probe");

rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
rk, "Expected rd_kafka_new() not to fail, but it failed");
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr,
sizeof(errstr));
TEST_ASSERT(
rk, "Expected rd_kafka_new() not to fail, but it failed");

rd_kafka_destroy(rk);
rd_kafka_destroy(rk);
}
} else {
TEST_SAY("SKIPPING: https.ca.location tests - requires librdkafka version >= 2.2.0 (current: 0x%08x)\n",
rd_kafka_version());
}
#endif /* WITH_OAUTHBEARER_OIDC */

Expand Down
5 changes: 4 additions & 1 deletion tests/0005-order.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ int main_0005_order(int argc, char **argv) {
int msgcnt = test_quick ? 500 : 50000;
int i;
test_timing_t t_produce, t_delivery;
const char *topic;

test_conf_init(&conf, &topic_conf, 10);

Expand All @@ -89,7 +90,9 @@ int main_0005_order(int argc, char **argv) {
/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0005", 0), topic_conf);
topic = test_mk_topic_name("0005", 0);
test_create_topic_if_auto_create_disabled(rk, topic, 1);
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));

Expand Down
17 changes: 10 additions & 7 deletions tests/0007-autotopic.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,25 @@ int main_0007_autotopic(int argc, char **argv) {
int msgcnt = 10;
int i;

if (!test_check_auto_create_topic()) {
TEST_SKIP(
"NOTE! This test requires "
"auto.create.topics.enable=true to be configured on "
"the broker!\n");
return 0;
}

/* Generate unique topic name */
test_conf_init(&conf, &topic_conf, 10);

TEST_SAY(
"\033[33mNOTE! This test requires "
"auto.create.topics.enable=true to be configured on "
"the broker!\033[0m\n");

/* Set delivery report callback */
rd_kafka_conf_set_dr_cb(conf, dr_cb);

/* Create kafka instance */
rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0007_autotopic", 1),
topic_conf);
const char *topic = test_mk_topic_name("0007_autotopic", 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't have new variables like this, we must declare topic at the top of the block

rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));

Expand Down
16 changes: 15 additions & 1 deletion tests/0008-reqacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,20 @@ int main_0008_reqacks(int argc, char **argv) {
"all brokers!\033[0m\n");

/* Try different request.required.acks settings (issue #75) */
for (reqacks = -1; reqacks <= 1; reqacks++) {
/* Test all standard acks values, but skip unsupported ones */
int start_acks = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

declare these with i at the top of the block

int end_acks = 1;

TEST_SAY("Testing acks values -1, 0, 1 (skipping unsupported ones)\n");
for (reqacks = start_acks; reqacks <= end_acks; reqacks++) {
char tmp[10];

/* Convert acks value to string and check if supported */
rd_snprintf(tmp, sizeof(tmp), "%d", reqacks);
if (!test_is_acks_supported(tmp)) {
TEST_SAY("Skipping acks=%d (not supported by cluster)\n", reqacks);
continue;
}

test_conf_init(&conf, &topic_conf, 10);

Expand Down Expand Up @@ -130,6 +142,8 @@ int main_0008_reqacks(int argc, char **argv) {
"expecting status %d\n",
rd_kafka_name(rk), reqacks, exp_status);

test_create_topic_if_auto_create_disabled(rk, topic, 1);

rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
Expand Down
Loading