Skip to content

Commit 1d9bb7d

Browse files
author
wallisyan
authored
Merge pull request aliyun#236 from aliyun/refactor4_functional_test
refactor4_credentials_test
2 parents 4ad4b61 + 2179d80 commit 1d9bb7d

File tree

3 files changed

+439
-0
lines changed

3 files changed

+439
-0
lines changed

aliyun-python-sdk-core/tests/credentials/__init__.py

Whitespace-only changes.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# coding=utf-8
2+
3+
import os
4+
from unittest import mock
5+
6+
from alibabacloud.credentials import AccessKeyCredentials, SecurityCredentials
7+
from alibabacloud.credentials.provider import StaticCredentialsProvider, CachedCredentialsProvider, \
8+
RotatingCredentialsProvider, CredentialsProvider, \
9+
ProfileCredentialsProvider, ChainedCredentialsProvider, EnvCredentialsProvider, \
10+
DefaultChainedCredentialsProvider
11+
from alibabacloud.exception import ClientException
12+
from tests import unittest
13+
14+
class TestProvider(unittest.TestCase):
15+
16+
def test_credentials_provider(self):
17+
credentials_provider = CredentialsProvider()
18+
try:
19+
credentials_provider.provide()
20+
except NotImplementedError as e:
21+
self.assertEqual(type(e), NotImplementedError)
22+
23+
def test_static_credentials_provider(self):
24+
ak = AccessKeyCredentials("123", "456")
25+
static_credentials = StaticCredentialsProvider(ak)
26+
self.assertEqual(static_credentials.provide(), ak)
27+
28+
def test_cached_credentials_provider(self):
29+
cached_credentials = CachedCredentialsProvider()
30+
self.assertEqual(cached_credentials.provide(), None)
31+
32+
def test_rotating_credentials_provider(self):
33+
rotating_credentials = RotatingCredentialsProvider(3600, 0.8)
34+
self.assertEqual(rotating_credentials.is_expiring, True)
35+
36+
try:
37+
rotating_credentials.rotate_credentials()
38+
except NotImplementedError as e:
39+
self.assertEqual(type(e), NotImplementedError)
40+
41+
rotating_credentials.rotate_credentials = mock.Mock(return_value=SecurityCredentials(None, None, None))
42+
self.assertEqual(type(rotating_credentials.provide()), SecurityCredentials)
43+
44+
rotating_credentials = RotatingCredentialsProvider(-36000000000, 0)
45+
rotating_credentials._cached_credentials = AccessKeyCredentials(None, None)
46+
self.assertEqual(type(rotating_credentials.provide()), AccessKeyCredentials)
47+
48+
def test_profile_credentials_provider(self):
49+
profile_credentials = ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default")
50+
profile = profile_credentials._load_profile("~/.alibabacloud/credentials", "default")
51+
self.assertTrue(profile)
52+
53+
with self.assertRaises(ClientException) as e:
54+
profile_credentials._load_profile("abc", None)
55+
self.assertEqual(str(e.exception), "Failed to find config file for SDK: abc")
56+
57+
full_path = os.path.expanduser("~/.alibabacloud/credentials")
58+
with self.assertRaises(ClientException) as e:
59+
profile_credentials._load_profile("~/.alibabacloud/credentials", "abc")
60+
self.assertEqual(str(e.exception), 'The Credentials file (%s) can not find the needed param "type".'
61+
% full_path)
62+
63+
ak_dict = {'type': 'access_key', 'access_key_id': 'qwe', 'access_key_secret': 'abc'}
64+
provider_profile = profile_credentials._get_provider_by_profile(ak_dict)
65+
self.assertEqual(type(provider_profile), StaticCredentialsProvider)
66+
type_dict_error = {'type': 'type_error'}
67+
with self.assertRaises(Exception) as e:
68+
profile_credentials._get_provider_by_profile(type_dict_error)
69+
self.assertEqual(str(e.exception), "Unexpected credentials type: type_error")
70+
ak_dict_error = {'type': 'access_key'}
71+
with self.assertRaises(ClientException) as e:
72+
profile_credentials._get_provider_by_profile(ak_dict_error)
73+
self.assertEqual(str(e.exception), "access_key_id must be set for credentials type access_key.")
74+
75+
self.assertEqual(type(profile_credentials.provide()), AccessKeyCredentials)
76+
77+
def test_chained_credentials_provider(self):
78+
os.environ["ALIBABA_CLOUD_ACCESS_KEY_ID"] = "key_id"
79+
os.environ["ALIBABA_CLOUD_ACCESS_KEY_SECRET"] = "key_secret"
80+
provider_chain1 = [
81+
EnvCredentialsProvider(),
82+
ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default"),
83+
]
84+
chained_credentials = ChainedCredentialsProvider(provider_chain1)
85+
self.assertEqual(chained_credentials.provide().access_key_id, "key_id")
86+
self.assertEqual(chained_credentials.provide().access_key_secret, "key_secret")
87+
88+
provider_chain2 = [
89+
ProfileCredentialsProvider("ClientConfig()", "~/.alibabacloud/credentials", "default"),
90+
]
91+
chained_credentials = ChainedCredentialsProvider(provider_chain2)
92+
self.assertEqual(type(chained_credentials.provide()), AccessKeyCredentials)
93+
94+
def test_default_chained_credentials_provider(self):
95+
default_credentials = DefaultChainedCredentialsProvider(None)
96+
self.assertEqual(default_credentials._get_config_file_name(), "~/.alibabacloud/credentials")
97+
os.environ["ALIBABA_CLOUD_CREDENTIALS_FILE"] = "abc"
98+
self.assertEqual(default_credentials._get_config_file_name(), "abc")
99+
100+
os.environ["ALIBABA_CLOUD_AK"] = "secret_key"
101+
ak = default_credentials._get_config("ak")
102+
self.assertEqual(ak, "secret_key")
103+

0 commit comments

Comments
 (0)