|
6 | 6 |
|
7 | 7 | import pytest
|
8 | 8 |
|
9 |
| -from redis import Redis |
| 9 | +from redis import Redis, exceptions |
10 | 10 | from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff
|
11 | 11 | from redis.cluster import (
|
12 | 12 | PRIMARY,
|
|
43 | 43 | skip_unless_arch_bits,
|
44 | 44 | wait_for_command,
|
45 | 45 | )
|
| 46 | +from .test_credentials import init_acl_user, init_required_pass |
46 | 47 |
|
47 | 48 | default_host = "127.0.0.1"
|
48 | 49 | default_port = 7000
|
@@ -813,8 +814,6 @@ def raise_connection_error():
|
813 | 814 | nodes = r.cluster_nodes()
|
814 | 815 | assert "myself" not in nodes.get(curr_default_node.name).get("flags")
|
815 | 816 | assert r.get_default_node() != curr_default_node
|
816 |
| - # Rollback to the old default node |
817 |
| - r.replace_default_node(curr_default_node) |
818 | 817 |
|
819 | 818 |
|
820 | 819 | @pytest.mark.onlycluster
|
@@ -2138,6 +2137,39 @@ def teardown():
|
2138 | 2137 | assert "client-info" in r.acl_log(count=1, target_nodes=node)[0]
|
2139 | 2138 | assert r.acl_log_reset(target_nodes=node)
|
2140 | 2139 |
|
| 2140 | + @skip_if_redis_enterprise() |
| 2141 | + def test_auth_requirepass(self, request): |
| 2142 | + r = _get_client(RedisCluster, request, flushdb=False) |
| 2143 | + # Test for default user (`username` is supposed to be optional) |
| 2144 | + default_username = "default" |
| 2145 | + temp_pass = "temp_pass" |
| 2146 | + init_required_pass(r, request, temp_pass) |
| 2147 | + r2 = _get_client( |
| 2148 | + RedisCluster, |
| 2149 | + request, |
| 2150 | + flushdb=False, |
| 2151 | + username=default_username, |
| 2152 | + password=temp_pass, |
| 2153 | + ) |
| 2154 | + assert r2.auth(temp_pass, default_username) is True |
| 2155 | + assert r2.auth(temp_pass) is True |
| 2156 | + |
| 2157 | + @skip_if_redis_enterprise() |
| 2158 | + def test_auth_acl(self, request): |
| 2159 | + r = _get_client(RedisCluster, request, flushdb=False) |
| 2160 | + # test for ACL users |
| 2161 | + username = "redis-py-auth" |
| 2162 | + password = "strong_password" |
| 2163 | + |
| 2164 | + init_acl_user(r, request, username, password) |
| 2165 | + r2 = _get_client( |
| 2166 | + RedisCluster, request, flushdb=False, username=username, password=password |
| 2167 | + ) |
| 2168 | + assert r2.auth(username=username, password="strong_password") is True |
| 2169 | + |
| 2170 | + with pytest.raises(exceptions.AuthenticationError): |
| 2171 | + r2.auth(username=username, password="wrong_password") |
| 2172 | + |
2141 | 2173 |
|
2142 | 2174 | @pytest.mark.onlycluster
|
2143 | 2175 | class TestNodesManager:
|
|
0 commit comments