File tree 1 file changed +15
-2
lines changed
1 file changed +15
-2
lines changed Original file line number Diff line number Diff line change 1
1
from __future__ import absolute_import
2
2
3
3
import abc
4
+ import logging
4
5
5
6
from kafka .sasl .abc import SaslMechanism
6
7
7
8
9
+ log = logging .getLogger (__name__ )
10
+
11
+
8
12
class SaslMechanismOAuth (SaslMechanism ):
9
13
10
14
def __init__ (self , ** config ):
11
15
assert 'sasl_oauth_token_provider' in config , 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
12
16
assert isinstance (config ['sasl_oauth_token_provider' ], AbstractTokenProvider ), \
13
17
'sasl_oauth_token_provider must implement kafka.sasl.oauth.AbstractTokenProvider'
14
18
self .token_provider = config ['sasl_oauth_token_provider' ]
19
+ self ._error = None
15
20
self ._is_done = False
16
21
self ._is_authenticated = False
17
22
18
23
def auth_bytes (self ):
24
+ if self ._error :
25
+ # Server should respond to this with SaslAuthenticate failure, which ends the auth process
26
+ return self ._error
19
27
token = self .token_provider .token ()
20
28
extensions = self ._token_extensions ()
21
29
return "n,,\x01 auth=Bearer {}{}\x01 \x01 " .format (token , extensions ).encode ('utf-8' )
22
30
23
31
def receive (self , auth_bytes ):
24
- self ._is_done = True
25
- self ._is_authenticated = auth_bytes == b''
32
+ if auth_bytes != b'' :
33
+ error = auth_bytes .decode ('utf-8' )
34
+ log .debug ("Sending x01 response to server after receiving SASL OAuth error: %s" , error )
35
+ self ._error = b'\x01 '
36
+ else :
37
+ self ._is_done = True
38
+ self ._is_authenticated = True
26
39
27
40
def is_done (self ):
28
41
return self ._is_done
You can’t perform that action at this time.
0 commit comments