|
14 | 14 | # limitations under the License.
|
15 | 15 |
|
16 | 16 | import re
|
| 17 | +from datetime import timedelta |
17 | 18 |
|
18 | 19 | import pytest
|
19 | 20 |
|
20 |
| -from couchbase.exceptions import (CouchbaseException, |
| 21 | +from couchbase.exceptions import (AuthenticationException, |
| 22 | + CouchbaseException, |
21 | 23 | FeatureUnavailableException,
|
22 | 24 | GroupNotFoundException,
|
23 | 25 | InvalidArgumentException,
|
|
29 | 31 | from couchbase.management.users import (Group,
|
30 | 32 | Role,
|
31 | 33 | User)
|
| 34 | +from couchbase.options import ClusterOptions |
| 35 | +from tests.helpers import ClusterInformation |
32 | 36 |
|
| 37 | +from ..auth import PasswordAuthenticator |
| 38 | +from ..cluster import Cluster |
33 | 39 | from ._test_utils import TestEnvironment
|
34 | 40 |
|
35 | 41 |
|
@@ -223,6 +229,50 @@ def test_user_display_name(self, cb_env):
|
223 | 229 |
|
224 | 230 | cb_env.um.drop_user(user.username, DropUserOptions(domain_name="local"))
|
225 | 231 |
|
| 232 | + def test_user_change_password(self, cb_env): |
| 233 | + username = 'change-password-user' |
| 234 | + admin_role = Role(name='admin') |
| 235 | + original_password = 'original_password' |
| 236 | + new_password = 'new_password' |
| 237 | + |
| 238 | + change_password_user = User(username=username, |
| 239 | + display_name="Change Password User", |
| 240 | + roles=admin_role, |
| 241 | + password=original_password) |
| 242 | + # Upsert user |
| 243 | + cb_env.um.upsert_user(change_password_user, UpsertUserOptions(domain_name="local")) |
| 244 | + |
| 245 | + # Authenticate as change-password-user. |
| 246 | + # Done in a while loop to emulate retry |
| 247 | + auth = PasswordAuthenticator(username, original_password) |
| 248 | + new_cluster = None |
| 249 | + while True: |
| 250 | + try: |
| 251 | + new_cluster = Cluster.connect(cb_env.cluster._connstr, ClusterOptions(auth)) |
| 252 | + except AuthenticationException: |
| 253 | + continue |
| 254 | + break |
| 255 | + |
| 256 | + # Change password |
| 257 | + new_cluster.users().change_password(new_password) |
| 258 | + |
| 259 | + # Assert can authenticate using new password |
| 260 | + success_auth = PasswordAuthenticator(username, new_password) |
| 261 | + while True: |
| 262 | + try: |
| 263 | + success_cluster = Cluster.connect(cb_env.cluster._connstr, ClusterOptions(success_auth)) |
| 264 | + except AuthenticationException: |
| 265 | + continue |
| 266 | + success_cluster.close() |
| 267 | + break |
| 268 | + |
| 269 | + # Assert cannot authenticate using old password |
| 270 | + fail_auth = PasswordAuthenticator(username, original_password) |
| 271 | + with pytest.raises(AuthenticationException): |
| 272 | + Cluster.connect(cb_env.cluster._connstr, ClusterOptions(fail_auth)) |
| 273 | + |
| 274 | + new_cluster.close() |
| 275 | + |
226 | 276 | def test_external_user(self, cb_env):
|
227 | 277 | """
|
228 | 278 | test_external_user()
|
|
0 commit comments