Skip to content

Deal with topic recreation for producer and consumer (using metadata) #5022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
# librdkafka v2.10.1

librdkafka v2.10.1 is a maintenance release:

* Fix producer being unable to resume production after topic recreation
followed by leader change (#5022).
* Fix consumer being unable to resume production after topic recreation, both
in case of a leader change and otherwise (#5022)

## Fixes

### Producer fixes

* Issues: #4898
When a topic is recreated, the producer may be unable to produce in cases of
a leader change, because the stored leader epoch may exceed the epoch of the
recreated topic. Solved by resetting the leader epoch if the topic is
recreated. Additionally, for idempotent producers, the queues are drained and
the epoch is bumped.
Happens since v2.1.0.

### Consumer fixes

* When a topic is recreated, the consumer may be unable to resume consumption
both in case of a leader change and otherwise, if the new topic partition has
more messages than the old topic.
Solved by resetting the leader epoch if the topic is recreated, and by
resetting the offsets for owned partitions.
Happens since v2.1.0.


# librdkafka v2.10.0

librdkafka v2.10.0 is a feature release:
Expand Down
42 changes: 38 additions & 4 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -683,14 +683,32 @@ rd_kafka_mock_topic_new(rd_kafka_mock_cluster_t *mcluster,
TAILQ_INSERT_TAIL(&mcluster->topics, mtopic, link);
mcluster->topic_cnt++;

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Created topic \"%s\" with %d partition(s) and "
"replication-factor %d",
mtopic->name, mtopic->partition_cnt, replication_factor);
rd_kafka_dbg(
mcluster->rk, MOCK, "MOCK",
"Created topic \"%s\" with topic id \"%s\", %d partition(s) "
"and replication-factor %d",
mtopic->name, rd_kafka_Uuid_base64str(&mtopic->id),
mtopic->partition_cnt, replication_factor);

return mtopic;
}

static void rd_kafka_mock_topic_remove(rd_kafka_mock_cluster_t *mcluster,
const char *topic) {
rd_kafka_mock_topic_t *mtopic;

mtopic = rd_kafka_mock_topic_find(mcluster, topic);
if (!mtopic)
return;

TAILQ_REMOVE(&mcluster->topics, mtopic, link);

rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Deleted topic \"%s\" with topic id \"%s\"", mtopic->name,
rd_kafka_Uuid_base64str(&mtopic->id));

rd_kafka_mock_topic_destroy(mtopic);
}

rd_kafka_mock_topic_t *
rd_kafka_mock_topic_find(const rd_kafka_mock_cluster_t *mcluster,
Expand Down Expand Up @@ -2165,6 +2183,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
const char *topic) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

rko->rko_u.mock.name = rd_strdup(topic);
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_TOPIC_DELETE;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_partition_set_leader(rd_kafka_mock_cluster_t *mcluster,
const char *topic,
Expand Down Expand Up @@ -2506,6 +2536,10 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
return RD_KAFKA_RESP_ERR_TOPIC_EXCEPTION;
break;

case RD_KAFKA_MOCK_CMD_TOPIC_DELETE:
rd_kafka_mock_topic_remove(mcluster, rko->rko_u.mock.name);
break;

case RD_KAFKA_MOCK_CMD_TOPIC_SET_ERROR:
mtopic =
rd_kafka_mock_topic_get(mcluster, rko->rko_u.mock.name, -1);
Expand Down
12 changes: 12 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,18 @@ rd_kafka_mock_topic_create(rd_kafka_mock_cluster_t *mcluster,
int partition_cnt,
int replication_factor);

/**
* @brief Delete a topic.
*
* This is an alternative to automatic topic deletion as performed by
* the client itself.
*
* @remark The Topic Admin API (DeleteTopics) is not supported by the
* mock broker.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_topic_delete(rd_kafka_mock_cluster_t *mcluster,
const char *topic);

/**
* @brief Sets the partition leader.
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET,
RD_KAFKA_MOCK_CMD_TELEMETRY_PUSH_INTERVAL_SET,
RD_KAFKA_MOCK_CMD_TOPIC_DELETE,
} cmd;

rd_kafka_resp_err_t err; /**< Error for:
Expand Down
Loading