Skip to content

Commit f7c234d

Browse files
hnousiainendpkp
authored andcommittedMar 14, 2025
Support connections through SOCKS5 proxies
Implement support for SOCKS5 proxies. Implement a new proxy wrapper that handles SOCKS5 connection, authentication and requesting connections to the actual Kafka broker endpoints. The proxy can be configured via a new keyword argument `socks5_proxy` to consumers, producers or admin client. The value is URL with optional username and password. E.g. `socks5://user:secret@proxy.example.com:10800` The implementation is done in state machine that makes progress on repeated calls to connect_ex. The rationale with this bit strange design is to minimize amount of changes on the actual BrokerConnection object.
1 parent a25ffae commit f7c234d

File tree

6 files changed

+271
-5
lines changed

6 files changed

+271
-5
lines changed
 

‎kafka/admin/client.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ class KafkaAdminClient(object):
151151
sasl mechanism handshake. Default: one of bootstrap servers
152152
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
153153
token provider instance. Default: None
154+
socks5_proxy (str): Socks5 proxy url. Default: None
154155
kafka_client (callable): Custom class / callable for creating KafkaClient instances
155-
156156
"""
157157
DEFAULT_CONFIG = {
158158
# client configs
@@ -188,6 +188,7 @@ class KafkaAdminClient(object):
188188
'sasl_kerberos_service_name': 'kafka',
189189
'sasl_kerberos_domain_name': None,
190190
'sasl_oauth_token_provider': None,
191+
'socks5_proxy': None,
191192

192193
# metrics configs
193194
'metric_reporters': [],

‎kafka/client_async.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ class KafkaClient(object):
173173
sasl mechanism handshake. Default: one of bootstrap servers
174174
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
175175
token provider instance. Default: None
176+
socks5_proxy (str): Socks5 proxy URL. Default: None
176177
"""
177178

178179
DEFAULT_CONFIG = {
@@ -213,7 +214,8 @@ class KafkaClient(object):
213214
'sasl_kerberos_name': None,
214215
'sasl_kerberos_service_name': 'kafka',
215216
'sasl_kerberos_domain_name': None,
216-
'sasl_oauth_token_provider': None
217+
'sasl_oauth_token_provider': None,
218+
'socks5_proxy': None,
217219
}
218220

219221
def __init__(self, **configs):

‎kafka/conn.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
3737
from kafka.protocol.types import Int32
3838
from kafka.sasl import get_sasl_mechanism
39+
from kafka.socks5_wrapper import Socks5Wrapper
3940
from kafka.version import __version__
4041

4142

