diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d2aca9fc..c9eba7728 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,34 @@ +# librdkafka v2.10.1 + +librdkafka v2.10.1 is a maintenance release: + + * Fix producer being unable to resume production after topic recreation + followed by leader change (#5022). + * Fix consumer being unable to resume production after topic recreation, both + in case of a leader change and otherwise (#5022) + +## Fixes + +### Producer fixes + +* Issues: #4898 + When a topic is recreated, the producer may be unable to produce in cases of + a leader change, because the stored leader epoch may exceed the epoch of the + recreated topic. Solved by resetting the leader epoch if the topic is + recreated. Additionally, for idempotent producers, the queues are drained and + the epoch is bumped. + Happens since v2.1.0. + +### Consumer fixes + +* When a topic is recreated, the consumer may be unable to resume consumption + both in case of a leader change and otherwise, if the new topic partition has + more messages than the old topic. + Solved by resetting the leader epoch if the topic is recreated, and by + resetting the offsets for owned partitions. + Happens since v2.1.0. + + # librdkafka v2.10.0 librdkafka v2.10.0 is a feature release: diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c index f73945ccd..2e6bdf6a6 100644 --- a/src/rdkafka_mock.c +++ b/src/rdkafka_mock.c @@ -683,14 +683,32 @@ rd_kafka_mock_topic_new(rd_kafka_mock_cluster_t *mcluster, TAILQ_INSERT_TAIL(&mcluster->topics, mtopic, link); mcluster->topic_cnt++; - rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", - "Created topic \"%s\" with %d partition(s) and " - "replication-factor %d", - mtopic->name, mtopic->partition_cnt, replication_factor); + rd_kafka_dbg( + mcluster->rk, MOCK, "MOCK", + "Created topic \"%s\" with topic id \"%s\", %d partition(s) " + "and replication-factor %d", + mtopic->name, rd_kafka_Uuid_base64str(&mtopic->id), + mtopic->partition_cnt, replication_factor); return mtopic; } +static void rd_kafka_mock_topic_remove(rd_kafka_mock_cluster_t *mcluster, + const char *topic) { + rd_kafka_mock_topic_t *mtopic; + + mtopic = rd_kafka_mock_topic_find(mcluster, topic); + if (!mtopic) + return; + + TAILQ_REMOVE(&mcluster->topics, mtopic, link); + + rd_kafka_dbg(mcluster->rk, MOCK, "MOCK", + "Deleted topic \"%s\" with topic id \"%s\"", mtopic->name, + rd_kafka_Uuid_base64str(&mtopic->id)); + + rd_kafka_mock_topic_destroy(mtopic); +} rd_kafka_mock_topic_t * rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster, @@ -2165,6 +2183,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster, rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); } +rd_kafka_resp_err_t +rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster, + const char *topic) { + rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK); + + rko->rko_u.mock.name = rd_strdup(topic); + rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_DELETE; + + return rd_kafka_op_err_destroy( + rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE)); +} + rd_kafka_resp_err_t rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster, const char *topic, @@ -2506,6 +2536,10 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster, return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION; break; + case RD_KAFKA_MOCK_CMD_TOPIC_DELETE: + rd_kafka_mock_topic_remove(mcluster, rko->rko_u.mock.name); + break; + case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR: mtopic = rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1); diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h index 0d528812b..caa1a11b0 100644 --- a/src/rdkafka_mock.h +++ b/src/rdkafka_mock.h @@ -248,6 +248,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster, int partition_cnt, int replication_factor); +/** + * @brief Delete a topic. + * + * This is an alternative to automatic topic deletion as performed by + * the client itself. + * + * @remark The Topic Admin API (DeleteTopics) is not supported by the + * mock broker. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster, + const char *topic); /** * @brief Sets the partition leader. diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index a4f9b93be..00cfb45f1 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -586,6 +586,7 @@ struct rd_kafka_op_s { RD_KAFKA_MOCK_CMD_APIVERSION_SET, RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET, RD_KAFKA_MOCK_CMD_TELEMETRY_PUSH_INTERVAL_SET, + RD_KAFKA_MOCK_CMD_TOPIC_DELETE, } cmd; rd_kafka_resp_err_t err; /**< Error for: diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index d345b2722..13f1da9b1 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -2,7 +2,7 @@ * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2022, Magnus Edenhill - * 2023, Confluent Inc. + * 2023-2025, Confluent Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,7 @@ #include "rdkafka_partition.h" #include "rdkafka_broker.h" #include "rdkafka_cgrp.h" +#include "rdkafka_idempotence.h" #include "rdkafka_metadata.h" #include "rdkafka_offset.h" #include "rdlog.h" @@ -55,7 +56,8 @@ static int rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, const struct rd_kafka_metadata_topic *mdt, const rd_kafka_metadata_topic_internal_t *mdit, - rd_ts_t ts_age); + rd_ts_t ts_age, + rd_bool_t *update_epoch_bump); /** @@ -494,13 +496,15 @@ rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk, /* Populate from metadata cache. */ if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1 /*valid*/)) && !rkmce->rkmce_mtopic.err) { + rd_bool_t update_epoch_bump = + rd_false; /* ignored for new topic anyway. */ if (existing) *existing = 1; rd_kafka_topic_metadata_update( rkt, &rkmce->rkmce_mtopic, &rkmce->rkmce_metadata_internal_topic, - rkmce->rkmce_ts_insert); + rkmce->rkmce_ts_insert, &update_epoch_bump); } if (do_lock) @@ -927,7 +931,8 @@ static void rd_kafka_toppar_idemp_msgid_restore(rd_kafka_topic_t *rkt, * @locks rd_kafka_topic_wrlock(rkt) MUST be held. */ static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt, - int32_t partition_cnt) { + int32_t partition_cnt, + rd_bool_t topic_id_change) { rd_kafka_t *rk = rkt->rkt_rk; rd_kafka_toppar_t **rktps; rd_kafka_toppar_t *rktp; @@ -1036,8 +1041,14 @@ static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt, * topic as non-existent, triggering the removal of partitions * on the producer client. When metadata is eventually correct * again and the topic is "re-created" on the producer, it - * must continue with the next msgid/baseseq. */ - if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid)) + * must continue with the next msgid/baseseq. + * + * This isn't applicable if the topic id changes (because it's + * a new topic entirely). This is just to avoid saving, we will + * still need to call rd_kafka_topic_recreated_partition_reset + * on this topic. */ + if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid) && + !topic_id_change) rd_kafka_toppar_idemp_msgid_save(rkt, rktp); rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN; @@ -1085,7 +1096,65 @@ static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt, return 1; } +/** + * @brief Update partitions of a topic after it has been recreated. + * + * @param rkt The topic to update. + * @param mdt The metadata of the topic. + * @param mdit The internal metadata of the topic. + */ +static void rd_kafka_topic_recreated_partition_reset( + rd_kafka_topic_t *rkt, + const struct rd_kafka_metadata_topic *mdt, + const rd_kafka_metadata_topic_internal_t *mdit) { + rd_kafka_t *rk = rkt->rkt_rk; + rd_kafka_toppar_t *rktp; + int32_t i; + rd_kafka_fetch_pos_t next_pos = { + .offset = RD_KAFKA_OFFSET_INVALID, + .leader_epoch = -1, + }; + + for (i = 0; i < rkt->rkt_partition_cnt; i++) { + rktp = rkt->rkt_p[i]; + + /* Common */ + rd_kafka_toppar_lock(rktp); + /* By setting partition's leader epoch to -1, we make sure to + * always pick up whatever is in the metadata. Even if the + * metadata is stale, it's better than what we have because the + * topic has been recreated. We don't need to set any other + * field, because (suppose) the leader is the same, then we can + * just continue, and if we're fetching from a follower, we'll + * just get a not follower error if applicable. */ + rktp->rktp_leader_epoch = -1; + if (rk->rk_type == RD_KAFKA_PRODUCER) { + /* Producer: No op - we bump epoch and drain on rk-level + * for an idempotent producer. */ + } else if (rk->rk_type == RD_KAFKA_CONSUMER) { + /* Consumer: set each partition's offset to invalid so + * it can go through a reset. + * + * Note that, this can lead to re-consumption in a + * particular cases, for example: + * 1. This consumer unsubscribes to the topic. + * 2. Topic is recreated, new consumer is created and + * subscribes to the topic, and consumes messages and + * commits offsets. + * 3. This consumer resubscribes to the topic and should + * ideally consume from consumer2's committed offsets, + * but since the topic is recreated, it will consume + * from whatever is in auto.offset.reset. It's too bad - + * we can't help it because the broker does not store + * offsets with topic id, only with topic name. */ + rd_kafka_offset_reset( + rktp, RD_KAFKA_NODEID_UA, next_pos, + RD_KAFKA_RESP_ERR_NO_ERROR, "topic recreated"); + } + rd_kafka_toppar_unlock(rktp); + } +} /** * Topic 'rkt' does not exist: propagate to interested parties. @@ -1252,7 +1321,8 @@ rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt, rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; /* Update number of partitions */ - rd_kafka_topic_partition_cnt_update(rkt, 0); + rd_kafka_topic_partition_cnt_update(rkt, 0, + rd_false /* topic_id_change */); /* Purge messages with forced partition */ rd_kafka_topic_assign_uas(rkt, err); @@ -1337,7 +1407,8 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, rkt->rkt_err = err; /* Update number of partitions */ - rd_kafka_topic_partition_cnt_update(rkt, 0); + rd_kafka_topic_partition_cnt_update(rkt, 0, + rd_false /* topic_id_change */); /* Purge messages with forced partition */ rd_kafka_topic_assign_uas(rkt, err); @@ -1353,6 +1424,9 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt, * @param mdt Topic metadata. * @param mdit Topic internal metadata. * @param ts_age absolute age (timestamp) of metadata. + * @param update_epoch_bump this is set as an out parameter. If set to true, + * caller should run rd_kafka_idemp_drain_epoch_bump(). + * * @returns 1 if the number of partitions changed, 0 if not, and -1 if the * topic is unknown. @@ -1363,7 +1437,8 @@ static int rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, const struct rd_kafka_metadata_topic *mdt, const rd_kafka_metadata_topic_internal_t *mdit, - rd_ts_t ts_age) { + rd_ts_t ts_age, + rd_bool_t *update_epoch_bump) { rd_kafka_t *rk = rkt->rkt_rk; int upd = 0; int j; @@ -1372,6 +1447,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, int old_state; rd_bool_t partition_exists_with_no_leader_epoch = rd_false; rd_bool_t partition_exists_with_stale_leader_epoch = rd_false; + rd_bool_t different_topic_id = rd_false; if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR) rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA", @@ -1422,12 +1498,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, /* Update number of partitions, but not if there are * (possibly intermittent) errors (e.g., "Leader not available"). */ if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) { - rd_bool_t different_topic_id = + /* Avoids those id changes where we're creating the topic or + * deleting it during termination. */ + rd_bool_t both_topic_ids_non_zero = + !RD_KAFKA_UUID_IS_ZERO(mdit->topic_id) && + !RD_KAFKA_UUID_IS_ZERO(rkt->rkt_topic_id); + different_topic_id = rd_kafka_Uuid_cmp(mdit->topic_id, rkt->rkt_topic_id) != 0; if (different_topic_id || mdt->partition_cnt > rkt->rkt_partition_cnt) upd += rd_kafka_topic_partition_cnt_update( - rkt, mdt->partition_cnt); + rkt, mdt->partition_cnt, different_topic_id); if (different_topic_id) { /* FIXME: an offset reset must be triggered. * when rkt_topic_id wasn't zero. @@ -1443,7 +1524,15 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt, rkt->rkt_topic->str, rd_kafka_Uuid_base64str(&rkt->rkt_topic_id), rd_kafka_Uuid_base64str(&mdit->topic_id)); + rkt->rkt_topic_id = mdit->topic_id; + + if (both_topic_ids_non_zero) { + rd_kafka_topic_recreated_partition_reset( + rkt, mdt, mdit); + if (rd_kafka_is_idempotent(rk)) + *update_epoch_bump = rd_true; + } } /* If the metadata times out for a topic (because all brokers * are down) the state will transition to S_UNKNOWN. @@ -1542,6 +1631,7 @@ int rd_kafka_topic_metadata_update2( const rd_kafka_metadata_topic_internal_t *mdit) { rd_kafka_topic_t *rkt; int r; + rd_bool_t update_epoch_bump = rd_false; rd_kafka_wrlock(rkb->rkb_rk); @@ -1557,10 +1647,28 @@ int rd_kafka_topic_metadata_update2( return -1; /* Ignore topics that we dont have locally. */ } - r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock()); + r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock(), + &update_epoch_bump); rd_kafka_wrunlock(rkb->rkb_rk); + /* This is true in case of topic recreation and when we're using an + * idempotent/transactional producer. */ + if (update_epoch_bump) { + /* Why do we bump epoch here rather than, say, at an rk-level + * after checking this for all topics? + * 1. We need to make sure leader changes (and the subsequent + * broker delegation change) does not run before this, as + * the producer might start sending to the new broker, which + * would give us out of sequence errors. + * 2. The rd_kafka_idemp_drain_epoch_bump() sets state in an + * idempotent way, though the function isn't entirely + * idempotent, so it's okay to do this repeatedly. + */ + rd_kafka_idemp_drain_epoch_bump( + rkb->rkb_rk, RD_KAFKA_RESP_ERR_NO_ERROR, "topic recreated"); + } + rd_kafka_topic_destroy0(rkt); /* from find() */ return r; @@ -1628,7 +1736,8 @@ void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt) { /* Setting the partition count to 0 moves all partitions to * the desired list (rktp_desp). */ - rd_kafka_topic_partition_cnt_update(rkt, 0); + rd_kafka_topic_partition_cnt_update(rkt, 0, + rd_false /* topic_id_change */); /* Now clean out the desired partitions list. * Use reverse traversal to avoid excessive memory shuffling @@ -2143,6 +2252,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, .partition_cnt = partition_cnt}; rd_kafka_metadata_topic_internal_t mdit = {.partitions = partitions}; int i; + rd_bool_t update_epoch_bump; mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt); @@ -2155,7 +2265,9 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt, rd_kafka_wrlock(rkt->rkt_rk); rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true, rd_false, rd_true); - rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock()); + update_epoch_bump = rd_false; + rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock(), + &update_epoch_bump); rd_kafka_wrunlock(rkt->rkt_rk); rd_free(partitions); } diff --git a/tests/0107-topic_recreate.c b/tests/0107-topic_recreate.c index 68b978479..b58c5b325 100644 --- a/tests/0107-topic_recreate.c +++ b/tests/0107-topic_recreate.c @@ -242,6 +242,9 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) { } +/** + * @remark This is run only when scenario="noautocreate". + */ int main_0107_topic_recreate(int argc, char **argv) { this_test = test_curr; /* Need to expose current test struct (in TLS) * to producer thread. */ @@ -257,3 +260,93 @@ int main_0107_topic_recreate(int argc, char **argv) { return 0; } + + +/** + * @brief Test topic create + delete + create with consumer. + * + * Details: + * 1. Create producer and produce 10 messages. + * 2. Create consumer (auto offset reset to earliest) and subscribe to topic, + * consume the first 10 messages. + * 3. Recreate topic and produce 20 messages. + * 4. Consume remaining 20 messages - it should get all 20 because there should + * be an offset reset when Fetch returns an UNKNOWN_TOPIC_ID error. + */ +static void do_test_topic_recreated_consumer_unknown_topic_id() { + rd_kafka_t *rk_producer; + rd_kafka_t *rk_consumer; + rd_kafka_conf_t *conf; + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + rd_kafka_topic_t *rkt; + + SUB_TEST("Testing topic recreation with consumer\n"); + + /* Create producer. */ + test_conf_init(&conf, NULL, 30); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk_producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + + /* Create topic using admin API */ + test_create_topic(rk_producer, topic, 1, 1); + + /* Write 10 msgs */ + test_produce_msgs2(rk_producer, topic, 0, 0, 0, 10, "test", 10); + + /* Destroy and recreate producer */ + rd_kafka_destroy(rk_producer); + rk_producer = NULL; + + /* Create consumer. */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "group.id", topic); + test_conf_set(conf, "auto.offset.reset", "earliest"); + /* Changing these becausee we don't accidentally want metadata refresh, + * we want this to be triggered by the UNKNOWN_TOPIC_ID error. */ + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "90000"); + test_conf_set(conf, "metadata.max.age.ms", "180000"); + rk_consumer = test_create_handle(RD_KAFKA_CONSUMER, conf); + + /* Subscribe to topic. */ + test_consumer_subscribe(rk_consumer, topic); + test_consumer_wait_assignment(rk_consumer, rd_false); + + /* Consume messages. */ + rkt = test_create_consumer_topic(rk_consumer, topic); + test_consume_msgs("consume_from_topic", rkt, 0, 0, TEST_NO_SEEK, 0, 10, + 0); + + /* Pause consumer so there are no fetches in the temporary state where + * there is no topic on the broker */ + test_consumer_pause_resume_partition(rk_consumer, topic, 0, rd_true); + rd_sleep(2); + + /* Recreate topic. */ + test_delete_topic(rk_producer, topic); + rd_sleep(5); + test_create_topic(rk_producer, topic, 1, 1); + + /* Produce 2x the number of messages. */ + test_conf_init(&conf, NULL, 30); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk_producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + test_produce_msgs2(rk_producer, topic, 0, 0, 0, 20, "test", 10); + + /* Resume consumer. */ + test_consumer_pause_resume_partition(rk_consumer, topic, 0, rd_false); + rd_sleep(2); + + /* Consume messages. */ + test_consume_msgs("consume_from_topic", rkt, 0, 0, TEST_NO_SEEK, 0, 20, + 0); + + /* Destroy consumer. */ + rd_kafka_destroy(rk_consumer); + rd_kafka_destroy(rk_producer); + SUB_TEST_PASS(); +} + +int main_0107_topic_recreate_unknown_topic_id(int argc, char **argv) { + do_test_topic_recreated_consumer_unknown_topic_id(); + return 0; +} diff --git a/tests/0152-topic_recreate_mock.c b/tests/0152-topic_recreate_mock.c new file mode 100644 index 000000000..72d3d333f --- /dev/null +++ b/tests/0152-topic_recreate_mock.c @@ -0,0 +1,396 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2025, Confluent Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "../src/rdkafka_proto.h" + +/** + * @name Verify that producer and consumer resumes operation after + * a topic has been deleted and recreated (with topic id change) using + * the mock broker. + * + * @note These tests should be revised after the implementation of KIP-516 is + * added to the actual broker for Produce. + */ + +/** + * Test topic recreation for producer. There are two configurable flags: + * 1. is_idempotent: decides whether producer is idempotent or not. + * 2. leader_change: if set to true, changes leader of the initial topic to push + * the leader epoch beyond that of the recreated topic. + */ +static void do_test_topic_recreated_producer(rd_bool_t is_idempotent, + rd_bool_t leader_change) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstrap_servers; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + + SUB_TEST("Testing topic recreation with %s producer", + is_idempotent ? "idempotent" : "normal"); + + /* Create mock cluster */ + mcluster = test_mock_cluster_new(3, &bootstrap_servers); + + /* Create topic and change leader to bump leader epoch */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + if (leader_change) + rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, + 2); + + /* Create and init a producer */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstrap_servers); + test_conf_set(conf, "enable.idempotence", + is_idempotent ? "true" : "false"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + + /* Produce 10 messages */ + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + + /* Delete topic */ + rd_kafka_mock_topic_delete(mcluster, "test_topic"); + + /* Re-create topic */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + + /* Propagate topic change in metadata. */ + rd_sleep(2); + + /* Produce messages to recreated topic - it should be seamless. */ + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/** + * Test two topics' recreation (at the same time) with normal and idempotent + * producer. + */ +static void do_test_two_topics_recreated_producer(rd_bool_t is_idempotent) { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstrap_servers; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + + SUB_TEST("Testing two topics' recreation with %s producer", + is_idempotent ? "idempotent" : "normal"); + + /* Create mock cluster */ + mcluster = test_mock_cluster_new(3, &bootstrap_servers); + + /* Create topic and change leader to bump leader epoch */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + rd_kafka_mock_topic_create(mcluster, "test_topic2", 3, 1); + rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2); + rd_kafka_mock_partition_set_leader(mcluster, "test_topic2", 0, 2); + + /* Create and init a producer */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstrap_servers); + test_conf_set(conf, "enable.idempotence", + is_idempotent ? "true" : "false"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + + /* Produce 10 messages */ + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + test_produce_msgs2(rk, "test_topic2", 0, 0, 0, 10, "test", 10); + + /* Delete topic */ + rd_kafka_mock_topic_delete(mcluster, "test_topic"); + rd_kafka_mock_topic_delete(mcluster, "test_topic2"); + /* Re-create topic */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + rd_kafka_mock_topic_create(mcluster, "test_topic2", 3, 1); + + /* Propagate topic change in metadata. */ + rd_sleep(2); + + /* Produce messages to recreated topic - it should be seamless. */ + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + test_produce_msgs2(rk, "test_topic2", 0, 0, 0, 10, "test", 10); + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + +/* Test topic recreation with transactional producer */ +static void do_test_topic_recreated_transactional_producer() { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstrap_servers; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_resp_err_t err; + + SUB_TEST("Testing topic recreation with transactional producer"); + + /* Create mock cluster */ + mcluster = test_mock_cluster_new(3, &bootstrap_servers); + + /* Create topic and change leader to bump leader epoch */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + + /* Note that a leader change is NECESSARY for testing a transactional + * producer. A NOT_LEADER exception is why a metadata request is + * triggered in the first place. Otherwise it just fails with a fatal + * error (out of sequence) because the Produce RPC isn't aware of topic + * IDs, and thus the client has no way to know. */ + rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2); + + /* Create and init a transactional producer */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstrap_servers); + test_conf_set(conf, "enable.idempotence", "true"); + test_conf_set(conf, "transactional.id", "test_tx"); + test_conf_set(conf, "max.in.flight", "1"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + rd_kafka_init_transactions(rk, 5000); + + + /* Produce 10 messages */ + rd_kafka_begin_transaction(rk); + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + rd_kafka_commit_transaction(rk, 5000); + + /* Delete topic */ + rd_kafka_mock_topic_delete(mcluster, "test_topic"); + + /* Re-create topic */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + + /* Propagate topic change in metadata*/ + rd_sleep(2); + + /* Produce messages to recreated topic. */ + rd_kafka_begin_transaction(rk); + /* First message should queue without any problems. */ + err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test_topic"), + RD_KAFKA_V_VALUE("test", 4), RD_KAFKA_V_END); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR, + "Expected NO_ERROR, not %s", rd_kafka_err2str(err)); + + /* We might get Success or Purged from Queue, depending on when exactly + * the metadata request is made. There's nothing we can do about it + * until AK implements produce by topic id. So turn off error checking. + */ + test_curr->ignore_dr_err = rd_true; + + /* Some Nth message should refuse to queue because we're in ERR__STATE + * and we need an abort. We don't know exactly at what point it starts + * to complain because we're not tracking the metadata request or the + * time when epoch_drain_bump is called. So just rely on the test + * timeout. */ + while (err == RD_KAFKA_RESP_ERR_NO_ERROR) { + err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test_topic"), + RD_KAFKA_V_VALUE("test", 4), + RD_KAFKA_V_END); + rd_sleep(1); + } + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE, + "Expected ERR__STATE error, not %s", rd_kafka_err2str(err)); + rd_kafka_abort_transaction(rk, 5000); + + /* Producer should work as normal after abort. */ + test_curr->ignore_dr_err = rd_false; + rd_kafka_begin_transaction(rk); + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + rd_kafka_commit_transaction(rk, 5000); + + rd_kafka_destroy(rk); + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + + +/** + * @brief Structure to hold information needed for test by the consumer thread. + */ +struct consumer_thread_arg { + const char *topic_name; + int expected_messages; + int broadcast_msg_cnt; + cnd_t broadcast_cnd; + mtx_t broadcast_mtx; + const char *bootstrap_servers; +}; + +/** + * @brief Consumer thread function + * + * Consumes messages from the specified topic and verifies that + * the expected number of messages are received. + */ +static int run_consumer(void *arg) { + struct consumer_thread_arg *tmc = (struct consumer_thread_arg *)arg; + rd_kafka_t *consumer; + rd_kafka_conf_t *conf; + int msg_cnt = 0; + int ret = 0; + + TEST_SAY( + "Consumer thread starting for topic %s, expecting %d messages\n", + tmc->topic_name, tmc->expected_messages); + + /* Create consumer */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", tmc->bootstrap_servers); + test_conf_set(conf, "group.id", "test_group"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + consumer = test_create_handle(RD_KAFKA_CONSUMER, conf); + + + /* Subscribe to topic */ + test_consumer_subscribe(consumer, tmc->topic_name); + + /* Consume messages until we've received the expected count */ + while (msg_cnt < tmc->expected_messages) { + rd_kafka_message_t *rkmessage; + + rkmessage = rd_kafka_consumer_poll(consumer, 1000); + if (!rkmessage) + continue; + + if (rkmessage->err) + TEST_SAY("Consumer error: %s\n", + rd_kafka_message_errstr(rkmessage)); + else { + msg_cnt++; + if (msg_cnt == tmc->broadcast_msg_cnt) { + mtx_lock(&tmc->broadcast_mtx); + cnd_broadcast(&tmc->broadcast_cnd); + mtx_unlock(&tmc->broadcast_mtx); + } + TEST_SAYL(3, "Received message %d/%d\n", msg_cnt, + tmc->expected_messages); + } + + rd_kafka_message_destroy(rkmessage); + } + + TEST_SAY("Consumer thread for topic %s received all %d messages\n", + tmc->topic_name, msg_cnt); + + /* Clean up */ + rd_kafka_destroy(consumer); + return ret; +} + +/** + * @brief Test topic recreation with consumer + * + * This test creates a consumer, produces 10 messages to a topic, and then + * recreates the topic. It then produces 20 more messages to the new topic + * and verifies that the consumer receives all 30 messages. + */ +static void do_test_topic_recreated_consumer() { + rd_kafka_mock_cluster_t *mcluster; + const char *bootstrap_servers; + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + struct consumer_thread_arg arg = { + .topic_name = "test_topic", + .expected_messages = 30, + .broadcast_msg_cnt = 10, + }; + cnd_init(&arg.broadcast_cnd); + mtx_init(&arg.broadcast_mtx, mtx_plain); + + SUB_TEST("Testing topic recreation with consumer\n"); + + /* Create mock cluster */ + mcluster = test_mock_cluster_new(3, &bootstrap_servers); + arg.bootstrap_servers = bootstrap_servers; + + /* Create topic and bump leader epochs */ + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2); + rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 1); + + /* Start consumer thread and give it some time to start consuming */ + thrd_t consumer_thread; + thrd_create(&consumer_thread, run_consumer, &arg); + rd_sleep(5); + + /* Create producer and produce 10 messages */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstrap_servers); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10); + rd_kafka_destroy(rk); + + /* Wait for consumer to consume 10 messages */ + mtx_lock(&arg.broadcast_mtx); + cnd_wait(&arg.broadcast_cnd, &arg.broadcast_mtx); + mtx_unlock(&arg.broadcast_mtx); + + /* Re-create topic and wait for it to get into the metadata*/ + rd_kafka_mock_topic_delete(mcluster, "test_topic"); + rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1); + rd_sleep(2); + + /* Create producer and produce 20 messages to new topic. */ + test_conf_init(&conf, NULL, 30); + test_conf_set(conf, "bootstrap.servers", bootstrap_servers); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + test_produce_msgs2(rk, "test_topic", 0, 0, 0, 20, "test", 10); + rd_kafka_destroy(rk); + + /* Wait for consumer to consume remaining 20 messages. */ + thrd_join(consumer_thread, NULL); + test_mock_cluster_destroy(mcluster); + SUB_TEST_PASS(); +} + +int main_0152_topic_recreate_mock(int argc, char **argv) { + do_test_topic_recreated_producer(rd_false, rd_false); + do_test_topic_recreated_producer(rd_false, rd_true); + do_test_topic_recreated_producer(rd_true, rd_false); + do_test_topic_recreated_producer(rd_true, rd_true); + + do_test_two_topics_recreated_producer(rd_false); + do_test_two_topics_recreated_producer(rd_true); + + do_test_topic_recreated_transactional_producer(); + + /* Consumer. */ + do_test_topic_recreated_consumer(); + return 0; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index bb94a5018..9c19b366e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -140,6 +140,7 @@ set( 0149-broker-same-host-port.c 0150-telemetry_mock.c 0151-purge-brokers.c + 0152-topic_recreate_mock.c 8000-idle.cpp 8001-fetch_from_follower_mock_manual.c test.c diff --git a/tests/test.c b/tests/test.c index 9c039c019..bc5fd756b 100644 --- a/tests/test.c +++ b/tests/test.c @@ -224,6 +224,7 @@ _TEST_DECL(0104_fetch_from_follower_mock); _TEST_DECL(0105_transactions_mock); _TEST_DECL(0106_cgrp_sess_timeout); _TEST_DECL(0107_topic_recreate); +_TEST_DECL(0107_topic_recreate_unknown_topic_id); _TEST_DECL(0109_auto_create_topics); _TEST_DECL(0110_batch_size); _TEST_DECL(0111_delay_create_topics); @@ -266,6 +267,7 @@ _TEST_DECL(0146_metadata_mock); _TEST_DECL(0149_broker_same_host_port_mock); _TEST_DECL(0150_telemetry_mock); _TEST_DECL(0151_purge_brokers_mock); +_TEST_DECL(0152_topic_recreate_mock); /* Manual tests */ _TEST_DECL(8000_idle); @@ -481,6 +483,7 @@ struct test tests[] = { 0, TEST_BRKVER_TOPIC_ADMINAPI, .scenario = "noautocreate"), + _TEST(0107_topic_recreate_unknown_topic_id, 0, TEST_BRKVER_TOPIC_ADMINAPI), _TEST(0109_auto_create_topics, 0), _TEST(0110_batch_size, 0), _TEST(0111_delay_create_topics, @@ -528,6 +531,7 @@ struct test tests[] = { _TEST(0149_broker_same_host_port_mock, TEST_F_LOCAL), _TEST(0150_telemetry_mock, 0), _TEST(0151_purge_brokers_mock, TEST_F_LOCAL), + _TEST(0152_topic_recreate_mock, TEST_F_LOCAL), /* Manual tests */ diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj index 37a049774..86b1085a1 100644 --- a/win32/tests/tests.vcxproj +++ b/win32/tests/tests.vcxproj @@ -230,6 +230,7 @@ +