diff options
Diffstat (limited to 'keystone-moon/keystone/tests/moon/func/test_func_api_authz.py')
-rw-r--r-- | keystone-moon/keystone/tests/moon/func/test_func_api_authz.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/moon/func/test_func_api_authz.py b/keystone-moon/keystone/tests/moon/func/test_func_api_authz.py new file mode 100644 index 00000000..77438e95 --- /dev/null +++ b/keystone-moon/keystone/tests/moon/func/test_func_api_authz.py @@ -0,0 +1,129 @@ +# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors +# This software is distributed under the terms and conditions of the 'Apache-2.0' +# license which can be found in the file 'LICENSE' in this package distribution +# or at 'http://www.apache.org/licenses/LICENSE-2.0'. + +import unittest +import json +import httplib + + +CREDENTIALS = { + "host": "127.0.0.1", + "port": "35357", + "login": "admin", + "password": "nomoresecrete", + "tenant_name": "demo", + "sessionid": "kxb50d9uusiywfcs2fiidmu1j5nsyckr", + "csrftoken": "", + "x-subject-token": "" +} + + +def get_url(url, post_data=None, delete_data=None, crsftoken=None, method="GET", authtoken=None): + # MOON_SERVER_IP["URL"] = url + # _url = "http://{HOST}:{PORT}".format(**MOON_SERVER_IP) + if post_data: + method = "POST" + if delete_data: + method = "DELETE" + print("\033[32m{} {}\033[m".format(method, url)) + conn = httplib.HTTPConnection(CREDENTIALS["host"], CREDENTIALS["port"]) + headers = { + "Content-type": "application/x-www-form-urlencoded", + # "Accept": "text/plain", + "Accept": "text/plain,text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", + 'Cookie': 'sessionid={}'.format(CREDENTIALS["sessionid"]), + } + if crsftoken: + headers["Cookie"] = "csrftoken={}; sessionid={}; NG_TRANSLATE_LANG_KEY:\"en\"".format(crsftoken, CREDENTIALS["sessionid"]) + CREDENTIALS["crsftoken"] = crsftoken + if authtoken: + headers["X-Auth-Token"] = CREDENTIALS["x-subject-token"] + if post_data: + method = "POST" + headers["Content-type"] = "application/json" + if crsftoken: + post_data = "&".join(map(lambda x: "=".join(x), post_data)) + elif "crsftoken" in CREDENTIALS and "sessionid" in CREDENTIALS: + post_data = json.dumps(post_data) + headers["Cookie"] = "csrftoken={}; sessionid={}; NG_TRANSLATE_LANG_KEY:\"en\"".format( + CREDENTIALS["crsftoken"], + CREDENTIALS["sessionid"]) + else: + post_data = json.dumps(post_data) + # conn.request(method, url, json.dumps(post_data), headers=headers) + conn.request(method, url, post_data, headers=headers) + elif delete_data: + method = "DELETE" + conn.request(method, url, json.dumps(delete_data), headers=headers) + else: + conn.request(method, url, headers=headers) + resp = conn.getresponse() + headers = resp.getheaders() + try: + CREDENTIALS["x-subject-token"] = dict(headers)["x-subject-token"] + except KeyError: + pass + if crsftoken: + sessionid_start = dict(headers)["set-cookie"].index("sessionid=")+len("sessionid=") + sessionid_end = dict(headers)["set-cookie"].index(";", sessionid_start) + sessionid = dict(headers)["set-cookie"][sessionid_start:sessionid_end] + CREDENTIALS["sessionid"] = sessionid + content = resp.read() + conn.close() + try: + return json.loads(content) + except ValueError: + return {"content": content} + + +class AuthTest(unittest.TestCase): + + def setUp(self): + post = { + "auth": { + "identity": { + "methods": [ + "password" + ], + "password": { + "user": { + "domain": { + "id": "Default" + }, + "name": "admin", + "password": "nomoresecrete" + } + } + }, + "scope": { + "project": { + "domain": { + "id": "Default" + }, + "name": "demo" + } + } + } + } + data = get_url("/v3/auth/tokens", post_data=post) + self.assertIn("token", data) + + def tearDown(self): + pass + + def test_authz(self): + data = get_url("/v3/OS-MOON/authz/1234567890/1111111/2222222/3333333", authtoken=True) + for key in ("authz", "subject_id", "tenant_id", "object_id", "action_id"): + self.assertIn(key, data) + print(data) + data = get_url("/v3/OS-MOON/authz/961420e0aeed4fd88e09cf4ae2ae700e/" + "4cff0936eeed42439d746e8071245235/df60c814-bafd-44a8-ad34-6c649e75295f/unpause", authtoken=True) + for key in ("authz", "subject_id", "tenant_id", "object_id", "action_id"): + self.assertIn(key, data) + print(data) + + +if __name__ == "__main__": + unittest.main() |