Skip to content

Ignore leading SECURITY_PROTOCOL:// in bootstrap_servers #2608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import collections
import copy
import logging
import random
import re
import threading
import time

from kafka.vendor import six

from kafka import errors as Errors
from kafka.conn import collect_hosts
from kafka.conn import get_ip_port_afi
from kafka.future import Future
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition

Expand Down Expand Up @@ -422,3 +424,24 @@ def with_partitions(self, partitions_to_add):
def __str__(self):
return 'ClusterMetadata(brokers: %d, topics: %d, coordinators: %d)' % \
(len(self._brokers), len(self._partitions), len(self._coordinators))


def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
"""

if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')

result = []
for host_port in hosts:
# ignore leading SECURITY_PROTOCOL:// to mimic java client
host_port = re.sub('^.*://', '', host_port)
host, port, afi = get_ip_port_afi(host_port)
result.append((host, port, afi))

if randomize:
random.shuffle(result)
return result
28 changes: 1 addition & 27 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import errno
import io
import logging
from random import shuffle, uniform
from random import uniform

# selectors in stdlib as of py3.4
try:
Expand Down Expand Up @@ -1496,32 +1496,6 @@ def get_ip_port_afi(host_and_port_str):
return host, port, af


def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionally
randomize the returned list.
"""

if isinstance(hosts, six.string_types):
hosts = hosts.strip().split(',')

result = []
afi = socket.AF_INET
for host_port in hosts:

host, port, afi = get_ip_port_afi(host_port)

if port < 0:
port = DEFAULT_KAFKA_PORT

result.append((host, port, afi))

if randomize:
shuffle(result)

return result


def is_inet_4_or_6(gai):
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
return gai[0] in (socket.AF_INET, socket.AF_INET6)
Expand Down
61 changes: 60 additions & 1 deletion test/test_cluster.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# pylint: skip-file
from __future__ import absolute_import

from kafka.cluster import ClusterMetadata
import socket

from kafka.cluster import ClusterMetadata, collect_hosts
from kafka.protocol.metadata import MetadataResponse


Expand Down Expand Up @@ -132,3 +134,60 @@ def test_metadata_v7():
assert cluster.cluster_id == 'cluster-foo'
assert cluster._partitions['topic-1'][0].offline_replicas == [12]
assert cluster._partitions['topic-1'][0].leader_epoch == 0


def test_collect_hosts__happy_path():
hosts = "127.0.0.1:1234,127.0.0.1"
results = collect_hosts(hosts)
assert set(results) == set([
('127.0.0.1', 1234, socket.AF_INET),
('127.0.0.1', 9092, socket.AF_INET),
])


def test_collect_hosts__ipv6():
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_INET6),
('2001:1000:2000::1', 9092, socket.AF_INET6),
('2001:1000:2000::1', 1234, socket.AF_INET6),
])


def test_collect_hosts__string_list():
hosts = [
'localhost:1234',
'localhost',
'[localhost]',
'2001::1',
'[2001::1]',
'[2001::1]:1234',
]
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 1234, socket.AF_INET6),
])


def test_collect_hosts__with_spaces():
hosts = "localhost:1234, localhost"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
])


def test_collect_hosts__protocol():
hosts = "SASL_SSL://foo.bar:1234,SASL_SSL://fizz.buzz:5678"
results = collect_hosts(hosts)
assert set(results) == set([
('foo.bar', 1234, socket.AF_UNSPEC),
('fizz.buzz', 5678, socket.AF_UNSPEC),
])
50 changes: 1 addition & 49 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import mock
import pytest

from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts
from kafka.conn import BrokerConnection, ConnectionStates
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.group import HeartbeatResponse
Expand Down Expand Up @@ -280,54 +280,6 @@ def test_close(conn):
pass # TODO


def test_collect_hosts__happy_path():
hosts = "127.0.0.1:1234,127.0.0.1"
results = collect_hosts(hosts)
assert set(results) == set([
('127.0.0.1', 1234, socket.AF_INET),
('127.0.0.1', 9092, socket.AF_INET),
])


def test_collect_hosts__ipv6():
hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_INET6),
('2001:1000:2000::1', 9092, socket.AF_INET6),
('2001:1000:2000::1', 1234, socket.AF_INET6),
])


def test_collect_hosts__string_list():
hosts = [
'localhost:1234',
'localhost',
'[localhost]',
'2001::1',
'[2001::1]',
'[2001::1]:1234',
]
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 9092, socket.AF_INET6),
('2001::1', 1234, socket.AF_INET6),
])


def test_collect_hosts__with_spaces():
hosts = "localhost:1234, localhost"
results = collect_hosts(hosts)
assert set(results) == set([
('localhost', 1234, socket.AF_UNSPEC),
('localhost', 9092, socket.AF_UNSPEC),
])


def test_lookup_on_connect():
hostname = 'example.org'
port = 9092
Expand Down
Loading