From a755a829ff0e047f36648cdcd79a84b312f157c3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 1 May 2025 12:48:56 -0700 Subject: [PATCH] Ignore leading SECURITY_PROTOCOL:// in bootstrap_servers --- kafka/cluster.py | 25 +++++++++++++++++- kafka/conn.py | 28 +------------------- test/test_cluster.py | 61 +++++++++++++++++++++++++++++++++++++++++++- test/test_conn.py | 50 +----------------------------------- 4 files changed, 86 insertions(+), 78 deletions(-) diff --git a/kafka/cluster.py b/kafka/cluster.py index ae822a401..d6ec82dba 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -3,13 +3,15 @@ import collections import copy import logging +import random +import re import threading import time from kafka.vendor import six from kafka import errors as Errors -from kafka.conn import collect_hosts +from kafka.conn import get_ip_port_afi from kafka.future import Future from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition @@ -422,3 +424,24 @@ def with_partitions(self, partitions_to_add): def __str__(self): return 'ClusterMetadata(brokers: %d, topics: %d, coordinators: %d)' % \ (len(self._brokers), len(self._partitions), len(self._coordinators)) + + +def collect_hosts(hosts, randomize=True): + """ + Collects a comma-separated set of hosts (host:port) and optionally + randomize the returned list. + """ + + if isinstance(hosts, six.string_types): + hosts = hosts.strip().split(',') + + result = [] + for host_port in hosts: + # ignore leading SECURITY_PROTOCOL:// to mimic java client + host_port = re.sub('^.*://', '', host_port) + host, port, afi = get_ip_port_afi(host_port) + result.append((host, port, afi)) + + if randomize: + random.shuffle(result) + return result diff --git a/kafka/conn.py b/kafka/conn.py index 8dd65c1c0..c9cdd595f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -4,7 +4,7 @@ import errno import io import logging -from random import shuffle, uniform +from random import uniform # selectors in stdlib as of py3.4 try: @@ -1496,32 +1496,6 @@ def get_ip_port_afi(host_and_port_str): return host, port, af -def collect_hosts(hosts, randomize=True): - """ - Collects a comma-separated set of hosts (host:port) and optionally - randomize the returned list. - """ - - if isinstance(hosts, six.string_types): - hosts = hosts.strip().split(',') - - result = [] - afi = socket.AF_INET - for host_port in hosts: - - host, port, afi = get_ip_port_afi(host_port) - - if port < 0: - port = DEFAULT_KAFKA_PORT - - result.append((host, port, afi)) - - if randomize: - shuffle(result) - - return result - - def is_inet_4_or_6(gai): """Given a getaddrinfo struct, return True iff ipv4 or ipv6""" return gai[0] in (socket.AF_INET, socket.AF_INET6) diff --git a/test/test_cluster.py b/test/test_cluster.py index f0a2f83d6..c57bd8f9f 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -1,7 +1,9 @@ # pylint: skip-file from __future__ import absolute_import -from kafka.cluster import ClusterMetadata +import socket + +from kafka.cluster import ClusterMetadata, collect_hosts from kafka.protocol.metadata import MetadataResponse @@ -132,3 +134,60 @@ def test_metadata_v7(): assert cluster.cluster_id == 'cluster-foo' assert cluster._partitions['topic-1'][0].offline_replicas == [12] assert cluster._partitions['topic-1'][0].leader_epoch == 0 + + +def test_collect_hosts__happy_path(): + hosts = "127.0.0.1:1234,127.0.0.1" + results = collect_hosts(hosts) + assert set(results) == set([ + ('127.0.0.1', 1234, socket.AF_INET), + ('127.0.0.1', 9092, socket.AF_INET), + ]) + + +def test_collect_hosts__ipv6(): + hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234" + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_INET6), + ('2001:1000:2000::1', 9092, socket.AF_INET6), + ('2001:1000:2000::1', 1234, socket.AF_INET6), + ]) + + +def test_collect_hosts__string_list(): + hosts = [ + 'localhost:1234', + 'localhost', + '[localhost]', + '2001::1', + '[2001::1]', + '[2001::1]:1234', + ] + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_INET6), + ('2001::1', 9092, socket.AF_INET6), + ('2001::1', 9092, socket.AF_INET6), + ('2001::1', 1234, socket.AF_INET6), + ]) + + +def test_collect_hosts__with_spaces(): + hosts = "localhost:1234, localhost" + results = collect_hosts(hosts) + assert set(results) == set([ + ('localhost', 1234, socket.AF_UNSPEC), + ('localhost', 9092, socket.AF_UNSPEC), + ]) + + +def test_collect_hosts__protocol(): + hosts = "SASL_SSL://foo.bar:1234,SASL_SSL://fizz.buzz:5678" + results = collect_hosts(hosts) + assert set(results) == set([ + ('foo.bar', 1234, socket.AF_UNSPEC), + ('fizz.buzz', 5678, socket.AF_UNSPEC), + ]) diff --git a/test/test_conn.py b/test/test_conn.py index b5deb748c..037cd015e 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -10,7 +10,7 @@ import mock import pytest -from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts +from kafka.conn import BrokerConnection, ConnectionStates from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.group import HeartbeatResponse @@ -280,54 +280,6 @@ def test_close(conn): pass # TODO -def test_collect_hosts__happy_path(): - hosts = "127.0.0.1:1234,127.0.0.1" - results = collect_hosts(hosts) - assert set(results) == set([ - ('127.0.0.1', 1234, socket.AF_INET), - ('127.0.0.1', 9092, socket.AF_INET), - ]) - - -def test_collect_hosts__ipv6(): - hosts = "[localhost]:1234,[2001:1000:2000::1],[2001:1000:2000::1]:1234" - results = collect_hosts(hosts) - assert set(results) == set([ - ('localhost', 1234, socket.AF_INET6), - ('2001:1000:2000::1', 9092, socket.AF_INET6), - ('2001:1000:2000::1', 1234, socket.AF_INET6), - ]) - - -def test_collect_hosts__string_list(): - hosts = [ - 'localhost:1234', - 'localhost', - '[localhost]', - '2001::1', - '[2001::1]', - '[2001::1]:1234', - ] - results = collect_hosts(hosts) - assert set(results) == set([ - ('localhost', 1234, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_INET6), - ('2001::1', 9092, socket.AF_INET6), - ('2001::1', 9092, socket.AF_INET6), - ('2001::1', 1234, socket.AF_INET6), - ]) - - -def test_collect_hosts__with_spaces(): - hosts = "localhost:1234, localhost" - results = collect_hosts(hosts) - assert set(results) == set([ - ('localhost', 1234, socket.AF_UNSPEC), - ('localhost', 9092, socket.AF_UNSPEC), - ]) - - def test_lookup_on_connect(): hostname = 'example.org' port = 9092