Skip to content

Commit 42cc318

Browse files
emasabpranavrth
andcommitted
[KIP-848] topic_id in topic_partition_internal, some new error codes (#4404)
* KIP-848 new error codes (partial) * Add topic id to topic_partition_private * Rename rd_kafka_uuid_t to rd_kafka_Uuid_t * [KIP-848] Added new configs group.protocol and group.remote.assignor (#4414) Added new configs group.protocol and group.remote.assignor * [KIP-848] Added topic id to topic_partition_t while reading from buffer (#4416) * Address comments * Address comment * Upgrade vcpkg --------- Co-authored-by: Pranav Rathi <[email protected]>
1 parent 22cdb64 commit 42cc318

16 files changed

+316
-48
lines changed

CONFIGURATION.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ group.instance.id | C | |
109109
partition.assignment.strategy | C | | range,roundrobin | medium | The name of one or more partition assignment strategies. The elected group leader will use a strategy supported by all members of the group to assign partitions to group members. If there is more than one eligible strategy, preference is determined by the order of this list (strategies earlier in the list have higher priority). Cooperative and non-cooperative (eager) strategies must not be mixed. Available strategies: range, roundrobin, cooperative-sticky. <br>*Type: string*
110110
session.timeout.ms | C | 1 .. 3600000 | 45000 | high | Client group session and failure detection timeout. The consumer sends periodic heartbeats (heartbeat.interval.ms) to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the **broker** configuration properties `group.min.session.timeout.ms` and `group.max.session.timeout.ms`. Also see `max.poll.interval.ms`. <br>*Type: integer*
111111
heartbeat.interval.ms | C | 1 .. 3600000 | 3000 | low | Group session keepalive heartbeat interval. <br>*Type: integer*
112-
group.protocol.type | C | | consumer | low | Group protocol type. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
112+
group.protocol.type | C | | consumer | low | Group protocol type for the `generic` group protocol. NOTE: Currently, the only supported group protocol type is `consumer`. <br>*Type: string*
113113
coordinator.query.interval.ms | C | 1 .. 3600000 | 600000 | low | How often to query for the current client group coordinator. If the currently assigned coordinator is down the configured query interval will be divided by ten to more quickly recover in case of coordinator reassignment. <br>*Type: integer*
114114
max.poll.interval.ms | C | 1 .. 86400000 | 300000 | high | Maximum allowed time between calls to consume messages (e.g., rd_kafka_consumer_poll()) for high-level consumers. If this interval is exceeded the consumer is considered failed and the group will rebalance in order to reassign the partitions to another consumer group member. Warning: Offset commits may be not possible at this point. Note: It is recommended to set `enable.auto.offset.store=false` for long-time processing applications and then explicitly store offsets (using offsets_store()) *after* message processing, to make sure offsets are not auto-committed prior to processing has finished. The interval is checked two times per second. See KIP-62 for more information. <br>*Type: integer*
115115
enable.auto.commit | C | true, false | true | high | Automatically and periodically commit offsets in the background. Note: setting this to false does not prevent the consumer from fetching previously committed start offsets. To circumvent this behaviour set specific start offsets per partition in the call to assign(). <br>*Type: boolean*

src/rdkafka.c

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,10 @@ static const struct rd_kafka_err_desc rd_kafka_err_descs[] = {
700700
_ERR_DESC(RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE,
701701
"Broker: Request principal deserialization failed during "
702702
"forwarding"),
703+
_ERR_DESC(RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH,
704+
"Broker: The member epoch is fenced by the group coordinator"),
705+
_ERR_DESC(RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH,
706+
"Broker: The member epoch is stale"),
703707
_ERR_DESC(RD_KAFKA_RESP_ERR__END, NULL)};
704708

705709

@@ -2436,8 +2440,9 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
24362440

24372441
if (RD_KAFKAP_STR_LEN(rk->rk_group_id) > 0) {
24382442
/* Create consumer group handle */
2439-
rk->rk_cgrp = rd_kafka_cgrp_new(rk, rk->rk_group_id,
2440-
rk->rk_client_id);
2443+
rk->rk_cgrp = rd_kafka_cgrp_new(
2444+
rk, rk->rk_conf.group_protocol, rk->rk_group_id,
2445+
rk->rk_client_id);
24412446
rk->rk_consumer.q =
24422447
rd_kafka_q_keep(rk->rk_cgrp->rkcg_q);
24432448
} else {

src/rdkafka.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,10 @@ typedef enum {
631631
RD_KAFKA_RESP_ERR_FEATURE_UPDATE_FAILED = 96,
632632
/** Request principal deserialization failed during forwarding */
633633
RD_KAFKA_RESP_ERR_PRINCIPAL_DESERIALIZATION_FAILURE = 97,
634-
634+
/** The member epoch is fenced by the group coordinator */
635+
RD_KAFKA_RESP_ERR_FENCED_MEMBER_EPOCH = 110,
636+
/** The member epoch is stale */
637+
RD_KAFKA_RESP_ERR_STALE_MEMBER_EPOCH = 113,
635638
RD_KAFKA_RESP_ERR_END_ALL,
636639
} rd_kafka_resp_err_t;
637640

@@ -4546,6 +4549,20 @@ rd_kafka_consumer_group_metadata_new_with_genid(const char *group_id,
45464549
const char *group_instance_id);
45474550

45484551

4552+
/**
4553+
* @brief Get member id of a group metadata.
4554+
*
4555+
* @param group_metadata The group metadata
4556+
*
4557+
* @returns The member id contained in the passed \p group_metadata.
4558+
*
4559+
* @remark The returned pointer has the same lifetime as \p group_metadata.
4560+
*/
4561+
RD_EXPORT
4562+
const char *rd_kafka_consumer_group_metadata_member_id(
4563+
const rd_kafka_consumer_group_metadata_t *group_metadata);
4564+
4565+
45494566
/**
45504567
* @brief Frees the consumer group metadata object as returned by
45514568
* rd_kafka_consumer_group_metadata().

src/rdkafka_admin.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3938,7 +3938,8 @@ rd_kafka_DeleteRecordsResponse_parse(rd_kafka_op_t *rko_req,
39383938
RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
39393939
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
39403940
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
3941-
offsets = rd_kafka_buf_read_topic_partitions(reply, 0, fields);
3941+
offsets = rd_kafka_buf_read_topic_partitions(
3942+
reply, rd_false /* don't use topic_id */, 0, fields);
39423943
if (!offsets)
39433944
rd_kafka_buf_parse_fail(reply,
39443945
"Failed to parse topic partitions");
@@ -4924,7 +4925,8 @@ rd_kafka_OffsetDeleteResponse_parse(rd_kafka_op_t *rko_req,
49244925
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
49254926
RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
49264927
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
4927-
partitions = rd_kafka_buf_read_topic_partitions(reply, 16, fields);
4928+
partitions = rd_kafka_buf_read_topic_partitions(
4929+
reply, rd_false /* don't use topic_id */, 16, fields);
49284930
if (!partitions) {
49294931
rd_snprintf(errstr, errstr_size,
49304932
"Failed to parse OffsetDeleteResponse partitions");
@@ -8112,7 +8114,8 @@ rd_kafka_DescribeConsumerGroupsResponse_parse(rd_kafka_op_t *rko_req,
81128114
{RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
81138115
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
81148116
partitions = rd_kafka_buf_read_topic_partitions(
8115-
rkbuf, 0, fields);
8117+
rkbuf, rd_false /* don't use topic_id */, 0,
8118+
fields);
81168119
rd_kafka_buf_destroy(rkbuf);
81178120
if (!partitions)
81188121
rd_kafka_buf_parse_fail(

src/rdkafka_assignor.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ rd_kafkap_bytes_t *rd_kafka_consumer_protocol_member_metadata_new(
159159
rd_kafka_buf_write_topic_partitions(
160160
rkbuf, owned_partitions,
161161
rd_false /*don't skip invalid offsets*/,
162-
rd_false /*any offset*/, fields);
162+
rd_false /*any offset*/, rd_false /* don't use topic_id */,
163+
fields);
163164
}
164165

165166
/* Following data is ignored by consumer version < 2 */

src/rdkafka_cgrp.c

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
361361
rd_kafka_cgrp_set_member_id(rkcg, NULL);
362362
if (rkcg->rkcg_group_instance_id)
363363
rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id);
364+
if (rkcg->rkcg_group_remote_assignor)
365+
rd_kafkap_str_destroy(rkcg->rkcg_group_remote_assignor);
364366

365367
rd_kafka_q_destroy_owner(rkcg->rkcg_q);
366368
rd_kafka_q_destroy_owner(rkcg->rkcg_ops);
@@ -398,18 +400,20 @@ rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t *rkcg, rd_bool_t reset) {
398400

399401

400402
rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
403+
rd_kafka_group_protocol_t group_protocol,
401404
const rd_kafkap_str_t *group_id,
402405
const rd_kafkap_str_t *client_id) {
403406
rd_kafka_cgrp_t *rkcg;
404407

405408
rkcg = rd_calloc(1, sizeof(*rkcg));
406409

407-
rkcg->rkcg_rk = rk;
408-
rkcg->rkcg_group_id = group_id;
409-
rkcg->rkcg_client_id = client_id;
410-
rkcg->rkcg_coord_id = -1;
411-
rkcg->rkcg_generation_id = -1;
412-
rkcg->rkcg_wait_resp = -1;
410+
rkcg->rkcg_rk = rk;
411+
rkcg->rkcg_group_protocol = group_protocol;
412+
rkcg->rkcg_group_id = group_id;
413+
rkcg->rkcg_client_id = client_id;
414+
rkcg->rkcg_coord_id = -1;
415+
rkcg->rkcg_generation_id = -1;
416+
rkcg->rkcg_wait_resp = -1;
413417

414418
rkcg->rkcg_ops = rd_kafka_q_new(rk);
415419
rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
@@ -420,6 +424,8 @@ rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
420424
rkcg->rkcg_q = rd_kafka_consume_q_new(rk);
421425
rkcg->rkcg_group_instance_id =
422426
rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);
427+
rkcg->rkcg_group_remote_assignor =
428+
rd_kafkap_str_new(rk->rk_conf.group_remote_assignor, -1);
423429

424430
TAILQ_INIT(&rkcg->rkcg_topics);
425431
rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
@@ -1513,8 +1519,8 @@ static void rd_kafka_cgrp_handle_SyncGroup_memberstate(
15131519
const rd_kafka_topic_partition_field_t fields[] = {
15141520
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
15151521
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
1516-
if (!(assignment =
1517-
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
1522+
if (!(assignment = rd_kafka_buf_read_topic_partitions(
1523+
rkbuf, rd_false /* don't use topic_id */, 0, fields)))
15181524
goto err_parse;
15191525
rd_kafka_buf_read_kbytes(rkbuf, &UserData);
15201526

@@ -1814,8 +1820,8 @@ static int rd_kafka_group_MemberMetadata_consumer_read(
18141820
RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
18151821
RD_KAFKA_TOPIC_PARTITION_FIELD_END};
18161822
if (Version >= 1 &&
1817-
!(rkgm->rkgm_owned =
1818-
rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
1823+
!(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(
1824+
rkbuf, rd_false /* don't use topic_id */, 0, fields)))
18191825
goto err;
18201826

18211827
if (Version >= 2) {
@@ -5567,6 +5573,11 @@ rd_kafka_consumer_group_metadata(rd_kafka_t *rk) {
55675573
return cgmetadata;
55685574
}
55695575

5576+
const char *rd_kafka_consumer_group_metadata_member_id(
5577+
const rd_kafka_consumer_group_metadata_t *group_metadata) {
5578+
return group_metadata->member_id;
5579+
}
5580+
55705581
void rd_kafka_consumer_group_metadata_destroy(
55715582
rd_kafka_consumer_group_metadata_t *cgmetadata) {
55725583
rd_free(cgmetadata->group_id);

src/rdkafka_cgrp.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,12 @@ typedef struct rd_kafka_cgrp_s {
190190
int32_t rkcg_coord_id; /**< Current coordinator id,
191191
* or -1 if not known. */
192192

193+
rd_kafka_group_protocol_t
194+
rkcg_group_protocol; /**< Group protocol to use */
195+
196+
rd_kafkap_str_t *rkcg_group_remote_assignor; /**< Group remote
197+
* assignor to use */
198+
193199
rd_kafka_broker_t *rkcg_curr_coord; /**< Current coordinator
194200
* broker handle, or NULL.
195201
* rkcg_coord's nodename is
@@ -313,6 +319,7 @@ extern const char *rd_kafka_cgrp_join_state_names[];
313319

314320
void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg);
315321
rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
322+
rd_kafka_group_protocol_t group_protocol,
316323
const rd_kafkap_str_t *group_id,
317324
const rd_kafkap_str_t *client_id);
318325
void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg);

src/rdkafka_conf.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1132,9 +1132,26 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
11321132
"Group session keepalive heartbeat interval.", 1, 3600 * 1000, 3 * 1000},
11331133
{_RK_GLOBAL | _RK_CGRP, "group.protocol.type", _RK_C_KSTR,
11341134
_RK(group_protocol_type),
1135-
"Group protocol type. NOTE: Currently, the only supported group "
1135+
"Group protocol type for the `generic` group protocol. NOTE: Currently, "
1136+
"the only supported group "
11361137
"protocol type is `consumer`.",
11371138
.sdef = "consumer"},
1139+
{_RK_GLOBAL | _RK_CGRP | _RK_HIGH | _RK_HIDDEN, "group.protocol", _RK_C_S2I,
1140+
_RK(group_protocol),
1141+
"Group protocol to use. Use `generic` for the original protocol and "
1142+
"`consumer` for the new "
1143+
"protocol introduced in KIP-848. Available protocols: generic or "
1144+
"consumer. Default is `generic`, "
1145+
"but will change to `consumer` in next releases.",
1146+
.vdef = RD_KAFKA_GROUP_PROTOCOL_GENERIC,
1147+
.s2i = {{RD_KAFKA_GROUP_PROTOCOL_GENERIC, "generic"},
1148+
{RD_KAFKA_GROUP_PROTOCOL_CONSUMER, "consumer"}}},
1149+
{_RK_GLOBAL | _RK_CGRP | _RK_MED | _RK_HIDDEN, "group.remote.assignor",
1150+
_RK_C_STR, _RK(group_remote_assignor),
1151+
"Server side assignor to use. Keep it null to make server select a "
1152+
"suitable assignor for the group. "
1153+
"Available assignors: uniform or range. Default is null",
1154+
.sdef = NULL},
11381155
{_RK_GLOBAL | _RK_CGRP, "coordinator.query.interval.ms", _RK_C_INT,
11391156
_RK(coord_query_intvl_ms),
11401157
"How often to query for the current client group coordinator. "

src/rdkafka_conf.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,11 @@ typedef enum {
163163
RD_KAFKA_RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY,
164164
} rd_kafka_client_dns_lookup_t;
165165

166+
typedef enum {
167+
RD_KAFKA_GROUP_PROTOCOL_GENERIC,
168+
RD_KAFKA_GROUP_PROTOCOL_CONSUMER,
169+
} rd_kafka_group_protocol_t;
170+
166171
/* Increase in steps of 64 as needed.
167172
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
168173
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
@@ -363,8 +368,10 @@ struct rd_kafka_conf_s {
363368
int fetch_min_bytes;
364369
int fetch_queue_backoff_ms;
365370
int fetch_error_backoff_ms;
371+
rd_kafka_group_protocol_t group_protocol;
366372
char *group_id_str;
367373
char *group_instance_id;
374+
char *group_remote_assignor;
368375
int allow_auto_create_topics;
369376

370377
rd_kafka_pattern_list_t *topic_blacklist;

0 commit comments

Comments
 (0)