aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_policy.py
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
committerWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
commitb8c756ecdd7cced1db4300935484e8c83701c82e (patch)
tree87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/test_policy.py
parentc304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff)
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_policy.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_policy.py228
1 files changed, 228 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_policy.py b/keystone-moon/keystone/tests/unit/test_policy.py
new file mode 100644
index 00000000..2c0c3995
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_policy.py
@@ -0,0 +1,228 @@
+# Copyright 2011 Piston Cloud Computing, Inc.
+# All Rights Reserved.
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import mock
+from oslo_policy import policy as common_policy
+import six
+from six.moves.urllib import request as urlrequest
+from testtools import matchers
+
+from keystone import exception
+from keystone.policy.backends import rules
+from keystone.tests import unit as tests
+from keystone.tests.unit.ksfixtures import temporaryfile
+
+
+class BasePolicyTestCase(tests.TestCase):
+ def setUp(self):
+ super(BasePolicyTestCase, self).setUp()
+ rules.reset()
+ self.addCleanup(rules.reset)
+ self.addCleanup(self.clear_cache_safely)
+
+ def clear_cache_safely(self):
+ if rules._ENFORCER:
+ rules._ENFORCER.clear()
+
+
+class PolicyFileTestCase(BasePolicyTestCase):
+ def setUp(self):
+ # self.tmpfilename should exist before setUp super is called
+ # this is to ensure it is available for the config_fixture in
+ # the config_overrides call.
+ self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
+ self.tmpfilename = self.tempfile.file_name
+ super(PolicyFileTestCase, self).setUp()
+ self.target = {}
+
+ def config_overrides(self):
+ super(PolicyFileTestCase, self).config_overrides()
+ self.config_fixture.config(group='oslo_policy',
+ policy_file=self.tmpfilename)
+
+ def test_modified_policy_reloads(self):
+ action = "example:test"
+ empty_credentials = {}
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"example:test": []}""")
+ rules.enforce(empty_credentials, action, self.target)
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write("""{"example:test": ["false:false"]}""")
+ rules._ENFORCER.clear()
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ empty_credentials, action, self.target)
+
+ def test_invalid_policy_raises_error(self):
+ action = "example:test"
+ empty_credentials = {}
+ invalid_json = '{"example:test": [],}'
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write(invalid_json)
+ self.assertRaises(ValueError, rules.enforce,
+ empty_credentials, action, self.target)
+
+
+class PolicyTestCase(BasePolicyTestCase):
+ def setUp(self):
+ super(PolicyTestCase, self).setUp()
+ # NOTE(vish): preload rules to circumvent reloading from file
+ rules.init()
+ self.rules = {
+ "true": [],
+ "example:allowed": [],
+ "example:denied": [["false:false"]],
+ "example:get_http": [["http:http://www.example.com"]],
+ "example:my_file": [["role:compute_admin"],
+ ["project_id:%(project_id)s"]],
+ "example:early_and_fail": [["false:false", "rule:true"]],
+ "example:early_or_success": [["rule:true"], ["false:false"]],
+ "example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
+ "example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
+ }
+
+ # NOTE(vish): then overload underlying policy engine
+ self._set_rules()
+ self.credentials = {}
+ self.target = {}
+
+ def _set_rules(self):
+ these_rules = common_policy.Rules.from_dict(self.rules)
+ rules._ENFORCER.set_rules(these_rules)
+
+ def test_enforce_nonexistent_action_throws(self):
+ action = "example:noexist"
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, action, self.target)
+
+ def test_enforce_bad_action_throws(self):
+ action = "example:denied"
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, action, self.target)
+
+ def test_enforce_good_action(self):
+ action = "example:allowed"
+ rules.enforce(self.credentials, action, self.target)
+
+ def test_enforce_http_true(self):
+
+ def fakeurlopen(url, post_data):
+ return six.StringIO("True")
+
+ action = "example:get_http"
+ target = {}
+ with mock.patch.object(urlrequest, 'urlopen', fakeurlopen):
+ result = rules.enforce(self.credentials, action, target)
+ self.assertTrue(result)
+
+ def test_enforce_http_false(self):
+
+ def fakeurlopen(url, post_data):
+ return six.StringIO("False")
+
+ action = "example:get_http"
+ target = {}
+ with mock.patch.object(urlrequest, 'urlopen', fakeurlopen):
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, action, target)
+
+ def test_templatized_enforcement(self):
+ target_mine = {'project_id': 'fake'}
+ target_not_mine = {'project_id': 'another'}
+ credentials = {'project_id': 'fake', 'roles': []}
+ action = "example:my_file"
+ rules.enforce(credentials, action, target_mine)
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ credentials, action, target_not_mine)
+
+ def test_early_AND_enforcement(self):
+ action = "example:early_and_fail"
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, action, self.target)
+
+ def test_early_OR_enforcement(self):
+ action = "example:early_or_success"
+ rules.enforce(self.credentials, action, self.target)
+
+ def test_ignore_case_role_check(self):
+ lowercase_action = "example:lowercase_admin"
+ uppercase_action = "example:uppercase_admin"
+ # NOTE(dprince) we mix case in the Admin role here to ensure
+ # case is ignored
+ admin_credentials = {'roles': ['AdMiN']}
+ rules.enforce(admin_credentials, lowercase_action, self.target)
+ rules.enforce(admin_credentials, uppercase_action, self.target)
+
+
+class DefaultPolicyTestCase(BasePolicyTestCase):
+ def setUp(self):
+ super(DefaultPolicyTestCase, self).setUp()
+ rules.init()
+
+ self.rules = {
+ "default": [],
+ "example:exist": [["false:false"]]
+ }
+ self._set_rules('default')
+ self.credentials = {}
+
+ # FIXME(gyee): latest Oslo policy Enforcer class reloads the rules in
+ # its enforce() method even though rules has been initialized via
+ # set_rules(). To make it easier to do our tests, we're going to
+ # monkeypatch load_roles() so it does nothing. This seem like a bug in
+ # Oslo policy as we shoudn't have to reload the rules if they have
+ # already been set using set_rules().
+ self._old_load_rules = rules._ENFORCER.load_rules
+ self.addCleanup(setattr, rules._ENFORCER, 'load_rules',
+ self._old_load_rules)
+ rules._ENFORCER.load_rules = lambda *args, **kwargs: None
+
+ def _set_rules(self, default_rule):
+ these_rules = common_policy.Rules.from_dict(self.rules, default_rule)
+ rules._ENFORCER.set_rules(these_rules)
+
+ def test_policy_called(self):
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, "example:exist", {})
+
+ def test_not_found_policy_calls_default(self):
+ rules.enforce(self.credentials, "example:noexist", {})
+
+ def test_default_not_found(self):
+ new_default_rule = "default_noexist"
+ # FIXME(gyee): need to overwrite the Enforcer's default_rule first
+ # as it is recreating the rules with its own default_rule instead
+ # of the default_rule passed in from set_rules(). I think this is a
+ # bug in Oslo policy.
+ rules._ENFORCER.default_rule = new_default_rule
+ self._set_rules(new_default_rule)
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
+ self.credentials, "example:noexist", {})
+
+
+class PolicyJsonTestCase(tests.TestCase):
+
+ def _load_entries(self, filename):
+ return set(json.load(open(filename)))
+
+ def test_json_examples_have_matching_entries(self):
+ policy_keys = self._load_entries(tests.dirs.etc('policy.json'))
+ cloud_policy_keys = self._load_entries(
+ tests.dirs.etc('policy.v3cloudsample.json'))
+
+ diffs = set(policy_keys).difference(set(cloud_policy_keys))
+
+ self.assertThat(diffs, matchers.Equals(set()))