summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/config.py8
-rw-r--r--tests/image.py43
-rw-r--r--tests/main.py33
-rw-r--r--tests/os_clients.py13
-rw-r--r--tests/user.py163
5 files changed, 221 insertions, 39 deletions
diff --git a/tests/config.py b/tests/config.py
index 2288d36e..7a0bef2d 100644
--- a/tests/config.py
+++ b/tests/config.py
@@ -6,16 +6,20 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import itertools
from oslo_config import cfg
import image
import os_clients
+import user
def list_opts():
return [
- ('os_clients', os_clients.OPTS),
- ('image', image.IMAGE_OPTS),
+ ('DEFAULT', itertools.chain(
+ os_clients.OPTS,
+ image.OPTS,
+ user.OPTS))
]
diff --git a/tests/image.py b/tests/image.py
index 0b4a3d72..5226c789 100644
--- a/tests/image.py
+++ b/tests/image.py
@@ -13,65 +13,62 @@ from oslo_config import cfg
from identity_auth import get_session
from os_clients import glance_client
-import logger as doctor_log
-IMAGE_OPTS = [
- cfg.StrOpt('name',
+OPTS = [
+ cfg.StrOpt('image_name',
default=os.environ.get('IMAGE_NAME', 'cirros'),
help='the name of test image',
required=True),
- cfg.StrOpt('format',
+ cfg.StrOpt('image_format',
default='qcow2',
help='the format of test image',
required=True),
- cfg.StrOpt('file_name',
+ cfg.StrOpt('image_filename',
default='cirros.img',
help='the name of image file',
required=True),
- cfg.StrOpt('url',
+ cfg.StrOpt('image_download_url',
default='https://launchpad.net/cirros/trunk/0.3.0/+download/cirros-0.3.0-x86_64-disk.img',
help='the url where to get the image',
required=True),
]
-LOG = doctor_log.Logger('doctor').getLogger()
-
class Image(object):
- def __init__(self, conf):
+ def __init__(self, conf, log):
self.conf = conf
+ self.log = log
self.glance = \
- glance_client(conf.os_clients.glance_version,
- get_session())
+ glance_client(conf.glance_version, get_session())
self.use_existing_image = False
self.image = None
def create(self):
- LOG.info('image create start......')
+ self.log.info('image create start......')
images = {image.name: image for image in self.glance.images.list()}
- if self.conf.image.name not in images:
- if not os.path.exists(self.conf.image.file_name):
- resp = urllib2.urlopen(self.conf.image.url)
- with open(self.conf.image.file_name, "wb") as file:
+ if self.conf.image_name not in images:
+ if not os.path.exists(self.conf.image_filename):
+ resp = urllib2.urlopen(self.conf.image_download_url)
+ with open(self.conf.image_filename, "wb") as file:
file.write(resp.read())
- self.image = self.glance.images.create(name=self.conf.image.name,
- disk_format=self.conf.image.format,
+ self.image = self.glance.images.create(name=self.conf.image_name,
+ disk_format=self.conf.image_format,
container_format="bare",
visibility="public")
self.glance.images.upload(self.image['id'],
- open(self.conf.image.file_name, 'rb'))
+ open(self.conf.image_filename, 'rb'))
else:
self.use_existing_image = True
- self.image = images[self.conf.image.name]
+ self.image = images[self.conf.image_name]
- LOG.info('image create end......')
+ self.log.info('image create end......')
def delete(self):
- LOG.info('image delete start.......')
+ self.log.info('image delete start.......')
if not self.use_existing_image and self.image:
self.glance.images.delete(self.image['id'])
- LOG.info('image delete end.......')
+ self.log.info('image delete end.......')
diff --git a/tests/main.py b/tests/main.py
index 50e0821b..46f0c894 100644
--- a/tests/main.py
+++ b/tests/main.py
@@ -11,6 +11,7 @@ import sys
import config
from image import Image
import logger as doctor_log
+from user import User
LOG = doctor_log.Logger('doctor').getLogger()
@@ -20,33 +21,39 @@ class DoctorTest(object):
def __init__(self, conf):
self.conf = conf
- self.image = Image(self.conf)
+ self.image = Image(self.conf, LOG)
+ self.user = User(self.conf, LOG)
+
+ def setup(self):
+ # prepare the cloud env
+
+ # preparing VM image...
+ self.image.create()
+
+ # creating test user...
+ self.user.create()
+ self.user.update_quota()
def run(self):
"""run doctor test"""
try:
LOG.info('doctor test starting.......')
- # prepare the cloud env
-
- # preparing VM image...
- self.image.create()
-
- # creating test user...
- # creating VM...
-
- # creating alarm...
-
- # starting doctor sample components...
+ self.setup()
# injecting host failure...
# verify the test results
+
except Exception as e:
LOG.error('doctor test failed, Exception=%s' % e)
sys.exit(1)
finally:
- self.image.delete()
+ self.cleanup()
+
+ def cleanup(self):
+ self.image.delete()
+ self.user.delete()
def main():
diff --git a/tests/os_clients.py b/tests/os_clients.py
index 2eb406e0..c9eb0b1d 100644
--- a/tests/os_clients.py
+++ b/tests/os_clients.py
@@ -9,13 +9,24 @@
from oslo_config import cfg
import glanceclient.client as glanceclient
-
+from keystoneclient.v2_0 import client as ks_client
+import novaclient.client as novaclient
OPTS = [
cfg.StrOpt('glance_version', default='2', help='glance version'),
+ cfg.StrOpt('nova_version', default='2.34', help='Nova version'),
]
def glance_client(version, session):
return glanceclient.Client(version=version,
session=session)
+
+
+def keystone_client(session):
+ return ks_client.Client(session=session)
+
+
+def nova_client(version, session):
+ return novaclient.Client(version=version,
+ session=session)
diff --git a/tests/user.py b/tests/user.py
new file mode 100644
index 00000000..b21bd1a8
--- /dev/null
+++ b/tests/user.py
@@ -0,0 +1,163 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import os
+
+from oslo_config import cfg
+
+from identity_auth import get_session
+from os_clients import keystone_client
+from os_clients import nova_client
+
+
+OPTS = [
+ cfg.StrOpt('doctor_user',
+ default='doctor',
+ help='the name of test user',
+ required=True),
+ cfg.StrOpt('doctor_passwd',
+ default='doctor',
+ help='the password of test user',
+ required=True),
+ cfg.StrOpt('doctor_project',
+ default='doctor',
+ help='the name of test project',
+ required=True),
+ cfg.StrOpt('doctor_role',
+ default='_member_',
+ help='the role of test user',
+ required=True),
+ cfg.IntOpt('quota_instances',
+ default=os.environ.get('VM_COUNT', 1),
+ help='the quota of instances in test user',
+ required=True),
+ cfg.IntOpt('quota_cores',
+ default=os.environ.get('VM_COUNT', 1),
+ help='the quota of cores in test user',
+ required=True),
+]
+
+
+class User(object):
+
+ def __init__(self, conf, log):
+ self.conf = conf
+ self.log = log
+ self.keystone = \
+ keystone_client(get_session())
+ self.nova = \
+ nova_client(conf.nova_version, get_session())
+ self.users = {}
+ self.projects = {}
+ self.roles = {}
+ self.roles_for_user = {}
+ self.roles_for_admin = {}
+
+ def create(self):
+ """create test user, project and etc"""
+ self.log.info('user create start......')
+
+ self._create_project()
+ self._create_user()
+ self._create_role()
+ self._add_user_role_in_project(is_admin=False)
+ self._add_user_role_in_project(is_admin=True)
+
+ self.log.info('user create end......')
+
+ def _create_project(self):
+ """create test project"""
+ self.projects = {project.name: project
+ for project in self.keystone.tenants.list()}
+ if self.conf.doctor_project not in self.projects:
+ test_project = \
+ self.keystone.tenants.create(self.conf.doctor_project)
+ self.projects[test_project.name] = test_project
+
+ def _create_user(self):
+ """create test user"""
+ project = self.projects.get(self.conf.doctor_project)
+ self.users = {user.name: user for user in self.keystone.users.list()}
+ if self.conf.doctor_user not in self.users:
+ test_user = self.keystone.users.create(
+ self.conf.doctor_user,
+ password=self.conf.doctor_passwd,
+ tenant_id=project.id)
+ self.users[test_user.name] = test_user
+
+ def _create_role(self):
+ """create test role"""
+ self.roles = {role.name: role for role in self.keystone.roles.list()}
+ if self.conf.doctor_role not in self.roles:
+ test_role = self.keystone.roles.create(self.conf.doctor_role)
+ self.roles[test_role.name] = test_role
+
+ def _add_user_role_in_project(self, is_admin=False):
+ """add test user with test role in test project"""
+ project = self.projects.get(self.conf.doctor_project)
+
+ user_name = 'admin' if is_admin else self.conf.doctor_user
+ user = self.users.get(user_name)
+
+ role_name = 'admin' if is_admin else self.conf.doctor_role
+ role = self.roles.get(role_name)
+
+ roles_for_user = self.roles_for_admin \
+ if is_admin else self.roles_for_user
+
+ roles_for_user = \
+ {role.name: role for role in
+ self.keystone.roles.roles_for_user(user, tenant=project)}
+ if role_name not in roles_for_user:
+ self.keystone.roles.add_user_role(user, role, tenant=project)
+ roles_for_user[role_name] = role
+
+ def delete(self):
+ """delete the test user, project and role"""
+ self.log.info('user delete start......')
+
+ project = self.projects.get(self.conf.doctor_project)
+ user = self.users.get(self.conf.doctor_user)
+ role = self.roles.get(self.conf.doctor_role)
+
+ if project:
+ if 'admin' in self.roles_for_admin:
+ self.keystone.roles.remove_user_role(
+ self.users['admin'],
+ self.roles['admin'],
+ tenant=project)
+
+ if user:
+ if role and self.conf.doctor_role in self.roles_for_user:
+ self.keystone.roles.remove_user_role(
+ user, role, tenant=project)
+ self.keystone.roles.delete(role)
+ self.keystone.users.delete(user)
+
+ self.keystone.tenants.delete(project)
+ self.log.info('user delete end......')
+
+ def update_quota(self):
+ self.log.info('user quota update start......')
+ project = self.projects.get(self.conf.doctor_project)
+ user = self.users.get(self.conf.doctor_user)
+
+ if project and user:
+ self.quota = self.nova.quotas.get(project.id,
+ user_id=user.id)
+ if self.conf.quota_instances > self.quota.instances:
+ self.nova.quotas.update(project.id,
+ instances=self.conf.quota_instances,
+ user_id=user.id)
+ if self.conf.quota_cores > self.quota.cores:
+ self.nova.quotas.update(project.id,
+ cores=self.conf.quota_cores,
+ user_id=user.id)
+ self.log.info('user quota update end......')
+ else:
+ raise Exception('No project or role for update quota')