diff options
Diffstat (limited to 'tests/inspector')
-rw-r--r-- | tests/inspector/__init__.py | 5 | ||||
-rw-r--r-- | tests/inspector/congress.py | 94 | ||||
-rw-r--r-- | tests/inspector/sample.py | 70 |
3 files changed, 142 insertions, 27 deletions
diff --git a/tests/inspector/__init__.py b/tests/inspector/__init__.py index 35bdb5b9..afba4800 100644 --- a/tests/inspector/__init__.py +++ b/tests/inspector/__init__.py @@ -11,6 +11,7 @@ import os from oslo_config import cfg from oslo_utils import importutils + OPTS = [ cfg.StrOpt('type', default=os.environ.get('INSPECTOR_TYPE', 'sample'), @@ -19,7 +20,7 @@ OPTS = [ required=True), cfg.StrOpt('ip', default='127.0.0.1', - help='the ip of default inspector', + help='the host ip of inspector', required=False), cfg.StrOpt('port', default='12345', @@ -30,8 +31,10 @@ OPTS = [ _inspector_name_class_mapping = { 'sample': 'inspector.sample.SampleInspector', + 'congress': 'inspector.congress.CongressInspector', } + def get_inspector(conf, log): inspector_class = _inspector_name_class_mapping[conf.inspector.type] return importutils.import_object(inspector_class, conf, log) diff --git a/tests/inspector/congress.py b/tests/inspector/congress.py new file mode 100644 index 00000000..ae295852 --- /dev/null +++ b/tests/inspector/congress.py @@ -0,0 +1,94 @@ +############################################################################## +# Copyright (c) 2017 ZTE Corporation and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +from identity_auth import get_identity_auth +from identity_auth import get_session +from os_clients import congress_client + +from inspector.base import BaseInspector + + +class CongressInspector(BaseInspector): + nova_api_min_version = '2.11' + doctor_driver = 'doctor' + doctor_datasource = 'doctor' + policy = 'classification' + rules = { + 'host_down': + 'host_down(host) :- doctor:events(hostname=host, type="compute.host.down", status="down")', + 'active_instance_in_host': + 'active_instance_in_host(vmid, host) :- nova:servers(id=vmid, host_name=host, status="ACTIVE")', + 'host_force_down': + 'execute[nova:services.force_down(host, "nova-compute", "True")] :- host_down(host)', + 'error_vm_states': + 'execute[nova:servers.reset_state(vmid, "error")] :- host_down(host), active_instance_in_host(vmid, host)' + } + + def __init__(self, conf, log): + super(CongressInspector, self).__init__(conf, log) + self.auth = get_identity_auth() + self.congress = congress_client(get_session(auth=self.auth)) + self._init_driver_and_ds() + self.inspector_url = self.get_inspector_url() + + def _init_driver_and_ds(self): + datasources = \ + {ds['name']: ds for ds in self.congress.list_datasources()['results']} + + # check nova_api version + nova_api_version = datasources['nova']['config'].get('api_version') + if nova_api_version and nova_api_version < self.nova_api_min_version: + raise Exception('Congress Nova datasource API version < nova_api_min_version(%s)' + % self.nova_api_min_version) + + # create doctor datasource if it's not exist + if self.doctor_datasource not in datasources: + self.congress.create_datasource( + body={'driver': self.doctor_driver, + 'name': self.doctor_datasource}) + + # check whether doctor driver exist + drivers = \ + {driver['id']: driver for driver in self.congress.list_drivers()['results']} + if self.doctor_driver not in drivers: + raise Exception('Do not support doctor driver in congress') + + self.policy_rules = \ + {rule['name']: rule for rule in + self.congress.list_policy_rules(self.policy)['results']} + + def get_inspector_url(self): + ds = self.congress.list_datasources()['results'] + doctor_ds = next((item for item in ds if item['driver'] == 'doctor'), + None) + congress_endpoint = self.congress.httpclient.get_endpoint(auth=self.auth) + return ('%s/v1/data-sources/%s/tables/events/rows' % + (congress_endpoint, doctor_ds['id'])) + + def start(self): + self.log.info('congress inspector start......') + + for rule_name, rule in self.rules.items(): + self._add_rule(rule_name, rule) + + def stop(self): + self.log.info('congress inspector stop......') + + for rule_name in self.rules.keys(): + self._del_rule(rule_name) + + def _add_rule(self, rule_name, rule): + if rule_name not in self.policy_rules: + self.congress.create_policy_rule(self.policy, + body={'name': rule_name, + 'rule': rule}) + + def _del_rule(self, rule_name): + if rule_name in self.policy_rules: + rule_id = self.policy_rules[rule_name]['id'] + self.congress.delete_policy_rule(self.policy, rule_id) diff --git a/tests/inspector/sample.py b/tests/inspector/sample.py index db477de9..1c05cede 100644 --- a/tests/inspector/sample.py +++ b/tests/inspector/sample.py @@ -14,9 +14,11 @@ import time from threading import Thread import requests +from common import utils from identity_auth import get_identity_auth from identity_auth import get_session from os_clients import nova_client +from os_clients import neutron_client from inspector.base import BaseInspector @@ -31,6 +33,10 @@ class SampleInspector(BaseInspector): # Normally we use this client for non redundant API calls self.nova = self.novaclients[0] + auth = get_identity_auth(project=self.conf.doctor_project) + session = get_session(auth=auth) + self.neutron = neutron_client(session) + self.servers = collections.defaultdict(list) self.hostnames = list() self.app = None @@ -86,37 +92,49 @@ class SampleInspector(BaseInspector): event_type = event['type'] if event_type == self.event_type: self.hostnames.append(hostname) - self.disable_compute_host(hostname) - - def disable_compute_host(self, hostname): - threads = [] - if len(self.servers[hostname]) > self.NUMBER_OF_CLIENTS: - # TODO(tojuvone): This could be enhanced in future with dynamic - # reuse of self.novaclients when all threads in use - self.log.error('%d servers in %s. Can handle only %d'%( - self.servers[hostname], hostname, self.NUMBER_OF_CLIENTS)) - for nova, server in zip(self.novaclients, self.servers[hostname]): - t = ThreadedResetState(nova, "error", server, self.log) - t.start() - threads.append(t) - for t in threads: - t.join() + thr1 = self._disable_compute_host(hostname) + thr2 = self._vms_reset_state('error', hostname) + thr3 = self._set_ports_data_plane_status('DOWN', hostname) + thr1.join() + thr2.join() + thr3.join() + + @utils.run_async + def _disable_compute_host(self, hostname): self.nova.services.force_down(hostname, 'nova-compute', True) self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time())) + @utils.run_async + def _vms_reset_state(self, state, hostname): -class ThreadedResetState(Thread): + @utils.run_async + def _vm_reset_state(nova, server, state): + nova.servers.reset_state(server, state) + self.log.info('doctor mark vm(%s) error at %s' % (server, time.time())) - def __init__(self, nova, state, server, log): - Thread.__init__(self) - self.nova = nova - self.state = state - self.server = server - self.log = log + thrs = [] + for nova, server in zip(self.novaclients, self.servers[hostname]): + t = _vm_reset_state(nova, server, state) + thrs.append(t) + for t in thrs: + t.join() - def run(self): - self.nova.servers.reset_state(self.server, self.state) - self.log.info('doctor mark vm(%s) error at %s' % (self.server, time.time())) + @utils.run_async + def _set_ports_data_plane_status(self, status, hostname): + body = {'data_plane_status': status} + + @utils.run_async + def _set_port_data_plane_status(port_id): + self.neutron.update_port(port_id, body) + self.log.info('doctor set data plane status %s on port %s' % (status, port_id)) + + thrs = [] + params = {'binding:host_id': hostname} + for port_id in self.neutron.list_ports(**params): + t = _set_port_data_plane_status(port_id) + thrs.append(t) + for t in thrs: + t.join() class InspectorApp(Thread): @@ -135,7 +153,7 @@ class InspectorApp(Thread): self.log.info('event posted in sample inspector at %s' % time.time()) self.log.info('sample inspector = %s' % self.inspector) self.log.info('sample inspector received data = %s' % request.data) - events = json.loads(request.data) + events = json.loads(request.data.decode('utf8')) self.inspector.handle_events(events) return "OK" |