From f2d2dcc87e67ed1ebca13aa8ed4567b8713ce5b0 Mon Sep 17 00:00:00 2001 From: Tomi Juvonen Date: Fri, 5 Oct 2018 11:48:25 +0300 Subject: Support Fenix and sample implementation accordingly Fenix has now same capability as our sample implementation. We can now support Fenix if manually installed on controllers. Sample implementation should be closer to Fenix as Fenix is the place to define the generic interfaces at the end. JIRA: DOCTOR-131 Change-Id: Ied58b8f469dbcc4bb5caa787e62c1831a211ecd6 Signed-off-by: Tomi Juvonen --- doctor_tests/admin_tool/__init__.py | 8 +- doctor_tests/admin_tool/sample.py | 185 +++++++++++++++++++----------------- 2 files changed, 103 insertions(+), 90 deletions(-) (limited to 'doctor_tests/admin_tool') diff --git a/doctor_tests/admin_tool/__init__.py b/doctor_tests/admin_tool/__init__.py index e8b12817..3417a334 100644 --- a/doctor_tests/admin_tool/__init__.py +++ b/doctor_tests/admin_tool/__init__.py @@ -8,16 +8,16 @@ ############################################################################## from oslo_config import cfg from oslo_utils import importutils - +import os OPTS = [ cfg.StrOpt('type', - default='sample', - choices=['sample'], + default=os.environ.get('ADMIN_TOOL_TYPE', 'sample'), + choices=['sample', 'fenix'], help='the component of doctor admin_tool', required=True), cfg.StrOpt('ip', - default='127.0.0.1', + default='0.0.0.0', help='the ip of admin_tool', required=True), cfg.IntOpt('port', diff --git a/doctor_tests/admin_tool/sample.py b/doctor_tests/admin_tool/sample.py index 892a4c83..a71f43a1 100644 --- a/doctor_tests/admin_tool/sample.py +++ b/doctor_tests/admin_tool/sample.py @@ -59,7 +59,7 @@ class AdminMain(Thread): self.parent = parent self.log = log self.conf = conf - self.url = 'http://0.0.0.0:%s' % conf.admin_tool.port + self.url = 'http://%s:%s' % (conf.admin_tool.ip, conf.admin_tool.port) self.projects_state = dict() # current state for each project self.proj_server_actions = dict() # actions for each project server self.projects_servers = dict() # servers processed in current state @@ -86,6 +86,7 @@ class AdminMain(Thread): driver='messaging', topics=['notifications']) self.notif_admin = self.notif_admin.prepare(publisher_id='admin_tool') + self.stopped = False self.log.info('Admin tool session %s initialized' % self.session_id) def cleanup(self): @@ -116,14 +117,15 @@ class AdminMain(Thread): if self._projects_not_in_wanted_states(wanted_states): self.log.error('Admin tool session %s: projects in invalid states ' '%s' % (self.session_id, self.projects_state)) - raise Exception('Admin tool session %s: not all projects in states' - ' %s' % (self.session_id, wanted_states)) + return False else: self.log.info('all projects replied') + return True def _project_notify(self, project_id, instance_ids, allowed_actions, actions_at, state, metadata): - reply_url = '%s/%s/maintenance' % (self.url, project_id) + reply_url = '%s/maintenance/%s/%s' % (self.url, self.session_id, + project_id) payload = dict(project_id=project_id, instance_ids=instance_ids, @@ -148,11 +150,12 @@ class AdminMain(Thread): self.notif_admin.info({'some': 'context'}, 'maintenance.host', payload) - def down_scale(self): + def in_scale(self): for project in self.projects_servers: - self.log.info('DOWN_SCALE to project %s' % project) + self.log.info('SCALE_IN to project %s' % project) self.log.debug('instance_ids %s' % self.projects_servers[project]) - instance_ids = '%s/%s/maintenance' % (self.url, project) + instance_ids = '%s/maintenance/%s/%s' % (self.url, self.session_id, + project) allowed_actions = [] wait_seconds = 120 actions_at = (datetime.datetime.utcnow() + @@ -163,18 +166,20 @@ class AdminMain(Thread): self._project_notify(project, instance_ids, allowed_actions, actions_at, state, metadata) - allowed_states = ['ACK_DOWN_SCALE', 'NACK_DOWN_SCALE'] - self.wait_projects_state(allowed_states, wait_seconds) - if self.projects_not_in_state('ACK_DOWN_SCALE'): - raise Exception('Admin tool session %s: all states not ' - 'ACK_DOWN_SCALE %s' % - (self.session_id, self.projects_state)) + allowed_states = ['ACK_SCALE_IN', 'NACK_SCALE_IN'] + if not self.wait_projects_state(allowed_states, wait_seconds): + self.state = 'MAINTENANCE_FAILED' + if self.projects_not_in_state('ACK_SCALE_IN'): + self.log.error('%s: all states not ACK_SCALE_IN' % + self.session_id) + self.state = 'MAINTENANCE_FAILED' def maintenance(self): for project in self.projects_servers: self.log.info('\nMAINTENANCE to project %s\n' % project) self.log.debug('instance_ids %s' % self.projects_servers[project]) - instance_ids = '%s/%s/maintenance' % (self.url, project) + instance_ids = '%s/maintenance/%s/%s' % (self.url, self.session_id, + project) allowed_actions = [] actions_at = self.maintenance_at state = self.state @@ -190,16 +195,18 @@ class AdminMain(Thread): allowed_actions, actions_at, state, metadata) allowed_states = ['ACK_MAINTENANCE', 'NACK_MAINTENANCE'] - self.wait_projects_state(allowed_states, wait_seconds) + if not self.wait_projects_state(allowed_states, wait_seconds): + self.state = 'MAINTENANCE_FAILED' if self.projects_not_in_state('ACK_MAINTENANCE'): - raise Exception('Admin tool session %s: all states not ' - 'ACK_MAINTENANCE %s' % - (self.session_id, self.projects_state)) + self.log.error('%s: all states not ACK_MAINTENANCE' % + self.session_id) + self.state = 'MAINTENANCE_FAILED' def maintenance_complete(self): for project in self.projects_servers: self.log.info('MAINTENANCE_COMPLETE to project %s' % project) - instance_ids = '%s/%s/maintenance' % (self.url, project) + instance_ids = '%s/maintenance/%s/%s' % (self.url, self.session_id, + project) allowed_actions = [] wait_seconds = 120 actions_at = (datetime.datetime.utcnow() + @@ -212,13 +219,14 @@ class AdminMain(Thread): metadata) allowed_states = ['ACK_MAINTENANCE_COMPLETE', 'NACK_MAINTENANCE_COMPLETE'] - self.wait_projects_state(allowed_states, wait_seconds) + if not self.wait_projects_state(allowed_states, wait_seconds): + self.state = 'MAINTENANCE_FAILED' if self.projects_not_in_state('ACK_MAINTENANCE_COMPLETE'): - raise Exception('Admin tool session %s: all states not ' - 'ACK_MAINTENANCE_COMPLETE %s' % - (self.session_id, self.projects_state)) + self.log.error('%s: all states not ACK_MAINTENANCE_COMPLETE' % + self.session_id) + self.state = 'MAINTENANCE_FAILED' - def need_down_scale(self, host_servers): + def need_in_scale(self, host_servers): room_for_instances = 0 for host in host_servers: instances = 0 @@ -267,7 +275,8 @@ class AdminMain(Thread): self.projects_servers[project] = projects_servers[project].copy() self.log.info('%s to project %s' % (state, project)) self.project_servers_log_info(project, projects_servers) - instance_ids = '%s/%s/maintenance' % (self.url, project) + instance_ids = '%s/maintenance/%s/%s' % (self.url, self.session_id, + project) allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION'] wait_seconds = 120 actions_at = (datetime.datetime.utcnow() + @@ -278,11 +287,14 @@ class AdminMain(Thread): allowed_actions, actions_at, state, metadata) allowed_states = [state_ack, state_nack] - self.wait_projects_state(allowed_states, wait_seconds) - if self.projects_not_in_state(state_ack): - raise Exception('Admin tool session %s: all states not %s %s' % - (self.session_id, state_ack, self.projects_state)) - self.actions_to_have_empty_host(host) + if not self.wait_projects_state(allowed_states, wait_seconds): + self.state = 'MAINTENANCE_FAILED' + elif self.projects_not_in_state(state_ack): + self.log.error('%s: all states not %s' % + (self.session_id, state_ack)) + self.state = 'MAINTENANCE_FAILED' + else: + self.actions_to_have_empty_host(host) def notify_action_done(self, project, instance_id): instance_ids = instance_id @@ -463,7 +475,8 @@ class AdminMain(Thread): time.sleep(5) def run(self): - while self.state != 'MAINTENANCE_COMPLETE': + while (self.state not in ['MAINTENANCE_DONE', 'MAINTENANCE_FAILED'] and + not self.stopped): self.log.info('--==session %s: processing state %s==--' % (self.session_id, self.state)) if self.state == 'MAINTENANCE': @@ -474,7 +487,8 @@ class AdminMain(Thread): raise Exception('all projects do not listen maintenance ' 'alarm') self.maintenance() - + if self.state == 'MAINTENANCE_FAILED': + continue maint_at = self.str_to_datetime(self.maintenance_at) if maint_at > datetime.datetime.utcnow(): time_now = (datetime.datetime.utcnow().strftime( @@ -492,14 +506,14 @@ class AdminMain(Thread): # True -> PLANNED_MAINTENANCE # False -> check if we can migrate VMs to get empty host # True -> PREPARE_MAINTENANCE - # False -> DOWN_SCALE + # False -> SCALE_IN maintenance_empty_hosts = ([h for h in self.hosts if h not in host_servers]) if len(maintenance_empty_hosts) == 0: - if self.need_down_scale(host_servers): + if self.need_in_scale(host_servers): self.log.info('Need to down scale') - self.state = 'DOWN_SCALE' + self.state = 'SCALE_IN' else: self.log.info('Free capacity, but need empty host') self.state = 'PREPARE_MAINTENANCE' @@ -508,14 +522,17 @@ class AdminMain(Thread): self.state = 'PLANNED_MAINTENANCE' self.log.info('--==State change from MAINTENANCE to %s==--' % self.state) - elif self.state == 'DOWN_SCALE': + elif self.state == 'SCALE_IN': # Test case is hard coded to have all compute capacity used # We need to down scale to have one empty compute host - self.down_scale() + self.update_server_info() + self.in_scale() + if self.state == 'MAINTENANCE_FAILED': + continue self.state = 'PREPARE_MAINTENANCE' host_servers = self.update_server_info() self.servers_log_info(host_servers) - self.log.info('--==State change from DOWN_SCALE to' + self.log.info('--==State change from SCALE_IN to' ' %s==--' % self.state) elif self.state == 'PREPARE_MAINTENANCE': @@ -527,7 +544,7 @@ class AdminMain(Thread): host_servers]) if len(maintenance_empty_hosts) == 0: self.log.info('no empty hosts for maintenance') - if self.need_down_scale(host_servers): + if self.need_in_scale(host_servers): raise Exception('Admin tool session %s: Not enough ' 'free capacity for maintenance' % self.session_id) @@ -535,6 +552,8 @@ class AdminMain(Thread): if host: self.make_compute_host_empty(host, host_servers[host], 'PREPARE_MAINTENANCE') + if self.state == 'MAINTENANCE_FAILED': + continue else: # We do not currently support another down scale if # first was not enough @@ -566,6 +585,7 @@ class AdminMain(Thread): maintenance_empty_hosts.append(host) self.log.info('--==Start to maintain empty hosts==--\n%s' % maintenance_empty_hosts) + self.update_server_info() for host in maintenance_empty_hosts: # scheduler has problems, let's see if just down scaled # host is really empty @@ -586,6 +606,8 @@ class AdminMain(Thread): self.log.info('PLANNED_MAINTENANCE host %s' % host) self.make_compute_host_empty(host, host_servers[host], 'PLANNED_MAINTENANCE') + if self.state == 'MAINTENANCE_FAILED': + continue self.log.info('IN_MAINTENANCE host %s' % host) self._admin_notify(admin_project, host, 'IN_MAINTENANCE', self.session_id) @@ -603,14 +625,16 @@ class AdminMain(Thread): self.log.info('Projects still need to up scale back to full ' 'capcity') self.maintenance_complete() + if self.state == 'MAINTENANCE_FAILED': + continue host_servers = self.update_server_info() self.servers_log_info(host_servers) - self.state = 'MAINTENANCE_COMPLETE' + self.state = 'MAINTENANCE_DONE' else: raise Exception('Admin tool session %s: session in invalid ' 'state %s' % (self.session_id, self.state)) - self.log.info('--==Maintenance session %s: ' - 'MAINTENANCE SESSION COMPLETE==--' % self.session_id) + self.log.info('--==Maintenance session %s: %s==--' % + (self.session_id, self.state)) def project_input(self, project_id, data): self.log.debug('Admin tool session %s: project %s input' % @@ -637,7 +661,6 @@ class AdminTool(Thread): self.admin_tool = admin_tool self.log = log self.conf = conf - self.port = self.conf.admin_tool.port self.maint_sessions = {} self.projects = {} self.maintenance_hosts = [] @@ -650,63 +673,55 @@ class AdminTool(Thread): def admin_maintenance_api_post(): data = json.loads(request.data.decode('utf8')) self.log.info('maintenance message: %s' % data) - if 'session_id' in data: - if data['state'] == 'REMOVE_MAINTENANCE_SESSION': - session_id = data['session_id'] - self.log.info('remove session %s' - % session_id) - self.maint_sessions[session_id].cleanup() - self.maint_sessions[session_id].stop() - del self.maint_sessions[session_id] - else: - session_id = str(generate_uuid()) - self.log.info('creating session: %s' % session_id) - self.maint_sessions[session_id] = ( - AdminMain(self.trasport_url, - session_id, - data, - self, - self.conf, - self.log)) - self.maint_sessions[session_id].start() + session_id = str(generate_uuid()) + self.log.info('creating session: %s' % session_id) + self.maint_sessions[session_id] = ( + AdminMain(self.trasport_url, + session_id, + data, + self, + self.conf, + self.log)) + self.maint_sessions[session_id].start() reply = json.dumps({'session_id': session_id, 'state': 'ACK_%s' % data['state']}) self.log.debug('reply: %s' % reply) return reply, 200, None - @app.route('/maintenance', methods=['GET']) - def admin_maintenance_api_get(): - data = json.loads(request.data.decode('utf8')) - self.log.debug('Admin get maintenance: %s' % data) - session_id = data['session_id'] + @app.route('/maintenance/', methods=['GET']) + def admin_maintenance_api_get(session_id=None): + self.log.debug('Admin get maintenance') reply = json.dumps({'state': self.maint_sessions[session_id].state}) - self.log.debug('reply: %s' % reply) + self.log.info('reply: %s' % reply) return reply, 200, None - @app.route('//maintenance', methods=['PUT']) - def project_maintenance_api_put(projet_id=None): + @app.route('/maintenance//', methods=['PUT']) + def project_maintenance_api_put(session_id=None, projet_id=None): data = json.loads(request.data.decode('utf8')) self.log.debug('%s project put: %s' % (projet_id, data)) - self.project_input(projet_id, data) + self.project_input(session_id, projet_id, data) return 'OK' - @app.route('//maintenance', methods=['GET']) - def project_maintenance_api_get(projet_id=None): - data = json.loads(request.data.decode('utf8')) - self.log.debug('%s project get %s' % (projet_id, data)) - instances = self.project_get_instances(projet_id, data) + @app.route('/maintenance//', methods=['GET']) + def project_maintenance_api_get(session_id=None, projet_id=None): + self.log.debug('%s project get %s' % (projet_id, session_id)) + instances = self.project_get_instances(session_id, projet_id) reply = json.dumps({'instance_ids': instances}) self.log.debug('%s reply: %s' % (projet_id, reply)) return reply, 200, None + @app.route('/maintenance/', methods=['DELETE']) + def remove_session(session_id=None): + self.log.info('remove session %s' + % session_id) + self.maint_sessions[session_id].cleanup() + self.maint_sessions[session_id].stop() + del self.maint_sessions[session_id] + return 'OK' + @app.route('/shutdown', methods=['POST']) def shutdown(): - for session in self.maint_sessions: - self.log.info('shutdown admin tool session %s thread' % - session) - self.maint_sessions[session].cleanup() - self.maint_sessions[session].stop() self.log.info('shutdown admin_tool server at %s' % time.time()) func = request.environ.get('werkzeug.server.shutdown') if func is None: @@ -714,13 +729,11 @@ class AdminTool(Thread): func() return 'admin_tool app shutting down...' - app.run(host='0.0.0.0', port=self.port) + app.run(host=self.conf.admin_tool.ip, port=self.conf.admin_tool.port) - def project_input(self, project_id, data): - session_id = data['session_id'] + def project_input(self, session_id, project_id, data): self.maint_sessions[session_id].project_input(project_id, data) - def project_get_instances(self, project_id, data): - session_id = data['session_id'] + def project_get_instances(self, session_id, project_id): return self.maint_sessions[session_id].project_get_instances( project_id) -- cgit 1.2.3-korg