summaryrefslogtreecommitdiffstats
path: root/tests/inspector
diff options
context:
space:
mode:
Diffstat (limited to 'tests/inspector')
-rw-r--r--tests/inspector/__init__.py5
-rw-r--r--tests/inspector/congress.py94
-rw-r--r--tests/inspector/sample.py70
3 files changed, 142 insertions, 27 deletions
diff --git a/tests/inspector/__init__.py b/tests/inspector/__init__.py
index 35bdb5b9..afba4800 100644
--- a/tests/inspector/__init__.py
+++ b/tests/inspector/__init__.py
@@ -11,6 +11,7 @@ import os
from oslo_config import cfg
from oslo_utils import importutils
+
OPTS = [
cfg.StrOpt('type',
default=os.environ.get('INSPECTOR_TYPE', 'sample'),
@@ -19,7 +20,7 @@ OPTS = [
required=True),
cfg.StrOpt('ip',
default='127.0.0.1',
- help='the ip of default inspector',
+ help='the host ip of inspector',
required=False),
cfg.StrOpt('port',
default='12345',
@@ -30,8 +31,10 @@ OPTS = [
_inspector_name_class_mapping = {
'sample': 'inspector.sample.SampleInspector',
+ 'congress': 'inspector.congress.CongressInspector',
}
+
def get_inspector(conf, log):
inspector_class = _inspector_name_class_mapping[conf.inspector.type]
return importutils.import_object(inspector_class, conf, log)
diff --git a/tests/inspector/congress.py b/tests/inspector/congress.py
new file mode 100644
index 00000000..ae295852
--- /dev/null
+++ b/tests/inspector/congress.py
@@ -0,0 +1,94 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corporation and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+from identity_auth import get_identity_auth
+from identity_auth import get_session
+from os_clients import congress_client
+
+from inspector.base import BaseInspector
+
+
+class CongressInspector(BaseInspector):
+ nova_api_min_version = '2.11'
+ doctor_driver = 'doctor'
+ doctor_datasource = 'doctor'
+ policy = 'classification'
+ rules = {
+ 'host_down':
+ 'host_down(host) :- doctor:events(hostname=host, type="compute.host.down", status="down")',
+ 'active_instance_in_host':
+ 'active_instance_in_host(vmid, host) :- nova:servers(id=vmid, host_name=host, status="ACTIVE")',
+ 'host_force_down':
+ 'execute[nova:services.force_down(host, "nova-compute", "True")] :- host_down(host)',
+ 'error_vm_states':
+ 'execute[nova:servers.reset_state(vmid, "error")] :- host_down(host), active_instance_in_host(vmid, host)'
+ }
+
+ def __init__(self, conf, log):
+ super(CongressInspector, self).__init__(conf, log)
+ self.auth = get_identity_auth()
+ self.congress = congress_client(get_session(auth=self.auth))
+ self._init_driver_and_ds()
+ self.inspector_url = self.get_inspector_url()
+
+ def _init_driver_and_ds(self):
+ datasources = \
+ {ds['name']: ds for ds in self.congress.list_datasources()['results']}
+
+ # check nova_api version
+ nova_api_version = datasources['nova']['config'].get('api_version')
+ if nova_api_version and nova_api_version < self.nova_api_min_version:
+ raise Exception('Congress Nova datasource API version < nova_api_min_version(%s)'
+ % self.nova_api_min_version)
+
+ # create doctor datasource if it's not exist
+ if self.doctor_datasource not in datasources:
+ self.congress.create_datasource(
+ body={'driver': self.doctor_driver,
+ 'name': self.doctor_datasource})
+
+ # check whether doctor driver exist
+ drivers = \
+ {driver['id']: driver for driver in self.congress.list_drivers()['results']}
+ if self.doctor_driver not in drivers:
+ raise Exception('Do not support doctor driver in congress')
+
+ self.policy_rules = \
+ {rule['name']: rule for rule in
+ self.congress.list_policy_rules(self.policy)['results']}
+
+ def get_inspector_url(self):
+ ds = self.congress.list_datasources()['results']
+ doctor_ds = next((item for item in ds if item['driver'] == 'doctor'),
+ None)
+ congress_endpoint = self.congress.httpclient.get_endpoint(auth=self.auth)
+ return ('%s/v1/data-sources/%s/tables/events/rows' %
+ (congress_endpoint, doctor_ds['id']))
+
+ def start(self):
+ self.log.info('congress inspector start......')
+
+ for rule_name, rule in self.rules.items():
+ self._add_rule(rule_name, rule)
+
+ def stop(self):
+ self.log.info('congress inspector stop......')
+
+ for rule_name in self.rules.keys():
+ self._del_rule(rule_name)
+
+ def _add_rule(self, rule_name, rule):
+ if rule_name not in self.policy_rules:
+ self.congress.create_policy_rule(self.policy,
+ body={'name': rule_name,
+ 'rule': rule})
+
+ def _del_rule(self, rule_name):
+ if rule_name in self.policy_rules:
+ rule_id = self.policy_rules[rule_name]['id']
+ self.congress.delete_policy_rule(self.policy, rule_id)
diff --git a/tests/inspector/sample.py b/tests/inspector/sample.py
index db477de9..1c05cede 100644
--- a/tests/inspector/sample.py
+++ b/tests/inspector/sample.py
@@ -14,9 +14,11 @@ import time
from threading import Thread
import requests
+from common import utils
from identity_auth import get_identity_auth
from identity_auth import get_session
from os_clients import nova_client
+from os_clients import neutron_client
from inspector.base import BaseInspector
@@ -31,6 +33,10 @@ class SampleInspector(BaseInspector):
# Normally we use this client for non redundant API calls
self.nova = self.novaclients[0]
+ auth = get_identity_auth(project=self.conf.doctor_project)
+ session = get_session(auth=auth)
+ self.neutron = neutron_client(session)
+
self.servers = collections.defaultdict(list)
self.hostnames = list()
self.app = None
@@ -86,37 +92,49 @@ class SampleInspector(BaseInspector):
event_type = event['type']
if event_type == self.event_type:
self.hostnames.append(hostname)
- self.disable_compute_host(hostname)
-
- def disable_compute_host(self, hostname):
- threads = []
- if len(self.servers[hostname]) > self.NUMBER_OF_CLIENTS:
- # TODO(tojuvone): This could be enhanced in future with dynamic
- # reuse of self.novaclients when all threads in use
- self.log.error('%d servers in %s. Can handle only %d'%(
- self.servers[hostname], hostname, self.NUMBER_OF_CLIENTS))
- for nova, server in zip(self.novaclients, self.servers[hostname]):
- t = ThreadedResetState(nova, "error", server, self.log)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
+ thr1 = self._disable_compute_host(hostname)
+ thr2 = self._vms_reset_state('error', hostname)
+ thr3 = self._set_ports_data_plane_status('DOWN', hostname)
+ thr1.join()
+ thr2.join()
+ thr3.join()
+
+ @utils.run_async
+ def _disable_compute_host(self, hostname):
self.nova.services.force_down(hostname, 'nova-compute', True)
self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time()))
+ @utils.run_async
+ def _vms_reset_state(self, state, hostname):
-class ThreadedResetState(Thread):
+ @utils.run_async
+ def _vm_reset_state(nova, server, state):
+ nova.servers.reset_state(server, state)
+ self.log.info('doctor mark vm(%s) error at %s' % (server, time.time()))
- def __init__(self, nova, state, server, log):
- Thread.__init__(self)
- self.nova = nova
- self.state = state
- self.server = server
- self.log = log
+ thrs = []
+ for nova, server in zip(self.novaclients, self.servers[hostname]):
+ t = _vm_reset_state(nova, server, state)
+ thrs.append(t)
+ for t in thrs:
+ t.join()
- def run(self):
- self.nova.servers.reset_state(self.server, self.state)
- self.log.info('doctor mark vm(%s) error at %s' % (self.server, time.time()))
+ @utils.run_async
+ def _set_ports_data_plane_status(self, status, hostname):
+ body = {'data_plane_status': status}
+
+ @utils.run_async
+ def _set_port_data_plane_status(port_id):
+ self.neutron.update_port(port_id, body)
+ self.log.info('doctor set data plane status %s on port %s' % (status, port_id))
+
+ thrs = []
+ params = {'binding:host_id': hostname}
+ for port_id in self.neutron.list_ports(**params):
+ t = _set_port_data_plane_status(port_id)
+ thrs.append(t)
+ for t in thrs:
+ t.join()
class InspectorApp(Thread):
@@ -135,7 +153,7 @@ class InspectorApp(Thread):
self.log.info('event posted in sample inspector at %s' % time.time())
self.log.info('sample inspector = %s' % self.inspector)
self.log.info('sample inspector received data = %s' % request.data)
- events = json.loads(request.data)
+ events = json.loads(request.data.decode('utf8'))
self.inspector.handle_events(events)
return "OK"