@@ -185,6 +186,7 @@ class BrokerConnection(object):
185186
sasl mechanism handshake. Default: one of bootstrap servers
186187
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
187188
token provider instance. Default: None
189+
socks5_proxy (str): Socks5 proxy url. Default: None
188190
"""
189191

190192
DEFAULT_CONFIG = {
@@ -220,7 +222,8 @@ class BrokerConnection(object):
220222
'sasl_kerberos_name': None,
221223
'sasl_kerberos_service_name': 'kafka',
222224
'sasl_kerberos_domain_name': None,
223-
'sasl_oauth_token_provider': None
225+
'sasl_oauth_token_provider': None,
226+
'socks5_proxy': None,
224227
}
225228
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
226229
VERSION_CHECKS = (
@@ -241,6 +244,7 @@ def __init__(self, host, port, afi, **configs):
241244
self._check_version_idx = None
242245
self._api_versions_idx = 2
243246
self._throttle_time = None
247+
self._socks5_proxy = None
244248

245249
self.config = copy.copy(self.DEFAULT_CONFIG)
246250
for key in self.config:
@@ -362,7 +366,11 @@ def connect(self):
362366
assert self._sock is None
363367
self._sock_afi, self._sock_addr = next_lookup
364368
try:
365-
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
369+
if self.config["socks5_proxy"] is not None:
370+
self._socks5_proxy = Socks5Wrapper(self.config["socks5_proxy"], self.afi)
371+
self._sock = self._socks5_proxy.socket(self._sock_afi, socket.SOCK_STREAM)
372+
else:
373+
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
366374
except (socket.error, OSError) as e:
367375
self.close(e)
368376
return self.state
@@ -382,7 +390,10 @@ def connect(self):
382390
# to check connection status
383391
ret = None
384392
try:
385-
ret = self._sock.connect_ex(self._sock_addr)
393+
if self._socks5_proxy:
394+
ret = self._socks5_proxy.connect_ex(self._sock_addr)
395+
else:
396+
ret = self._sock.connect_ex(self._sock_addr)
386397
except socket.error as err:
387398
ret = err.errno
388399

‎kafka/consumer/group.py

+2
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ class KafkaConsumer(six.Iterator):
260260
sasl mechanism handshake. Default: one of bootstrap servers
261261
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
262262
token provider instance. Default: None
263+
socks5_proxy (str): Socks5 proxy URL. Default: None
263264
kafka_client (callable): Custom class / callable for creating KafkaClient instances
264265
265266
Note:
@@ -325,6 +326,7 @@ class KafkaConsumer(six.Iterator):
325326
'sasl_kerberos_service_name': 'kafka',
326327
'sasl_kerberos_domain_name': None,
327328
'sasl_oauth_token_provider': None,
329+
'socks5_proxy': None,
328330
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
329331
'kafka_client': KafkaClient,
330332
}

‎kafka/producer/kafka.py

+2
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ class KafkaProducer(object):
299299
sasl mechanism handshake. Default: one of bootstrap servers
300300
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider): OAuthBearer
301301
token provider instance. Default: None
302+
socks5_proxy (str): Socks5 proxy URL. Default: None
302303
kafka_client (callable): Custom class / callable for creating KafkaClient instances
303304
304305
Note:
@@ -355,6 +356,7 @@ class KafkaProducer(object):
355356
'sasl_kerberos_service_name': 'kafka',
356357
'sasl_kerberos_domain_name': None,
357358
'sasl_oauth_token_provider': None,
359+
'socks5_proxy': None,
358360
'kafka_client': KafkaClient,
359361
}
360362

‎kafka/socks5_wrapper.py

+248
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
try:
2+
from urllib.parse import urlparse
3+
except ImportError:
4+
from urlparse import urlparse
5+
6+
import errno
7+
import logging
8+
import random
9+
import socket
10+
import struct
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
class ProxyConnectionStates:
16+
DISCONNECTED = '<disconnected>'
17+
CONNECTING = '<connecting>'
18+
NEGOTIATE_PROPOSE = '<negotiate_propose>'
19+
NEGOTIATING = '<negotiating>'
20+
AUTHENTICATING = '<authenticating>'
21+
REQUEST_SUBMIT = '<request_submit>'
22+
REQUESTING = '<requesting>'
23+
READ_ADDRESS = '<read_address>'
24+
COMPLETE = '<complete>'
25+
26+
27+
class Socks5Wrapper:
28+
"""Socks5 proxy wrapper
29+
30+
Manages connection through socks5 proxy with support for username/password
31+
authentication.
32+
"""
33+
34+
def __init__(self, proxy_url, afi):
35+
self._buffer_in = b''
36+
self._buffer_out = b''
37+
self._proxy_url = urlparse(proxy_url)
38+
self._sock = None
39+
self._state = ProxyConnectionStates.DISCONNECTED
40+
self._target_afi = socket.AF_UNSPEC
41+
42+
proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, afi)
43+
# TODO raise error on lookup failure
44+
self._proxy_addr = random.choice(proxy_addrs)
45+
46+
@classmethod
47+
def is_inet_4_or_6(cls, gai):
48+
"""Given a getaddrinfo struct, return True iff ipv4 or ipv6"""
49+
return gai[0] in (socket.AF_INET, socket.AF_INET6)
50+
51+
@classmethod
52+
def dns_lookup(cls, host, port, afi=socket.AF_UNSPEC):
53+
"""Returns a list of getaddrinfo structs, optionally filtered to an afi (ipv4 / ipv6)"""
54+
# XXX: all DNS functions in Python are blocking. If we really
55+
# want to be non-blocking here, we need to use a 3rd-party
56+
# library like python-adns, or move resolution onto its
57+
# own thread. This will be subject to the default libc
58+
# name resolution timeout (5s on most Linux boxes)
59+
try:
60+
return list(filter(cls.is_inet_4_or_6,
61+
socket.getaddrinfo(host, port, afi,
62+
socket.SOCK_STREAM)))
63+
except socket.gaierror as ex:
64+
log.warning("DNS lookup failed for proxy %s:%d, %r", host, port, ex)
65+
return []
66+
67+
def socket(self, family, sock_type):
68+
"""Open and record a socket.
69+
70+
Returns the actual underlying socket
71+
object to ensure e.g. selects and ssl wrapping works as expected.
72+
"""
73+
self._target_afi = family # Store the address family of the target
74+
afi, _, _, _, _ = self._proxy_addr
75+
self._sock = socket.socket(afi, sock_type)
76+
return self._sock
77+
78+
def _flush_buf(self):
79+
"""Send out all data that is stored in the outgoing buffer.
80+
81+
It is expected that the caller handles error handling, including non-blocking
82+
as well as connection failure exceptions.
83+
"""
84+
while self._buffer_out:
85+
sent_bytes = self._sock.send(self._buffer_out)
86+
self._buffer_out = self._buffer_out[sent_bytes:]
87+
88+
def _peek_buf(self, datalen):
89+
"""Ensure local inbound buffer has enough data, and return that data without
90+
consuming the local buffer
91+
92+
It's expected that the caller handles e.g. blocking exceptions"""
93+
while True:
94+
bytes_remaining = datalen - len(self._buffer_in)
95+
if bytes_remaining <= 0:
96+
break
97+
data = self._sock.recv(bytes_remaining)
98+
if not data:
99+
break
100+
self._buffer_in = self._buffer_in + data
101+
102+
return self._buffer_in[:datalen]
103+
104+
def _read_buf(self, datalen):
105+
"""Read and consume bytes from socket connection
106+
107+
It's expected that the caller handles e.g. blocking exceptions"""
108+
buf = self._peek_buf(datalen)
109+
if buf:
110+
self._buffer_in = self._buffer_in[len(buf):]
111+
return buf
112+
113+
def connect_ex(self, addr):
114+
"""Runs a state machine through connection to authentication to
115+
proxy connection request.
116+
117+
The somewhat strange setup is to facilitate non-intrusive use from
118+
BrokerConnection state machine.
119+
120+
This function is called with a socket in non-blocking mode. Both
121+
send and receive calls can return in EWOULDBLOCK/EAGAIN which we
122+
specifically avoid handling here. These are handled in main
123+
BrokerConnection connection loop, which then would retry calls
124+
to this function."""
125+
126+
if self._state == ProxyConnectionStates.DISCONNECTED:
127+
self._state = ProxyConnectionStates.CONNECTING
128+
129+
if self._state == ProxyConnectionStates.CONNECTING:
130+
_, _, _, _, sockaddr = self._proxy_addr
131+
ret = self._sock.connect_ex(sockaddr)
132+
if not ret or ret == errno.EISCONN:
133+
self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE
134+
else:
135+
return ret
136+
137+
if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE:
138+
if self._proxy_url.username and self._proxy_url.password:
139+
# Propose username/password
140+
self._buffer_out = b"\x05\x01\x02"
141+
else:
142+
# Propose no auth
143+
self._buffer_out = b"\x05\x01\x00"
144+
self._state = ProxyConnectionStates.NEGOTIATING
145+
146+
if self._state == ProxyConnectionStates.NEGOTIATING:
147+
self._flush_buf()
148+
buf = self._read_buf(2)
149+
if buf[0:1] != b"\x05":
150+
log.error("Unrecognized SOCKS version")
151+
self._state = ProxyConnectionStates.DISCONNECTED
152+
self._sock.close()
153+
return errno.ECONNREFUSED
154+
155+
if buf[1:2] == b"\x00":
156+
# No authentication required
157+
self._state = ProxyConnectionStates.REQUEST_SUBMIT
158+
elif buf[1:2] == b"\x02":
159+
# Username/password authentication selected
160+
userlen = len(self._proxy_url.username)
161+
passlen = len(self._proxy_url.password)
162+
self._buffer_out = struct.pack(
163+
"!bb{}sb{}s".format(userlen, passlen),
164+
1, # version
165+
userlen,
166+
self._proxy_url.username.encode(),
167+
passlen,
168+
self._proxy_url.password.encode(),
169+
)
170+
self._state = ProxyConnectionStates.AUTHENTICATING
171+
else:
172+
log.error("Unrecognized SOCKS authentication method")
173+
self._state = ProxyConnectionStates.DISCONNECTED
174+
self._sock.close()
175+
return errno.ECONNREFUSED
176+
177+
if self._state == ProxyConnectionStates.AUTHENTICATING:
178+
self._flush_buf()
179+
buf = self._read_buf(2)
180+
if buf == b"\x01\x00":
181+
# Authentication succesful
182+
self._state = ProxyConnectionStates.REQUEST_SUBMIT
183+
else:
184+
log.error("Socks5 proxy authentication failure")
185+
self._state = ProxyConnectionStates.DISCONNECTED
186+
self._sock.close()
187+
return errno.ECONNREFUSED
188+
189+
if self._state == ProxyConnectionStates.REQUEST_SUBMIT:
190+
if self._target_afi == socket.AF_INET:
191+
addr_type = 1
192+
addr_len = 4
193+
elif self._target_afi == socket.AF_INET6:
194+
addr_type = 4
195+
addr_len = 16
196+
else:
197+
log.error("Unknown address family, %r", self._target_afi)
198+
self._state = ProxyConnectionStates.DISCONNECTED
199+
self._sock.close()
200+
return errno.ECONNREFUSED
201+
202+
self._buffer_out = struct.pack(
203+
"!bbbb{}sh".format(addr_len),
204+
5, # version
205+
1, # command: connect
206+
0, # reserved
207+
addr_type, # 1 for ipv4, 4 for ipv6 address
208+
socket.inet_pton(self._target_afi, addr[0]), # either 4 or 16 bytes of actual address
209+
addr[1], # port
210+
)
211+
self._state = ProxyConnectionStates.REQUESTING
212+
213+
if self._state == ProxyConnectionStates.REQUESTING:
214+
self._flush_buf()
215+
buf = self._read_buf(2)
216+
if buf[0:2] == b"\x05\x00":
217+
self._state = ProxyConnectionStates.READ_ADDRESS
218+
else:
219+
log.error("Proxy request failed: %r", buf[1:2])
220+
self._state = ProxyConnectionStates.DISCONNECTED
221+
self._sock.close()
222+
return errno.ECONNREFUSED
223+
224+
if self._state == ProxyConnectionStates.READ_ADDRESS:
225+
# we don't really care about the remote endpoint address, but need to clear the stream
226+
buf = self._peek_buf(2)
227+
if buf[0:2] == b"\x00\x01":
228+
_ = self._read_buf(2 + 4 + 2) # ipv4 address + port
229+
elif buf[0:2] == b"\x00\x05":
230+
_ = self._read_buf(2 + 16 + 2) # ipv6 address + port
231+
else:
232+
log.error("Unrecognized remote address type %r", buf[1:2])
233+
self._state = ProxyConnectionStates.DISCONNECTED
234+
self._sock.close()
235+
return errno.ECONNREFUSED
236+
self._state = ProxyConnectionStates.COMPLETE
237+
238+
if self._state == ProxyConnectionStates.COMPLETE:
239+
return 0
240+
241+
# not reached;
242+
# Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by
243+
# the caller. The caller re-enters this state machine from retry logic with timer or via select & family
244+
log.error("Internal error, state %r not handled correctly", self._state)
245+
self._state = ProxyConnectionStates.DISCONNECTED
246+
if self._sock:
247+
self._sock.close()
248+
return errno.ECONNREFUSED

0 commit comments

Comments
 (0)