diff options
author | Tomi Juvonen <tomi.juvonen@nokia.com> | 2018-03-08 07:13:36 +0200 |
---|---|---|
committer | Tomi Juvonen <tomi.juvonen@nokia.com> | 2018-08-09 09:22:40 +0000 |
commit | 4639f15e6db2f1480b41f6fbfd11d70312d4e421 (patch) | |
tree | 7ba5b8755513970c39d6f4f5123f59beb77a0a41 /doctor_tests | |
parent | b54cbc5dd2d32fcb27238680b4657ed384d021c5 (diff) |
Add maintenance test code
-Add sample admin_tool
-Add sample app_manager
-Modify sample inspector
JIRA: DOCTOR-106
Change-Id: I52cffecaa88452ce5e7cc6487534c88fcfd378ad
Signed-off-by: Tomi Juvonen <tomi.juvonen@nokia.com>
Diffstat (limited to 'doctor_tests')
-rw-r--r-- | doctor_tests/admin_tool/__init__.py | 37 | ||||
-rw-r--r-- | doctor_tests/admin_tool/base.py | 26 | ||||
-rw-r--r-- | doctor_tests/admin_tool/sample.py | 726 | ||||
-rw-r--r-- | doctor_tests/app_manager/__init__.py | 38 | ||||
-rw-r--r-- | doctor_tests/app_manager/base.py | 26 | ||||
-rw-r--r-- | doctor_tests/app_manager/sample.py | 255 | ||||
-rw-r--r-- | doctor_tests/config.py | 4 | ||||
-rw-r--r-- | doctor_tests/inspector/sample.py | 39 | ||||
-rw-r--r-- | doctor_tests/installer/apex.py | 1 | ||||
-rw-r--r-- | doctor_tests/main.py | 15 | ||||
-rw-r--r-- | doctor_tests/scenario/maintenance.py | 103 |
11 files changed, 1259 insertions, 11 deletions
diff --git a/doctor_tests/admin_tool/__init__.py b/doctor_tests/admin_tool/__init__.py new file mode 100644 index 00000000..e8b12817 --- /dev/null +++ b/doctor_tests/admin_tool/__init__.py @@ -0,0 +1,37 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from oslo_config import cfg +from oslo_utils import importutils + + +OPTS = [ + cfg.StrOpt('type', + default='sample', + choices=['sample'], + help='the component of doctor admin_tool', + required=True), + cfg.StrOpt('ip', + default='127.0.0.1', + help='the ip of admin_tool', + required=True), + cfg.IntOpt('port', + default='12347', + help='the port of doctor admin_tool', + required=True), +] + + +_admin_tool_name_class_mapping = { + 'sample': 'doctor_tests.admin_tool.sample.SampleAdminTool' +} + + +def get_admin_tool(trasport_url, conf, log): + admin_tool_class = _admin_tool_name_class_mapping.get(conf.admin_tool.type) + return importutils.import_object(admin_tool_class, trasport_url, conf, log) diff --git a/doctor_tests/admin_tool/base.py b/doctor_tests/admin_tool/base.py new file mode 100644 index 00000000..0f0b2dcd --- /dev/null +++ b/doctor_tests/admin_tool/base.py @@ -0,0 +1,26 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseAdminTool(object): + + def __init__(self, conf, log): + self.conf = conf + self.log = log + + @abc.abstractmethod + def start(self): + pass + + @abc.abstractmethod + def stop(self): + pass diff --git a/doctor_tests/admin_tool/sample.py b/doctor_tests/admin_tool/sample.py new file mode 100644 index 00000000..892a4c83 --- /dev/null +++ b/doctor_tests/admin_tool/sample.py @@ -0,0 +1,726 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import datetime +from flask import Flask +from flask import request +import json +from novaclient.exceptions import BadRequest +import oslo_messaging as messaging +import requests +import time +from threading import Thread +from traceback import format_exc +from uuid import uuid1 as generate_uuid + +from doctor_tests.admin_tool.base import BaseAdminTool +from doctor_tests.identity_auth import get_identity_auth +from doctor_tests.identity_auth import get_session +from doctor_tests.os_clients import aodh_client +from doctor_tests.os_clients import nova_client + + +class SampleAdminTool(BaseAdminTool): + + def __init__(self, trasport_url, conf, log): + super(SampleAdminTool, self).__init__(conf, log) + self.trasport_url = trasport_url + self.app = None + + def start(self): + self.log.info('sample admin tool start......') + self.app = AdminTool(self.trasport_url, self.conf, self, self.log) + self.app.start() + + def stop(self): + self.log.info('sample admin tool stop......') + if not self.app: + return + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + url = 'http://%s:%d/shutdown'\ + % (self.conf.admin_tool.ip, + self.conf.admin_tool.port) + requests.post(url, data='', headers=headers) + + +class AdminMain(Thread): + + def __init__(self, trasport_url, session_id, data, parent, conf, log): + Thread.__init__(self) + self.session_id = session_id + self.parent = parent + self.log = log + self.conf = conf + self.url = 'http://0.0.0.0:%s' % conf.admin_tool.port + self.projects_state = dict() # current state for each project + self.proj_server_actions = dict() # actions for each project server + self.projects_servers = dict() # servers processed in current state + self.maint_proj_servers = dict() # servers under whole maintenance + self.hosts = data['hosts'] + self.maintenance_at = data['maintenance_at'] + self.computes_disabled = list() + self.metadata = data['metadata'] + self.auth = get_identity_auth(project=self.conf.doctor_project) + self.state = data['state'] + self.aodh = aodh_client(self.conf.aodh_version, + get_session(auth=self.auth)) + self.nova = nova_client(self.conf.nova_version, + get_session(auth=self.auth)) + self.log.info('transport_url %s' % trasport_url) + transport = messaging.get_transport(self.conf, trasport_url) + self.notif_proj = messaging.Notifier(transport, + 'maintenance.planned', + driver='messaging', + topics=['notifications']) + self.notif_proj = self.notif_proj.prepare(publisher_id='admin_tool') + self.notif_admin = messaging.Notifier(transport, + 'maintenance.host', + driver='messaging', + topics=['notifications']) + self.notif_admin = self.notif_admin.prepare(publisher_id='admin_tool') + self.log.info('Admin tool session %s initialized' % self.session_id) + + def cleanup(self): + for host in self.computes_disabled: + self.log.info('enable nova-compute on %s' % host) + self.nova.services.enable(host, 'nova-compute') + + def _projects_not_in_wanted_states(self, wanted_states): + if len([v for v in self.projects_state.values() + if v not in wanted_states]): + return True + else: + return False + + def projects_not_in_state(self, state): + if len([v for v in self.projects_state.values() + if v != state]): + return True + else: + return False + + def wait_projects_state(self, wanted_states, wait_seconds): + retries = wait_seconds + while (retries > 0 and + self._projects_not_in_wanted_states(wanted_states)): + time.sleep(1) + retries = retries - 1 + if self._projects_not_in_wanted_states(wanted_states): + self.log.error('Admin tool session %s: projects in invalid states ' + '%s' % (self.session_id, self.projects_state)) + raise Exception('Admin tool session %s: not all projects in states' + ' %s' % (self.session_id, wanted_states)) + else: + self.log.info('all projects replied') + + def _project_notify(self, project_id, instance_ids, allowed_actions, + actions_at, state, metadata): + reply_url = '%s/%s/maintenance' % (self.url, project_id) + + payload = dict(project_id=project_id, + instance_ids=instance_ids, + allowed_actions=allowed_actions, + state=state, + actions_at=actions_at, + session_id=self.session_id, + metadata=metadata, + reply_url=reply_url) + + self.log.debug('Sending "maintenance.planned" to project: %s' % + payload) + + self.notif_proj.info({'some': 'context'}, 'maintenance.scheduled', + payload) + + def _admin_notify(self, project, host, state, session_id): + payload = dict(project_id=project, host=host, state=state, + session_id=session_id) + + self.log.debug('Sending "maintenance.host": %s' % payload) + + self.notif_admin.info({'some': 'context'}, 'maintenance.host', payload) + + def down_scale(self): + for project in self.projects_servers: + self.log.info('DOWN_SCALE to project %s' % project) + self.log.debug('instance_ids %s' % self.projects_servers[project]) + instance_ids = '%s/%s/maintenance' % (self.url, project) + allowed_actions = [] + wait_seconds = 120 + actions_at = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=wait_seconds) + ).strftime('%Y-%m-%d %H:%M:%S') + state = self.state + metadata = self.metadata + self._project_notify(project, instance_ids, + allowed_actions, actions_at, state, + metadata) + allowed_states = ['ACK_DOWN_SCALE', 'NACK_DOWN_SCALE'] + self.wait_projects_state(allowed_states, wait_seconds) + if self.projects_not_in_state('ACK_DOWN_SCALE'): + raise Exception('Admin tool session %s: all states not ' + 'ACK_DOWN_SCALE %s' % + (self.session_id, self.projects_state)) + + def maintenance(self): + for project in self.projects_servers: + self.log.info('\nMAINTENANCE to project %s\n' % project) + self.log.debug('instance_ids %s' % self.projects_servers[project]) + instance_ids = '%s/%s/maintenance' % (self.url, project) + allowed_actions = [] + actions_at = self.maintenance_at + state = self.state + metadata = self.metadata + maint_at = self.str_to_datetime(self.maintenance_at) + td = maint_at - datetime.datetime.utcnow() + wait_seconds = int(td.total_seconds()) + if wait_seconds < 10: + raise Exception('Admin tool session %s: No time for project to' + ' answer: %s' % + (self.session_id, wait_seconds)) + self._project_notify(project, instance_ids, + allowed_actions, actions_at, state, + metadata) + allowed_states = ['ACK_MAINTENANCE', 'NACK_MAINTENANCE'] + self.wait_projects_state(allowed_states, wait_seconds) + if self.projects_not_in_state('ACK_MAINTENANCE'): + raise Exception('Admin tool session %s: all states not ' + 'ACK_MAINTENANCE %s' % + (self.session_id, self.projects_state)) + + def maintenance_complete(self): + for project in self.projects_servers: + self.log.info('MAINTENANCE_COMPLETE to project %s' % project) + instance_ids = '%s/%s/maintenance' % (self.url, project) + allowed_actions = [] + wait_seconds = 120 + actions_at = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=wait_seconds) + ).strftime('%Y-%m-%d %H:%M:%S') + state = 'MAINTENANCE_COMPLETE' + metadata = self.metadata + self._project_notify(project, instance_ids, + allowed_actions, actions_at, state, + metadata) + allowed_states = ['ACK_MAINTENANCE_COMPLETE', + 'NACK_MAINTENANCE_COMPLETE'] + self.wait_projects_state(allowed_states, wait_seconds) + if self.projects_not_in_state('ACK_MAINTENANCE_COMPLETE'): + raise Exception('Admin tool session %s: all states not ' + 'ACK_MAINTENANCE_COMPLETE %s' % + (self.session_id, self.projects_state)) + + def need_down_scale(self, host_servers): + room_for_instances = 0 + for host in host_servers: + instances = 0 + for project in host_servers[host]: + for instance in host_servers[host][project]: + instances += 1 + room_for_instances += (2 - instances) + self.log.info('there is room for %d instances' % room_for_instances) + if room_for_instances > 1: + return False + else: + return True + + def find_host_to_be_empty(self, host_servers): + host_to_be_empty = None + host_nonha_instances = 0 + for host in host_servers: + ha_instances = 0 + nonha_instances = 0 + for project in host_servers[host]: + for instance in host_servers[host][project]: + if ('doctor_ha_app_' in + host_servers[host][project][instance]): + ha_instances += 1 + else: + nonha_instances += 1 + self.log.info('host %s has %d ha and %d non ha instances' % + (host, ha_instances, nonha_instances)) + if ha_instances == 0: + if host_to_be_empty: + if nonha_instances < host_nonha_instances: + host_to_be_empty = host + host_nonha_instances = nonha_instances + else: + host_to_be_empty = host + host_nonha_instances = nonha_instances + self.log.info('host %s selected to be empty' % host_to_be_empty) + return host_to_be_empty + + def make_compute_host_empty(self, host, projects_servers, statebase): + state = statebase + state_ack = 'ACK_%s' % statebase + state_nack = 'NACK_%s' % statebase + for project in projects_servers: + # self.projects_servers must have servers under action + self.projects_servers[project] = projects_servers[project].copy() + self.log.info('%s to project %s' % (state, project)) + self.project_servers_log_info(project, projects_servers) + instance_ids = '%s/%s/maintenance' % (self.url, project) + allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] + wait_seconds = 120 + actions_at = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=wait_seconds) + ).strftime('%Y-%m-%d %H:%M:%S') + metadata = self.metadata + self._project_notify(project, instance_ids, + allowed_actions, actions_at, state, + metadata) + allowed_states = [state_ack, state_nack] + self.wait_projects_state(allowed_states, wait_seconds) + if self.projects_not_in_state(state_ack): + raise Exception('Admin tool session %s: all states not %s %s' % + (self.session_id, state_ack, self.projects_state)) + self.actions_to_have_empty_host(host) + + def notify_action_done(self, project, instance_id): + instance_ids = instance_id + allowed_actions = [] + actions_at = None + state = "INSTANCE_ACTION_DONE" + metadata = None + self._project_notify(project, instance_ids, allowed_actions, + actions_at, state, metadata) + + def actions_to_have_empty_host(self, host): + retry = 0 + while len(self.proj_server_actions) == 0: + time.sleep(2) + if retry == 10: + raise Exception('Admin tool session %s: project server actions' + ' not set' % self.session_id) + retry += 1 + for project in self.proj_server_actions: + for server, action in self.proj_server_actions[project].items(): + self.log.info('Action %s server %s: %s' % (action, server, + self.projects_servers[project][server])) + if action == 'MIGRATE': + self.migrate_server(server) + self.notify_action_done(project, server) + elif action == 'OWN_ACTION': + pass + else: + raise Exception('Admin tool session %s: server %s action ' + '%s not supported' % + (self.session_id, server, action)) + self.proj_server_actions = dict() + self._wait_host_empty(host) + + def migrate_server(self, server_id): + server = self.nova.servers.get(server_id) + vm_state = server.__dict__.get('OS-EXT-STS:vm_state') + self.log.info('server %s state %s' % (server_id, vm_state)) + last_vm_state = vm_state + retry_migrate = 5 + while True: + try: + server.migrate() + time.sleep(5) + retries = 36 + while vm_state != 'resized' and retries > 0: + # try to confirm within 3min + server = self.nova.servers.get(server_id) + vm_state = server.__dict__.get('OS-EXT-STS:vm_state') + if vm_state == 'resized': + server.confirm_resize() + self.log.info('server %s migration confirmed' % + server_id) + return + if last_vm_state != vm_state: + self.log.info('server %s state: %s' % (server_id, + vm_state)) + if vm_state == 'error': + raise Exception('server %s migration failed, state: %s' + % (server_id, vm_state)) + time.sleep(5) + retries = retries - 1 + last_vm_state = vm_state + # Timout waiting state to change + break + + except BadRequest: + if retry_migrate == 0: + raise Exception('server %s migrate failed' % server_id) + # Might take time for scheduler to sync inconsistent instance + # list for host + retry_time = 180 - (retry_migrate * 30) + self.log.info('server %s migrate failed, retry in %s sec' + % (server_id, retry_time)) + time.sleep(retry_time) + except Exception as e: + self.log.error('server %s migration failed, Exception=%s' % + (server_id, e)) + self.log.error(format_exc()) + raise Exception('server %s migration failed, state: %s' % + (server_id, vm_state)) + finally: + retry_migrate = retry_migrate - 1 + raise Exception('server %s migration timeout, state: %s' % + (server_id, vm_state)) + + def _wait_host_empty(self, host): + hid = self.nova.hypervisors.search(host)[0].id + vcpus_used_last = 0 + # wait 4min to get host empty + for j in range(48): + hvisor = self.nova.hypervisors.get(hid) + vcpus_used = hvisor.__getattr__('vcpus_used') + if vcpus_used > 0: + if vcpus_used_last == 0: + self.log.info('%s still has %d vcpus reserved. wait...' + % (host, vcpus_used)) + elif vcpus_used != vcpus_used_last: + self.log.info('%s still has %d vcpus reserved. wait...' + % (host, vcpus_used)) + vcpus_used_last = vcpus_used + time.sleep(5) + else: + self.log.info('%s empty' % host) + return + raise Exception('%s host not empty' % host) + + def projects_listen_alarm(self, match_event): + match_projects = ([str(alarm['project_id']) for alarm in + self.aodh.alarm.list() if + str(alarm['event_rule']['event_type']) == + match_event]) + all_projects_match = True + for project in list(self.projects_state): + if project not in match_projects: + self.log.error('Admin tool session %s: project %s not ' + 'listening to %s' % + (self.session_id, project, match_event)) + all_projects_match = False + return all_projects_match + + def project_servers_log_info(self, project, host_servers): + info = 'Project servers:\n' + for server in host_servers[project]: + info += (' %s: %s\n' % + (server, host_servers[project][server])) + self.log.info('%s' % info) + + def servers_log_info(self, host_servers): + info = '\n' + for host in self.hosts: + info += '%s:\n' % host + if host in host_servers: + for project in host_servers[host]: + info += ' %s:\n' % project + for server in host_servers[host][project]: + info += (' %s: %s\n' % + (server, host_servers[host][project][server])) + self.log.info('%s' % info) + + def update_server_info(self): + opts = {'all_tenants': True} + servers = self.nova.servers.list(search_opts=opts) + self.projects_servers = dict() + host_servers = dict() + for server in servers: + try: + host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host')) + project = str(server.tenant_id) + server_name = str(server.name) + server_id = str(server.id) + except Exception: + raise Exception('can not get params from server=%s' % + server) + if host not in self.hosts: + continue + if host not in host_servers: + host_servers[host] = dict() + if project not in host_servers[host]: + host_servers[host][project] = dict() + if project not in self.projects_servers: + self.projects_servers[project] = dict() + if project not in self.projects_state: + self.projects_state[project] = None + host_servers[host][project][server_id] = server_name + self.projects_servers[project][server_id] = server_name + return host_servers + + def str_to_datetime(self, dt_str): + mdate, mtime = dt_str.split() + year, month, day = map(int, mdate.split('-')) + hours, minutes, seconds = map(int, mtime.split(':')) + return datetime.datetime(year, month, day, hours, minutes, seconds) + + def host_maintenance(self, host): + self.log.info('maintaining host %s' % host) + # no implementation to make real maintenance + time.sleep(5) + + def run(self): + while self.state != 'MAINTENANCE_COMPLETE': + self.log.info('--==session %s: processing state %s==--' % + (self.session_id, self.state)) + if self.state == 'MAINTENANCE': + host_servers = self.update_server_info() + self.servers_log_info(host_servers) + + if not self.projects_listen_alarm('maintenance.scheduled'): + raise Exception('all projects do not listen maintenance ' + 'alarm') + self.maintenance() + + maint_at = self.str_to_datetime(self.maintenance_at) + if maint_at > datetime.datetime.utcnow(): + time_now = (datetime.datetime.utcnow().strftime( + '%Y-%m-%d %H:%M:%S')) + self.log.info('Time now: %s maintenance starts: %s....' % + (time_now, self.maintenance_at)) + td = maint_at - datetime.datetime.utcnow() + time.sleep(td.total_seconds()) + time_now = (datetime.datetime.utcnow().strftime( + '%Y-%m-%d %H:%M:%S')) + self.log.info('Time to start maintenance starts: %s' % + time_now) + + # check if we have empty compute host + # True -> PLANNED_MAINTENANCE + # False -> check if we can migrate VMs to get empty host + # True -> PREPARE_MAINTENANCE + # False -> DOWN_SCALE + maintenance_empty_hosts = ([h for h in self.hosts if h not in + host_servers]) + + if len(maintenance_empty_hosts) == 0: + if self.need_down_scale(host_servers): + self.log.info('Need to down scale') + self.state = 'DOWN_SCALE' + else: + self.log.info('Free capacity, but need empty host') + self.state = 'PREPARE_MAINTENANCE' + else: + self.log.info('Free capacity, but need empty host') + self.state = 'PLANNED_MAINTENANCE' + self.log.info('--==State change from MAINTENANCE to %s==--' + % self.state) + elif self.state == 'DOWN_SCALE': + # Test case is hard coded to have all compute capacity used + # We need to down scale to have one empty compute host + self.down_scale() + self.state = 'PREPARE_MAINTENANCE' + host_servers = self.update_server_info() + self.servers_log_info(host_servers) + self.log.info('--==State change from DOWN_SCALE to' + ' %s==--' % self.state) + + elif self.state == 'PREPARE_MAINTENANCE': + # It might be down scale did not free capacity on a single + # compute host, so we need to arrange free capacity to a single + # compute host + self.maint_proj_servers = self.projects_servers.copy() + maintenance_empty_hosts = ([h for h in self.hosts if h not in + host_servers]) + if len(maintenance_empty_hosts) == 0: + self.log.info('no empty hosts for maintenance') + if self.need_down_scale(host_servers): + raise Exception('Admin tool session %s: Not enough ' + 'free capacity for maintenance' % + self.session_id) + host = self.find_host_to_be_empty(host_servers) + if host: + self.make_compute_host_empty(host, host_servers[host], + 'PREPARE_MAINTENANCE') + else: + # We do not currently support another down scale if + # first was not enough + raise Exception('Admin tool session %s: No host ' + 'candidate to be emptied' % + self.session_id) + else: + for host in maintenance_empty_hosts: + self.log.info('%s already empty ' + 'for maintenance' % host) + self.state = 'PLANNED_MAINTENANCE' + host_servers = self.update_server_info() + self.servers_log_info(host_servers) + self.log.info('--==State change from PREPARE_MAINTENANCE to %s' + '==--' % self.state) + elif self.state == 'PLANNED_MAINTENANCE': + maintenance_hosts = list() + maintenance_empty_hosts = list() + # TODO This should be admin. hack for now to have it work + admin_project = list(self.projects_state)[0] + for host in self.hosts: + self.log.info('disable nova-compute on host %s' % host) + self.nova.services.disable_log_reason(host, 'nova-compute', + 'maintenance') + self.computes_disabled.append(host) + if host in host_servers and len(host_servers[host]): + maintenance_hosts.append(host) + else: + maintenance_empty_hosts.append(host) + self.log.info('--==Start to maintain empty hosts==--\n%s' % + maintenance_empty_hosts) + for host in maintenance_empty_hosts: + # scheduler has problems, let's see if just down scaled + # host is really empty + self._wait_host_empty(host) + self.log.info('IN_MAINTENANCE host %s' % host) + self._admin_notify(admin_project, host, 'IN_MAINTENANCE', + self.session_id) + self.host_maintenance(host) + self._admin_notify(admin_project, host, + 'MAINTENANCE_COMPLETE', + self.session_id) + self.nova.services.enable(host, 'nova-compute') + self.computes_disabled.remove(host) + self.log.info('MAINTENANCE_COMPLETE host %s' % host) + self.log.info('--==Start to maintain occupied hosts==--\n%s' % + maintenance_hosts) + for host in maintenance_hosts: + self.log.info('PLANNED_MAINTENANCE host %s' % host) + self.make_compute_host_empty(host, host_servers[host], + 'PLANNED_MAINTENANCE') + self.log.info('IN_MAINTENANCE host %s' % host) + self._admin_notify(admin_project, host, 'IN_MAINTENANCE', + self.session_id) + self.host_maintenance(host) + self._admin_notify(admin_project, host, + 'MAINTENANCE_COMPLETE', + self.session_id) + self.nova.services.enable(host, 'nova-compute') + self.computes_disabled.remove(host) + self.log.info('MAINTENANCE_COMPLETE host %s' % host) + self.state = 'PLANNED_MAINTENANCE_COMPLETE' + host_servers = self.update_server_info() + self.servers_log_info(host_servers) + elif self.state == 'PLANNED_MAINTENANCE_COMPLETE': + self.log.info('Projects still need to up scale back to full ' + 'capcity') + self.maintenance_complete() + host_servers = self.update_server_info() + self.servers_log_info(host_servers) + self.state = 'MAINTENANCE_COMPLETE' + else: + raise Exception('Admin tool session %s: session in invalid ' + 'state %s' % (self.session_id, self.state)) + self.log.info('--==Maintenance session %s: ' + 'MAINTENANCE SESSION COMPLETE==--' % self.session_id) + + def project_input(self, project_id, data): + self.log.debug('Admin tool session %s: project %s input' % + (self.session_id, project_id)) + if 'instance_actions' in data: + self.proj_server_actions[project_id] = ( + data['instance_actions'].copy()) + self.projects_state[project_id] = data['state'] + + def project_get_instances(self, project_id): + ret = list(self.projects_servers[project_id]) + self.log.debug('Admin tool session %s: project %s GET return: %s' % + (self.session_id, project_id, ret)) + return ret + + def stop(self): + self.stopped = True + + +class AdminTool(Thread): + + def __init__(self, trasport_url, conf, admin_tool, log): + Thread.__init__(self) + self.admin_tool = admin_tool + self.log = log + self.conf = conf + self.port = self.conf.admin_tool.port + self.maint_sessions = {} + self.projects = {} + self.maintenance_hosts = [] + self.trasport_url = trasport_url + + def run(self): + app = Flask('admin_tool') + + @app.route('/maintenance', methods=['POST']) + def admin_maintenance_api_post(): + data = json.loads(request.data.decode('utf8')) + self.log.info('maintenance message: %s' % data) + if 'session_id' in data: + if data['state'] == 'REMOVE_MAINTENANCE_SESSION': + session_id = data['session_id'] + self.log.info('remove session %s' + % session_id) + self.maint_sessions[session_id].cleanup() + self.maint_sessions[session_id].stop() + del self.maint_sessions[session_id] + else: + session_id = str(generate_uuid()) + self.log.info('creating session: %s' % session_id) + self.maint_sessions[session_id] = ( + AdminMain(self.trasport_url, + session_id, + data, + self, + self.conf, + self.log)) + self.maint_sessions[session_id].start() + reply = json.dumps({'session_id': session_id, + 'state': 'ACK_%s' % data['state']}) + self.log.debug('reply: %s' % reply) + return reply, 200, None + + @app.route('/maintenance', methods=['GET']) + def admin_maintenance_api_get(): + data = json.loads(request.data.decode('utf8')) + self.log.debug('Admin get maintenance: %s' % data) + session_id = data['session_id'] + reply = json.dumps({'state': + self.maint_sessions[session_id].state}) + self.log.debug('reply: %s' % reply) + return reply, 200, None + + @app.route('/<projet_id>/maintenance', methods=['PUT']) + def project_maintenance_api_put(projet_id=None): + data = json.loads(request.data.decode('utf8')) + self.log.debug('%s project put: %s' % (projet_id, data)) + self.project_input(projet_id, data) + return 'OK' + + @app.route('/<projet_id>/maintenance', methods=['GET']) + def project_maintenance_api_get(projet_id=None): + data = json.loads(request.data.decode('utf8')) + self.log.debug('%s project get %s' % (projet_id, data)) + instances = self.project_get_instances(projet_id, data) + reply = json.dumps({'instance_ids': instances}) + self.log.debug('%s reply: %s' % (projet_id, reply)) + return reply, 200, None + + @app.route('/shutdown', methods=['POST']) + def shutdown(): + for session in self.maint_sessions: + self.log.info('shutdown admin tool session %s thread' % + session) + self.maint_sessions[session].cleanup() + self.maint_sessions[session].stop() + self.log.info('shutdown admin_tool server at %s' % time.time()) + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + return 'admin_tool app shutting down...' + + app.run(host='0.0.0.0', port=self.port) + + def project_input(self, project_id, data): + session_id = data['session_id'] + self.maint_sessions[session_id].project_input(project_id, data) + + def project_get_instances(self, project_id, data): + session_id = data['session_id'] + return self.maint_sessions[session_id].project_get_instances( + project_id) diff --git a/doctor_tests/app_manager/__init__.py b/doctor_tests/app_manager/__init__.py new file mode 100644 index 00000000..717d6587 --- /dev/null +++ b/doctor_tests/app_manager/__init__.py @@ -0,0 +1,38 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from oslo_config import cfg +from oslo_utils import importutils + + +OPTS = [ + cfg.StrOpt('type', + default='sample', + choices=['sample'], + help='the component of doctor app manager', + required=True), + cfg.StrOpt('ip', + default='127.0.0.1', + help='the ip of app manager', + required=True), + cfg.IntOpt('port', + default='12348', + help='the port of doctor app manager', + required=True), +] + + +_app_manager_name_class_mapping = { + 'sample': 'doctor_tests.app_manager.sample.SampleAppManager' +} + + +def get_app_manager(stack, conf, log): + app_manager_class = ( + _app_manager_name_class_mapping.get(conf.app_manager.type)) + return importutils.import_object(app_manager_class, stack, conf, log) diff --git a/doctor_tests/app_manager/base.py b/doctor_tests/app_manager/base.py new file mode 100644 index 00000000..0d424083 --- /dev/null +++ b/doctor_tests/app_manager/base.py @@ -0,0 +1,26 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseAppManager(object): + + def __init__(self, conf, log): + self.conf = conf + self.log = log + + @abc.abstractmethod + def start(self): + pass + + @abc.abstractmethod + def stop(self): + pass diff --git a/doctor_tests/app_manager/sample.py b/doctor_tests/app_manager/sample.py new file mode 100644 index 00000000..94926ee2 --- /dev/null +++ b/doctor_tests/app_manager/sample.py @@ -0,0 +1,255 @@ +############################################################################## +# Copyright (c) 2018 Nokia Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from flask import Flask +from flask import request +import json +import yaml +import time +from threading import Thread +import requests + +from doctor_tests.app_manager.base import BaseAppManager +from doctor_tests.identity_auth import get_identity_auth +from doctor_tests.identity_auth import get_session +from doctor_tests.os_clients import nova_client + + +class SampleAppManager(BaseAppManager): + + def __init__(self, stack, conf, log): + super(SampleAppManager, self).__init__(conf, log) + self.stack = stack + self.app = None + + def start(self): + self.log.info('sample app manager start......') + self.app = AppManager(self.stack, self.conf, self, self.log) + self.app.start() + + def stop(self): + self.log.info('sample app manager stop......') + if not self.app: + return + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + url = 'http://%s:%d/shutdown'\ + % (self.conf.app_manager.ip, + self.conf.app_manager.port) + requests.post(url, data='', headers=headers) + + +class AppManager(Thread): + + def __init__(self, stack, conf, app_manager, log): + Thread.__init__(self) + self.stack = stack + self.conf = conf + self.port = self.conf.app_manager.port + self.app_manager = app_manager + self.log = log + self.intance_ids = None + self.headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + self.auth = get_identity_auth(project=self.conf.doctor_project) + self.nova = nova_client(self.conf.nova_version, + get_session(auth=self.auth)) + self.orig_number_of_instances = self.number_of_instances() + self.ha_instances = self.get_ha_instances() + self.floating_ip = None + self.active_instance_id = self.active_instance_id() + + def active_instance_id(self): + for instance in self.ha_instances: + network_interfaces = next(iter(instance.addresses.values())) + for network_interface in network_interfaces: + _type = network_interface.get('OS-EXT-IPS:type') + if _type == "floating": + if not self.floating_ip: + self.floating_ip = network_interface.get('addr') + self.log.debug('active_instance: %s %s' % + (instance.name, instance.id)) + return instance.id + raise Exception("No active instance found") + + def switch_over_ha_instance(self): + for instance in self.ha_instances: + if instance.id != self.active_instance_id: + self.log.info('Switch over to: %s %s' % (instance.name, + instance.id)) + instance.add_floating_ip(self.floating_ip) + self.active_instance_id = instance.id + break + + def get_instance_ids(self): + ret = list() + for instance in self.nova.servers.list(detailed=False): + ret.append(instance.id) + return ret + + def get_ha_instances(self): + ha_instances = list() + for instance in self.nova.servers.list(detailed=True): + if "doctor_ha_app_" in instance.name: + ha_instances.append(instance) + self.log.debug('ha_instances: %s' % instance.name) + return ha_instances + + def _alarm_data_decoder(self, data): + if "[" in data or "{" in data: + # string to list or dict removing unicode + data = yaml.load(data.replace("u'", "'")) + return data + + def _alarm_traits_decoder(self, data): + return ({str(t[0]): self._alarm_data_decoder(str(t[2])) + for t in data['reason_data']['event']['traits']}) + + def get_session_instance_ids(self, url, session_id): + data = {'session_id': session_id} + ret = requests.get(url, data=json.dumps(data), headers=self.headers) + if ret.status_code != 200: + raise Exception(ret.text) + self.log.info('get_instance_ids %s' % ret.json()) + return ret.json()['instance_ids'] + + def scale_instances(self, number_of_instances): + number_of_instances_before = self.number_of_instances() + + parameters = self.stack.parameters + parameters['nonha_intances'] += number_of_instances + self.stack.update(self.stack.stack_name, + self.stack.stack_id, + self.stack.template, + parameters=parameters, + files=self.stack.files) + + number_of_instances_after = self.number_of_instances() + if (number_of_instances_before + number_of_instances != + number_of_instances_after): + self.log.error('scale_instances with: %d from: %d ends up to: %d' + % (number_of_instances, number_of_instances_before, + number_of_instances_after)) + raise Exception('scale_instances failed') + + self.log.info('scaled insances from %d to %d' % + (number_of_instances_before, + number_of_instances_after)) + + def number_of_instances(self): + return len(self.nova.servers.list(detailed=False)) + + def run(self): + app = Flask('app_manager') + + @app.route('/maintenance', methods=['POST']) + def maintenance_alarm(): + data = json.loads(request.data.decode('utf8')) + try: + payload = self._alarm_traits_decoder(data) + except: + payload = ({t[0]: t[2] for t in + data['reason_data']['event']['traits']}) + self.log.error('cannot parse alarm data: %s' % payload) + raise Exception('sample app manager cannot parse alarm.' + 'Possibly trait data over 256 char') + + self.log.info('sample app manager received data = %s' % payload) + + state = payload['state'] + reply_state = None + reply = dict() + + self.log.info('sample app manager state: %s' % state) + + if state == 'MAINTENANCE': + instance_ids = (self.get_session_instance_ids( + payload['instance_ids'], + payload['session_id'])) + reply['instance_ids'] = instance_ids + reply_state = 'ACK_MAINTENANCE' + + elif state == 'DOWN_SCALE': + # scale down 2 isntances that is VCPUS equaling to single + # compute node + self.scale_instances(-2) + reply['instance_ids'] = self.get_instance_ids() + reply_state = 'ACK_DOWN_SCALE' + + elif state == 'MAINTENANCE_COMPLETE': + # possibly need to upscale + number_of_instances = self.number_of_instances() + if self.orig_number_of_instances > number_of_instances: + scale_instances = (self.orig_number_of_instances - + number_of_instances) + self.scale_instances(scale_instances) + reply_state = 'ACK_MAINTENANCE_COMPLETE' + + elif state == 'PREPARE_MAINTENANCE': + if "MIGRATE" not in payload['allowed_actions']: + raise Exception('MIGRATE not supported') + + instance_ids = (self.get_session_instance_ids( + payload['instance_ids'], + payload['session_id'])) + self.log.info('sample app manager got instances: %s' % + instance_ids) + instance_actions = dict() + for instance_id in instance_ids: + instance_actions[instance_id] = "MIGRATE" + if instance_id == self.active_instance_id: + self.switch_over_ha_instance() + reply['instance_actions'] = instance_actions + reply_state = 'ACK_PREPARE_MAINTENANCE' + + elif state == 'PLANNED_MAINTENANCE': + if "MIGRATE" not in payload['allowed_actions']: + raise Exception('MIGRATE not supported') + + instance_ids = (self.get_session_instance_ids( + payload['instance_ids'], + payload['session_id'])) + self.log.info('sample app manager got instances: %s' % + instance_ids) + instance_actions = dict() + for instance_id in instance_ids: + instance_actions[instance_id] = "MIGRATE" + if instance_id == self.active_instance_id: + self.switch_over_ha_instance() + reply['instance_actions'] = instance_actions + reply_state = 'ACK_PLANNED_MAINTENANCE' + + elif state == 'INSTANCE_ACTION_DONE': + self.log.info('%s' % payload['instance_ids']) + + else: + raise Exception('sample app manager received event with' + ' unknown state %s' % state) + + if reply_state: + reply['session_id'] = payload['session_id'] + reply['state'] = reply_state + url = payload['reply_url'] + self.log.info('sample app manager reply: %s' % reply) + requests.put(url, data=json.dumps(reply), headers=self.headers) + + return 'OK' + + @app.route('/shutdown', methods=['POST']) + def shutdown(): + self.log.info('shutdown app manager server at %s' % time.time()) + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + return 'app manager shutting down...' + + app.run(host="0.0.0.0", port=self.port) diff --git a/doctor_tests/config.py b/doctor_tests/config.py index dc05c0d8..cea1f0c9 100644 --- a/doctor_tests/config.py +++ b/doctor_tests/config.py @@ -11,6 +11,8 @@ import itertools from oslo_config import cfg
from doctor_tests import alarm
+from doctor_tests import admin_tool
+from doctor_tests import app_manager
from doctor_tests import consumer
from doctor_tests import image
from doctor_tests import instance
@@ -30,6 +32,8 @@ def list_opts(): ('monitor', monitor.OPTS),
('inspector', inspector.OPTS),
('consumer', consumer.OPTS),
+ ('admin_tool', admin_tool.OPTS),
+ ('app_manager', app_manager.OPTS),
('DEFAULT', itertools.chain(
os_clients.OPTS,
image.OPTS,
diff --git a/doctor_tests/inspector/sample.py b/doctor_tests/inspector/sample.py index 7742373d..a55a12b7 100644 --- a/doctor_tests/inspector/sample.py +++ b/doctor_tests/inspector/sample.py @@ -13,6 +13,7 @@ import json import time from threading import Thread import requests +import yaml from doctor_tests.common import utils from doctor_tests.identity_auth import get_identity_auth @@ -105,6 +106,39 @@ class SampleInspector(BaseInspector): if self.conf.inspector.update_neutron_port_dp_status: thr3.join() + def _alarm_data_decoder(self, data): + if "[" in data or "{" in data: + # string to list or dict removing unicode + data = yaml.load(data.replace("u'", "'")) + return data + + def _alarm_traits_decoder(self, data): + return ({str(t[0]): self._alarm_data_decoder(str(t[2])) + for t in data['reason_data']['event']['traits']}) + + def maintenance(self, data): + try: + payload = self._alarm_traits_decoder(data) + except: + payload = ({t[0]: t[2] for t in + data['reason_data']['event']['traits']}) + self.log.error('cannot parse alarm data: %s' % payload) + raise Exception('sample inspector cannot parse alarm.' + 'Possibly trait data over 256 char') + self.log.info('sample inspector received data = %s' % payload) + + state = payload['state'] + host = payload['host'] + + if state == 'IN_MAINTENANCE': + self.log.info("sample inspector: disable %s automatic fault " + "management" % host) + elif state == 'MAINTENANCE_COMPLETE': + self.log.info("sample inspector: enable %s automatic fault " + "management" % host) + else: + raise("sample inspector couldn't handle state: %s" % state) + @utils.run_async def _disable_compute_host(self, hostname): self.nova.services.force_down(hostname, 'nova-compute', True) @@ -173,6 +207,11 @@ class InspectorApp(Thread): self.inspector.handle_events(events) return "OK" + @app.route('/maintenance', methods=['POST']) + def maintenance(): + self.inspector.maintenance(request.json) + return "OK" + @app.route('/events/shutdown', methods=['POST']) def shutdown(): self.log.info('shutdown inspector app server at %s' % time.time()) diff --git a/doctor_tests/installer/apex.py b/doctor_tests/installer/apex.py index 121767fc..bfa72d32 100644 --- a/doctor_tests/installer/apex.py +++ b/doctor_tests/installer/apex.py @@ -34,7 +34,6 @@ class ApexInstaller(BaseInstaller): def setup(self): self.log.info('Setup Apex installer start......') - self.key_file = self.get_ssh_key_from_installer() self._get_and_set_ips() self.create_flavor() diff --git a/doctor_tests/main.py b/doctor_tests/main.py index 2a8abda7..438d8324 100644 --- a/doctor_tests/main.py +++ b/doctor_tests/main.py @@ -100,11 +100,20 @@ class DoctorTest(object): return try: LOG.info('doctor maintenance test starting.......') - - maintenance = Maintenance(self.conf, LOG) + trasport_url = self.installer.get_transport_url() + maintenance = Maintenance(trasport_url, self.conf, LOG) maintenance.setup_maintenance(self.user) - # TODO (tojuvone) actual test + # wait for aodh alarms are updated in caches for event evaluator, + # sleep time should be larger than event_alarm_cache_ttl + # (default 60) + LOG.info('wait aodh for 120s.......') + time.sleep(120) + + session_id = maintenance.start_maintenance() + maintenance.wait_maintenance_complete(session_id) + + LOG.info('doctor maintenance complete.......') except Exception as e: LOG.error('doctor maintenance test failed, Exception=%s' % e) diff --git a/doctor_tests/scenario/maintenance.py b/doctor_tests/scenario/maintenance.py index bb0e943b..54244d79 100644 --- a/doctor_tests/scenario/maintenance.py +++ b/doctor_tests/scenario/maintenance.py @@ -6,9 +6,17 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import datetime +import json +import requests +import time + +from doctor_tests.admin_tool import get_admin_tool +from doctor_tests.app_manager import get_app_manager from doctor_tests.common.utils import get_doctor_test_root_dir from doctor_tests.identity_auth import get_identity_auth from doctor_tests.identity_auth import get_session +from doctor_tests.inspector import get_inspector from doctor_tests.os_clients import keystone_client from doctor_tests.os_clients import neutron_client from doctor_tests.os_clients import nova_client @@ -17,7 +25,7 @@ from doctor_tests.stack import Stack class Maintenance(object): - def __init__(self, conf, log): + def __init__(self, trasport_url, conf, log): self.conf = conf self.log = log self.keystone = keystone_client( @@ -26,6 +34,9 @@ class Maintenance(object): auth = get_identity_auth(project=self.conf.doctor_project) self.neutron = neutron_client(get_session(auth=auth)) self.stack = Stack(self.conf, self.log) + self.admin_tool = get_admin_tool(trasport_url, self.conf, self.log) + self.app_manager = get_app_manager(self.stack, self.conf, self.log) + self.inspector = get_inspector(self.conf, self.log) def get_external_network(self): ext_net = None @@ -35,7 +46,7 @@ class Maintenance(object): ext_net = network['name'] break if ext_net is None: - raise Exception("externl network not defined") + raise Exception("external network not defined") return ext_net def setup_maintenance(self, user): @@ -43,14 +54,15 @@ class Maintenance(object): # need to be free before test hvisors = self.nova.hypervisors.list(detailed=True) prev_vcpus = 0 - prev_hostname = "" + prev_hostname = '' self.log.info('checking hypervisors.......') for hvisor in hvisors: - vcpus = hvisor.__getattr__("vcpus") - vcpus_used = hvisor.__getattr__("vcpus_used") - hostname = hvisor.__getattr__("hypervisor_hostname") + vcpus = hvisor.__getattr__('vcpus') + vcpus_used = hvisor.__getattr__('vcpus_used') + hostname = hvisor.__getattr__('hypervisor_hostname') if vcpus < 2: - raise Exception('not enough vcpus on %s' % hostname) + raise Exception('not enough vcpus (%d) on %s' % + (vcpus, hostname)) if vcpus_used > 0: raise Exception('%d vcpus used on %s' % (vcpus_used, hostname)) @@ -98,6 +110,83 @@ class Maintenance(object): parameters=parameters, files=files) + self.admin_tool.start() + self.app_manager.start() + self.inspector.start() + + def start_maintenance(self): + self.log.info('start maintenance.......') + hvisors = self.nova.hypervisors.list(detailed=True) + maintenance_hosts = list() + for hvisor in hvisors: + hostname = hvisor.__getattr__('hypervisor_hostname') + maintenance_hosts.append(hostname) + + url = 'http://0.0.0.0:%s/maintenance' % self.conf.admin_tool.port + # let's start maintenance 20sec from now, so projects will have + # time to ACK to it before that + maintenance_at = (datetime.datetime.utcnow() + + datetime.timedelta(seconds=20) + ).strftime('%Y-%m-%d %H:%M:%S') + data = {'hosts': maintenance_hosts, + 'state': 'MAINTENANCE', + 'maintenance_at': maintenance_at, + 'metadata': {'openstack_version': 'Pike'}} + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + + ret = requests.post(url, data=json.dumps(data), headers=headers) + if ret.status_code != 200: + raise Exception(ret.text) + return ret.json()['session_id'] + + def remove_maintenance_session(self, session_id): + self.log.info('remove maintenance session %s.......' % session_id) + + url = 'http://0.0.0.0:%s/maintenance' % self.conf.admin_tool.port + + data = {'state': 'REMOVE_MAINTENANCE_SESSION', + 'session_id': session_id} + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + + ret = requests.post(url, data=json.dumps(data), headers=headers) + if ret.status_code != 200: + raise Exception(ret.text) + + def get_maintenance_state(self, session_id): + url = 'http://0.0.0.0:%s/maintenance' % self.conf.admin_tool.port + data = {'session_id': session_id} + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json'} + ret = requests.get(url, data=json.dumps(data), headers=headers) + if ret.status_code != 200: + raise Exception(ret.text) + return ret.json()['state'] + + def wait_maintenance_complete(self, session_id): + retries = 60 + state = None + time.sleep(600) + while state != 'MAINTENANCE_COMPLETE' and retries > 0: + time.sleep(10) + state = self.get_maintenance_state(session_id) + retries = retries - 1 + if retries == 0 and state != 'MAINTENANCE_COMPLETE': + raise Exception('maintenance %s not completed within 20min, status' + ' %s' % (session_id, state)) + elif state == 'MAINTENANCE_COMPLETE': + self.log.info('maintenance %s %s' % (session_id, state)) + self.remove_maintenance_session(session_id) + elif state == 'MAINTENANCE_FAILED': + raise Exception('maintenance %s failed' % session_id) + def cleanup_maintenance(self): + self.admin_tool.stop() + self.app_manager.stop() + self.inspector.stop() self.log.info('stack delete start.......') self.stack.delete() |