@@ -130,30 +130,22 @@ def test_server_version_02(self):
130
130
CORRECT_PASSWORD = 'correct\u1680 password'
131
131
132
132
133
- class TestAuthentication (tb .ConnectedTestCase ):
133
+ class BaseTestAuthentication (tb .ConnectedTestCase ):
134
+ USERS = []
135
+
134
136
def setUp (self ):
135
137
super ().setUp ()
136
138
137
139
if not self .cluster .is_managed ():
138
140
self .skipTest ('unmanaged cluster' )
139
141
140
- methods = [
141
- ('trust' , None ),
142
- ('reject' , None ),
143
- ('scram-sha-256' , CORRECT_PASSWORD ),
144
- ('md5' , CORRECT_PASSWORD ),
145
- ('password' , CORRECT_PASSWORD ),
146
- ]
147
-
148
142
self .cluster .reset_hba ()
149
143
150
144
create_script = []
151
- for method , password in methods :
145
+ for username , method , password in self . USERS :
152
146
if method == 'scram-sha-256' and self .server_version .major < 10 :
153
147
continue
154
148
155
- username = method .replace ('-' , '_' )
156
-
157
149
# if this is a SCRAM password, we need to set the encryption method
158
150
# to "scram-sha-256" in order to properly hash the password
159
151
if method == 'scram-sha-256' :
@@ -162,7 +154,7 @@ def setUp(self):
162
154
)
163
155
164
156
create_script .append (
165
- 'CREATE ROLE {}_user WITH LOGIN{};' .format (
157
+ 'CREATE ROLE "{}" WITH LOGIN{};' .format (
166
158
username ,
167
159
f' PASSWORD E{ (password or "" )!r} '
168
160
)
@@ -175,20 +167,20 @@ def setUp(self):
175
167
"SET password_encryption = 'md5';"
176
168
)
177
169
178
- if _system != 'Windows' :
170
+ if _system != 'Windows' and method != 'gss' :
179
171
self .cluster .add_hba_entry (
180
172
type = 'local' ,
181
- database = 'postgres' , user = '{}_user' . format ( username ) ,
173
+ database = 'postgres' , user = username ,
182
174
auth_method = method )
183
175
184
176
self .cluster .add_hba_entry (
185
177
type = 'host' , address = ipaddress .ip_network ('127.0.0.0/24' ),
186
- database = 'postgres' , user = '{}_user' . format ( username ) ,
178
+ database = 'postgres' , user = username ,
187
179
auth_method = method )
188
180
189
181
self .cluster .add_hba_entry (
190
182
type = 'host' , address = ipaddress .ip_network ('::1/128' ),
191
- database = 'postgres' , user = '{}_user' . format ( username ) ,
183
+ database = 'postgres' , user = username ,
192
184
auth_method = method )
193
185
194
186
# Put hba changes into effect
@@ -201,28 +193,28 @@ def tearDown(self):
201
193
# Reset cluster's pg_hba.conf since we've meddled with it
202
194
self .cluster .trust_local_connections ()
203
195
204
- methods = [
205
- 'trust' ,
206
- 'reject' ,
207
- 'scram-sha-256' ,
208
- 'md5' ,
209
- 'password' ,
210
- ]
211
-
212
196
drop_script = []
213
- for method in methods :
197
+ for username , method , _ in self . USERS :
214
198
if method == 'scram-sha-256' and self .server_version .major < 10 :
215
199
continue
216
200
217
- username = method .replace ('-' , '_' )
218
-
219
- drop_script .append ('DROP ROLE {}_user;' .format (username ))
201
+ drop_script .append ('DROP ROLE "{}";' .format (username ))
220
202
221
203
drop_script = '\n ' .join (drop_script )
222
204
self .loop .run_until_complete (self .con .execute (drop_script ))
223
205
224
206
super ().tearDown ()
225
207
208
+
209
+ class TestAuthentication (BaseTestAuthentication ):
210
+ USERS = [
211
+ ('trust_user' , 'trust' , None ),
212
+ ('reject_user' , 'reject' , None ),
213
+ ('scram_sha_256_user' , 'scram-sha-256' , CORRECT_PASSWORD ),
214
+ ('md5_user' , 'md5' , CORRECT_PASSWORD ),
215
+ ('password_user' , 'password' , CORRECT_PASSWORD ),
216
+ ]
217
+
226
218
async def _try_connect (self , ** kwargs ):
227
219
# On Windows the server sometimes just closes
228
220
# the connection sooner than we receive the
@@ -388,6 +380,62 @@ async def test_auth_md5_unsupported(self, _):
388
380
await self .connect (user = 'md5_user' , password = CORRECT_PASSWORD )
389
381
390
382
383
+ class TestGssAuthentication (BaseTestAuthentication ):
384
+ @classmethod
385
+ def setUpClass (cls ):
386
+ try :
387
+ from k5test .realm import K5Realm
388
+ except ModuleNotFoundError :
389
+ raise unittest .SkipTest ('k5test not installed' )
390
+
391
+ cls .realm = K5Realm ()
392
+ cls .addClassCleanup (cls .realm .stop )
393
+ # Setup environment before starting the cluster.
394
+ patch = unittest .mock .patch .dict (os .environ , cls .realm .env )
395
+ patch .start ()
396
+ cls .addClassCleanup (patch .stop )
397
+ # Add credentials.
398
+ cls .realm .addprinc ('postgres/localhost' )
399
+ cls .realm .extract_keytab ('postgres/localhost' , cls .realm .keytab )
400
+
401
+ cls .USERS = [(cls .realm .user_princ , 'gss' , None )]
402
+ super ().setUpClass ()
403
+
404
+ cls .cluster .override_connection_spec (host = 'localhost' )
405
+
406
+ @classmethod
407
+ def get_server_settings (cls ):
408
+ settings = super ().get_server_settings ()
409
+ settings ['krb_server_keyfile' ] = f'FILE:{ cls .realm .keytab } '
410
+ return settings
411
+
412
+ @classmethod
413
+ def setup_cluster (cls ):
414
+ cls .cluster = cls .new_cluster (pg_cluster .TempCluster )
415
+ cls .start_cluster (
416
+ cls .cluster , server_settings = cls .get_server_settings ())
417
+
418
+ async def test_auth_gssapi (self ):
419
+ conn = await self .connect (user = self .realm .user_princ )
420
+ await conn .close ()
421
+
422
+ # Service name mismatch.
423
+ with self .assertRaisesRegex (
424
+ exceptions .InternalClientError ,
425
+ 'Server .* not found'
426
+ ):
427
+ await self .connect (user = self .realm .user_princ , krbsrvname = 'wrong' )
428
+
429
+ # Credentials mismatch.
430
+ self .realm .addprinc ('wrong_user' , 'password' )
431
+ self .realm .kinit ('wrong_user' , 'password' )
432
+ with self .assertRaisesRegex (
433
+ exceptions .InvalidAuthorizationSpecificationError ,
434
+ 'GSSAPI authentication failed for user'
435
+ ):
436
+ await self .connect (user = self .realm .user_princ )
437
+
438
+
391
439
class TestConnectParams (tb .TestCase ):
392
440
393
441
TESTS = [
0 commit comments