Skip to content

Commit e92ea0e

Browse files
committed
Refresh broker list even if no topics to refresh (confluentinc#2466)
1 parent dda6cf2 commit e92ea0e

5 files changed

+42
-13
lines changed

CONFIGURATION.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ receive.message.max.bytes | * | 1000 .. 2147483647 | 100000
1313
max.in.flight.requests.per.connection | * | 1 .. 1000000 | 1000000 | low | Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
1414
max.in.flight | * | 1 .. 1000000 | 1000000 | low | Alias for `max.in.flight.requests.per.connection`: Maximum number of in-flight requests per broker connection. This is a generic property applied to all broker communication, however it is primarily relevant to produce requests. In particular, note that other mechanisms limit the number of outstanding consumer fetch request per broker to one. <br>*Type: integer*
1515
metadata.request.timeout.ms | * | 10 .. 900000 | 60000 | low | Non-topic request timeout in milliseconds. This is for metadata requests, etc. <br>*Type: integer*
16-
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Topic metadata refresh interval in milliseconds. The metadata is automatically refreshed on error and connect. Use -1 to disable the intervalled refresh. <br>*Type: integer*
16+
topic.metadata.refresh.interval.ms | * | -1 .. 3600000 | 300000 | low | Period of time in milliseconds at which topic and broker metadata is refreshed in order to proactively discover any new brokers, topics, partitions or partition leader changes. Use -1 to disable the intervalled refresh (not recommended). If there are no locally referenced topics (no topic objects created, no messages produced, no subscription or no assignment) then only the broker list will be refreshed every interval but no more often than every 10s. <br>*Type: integer*
1717
metadata.max.age.ms | * | 1 .. 86400000 | 900000 | low | Metadata cache max age. Defaults to topic.metadata.refresh.interval.ms * 3 <br>*Type: integer*
1818
topic.metadata.refresh.fast.interval.ms | * | 1 .. 60000 | 250 | low | When a topic loses its leader a new metadata request will be enqueued with this initial interval, exponentially increasing until the topic metadata has been refreshed. This is used to recover quickly from transitioning leader brokers. <br>*Type: integer*
1919
topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10 | low | **DEPRECATED** No longer used. <br>*Type: integer*

src/rdkafka.c

+18-4
Original file line numberDiff line numberDiff line change
@@ -1758,10 +1758,23 @@ static void rd_kafka_metadata_refresh_cb (rd_kafka_timers_t *rkts, void *arg) {
17581758
rk->rk_cgrp->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
17591759
sparse = 0;
17601760

1761-
if (sparse)
1762-
rd_kafka_metadata_refresh_known_topics(
1763-
rk, NULL, 1/*force*/, "periodic refresh");
1764-
else
1761+
if (sparse) {
1762+
if (rd_kafka_metadata_refresh_known_topics(
1763+
rk, NULL, 1/*force*/,
1764+
"periodic topic and broker list refresh") ==
1765+
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC &&
1766+
rd_interval(&rk->rk_suppress.broker_metadata_refresh,
1767+
10*1000*1000 /*10s*/, 0) > 0) {
1768+
/* If there are no (locally referenced) topics
1769+
* to query, refresh the broker list.
1770+
* This avoids getting idle-disconnected for clients
1771+
* that have not yet referenced a topic and makes
1772+
* sure such a client has an up to date broker list. */
1773+
rd_kafka_metadata_refresh_brokers(
1774+
rk, NULL,
1775+
"periodic broker list refresh");
1776+
}
1777+
} else
17651778
rd_kafka_metadata_refresh_all(rk, NULL, "periodic refresh");
17661779
}
17671780

@@ -1961,6 +1974,7 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
19611974
mtx_init(&rk->rk_init_lock, mtx_plain);
19621975

19631976
rd_interval_init(&rk->rk_suppress.no_idemp_brokers);
1977+
rd_interval_init(&rk->rk_suppress.broker_metadata_refresh);
19641978
rd_interval_init(&rk->rk_suppress.sparse_connect_random);
19651979
mtx_init(&rk->rk_suppress.sparse_connect_lock, mtx_plain);
19661980

src/rdkafka_conf.c

+11-6
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,17 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
304304
"Non-topic request timeout in milliseconds. "
305305
"This is for metadata requests, etc.",
306306
10, 900*1000, 60*1000},
307-
{ _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
308-
_RK(metadata_refresh_interval_ms),
309-
"Topic metadata refresh interval in milliseconds. "
310-
"The metadata is automatically refreshed on error and connect. "
311-
"Use -1 to disable the intervalled refresh.",
312-
-1, 3600*1000, 5*60*1000 },
307+
{ _RK_GLOBAL, "topic.metadata.refresh.interval.ms", _RK_C_INT,
308+
_RK(metadata_refresh_interval_ms),
309+
"Period of time in milliseconds at which topic and broker "
310+
"metadata is refreshed in order to proactively discover any new "
311+
"brokers, topics, partitions or partition leader changes. "
312+
"Use -1 to disable the intervalled refresh (not recommended). "
313+
"If there are no locally referenced topics "
314+
"(no topic objects created, no messages produced, "
315+
"no subscription or no assignment) then only the broker list will "
316+
"be refreshed every interval but no more often than every 10s.",
317+
-1, 3600*1000, 5*60*1000 },
313318
{ _RK_GLOBAL, "metadata.max.age.ms", _RK_C_INT,
314319
_RK(metadata_max_age_ms),
315320
"Metadata cache max age. "

src/rdkafka_int.h

+9
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,15 @@ struct rd_kafka_s {
355355
rd_interval_t sparse_connect_random;
356356
/**< Lock for sparse_connect_random */
357357
mtx_t sparse_connect_lock;
358+
359+
/**< Broker metadata refresh interval:
360+
* this is rate-limiting the number of topic-less
361+
* broker/cluster metadata refreshes when there are no
362+
* topics to refresh.
363+
* Will be refreshed every topic.metadata.refresh.interval.ms
364+
* but no more often than every 10s.
365+
* No locks: only accessed by rdkafka main thread. */
366+
rd_interval_t broker_metadata_refresh;
358367
} rk_suppress;
359368

360369
struct {

src/rdkafka_metadata.c

+3-2
Original file line numberDiff line numberDiff line change
@@ -583,8 +583,9 @@ rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb,
583583
}
584584

585585
/* Check if cgrp effective subscription is affected by
586-
* new metadata. */
587-
if (rkb->rkb_rk->rk_cgrp)
586+
* new topic metadata.
587+
* Ignore if this was a broker-only refresh (no topics) */
588+
if ((requested_topics || all_topics) && rkb->rkb_rk->rk_cgrp)
588589
rd_kafka_cgrp_metadata_update_check(
589590
rkb->rkb_rk->rk_cgrp, 1/*do join*/);
590591

0 commit comments

Comments
 (0)