aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-07-07 14:11:51 +0200
committerasteroide <thomas.duval@orange.com>2015-07-07 14:11:51 +0200
commit899c3f48242aedd890b2a9281ad5013ea7cc2d77 (patch)
treef62223555d9242e98f1f5cd736f94b8796d2cb5d /keystone-moon/keystone/tests
parent35641a3050f91e149cc1388340fbb3fdfc43310f (diff)
Add unit tests for LogManager and fix some bugs.
Change-Id: I42dfbdf8f83928e6437dba7026d1a2905d46ab44
Diffstat (limited to 'keystone-moon/keystone/tests')
-rw-r--r--keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py167
1 files changed, 167 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
index 1b678d53..49f6812c 100644
--- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
+++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_log.py
@@ -2,3 +2,170 @@
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+"""Unit tests for LogManager"""
+
+import json
+import os
+import uuid
+import time
+from oslo_config import cfg
+from keystone.tests import unit as tests
+from keystone.contrib.moon.core import IntraExtensionAdminManager
+from keystone.tests.unit.ksfixtures import database
+from keystone import resource
+from keystone.contrib.moon.exception import *
+from keystone.tests.unit import default_fixtures
+from keystone.contrib.moon.core import LogManager, TenantManager
+
+CONF = cfg.CONF
+
+USER_ADMIN = {
+ 'name': 'admin',
+ 'domain_id': "default",
+ 'password': 'admin'
+}
+
+IE = {
+ "name": "test IE",
+ "policymodel": "policy_rbac_authz",
+ "description": "a simple description."
+}
+
+TIME_FORMAT = '%Y-%m-%d-%H:%M:%S'
+
+class TestIntraExtensionAdminManager(tests.TestCase):
+
+ def setUp(self):
+ self.useFixture(database.Database())
+ super(TestIntraExtensionAdminManager, self).setUp()
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+ self.manager = IntraExtensionAdminManager()
+
+ def __get_key_from_value(self, value, values_dict):
+ return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0]
+
+ def load_extra_backends(self):
+ return {
+ "moonlog_api": LogManager(),
+ "tenant_api": TenantManager(),
+ # "resource_api": resource.Manager(),
+ }
+
+ def config_overrides(self):
+ super(TestIntraExtensionAdminManager, self).config_overrides()
+ self.policy_directory = 'examples/moon/policies'
+ self.config_fixture.config(
+ group='moon',
+ intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector')
+ self.config_fixture.config(
+ group='moon',
+ policy_directory=self.policy_directory)
+
+ def create_intra_extension(self, policy_model="policy_rbac_admin"):
+ # Create the admin user because IntraExtension needs it
+ self.admin = self.identity_api.create_user(USER_ADMIN)
+ IE["policymodel"] = policy_model
+ self.ref = self.manager.load_intra_extension(IE)
+ self.assertIsInstance(self.ref, dict)
+ self.create_tenant(self.ref["id"])
+
+ def create_tenant(self, authz_uuid):
+ tenant = {
+ "id": uuid.uuid4().hex,
+ "name": "TestAuthzIntraExtensionManager",
+ "enabled": True,
+ "description": "",
+ "domain_id": "default"
+ }
+ project = self.resource_api.create_project(tenant["id"], tenant)
+ mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None)
+ self.assertIsInstance(mapping, dict)
+ self.assertIn("authz", mapping)
+ self.assertEqual(mapping["authz"], authz_uuid)
+ return mapping
+
+ def create_user(self, username="TestAdminIntraExtensionManagerUser"):
+ user = {
+ "id": uuid.uuid4().hex,
+ "name": username,
+ "enabled": True,
+ "description": "",
+ "domain_id": "default"
+ }
+ _user = self.identity_api.create_user(user)
+ return _user
+
+ def delete_admin_intra_extension(self):
+ self.manager.delete_intra_extension(self.ref["id"])
+
+ def send_logs(self):
+ log_authz = "Test for authz " + uuid.uuid4().hex
+ logs = []
+ self.moonlog_api.authz(log_authz)
+ logs.append("Test for critical " + uuid.uuid4().hex)
+ self.moonlog_api.critical(logs[-1])
+ logs.append("Test for error " + uuid.uuid4().hex)
+ self.moonlog_api.error(logs[-1])
+ logs.append("Test for warning " + uuid.uuid4().hex)
+ self.moonlog_api.warning(logs[-1])
+ logs.append("Test for info " + uuid.uuid4().hex)
+ self.moonlog_api.info(logs[-1])
+ logs.append("Test for debug " + uuid.uuid4().hex)
+ self.moonlog_api.debug(logs[-1])
+ return log_authz, logs
+
+ def test_get_set_logs(self):
+ previous_authz_logs = self.moonlog_api.get_logs(logger="authz")
+ previous_sys_logs = self.moonlog_api.get_logs(logger="sys")
+
+ log_authz, logs = self.send_logs()
+ time.sleep(1)
+
+ authz_logs = self.moonlog_api.get_logs(logger="authz")
+ sys_logs = self.moonlog_api.get_logs(logger="sys")
+
+ self.assertIsInstance(authz_logs, list)
+ self.assertIsInstance(sys_logs, list)
+
+ self.assertIn(log_authz, " ".join(authz_logs))
+
+ self.assertEqual(len(authz_logs), len(previous_authz_logs)+1)
+ self.assertTrue(len(sys_logs) >= len(previous_sys_logs)+5)
+ for log in logs:
+ self.assertIn(log, " ".join(sys_logs))
+
+ def test_get_syslogger_with_options(self):
+
+ all_logs = self.moonlog_api.get_logs(logger="sys")
+
+ time_1 = time.strftime(TIME_FORMAT)
+ time.sleep(1)
+
+ log_authz, logs = self.send_logs()
+
+ NUMBER_OF_LOG = 5
+ sys_logs = self.moonlog_api.get_logs(logger="sys", event_number=NUMBER_OF_LOG)
+ self.assertIsInstance(sys_logs, list)
+ self.assertEqual(len(sys_logs), NUMBER_OF_LOG)
+
+ sys_logs = self.moonlog_api.get_logs(logger="sys", time_from=time_1)
+ self.assertIsInstance(sys_logs, list)
+ self.assertEqual(len(sys_logs), NUMBER_OF_LOG)
+
+ log_authz, logs = self.send_logs()
+
+ time.sleep(1)
+ time_2 = time.strftime(TIME_FORMAT)
+
+ log_authz, logs = self.send_logs()
+
+ sys_logs = self.moonlog_api.get_logs(logger="sys", time_to=time_2)
+ self.assertIsInstance(sys_logs, list)
+ self.assertEqual(len(sys_logs), len(all_logs)+3*NUMBER_OF_LOG)
+
+ sys_logs = self.moonlog_api.get_logs(logger="sys", time_from=time_1, time_to=time_2)
+ self.assertIsInstance(sys_logs, list)
+ self.assertEqual(len(sys_logs), 3*NUMBER_OF_LOG)
+