diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6d2aca9fc..c9eba7728 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,34 @@
+# librdkafka v2.10.1
+
+librdkafka v2.10.1 is a maintenance release:
+
+ * Fix producer being unable to resume production after topic recreation
+ followed by leader change (#5022).
+ * Fix consumer being unable to resume production after topic recreation, both
+ in case of a leader change and otherwise (#5022)
+
+## Fixes
+
+### Producer fixes
+
+* Issues: #4898
+ When a topic is recreated, the producer may be unable to produce in cases of
+ a leader change, because the stored leader epoch may exceed the epoch of the
+ recreated topic. Solved by resetting the leader epoch if the topic is
+ recreated. Additionally, for idempotent producers, the queues are drained and
+ the epoch is bumped.
+ Happens since v2.1.0.
+
+### Consumer fixes
+
+* When a topic is recreated, the consumer may be unable to resume consumption
+ both in case of a leader change and otherwise, if the new topic partition has
+ more messages than the old topic.
+ Solved by resetting the leader epoch if the topic is recreated, and by
+ resetting the offsets for owned partitions.
+ Happens since v2.1.0.
+
+
# librdkafka v2.10.0
librdkafka v2.10.0 is a feature release:
diff --git a/src/rdkafka_mock.c b/src/rdkafka_mock.c
index f73945ccd..2e6bdf6a6 100644
--- a/src/rdkafka_mock.c
+++ b/src/rdkafka_mock.c
@@ -683,14 +683,32 @@ rd_kafka_mock_topic_new(rd_kafka_mock_cluster_t *mcluster,
TAILQ_INSERT_TAIL(&mcluster->topics, mtopic, link);
mcluster->topic_cnt++;
- rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
- "Created topic \"%s\" with %d partition(s) and "
- "replication-factor %d",
- mtopic->name, mtopic->partition_cnt, replication_factor);
+ rd_kafka_dbg(
+ mcluster->rk, MOCK, "MOCK",
+ "Created topic \"%s\" with topic id \"%s\", %d partition(s) "
+ "and replication-factor %d",
+ mtopic->name, rd_kafka_Uuid_base64str(&mtopic->id),
+ mtopic->partition_cnt, replication_factor);
return mtopic;
}
+static void rd_kafka_mock_topic_remove(rd_kafka_mock_cluster_t *mcluster,
+ const char *topic) {
+ rd_kafka_mock_topic_t *mtopic;
+
+ mtopic = rd_kafka_mock_topic_find(mcluster, topic);
+ if (!mtopic)
+ return;
+
+ TAILQ_REMOVE(&mcluster->topics, mtopic, link);
+
+ rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
+ "Deleted topic \"%s\" with topic id \"%s\"", mtopic->name,
+ rd_kafka_Uuid_base64str(&mtopic->id));
+
+ rd_kafka_mock_topic_destroy(mtopic);
+}
rd_kafka_mock_topic_t *
rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster,
@@ -2165,6 +2183,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}
+rd_kafka_resp_err_t
+rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
+ const char *topic) {
+ rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);
+
+ rko->rko_u.mock.name = rd_strdup(topic);
+ rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_DELETE;
+
+ return rd_kafka_op_err_destroy(
+ rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
+}
+
rd_kafka_resp_err_t
rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
@@ -2506,6 +2536,10 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION;
break;
+ case RD_KAFKA_MOCK_CMD_TOPIC_DELETE:
+ rd_kafka_mock_topic_remove(mcluster, rko->rko_u.mock.name);
+ break;
+
case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR:
mtopic =
rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1);
diff --git a/src/rdkafka_mock.h b/src/rdkafka_mock.h
index 0d528812b..caa1a11b0 100644
--- a/src/rdkafka_mock.h
+++ b/src/rdkafka_mock.h
@@ -248,6 +248,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
int partition_cnt,
int replication_factor);
+/**
+ * @brief Delete a topic.
+ *
+ * This is an alternative to automatic topic deletion as performed by
+ * the client itself.
+ *
+ * @remark The Topic Admin API (DeleteTopics) is not supported by the
+ * mock broker.
+ */
+RD_EXPORT rd_kafka_resp_err_t
+rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
+ const char *topic);
/**
* @brief Sets the partition leader.
diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h
index a4f9b93be..00cfb45f1 100644
--- a/src/rdkafka_op.h
+++ b/src/rdkafka_op.h
@@ -586,6 +586,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET,
RD_KAFKA_MOCK_CMD_TELEMETRY_PUSH_INTERVAL_SET,
+ RD_KAFKA_MOCK_CMD_TOPIC_DELETE,
} cmd;
rd_kafka_resp_err_t err; /**< Error for:
diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c
index d345b2722..13f1da9b1 100644
--- a/src/rdkafka_topic.c
+++ b/src/rdkafka_topic.c
@@ -2,7 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2022, Magnus Edenhill
- * 2023, Confluent Inc.
+ * 2023-2025, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,6 +34,7 @@
#include "rdkafka_partition.h"
#include "rdkafka_broker.h"
#include "rdkafka_cgrp.h"
+#include "rdkafka_idempotence.h"
#include "rdkafka_metadata.h"
#include "rdkafka_offset.h"
#include "rdlog.h"
@@ -55,7 +56,8 @@ static int
rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
const struct rd_kafka_metadata_topic *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
- rd_ts_t ts_age);
+ rd_ts_t ts_age,
+ rd_bool_t *update_epoch_bump);
/**
@@ -494,13 +496,15 @@ rd_kafka_topic_t *rd_kafka_topic_new0(rd_kafka_t *rk,
/* Populate from metadata cache. */
if ((rkmce = rd_kafka_metadata_cache_find(rk, topic, 1 /*valid*/)) &&
!rkmce->rkmce_mtopic.err) {
+ rd_bool_t update_epoch_bump =
+ rd_false; /* ignored for new topic anyway. */
if (existing)
*existing = 1;
rd_kafka_topic_metadata_update(
rkt, &rkmce->rkmce_mtopic,
&rkmce->rkmce_metadata_internal_topic,
- rkmce->rkmce_ts_insert);
+ rkmce->rkmce_ts_insert, &update_epoch_bump);
}
if (do_lock)
@@ -927,7 +931,8 @@ static void rd_kafka_toppar_idemp_msgid_restore(rd_kafka_topic_t *rkt,
* @locks rd_kafka_topic_wrlock(rkt) MUST be held.
*/
static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt,
- int32_t partition_cnt) {
+ int32_t partition_cnt,
+ rd_bool_t topic_id_change) {
rd_kafka_t *rk = rkt->rkt_rk;
rd_kafka_toppar_t **rktps;
rd_kafka_toppar_t *rktp;
@@ -1036,8 +1041,14 @@ static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt,
* topic as non-existent, triggering the removal of partitions
* on the producer client. When metadata is eventually correct
* again and the topic is "re-created" on the producer, it
- * must continue with the next msgid/baseseq. */
- if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid))
+ * must continue with the next msgid/baseseq.
+ *
+ * This isn't applicable if the topic id changes (because it's
+ * a new topic entirely). This is just to avoid saving, we will
+ * still need to call rd_kafka_topic_recreated_partition_reset
+ * on this topic. */
+ if (is_idempodent && rd_kafka_pid_valid(rktp->rktp_eos.pid) &&
+ !topic_id_change)
rd_kafka_toppar_idemp_msgid_save(rkt, rktp);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_UNKNOWN;
@@ -1085,7 +1096,65 @@ static int rd_kafka_topic_partition_cnt_update(rd_kafka_topic_t *rkt,
return 1;
}
+/**
+ * @brief Update partitions of a topic after it has been recreated.
+ *
+ * @param rkt The topic to update.
+ * @param mdt The metadata of the topic.
+ * @param mdit The internal metadata of the topic.
+ */
+static void rd_kafka_topic_recreated_partition_reset(
+ rd_kafka_topic_t *rkt,
+ const struct rd_kafka_metadata_topic *mdt,
+ const rd_kafka_metadata_topic_internal_t *mdit) {
+ rd_kafka_t *rk = rkt->rkt_rk;
+ rd_kafka_toppar_t *rktp;
+ int32_t i;
+ rd_kafka_fetch_pos_t next_pos = {
+ .offset = RD_KAFKA_OFFSET_INVALID,
+ .leader_epoch = -1,
+ };
+
+ for (i = 0; i < rkt->rkt_partition_cnt; i++) {
+ rktp = rkt->rkt_p[i];
+
+ /* Common */
+ rd_kafka_toppar_lock(rktp);
+ /* By setting partition's leader epoch to -1, we make sure to
+ * always pick up whatever is in the metadata. Even if the
+ * metadata is stale, it's better than what we have because the
+ * topic has been recreated. We don't need to set any other
+ * field, because (suppose) the leader is the same, then we can
+ * just continue, and if we're fetching from a follower, we'll
+ * just get a not follower error if applicable. */
+ rktp->rktp_leader_epoch = -1;
+ if (rk->rk_type == RD_KAFKA_PRODUCER) {
+ /* Producer: No op - we bump epoch and drain on rk-level
+ * for an idempotent producer. */
+ } else if (rk->rk_type == RD_KAFKA_CONSUMER) {
+ /* Consumer: set each partition's offset to invalid so
+ * it can go through a reset.
+ *
+ * Note that, this can lead to re-consumption in a
+ * particular cases, for example:
+ * 1. This consumer unsubscribes to the topic.
+ * 2. Topic is recreated, new consumer is created and
+ * subscribes to the topic, and consumes messages and
+ * commits offsets.
+ * 3. This consumer resubscribes to the topic and should
+ * ideally consume from consumer2's committed offsets,
+ * but since the topic is recreated, it will consume
+ * from whatever is in auto.offset.reset. It's too bad -
+ * we can't help it because the broker does not store
+ * offsets with topic id, only with topic name. */
+ rd_kafka_offset_reset(
+ rktp, RD_KAFKA_NODEID_UA, next_pos,
+ RD_KAFKA_RESP_ERR_NO_ERROR, "topic recreated");
+ }
+ rd_kafka_toppar_unlock(rktp);
+ }
+}
/**
* Topic 'rkt' does not exist: propagate to interested parties.
@@ -1252,7 +1321,8 @@ rd_bool_t rd_kafka_topic_set_notexists(rd_kafka_topic_t *rkt,
rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;
/* Update number of partitions */
- rd_kafka_topic_partition_cnt_update(rkt, 0);
+ rd_kafka_topic_partition_cnt_update(rkt, 0,
+ rd_false /* topic_id_change */);
/* Purge messages with forced partition */
rd_kafka_topic_assign_uas(rkt, err);
@@ -1337,7 +1407,8 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt,
rkt->rkt_err = err;
/* Update number of partitions */
- rd_kafka_topic_partition_cnt_update(rkt, 0);
+ rd_kafka_topic_partition_cnt_update(rkt, 0,
+ rd_false /* topic_id_change */);
/* Purge messages with forced partition */
rd_kafka_topic_assign_uas(rkt, err);
@@ -1353,6 +1424,9 @@ rd_bool_t rd_kafka_topic_set_error(rd_kafka_topic_t *rkt,
* @param mdt Topic metadata.
* @param mdit Topic internal metadata.
* @param ts_age absolute age (timestamp) of metadata.
+ * @param update_epoch_bump this is set as an out parameter. If set to true,
+ * caller should run rd_kafka_idemp_drain_epoch_bump().
+ *
* @returns 1 if the number of partitions changed, 0 if not, and -1 if the
* topic is unknown.
@@ -1363,7 +1437,8 @@ static int
rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
const struct rd_kafka_metadata_topic *mdt,
const rd_kafka_metadata_topic_internal_t *mdit,
- rd_ts_t ts_age) {
+ rd_ts_t ts_age,
+ rd_bool_t *update_epoch_bump) {
rd_kafka_t *rk = rkt->rkt_rk;
int upd = 0;
int j;
@@ -1372,6 +1447,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
int old_state;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_stale_leader_epoch = rd_false;
+ rd_bool_t different_topic_id = rd_false;
if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
@@ -1422,12 +1498,17 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
/* Update number of partitions, but not if there are
* (possibly intermittent) errors (e.g., "Leader not available"). */
if (mdt->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
- rd_bool_t different_topic_id =
+ /* Avoids those id changes where we're creating the topic or
+ * deleting it during termination. */
+ rd_bool_t both_topic_ids_non_zero =
+ !RD_KAFKA_UUID_IS_ZERO(mdit->topic_id) &&
+ !RD_KAFKA_UUID_IS_ZERO(rkt->rkt_topic_id);
+ different_topic_id =
rd_kafka_Uuid_cmp(mdit->topic_id, rkt->rkt_topic_id) != 0;
if (different_topic_id ||
mdt->partition_cnt > rkt->rkt_partition_cnt)
upd += rd_kafka_topic_partition_cnt_update(
- rkt, mdt->partition_cnt);
+ rkt, mdt->partition_cnt, different_topic_id);
if (different_topic_id) {
/* FIXME: an offset reset must be triggered.
* when rkt_topic_id wasn't zero.
@@ -1443,7 +1524,15 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rkt->rkt_topic->str,
rd_kafka_Uuid_base64str(&rkt->rkt_topic_id),
rd_kafka_Uuid_base64str(&mdit->topic_id));
+
rkt->rkt_topic_id = mdit->topic_id;
+
+ if (both_topic_ids_non_zero) {
+ rd_kafka_topic_recreated_partition_reset(
+ rkt, mdt, mdit);
+ if (rd_kafka_is_idempotent(rk))
+ *update_epoch_bump = rd_true;
+ }
}
/* If the metadata times out for a topic (because all brokers
* are down) the state will transition to S_UNKNOWN.
@@ -1542,6 +1631,7 @@ int rd_kafka_topic_metadata_update2(
const rd_kafka_metadata_topic_internal_t *mdit) {
rd_kafka_topic_t *rkt;
int r;
+ rd_bool_t update_epoch_bump = rd_false;
rd_kafka_wrlock(rkb->rkb_rk);
@@ -1557,10 +1647,28 @@ int rd_kafka_topic_metadata_update2(
return -1; /* Ignore topics that we dont have locally. */
}
- r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock());
+ r = rd_kafka_topic_metadata_update(rkt, mdt, mdit, rd_clock(),
+ &update_epoch_bump);
rd_kafka_wrunlock(rkb->rkb_rk);
+ /* This is true in case of topic recreation and when we're using an
+ * idempotent/transactional producer. */
+ if (update_epoch_bump) {
+ /* Why do we bump epoch here rather than, say, at an rk-level
+ * after checking this for all topics?
+ * 1. We need to make sure leader changes (and the subsequent
+ * broker delegation change) does not run before this, as
+ * the producer might start sending to the new broker, which
+ * would give us out of sequence errors.
+ * 2. The rd_kafka_idemp_drain_epoch_bump() sets state in an
+ * idempotent way, though the function isn't entirely
+ * idempotent, so it's okay to do this repeatedly.
+ */
+ rd_kafka_idemp_drain_epoch_bump(
+ rkb->rkb_rk, RD_KAFKA_RESP_ERR_NO_ERROR, "topic recreated");
+ }
+
rd_kafka_topic_destroy0(rkt); /* from find() */
return r;
@@ -1628,7 +1736,8 @@ void rd_kafka_topic_partitions_remove(rd_kafka_topic_t *rkt) {
/* Setting the partition count to 0 moves all partitions to
* the desired list (rktp_desp). */
- rd_kafka_topic_partition_cnt_update(rkt, 0);
+ rd_kafka_topic_partition_cnt_update(rkt, 0,
+ rd_false /* topic_id_change */);
/* Now clean out the desired partitions list.
* Use reverse traversal to avoid excessive memory shuffling
@@ -2143,6 +2252,7 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,
.partition_cnt = partition_cnt};
rd_kafka_metadata_topic_internal_t mdit = {.partitions = partitions};
int i;
+ rd_bool_t update_epoch_bump;
mdt.partitions = rd_alloca(sizeof(*mdt.partitions) * partition_cnt);
@@ -2155,7 +2265,9 @@ void rd_ut_kafka_topic_set_topic_exists(rd_kafka_topic_t *rkt,
rd_kafka_wrlock(rkt->rkt_rk);
rd_kafka_metadata_cache_topic_update(rkt->rkt_rk, &mdt, &mdit, rd_true,
rd_false, rd_true);
- rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock());
+ update_epoch_bump = rd_false;
+ rd_kafka_topic_metadata_update(rkt, &mdt, &mdit, rd_clock(),
+ &update_epoch_bump);
rd_kafka_wrunlock(rkt->rkt_rk);
rd_free(partitions);
}
diff --git a/tests/0107-topic_recreate.c b/tests/0107-topic_recreate.c
index 68b978479..b58c5b325 100644
--- a/tests/0107-topic_recreate.c
+++ b/tests/0107-topic_recreate.c
@@ -242,6 +242,9 @@ static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {
}
+/**
+ * @remark This is run only when scenario="noautocreate".
+ */
int main_0107_topic_recreate(int argc, char **argv) {
this_test = test_curr; /* Need to expose current test struct (in TLS)
* to producer thread. */
@@ -257,3 +260,93 @@ int main_0107_topic_recreate(int argc, char **argv) {
return 0;
}
+
+
+/**
+ * @brief Test topic create + delete + create with consumer.
+ *
+ * Details:
+ * 1. Create producer and produce 10 messages.
+ * 2. Create consumer (auto offset reset to earliest) and subscribe to topic,
+ * consume the first 10 messages.
+ * 3. Recreate topic and produce 20 messages.
+ * 4. Consume remaining 20 messages - it should get all 20 because there should
+ * be an offset reset when Fetch returns an UNKNOWN_TOPIC_ID error.
+ */
+static void do_test_topic_recreated_consumer_unknown_topic_id() {
+ rd_kafka_t *rk_producer;
+ rd_kafka_t *rk_consumer;
+ rd_kafka_conf_t *conf;
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ rd_kafka_topic_t *rkt;
+
+ SUB_TEST("Testing topic recreation with consumer\n");
+
+ /* Create producer. */
+ test_conf_init(&conf, NULL, 30);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk_producer = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ /* Create topic using admin API */
+ test_create_topic(rk_producer, topic, 1, 1);
+
+ /* Write 10 msgs */
+ test_produce_msgs2(rk_producer, topic, 0, 0, 0, 10, "test", 10);
+
+ /* Destroy and recreate producer */
+ rd_kafka_destroy(rk_producer);
+ rk_producer = NULL;
+
+ /* Create consumer. */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "group.id", topic);
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ /* Changing these becausee we don't accidentally want metadata refresh,
+ * we want this to be triggered by the UNKNOWN_TOPIC_ID error. */
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "90000");
+ test_conf_set(conf, "metadata.max.age.ms", "180000");
+ rk_consumer = test_create_handle(RD_KAFKA_CONSUMER, conf);
+
+ /* Subscribe to topic. */
+ test_consumer_subscribe(rk_consumer, topic);
+ test_consumer_wait_assignment(rk_consumer, rd_false);
+
+ /* Consume messages. */
+ rkt = test_create_consumer_topic(rk_consumer, topic);
+ test_consume_msgs("consume_from_topic", rkt, 0, 0, TEST_NO_SEEK, 0, 10,
+ 0);
+
+ /* Pause consumer so there are no fetches in the temporary state where
+ * there is no topic on the broker */
+ test_consumer_pause_resume_partition(rk_consumer, topic, 0, rd_true);
+ rd_sleep(2);
+
+ /* Recreate topic. */
+ test_delete_topic(rk_producer, topic);
+ rd_sleep(5);
+ test_create_topic(rk_producer, topic, 1, 1);
+
+ /* Produce 2x the number of messages. */
+ test_conf_init(&conf, NULL, 30);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk_producer = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ test_produce_msgs2(rk_producer, topic, 0, 0, 0, 20, "test", 10);
+
+ /* Resume consumer. */
+ test_consumer_pause_resume_partition(rk_consumer, topic, 0, rd_false);
+ rd_sleep(2);
+
+ /* Consume messages. */
+ test_consume_msgs("consume_from_topic", rkt, 0, 0, TEST_NO_SEEK, 0, 20,
+ 0);
+
+ /* Destroy consumer. */
+ rd_kafka_destroy(rk_consumer);
+ rd_kafka_destroy(rk_producer);
+ SUB_TEST_PASS();
+}
+
+int main_0107_topic_recreate_unknown_topic_id(int argc, char **argv) {
+ do_test_topic_recreated_consumer_unknown_topic_id();
+ return 0;
+}
diff --git a/tests/0152-topic_recreate_mock.c b/tests/0152-topic_recreate_mock.c
new file mode 100644
index 000000000..72d3d333f
--- /dev/null
+++ b/tests/0152-topic_recreate_mock.c
@@ -0,0 +1,396 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2025, Confluent Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "../src/rdkafka_proto.h"
+
+/**
+ * @name Verify that producer and consumer resumes operation after
+ * a topic has been deleted and recreated (with topic id change) using
+ * the mock broker.
+ *
+ * @note These tests should be revised after the implementation of KIP-516 is
+ * added to the actual broker for Produce.
+ */
+
+/**
+ * Test topic recreation for producer. There are two configurable flags:
+ * 1. is_idempotent: decides whether producer is idempotent or not.
+ * 2. leader_change: if set to true, changes leader of the initial topic to push
+ * the leader epoch beyond that of the recreated topic.
+ */
+static void do_test_topic_recreated_producer(rd_bool_t is_idempotent,
+ rd_bool_t leader_change) {
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstrap_servers;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+
+ SUB_TEST("Testing topic recreation with %s producer",
+ is_idempotent ? "idempotent" : "normal");
+
+ /* Create mock cluster */
+ mcluster = test_mock_cluster_new(3, &bootstrap_servers);
+
+ /* Create topic and change leader to bump leader epoch */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+ if (leader_change)
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0,
+ 2);
+
+ /* Create and init a producer */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
+ test_conf_set(conf, "enable.idempotence",
+ is_idempotent ? "true" : "false");
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+
+ /* Produce 10 messages */
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+
+ /* Delete topic */
+ rd_kafka_mock_topic_delete(mcluster, "test_topic");
+
+ /* Re-create topic */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+
+ /* Propagate topic change in metadata. */
+ rd_sleep(2);
+
+ /* Produce messages to recreated topic - it should be seamless. */
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+
+ rd_kafka_destroy(rk);
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * Test two topics' recreation (at the same time) with normal and idempotent
+ * producer.
+ */
+static void do_test_two_topics_recreated_producer(rd_bool_t is_idempotent) {
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstrap_servers;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+
+ SUB_TEST("Testing two topics' recreation with %s producer",
+ is_idempotent ? "idempotent" : "normal");
+
+ /* Create mock cluster */
+ mcluster = test_mock_cluster_new(3, &bootstrap_servers);
+
+ /* Create topic and change leader to bump leader epoch */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+ rd_kafka_mock_topic_create(mcluster, "test_topic2", 3, 1);
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2);
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic2", 0, 2);
+
+ /* Create and init a producer */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
+ test_conf_set(conf, "enable.idempotence",
+ is_idempotent ? "true" : "false");
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+
+ /* Produce 10 messages */
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+ test_produce_msgs2(rk, "test_topic2", 0, 0, 0, 10, "test", 10);
+
+ /* Delete topic */
+ rd_kafka_mock_topic_delete(mcluster, "test_topic");
+ rd_kafka_mock_topic_delete(mcluster, "test_topic2");
+ /* Re-create topic */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+ rd_kafka_mock_topic_create(mcluster, "test_topic2", 3, 1);
+
+ /* Propagate topic change in metadata. */
+ rd_sleep(2);
+
+ /* Produce messages to recreated topic - it should be seamless. */
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+ test_produce_msgs2(rk, "test_topic2", 0, 0, 0, 10, "test", 10);
+ rd_kafka_destroy(rk);
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+/* Test topic recreation with transactional producer */
+static void do_test_topic_recreated_transactional_producer() {
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstrap_servers;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_resp_err_t err;
+
+ SUB_TEST("Testing topic recreation with transactional producer");
+
+ /* Create mock cluster */
+ mcluster = test_mock_cluster_new(3, &bootstrap_servers);
+
+ /* Create topic and change leader to bump leader epoch */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+
+ /* Note that a leader change is NECESSARY for testing a transactional
+ * producer. A NOT_LEADER exception is why a metadata request is
+ * triggered in the first place. Otherwise it just fails with a fatal
+ * error (out of sequence) because the Produce RPC isn't aware of topic
+ * IDs, and thus the client has no way to know. */
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2);
+
+ /* Create and init a transactional producer */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
+ test_conf_set(conf, "enable.idempotence", "true");
+ test_conf_set(conf, "transactional.id", "test_tx");
+ test_conf_set(conf, "max.in.flight", "1");
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ rd_kafka_init_transactions(rk, 5000);
+
+
+ /* Produce 10 messages */
+ rd_kafka_begin_transaction(rk);
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+ rd_kafka_commit_transaction(rk, 5000);
+
+ /* Delete topic */
+ rd_kafka_mock_topic_delete(mcluster, "test_topic");
+
+ /* Re-create topic */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+
+ /* Propagate topic change in metadata*/
+ rd_sleep(2);
+
+ /* Produce messages to recreated topic. */
+ rd_kafka_begin_transaction(rk);
+ /* First message should queue without any problems. */
+ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test_topic"),
+ RD_KAFKA_V_VALUE("test", 4), RD_KAFKA_V_END);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
+ "Expected NO_ERROR, not %s", rd_kafka_err2str(err));
+
+ /* We might get Success or Purged from Queue, depending on when exactly
+ * the metadata request is made. There's nothing we can do about it
+ * until AK implements produce by topic id. So turn off error checking.
+ */
+ test_curr->ignore_dr_err = rd_true;
+
+ /* Some Nth message should refuse to queue because we're in ERR__STATE
+ * and we need an abort. We don't know exactly at what point it starts
+ * to complain because we're not tracking the metadata request or the
+ * time when epoch_drain_bump is called. So just rely on the test
+ * timeout. */
+ while (err == RD_KAFKA_RESP_ERR_NO_ERROR) {
+ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test_topic"),
+ RD_KAFKA_V_VALUE("test", 4),
+ RD_KAFKA_V_END);
+ rd_sleep(1);
+ }
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
+ "Expected ERR__STATE error, not %s", rd_kafka_err2str(err));
+ rd_kafka_abort_transaction(rk, 5000);
+
+ /* Producer should work as normal after abort. */
+ test_curr->ignore_dr_err = rd_false;
+ rd_kafka_begin_transaction(rk);
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+ rd_kafka_commit_transaction(rk, 5000);
+
+ rd_kafka_destroy(rk);
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Structure to hold information needed for test by the consumer thread.
+ */
+struct consumer_thread_arg {
+ const char *topic_name;
+ int expected_messages;
+ int broadcast_msg_cnt;
+ cnd_t broadcast_cnd;
+ mtx_t broadcast_mtx;
+ const char *bootstrap_servers;
+};
+
+/**
+ * @brief Consumer thread function
+ *
+ * Consumes messages from the specified topic and verifies that
+ * the expected number of messages are received.
+ */
+static int run_consumer(void *arg) {
+ struct consumer_thread_arg *tmc = (struct consumer_thread_arg *)arg;
+ rd_kafka_t *consumer;
+ rd_kafka_conf_t *conf;
+ int msg_cnt = 0;
+ int ret = 0;
+
+ TEST_SAY(
+ "Consumer thread starting for topic %s, expecting %d messages\n",
+ tmc->topic_name, tmc->expected_messages);
+
+ /* Create consumer */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", tmc->bootstrap_servers);
+ test_conf_set(conf, "group.id", "test_group");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ consumer = test_create_handle(RD_KAFKA_CONSUMER, conf);
+
+
+ /* Subscribe to topic */
+ test_consumer_subscribe(consumer, tmc->topic_name);
+
+ /* Consume messages until we've received the expected count */
+ while (msg_cnt < tmc->expected_messages) {
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consumer_poll(consumer, 1000);
+ if (!rkmessage)
+ continue;
+
+ if (rkmessage->err)
+ TEST_SAY("Consumer error: %s\n",
+ rd_kafka_message_errstr(rkmessage));
+ else {
+ msg_cnt++;
+ if (msg_cnt == tmc->broadcast_msg_cnt) {
+ mtx_lock(&tmc->broadcast_mtx);
+ cnd_broadcast(&tmc->broadcast_cnd);
+ mtx_unlock(&tmc->broadcast_mtx);
+ }
+ TEST_SAYL(3, "Received message %d/%d\n", msg_cnt,
+ tmc->expected_messages);
+ }
+
+ rd_kafka_message_destroy(rkmessage);
+ }
+
+ TEST_SAY("Consumer thread for topic %s received all %d messages\n",
+ tmc->topic_name, msg_cnt);
+
+ /* Clean up */
+ rd_kafka_destroy(consumer);
+ return ret;
+}
+
+/**
+ * @brief Test topic recreation with consumer
+ *
+ * This test creates a consumer, produces 10 messages to a topic, and then
+ * recreates the topic. It then produces 20 more messages to the new topic
+ * and verifies that the consumer receives all 30 messages.
+ */
+static void do_test_topic_recreated_consumer() {
+ rd_kafka_mock_cluster_t *mcluster;
+ const char *bootstrap_servers;
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ struct consumer_thread_arg arg = {
+ .topic_name = "test_topic",
+ .expected_messages = 30,
+ .broadcast_msg_cnt = 10,
+ };
+ cnd_init(&arg.broadcast_cnd);
+ mtx_init(&arg.broadcast_mtx, mtx_plain);
+
+ SUB_TEST("Testing topic recreation with consumer\n");
+
+ /* Create mock cluster */
+ mcluster = test_mock_cluster_new(3, &bootstrap_servers);
+ arg.bootstrap_servers = bootstrap_servers;
+
+ /* Create topic and bump leader epochs */
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 2);
+ rd_kafka_mock_partition_set_leader(mcluster, "test_topic", 0, 1);
+
+ /* Start consumer thread and give it some time to start consuming */
+ thrd_t consumer_thread;
+ thrd_create(&consumer_thread, run_consumer, &arg);
+ rd_sleep(5);
+
+ /* Create producer and produce 10 messages */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 10, "test", 10);
+ rd_kafka_destroy(rk);
+
+ /* Wait for consumer to consume 10 messages */
+ mtx_lock(&arg.broadcast_mtx);
+ cnd_wait(&arg.broadcast_cnd, &arg.broadcast_mtx);
+ mtx_unlock(&arg.broadcast_mtx);
+
+ /* Re-create topic and wait for it to get into the metadata*/
+ rd_kafka_mock_topic_delete(mcluster, "test_topic");
+ rd_kafka_mock_topic_create(mcluster, "test_topic", 3, 1);
+ rd_sleep(2);
+
+ /* Create producer and produce 20 messages to new topic. */
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstrap_servers);
+ rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+ test_produce_msgs2(rk, "test_topic", 0, 0, 0, 20, "test", 10);
+ rd_kafka_destroy(rk);
+
+ /* Wait for consumer to consume remaining 20 messages. */
+ thrd_join(consumer_thread, NULL);
+ test_mock_cluster_destroy(mcluster);
+ SUB_TEST_PASS();
+}
+
+int main_0152_topic_recreate_mock(int argc, char **argv) {
+ do_test_topic_recreated_producer(rd_false, rd_false);
+ do_test_topic_recreated_producer(rd_false, rd_true);
+ do_test_topic_recreated_producer(rd_true, rd_false);
+ do_test_topic_recreated_producer(rd_true, rd_true);
+
+ do_test_two_topics_recreated_producer(rd_false);
+ do_test_two_topics_recreated_producer(rd_true);
+
+ do_test_topic_recreated_transactional_producer();
+
+ /* Consumer. */
+ do_test_topic_recreated_consumer();
+ return 0;
+}
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index bb94a5018..9c19b366e 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -140,6 +140,7 @@ set(
0149-broker-same-host-port.c
0150-telemetry_mock.c
0151-purge-brokers.c
+ 0152-topic_recreate_mock.c
8000-idle.cpp
8001-fetch_from_follower_mock_manual.c
test.c
diff --git a/tests/test.c b/tests/test.c
index 9c039c019..bc5fd756b 100644
--- a/tests/test.c
+++ b/tests/test.c
@@ -224,6 +224,7 @@ _TEST_DECL(0104_fetch_from_follower_mock);
_TEST_DECL(0105_transactions_mock);
_TEST_DECL(0106_cgrp_sess_timeout);
_TEST_DECL(0107_topic_recreate);
+_TEST_DECL(0107_topic_recreate_unknown_topic_id);
_TEST_DECL(0109_auto_create_topics);
_TEST_DECL(0110_batch_size);
_TEST_DECL(0111_delay_create_topics);
@@ -266,6 +267,7 @@ _TEST_DECL(0146_metadata_mock);
_TEST_DECL(0149_broker_same_host_port_mock);
_TEST_DECL(0150_telemetry_mock);
_TEST_DECL(0151_purge_brokers_mock);
+_TEST_DECL(0152_topic_recreate_mock);
/* Manual tests */
_TEST_DECL(8000_idle);
@@ -481,6 +483,7 @@ struct test tests[] = {
0,
TEST_BRKVER_TOPIC_ADMINAPI,
.scenario = "noautocreate"),
+ _TEST(0107_topic_recreate_unknown_topic_id, 0, TEST_BRKVER_TOPIC_ADMINAPI),
_TEST(0109_auto_create_topics, 0),
_TEST(0110_batch_size, 0),
_TEST(0111_delay_create_topics,
@@ -528,6 +531,7 @@ struct test tests[] = {
_TEST(0149_broker_same_host_port_mock, TEST_F_LOCAL),
_TEST(0150_telemetry_mock, 0),
_TEST(0151_purge_brokers_mock, TEST_F_LOCAL),
+ _TEST(0152_topic_recreate_mock, TEST_F_LOCAL),
/* Manual tests */
diff --git a/win32/tests/tests.vcxproj b/win32/tests/tests.vcxproj
index 37a049774..86b1085a1 100644
--- a/win32/tests/tests.vcxproj
+++ b/win32/tests/tests.vcxproj
@@ -230,6 +230,7 @@
+