From 44d1e135eced7afe13b8772a610ae5cdae310b68 Mon Sep 17 00:00:00 2001 From: Ryota MIBU Date: Mon, 11 Sep 2017 13:45:47 +0000 Subject: fix package path and move files under doctor_tests Change-Id: Ibde6a36c43064e5fbea1a0b7a9b49349c343e42f Signed-off-by: Ryota MIBU --- doctor_tests/inspector/__init__.py | 40 +++++++++ doctor_tests/inspector/base.py | 30 +++++++ doctor_tests/inspector/congress.py | 94 +++++++++++++++++++++ doctor_tests/inspector/sample.py | 169 +++++++++++++++++++++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 doctor_tests/inspector/__init__.py create mode 100644 doctor_tests/inspector/base.py create mode 100644 doctor_tests/inspector/congress.py create mode 100644 doctor_tests/inspector/sample.py (limited to 'doctor_tests/inspector') diff --git a/doctor_tests/inspector/__init__.py b/doctor_tests/inspector/__init__.py new file mode 100644 index 00000000..3be79e57 --- /dev/null +++ b/doctor_tests/inspector/__init__.py @@ -0,0 +1,40 @@ +############################################################################# +# Copyright (c) 2017 ZTE Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import os + +from oslo_config import cfg +from oslo_utils import importutils + + +OPTS = [ + cfg.StrOpt('type', + default=os.environ.get('INSPECTOR_TYPE', 'sample'), + choices=['sample', 'congress', 'vitrage'], + help='the component of doctor inspector', + required=True), + cfg.StrOpt('ip', + default='127.0.0.1', + help='the host ip of inspector', + required=False), + cfg.StrOpt('port', + default='12345', + help='the port of default for inspector', + required=False), +] + + +_inspector_name_class_mapping = { + 'sample': 'doctor_tests.inspector.sample.SampleInspector', + 'congress': 'doctor_tests.inspector.congress.CongressInspector', +} + + +def get_inspector(conf, log): + inspector_class = _inspector_name_class_mapping[conf.inspector.type] + return importutils.import_object(inspector_class, conf, log) diff --git a/doctor_tests/inspector/base.py b/doctor_tests/inspector/base.py new file mode 100644 index 00000000..854f0695 --- /dev/null +++ b/doctor_tests/inspector/base.py @@ -0,0 +1,30 @@ +############################################################################## +# Copyright (c) 2017 ZTE Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class BaseInspector(object): + + def __init__(self, conf, log): + self.conf = conf + self.log = log + + @abc.abstractmethod + def get_inspector_url(self): + pass + + @abc.abstractmethod + def start(self): + pass + + @abc.abstractmethod + def stop(self): + pass \ No newline at end of file diff --git a/doctor_tests/inspector/congress.py b/doctor_tests/inspector/congress.py new file mode 100644 index 00000000..c89a41bd --- /dev/null +++ b/doctor_tests/inspector/congress.py @@ -0,0 +1,94 @@ +############################################################################## +# Copyright (c) 2017 ZTE Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from doctor_tests.identity_auth import get_identity_auth +from doctor_tests.identity_auth import get_session +from doctor_tests.os_clients import congress_client + +from doctor_tests.inspector.base import BaseInspector + + +class CongressInspector(BaseInspector): + nova_api_min_version = '2.11' + doctor_driver = 'doctor' + doctor_datasource = 'doctor' + policy = 'classification' + rules = { + 'host_down': + 'host_down(host) :- doctor:events(hostname=host, type="compute.host.down", status="down")', + 'active_instance_in_host': + 'active_instance_in_host(vmid, host) :- nova:servers(id=vmid, host_name=host, status="ACTIVE")', + 'host_force_down': + 'execute[nova:services.force_down(host, "nova-compute", "True")] :- host_down(host)', + 'error_vm_states': + 'execute[nova:servers.reset_state(vmid, "error")] :- host_down(host), active_instance_in_host(vmid, host)' + } + + def __init__(self, conf, log): + super(CongressInspector, self).__init__(conf, log) + self.auth = get_identity_auth() + self.congress = congress_client(get_session(auth=self.auth)) + self._init_driver_and_ds() + self.inspector_url = self.get_inspector_url() + + def _init_driver_and_ds(self): + datasources = \ + {ds['name']: ds for ds in self.congress.list_datasources()['results']} + + # check nova_api version + nova_api_version = datasources['nova']['config'].get('api_version') + if nova_api_version and nova_api_version < self.nova_api_min_version: + raise Exception('Congress Nova datasource API version < nova_api_min_version(%s)' + % self.nova_api_min_version) + + # create doctor datasource if it's not exist + if self.doctor_datasource not in datasources: + self.congress.create_datasource( + body={'driver': self.doctor_driver, + 'name': self.doctor_datasource}) + + # check whether doctor driver exist + drivers = \ + {driver['id']: driver for driver in self.congress.list_drivers()['results']} + if self.doctor_driver not in drivers: + raise Exception('Do not support doctor driver in congress') + + self.policy_rules = \ + {rule['name']: rule for rule in + self.congress.list_policy_rules(self.policy)['results']} + + def get_inspector_url(self): + ds = self.congress.list_datasources()['results'] + doctor_ds = next((item for item in ds if item['driver'] == 'doctor'), + None) + congress_endpoint = self.congress.httpclient.get_endpoint(auth=self.auth) + return ('%s/v1/data-sources/%s/tables/events/rows' % + (congress_endpoint, doctor_ds['id'])) + + def start(self): + self.log.info('congress inspector start......') + + for rule_name, rule in self.rules.items(): + self._add_rule(rule_name, rule) + + def stop(self): + self.log.info('congress inspector stop......') + + for rule_name in self.rules.keys(): + self._del_rule(rule_name) + + def _add_rule(self, rule_name, rule): + if rule_name not in self.policy_rules: + self.congress.create_policy_rule(self.policy, + body={'name': rule_name, + 'rule': rule}) + + def _del_rule(self, rule_name): + if rule_name in self.policy_rules: + rule_id = self.policy_rules[rule_name]['id'] + self.congress.delete_policy_rule(self.policy, rule_id) diff --git a/doctor_tests/inspector/sample.py b/doctor_tests/inspector/sample.py new file mode 100644 index 00000000..114e4ebd --- /dev/null +++ b/doctor_tests/inspector/sample.py @@ -0,0 +1,169 @@ +############################################################################## +# Copyright (c) 2017 ZTE Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import collections +from flask import Flask +from flask import request +import json +import time +from threading import Thread +import requests + +from doctor_tests.common import utils +from doctor_tests.identity_auth import get_identity_auth +from doctor_tests.identity_auth import get_session +from doctor_tests.os_clients import nova_client +from doctor_tests.os_clients import neutron_client +from doctor_tests.inspector.base import BaseInspector + + +class SampleInspector(BaseInspector): + event_type = 'compute.host.down' + + def __init__(self, conf, log): + super(SampleInspector, self).__init__(conf, log) + self.inspector_url = self.get_inspector_url() + self.novaclients = list() + self._init_novaclients() + # Normally we use this client for non redundant API calls + self.nova = self.novaclients[0] + + auth = get_identity_auth(project=self.conf.doctor_project) + session = get_session(auth=auth) + self.neutron = neutron_client(session) + + self.servers = collections.defaultdict(list) + self.hostnames = list() + self.app = None + + def _init_novaclients(self): + self.NUMBER_OF_CLIENTS = self.conf.instance_count + auth = get_identity_auth(project=self.conf.doctor_project) + session = get_session(auth=auth) + for i in range(self.NUMBER_OF_CLIENTS): + self.novaclients.append( + nova_client(self.conf.nova_version, session)) + + def _init_servers_list(self): + self.servers.clear() + opts = {'all_tenants': True} + servers = self.nova.servers.list(search_opts=opts) + for server in servers: + try: + host = server.__dict__.get('OS-EXT-SRV-ATTR:host') + self.servers[host].append(server) + self.log.debug('get hostname=%s from server=%s' % (host, server)) + except Exception as e: + self.log.info('can not get hostname from server=%s' % server) + + def get_inspector_url(self): + return 'http://%s:%s' % (self.conf.inspector.ip, self.conf.inspector.port) + + def start(self): + self.log.info('sample inspector start......') + self._init_servers_list() + self.app = InspectorApp(self.conf.inspector.port, self, self.log) + self.app.start() + + def stop(self): + self.log.info('sample inspector stop......') + if not self.app: + return + for hostname in self.hostnames: + self.nova.services.force_down(hostname, 'nova-compute', False) + + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + } + url = '%s%s' % (self.inspector_url, 'shutdown') \ + if self.inspector_url.endswith('/') else \ + '%s%s' % (self.inspector_url, '/shutdown') + requests.post(url, data='', headers=headers) + + def handle_events(self, events): + for event in events: + hostname = event['details']['hostname'] + event_type = event['type'] + if event_type == self.event_type: + self.hostnames.append(hostname) + thr1 = self._disable_compute_host(hostname) + thr2 = self._vms_reset_state('error', hostname) + thr3 = self._set_ports_data_plane_status('DOWN', hostname) + thr1.join() + thr2.join() + thr3.join() + + @utils.run_async + def _disable_compute_host(self, hostname): + self.nova.services.force_down(hostname, 'nova-compute', True) + self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time())) + + @utils.run_async + def _vms_reset_state(self, state, hostname): + + @utils.run_async + def _vm_reset_state(nova, server, state): + nova.servers.reset_state(server, state) + self.log.info('doctor mark vm(%s) error at %s' % (server, time.time())) + + thrs = [] + for nova, server in zip(self.novaclients, self.servers[hostname]): + t = _vm_reset_state(nova, server, state) + thrs.append(t) + for t in thrs: + t.join() + + @utils.run_async + def _set_ports_data_plane_status(self, status, hostname): + body = {'data_plane_status': status} + + @utils.run_async + def _set_port_data_plane_status(port_id): + self.neutron.update_port(port_id, body) + self.log.info('doctor set data plane status %s on port %s' % (status, port_id)) + + thrs = [] + params = {'binding:host_id': hostname} + for port_id in self.neutron.list_ports(**params): + t = _set_port_data_plane_status(port_id) + thrs.append(t) + for t in thrs: + t.join() + + +class InspectorApp(Thread): + + def __init__(self, port, inspector, log): + Thread.__init__(self) + self.port = port + self.inspector = inspector + self.log = log + + def run(self): + app = Flask('inspector') + + @app.route('/events', methods=['PUT']) + def event_posted(): + self.log.info('event posted in sample inspector at %s' % time.time()) + self.log.info('sample inspector = %s' % self.inspector) + self.log.info('sample inspector received data = %s' % request.data) + events = json.loads(request.data.decode('utf8')) + self.inspector.handle_events(events) + return "OK" + + @app.route('/shutdown', methods=['POST']) + def shutdown(): + self.log.info('shutdown inspector app server at %s' % time.time()) + func = request.environ.get('werkzeug.server.shutdown') + if func is None: + raise RuntimeError('Not running with the Werkzeug Server') + func() + return 'inspector app shutting down...' + + app.run(host="0.0.0.0", port=self.port) -- cgit 1.2.3-korg