diff options
Diffstat (limited to 'tests/inspector/sample.py')
-rw-r--r-- | tests/inspector/sample.py | 70 |
1 files changed, 44 insertions, 26 deletions
diff --git a/tests/inspector/sample.py b/tests/inspector/sample.py index db477de9..1c05cede 100644 --- a/tests/inspector/sample.py +++ b/tests/inspector/sample.py @@ -14,9 +14,11 @@ import time from threading import Thread import requests +from common import utils from identity_auth import get_identity_auth from identity_auth import get_session from os_clients import nova_client +from os_clients import neutron_client from inspector.base import BaseInspector @@ -31,6 +33,10 @@ class SampleInspector(BaseInspector): # Normally we use this client for non redundant API calls self.nova = self.novaclients[0] + auth = get_identity_auth(project=self.conf.doctor_project) + session = get_session(auth=auth) + self.neutron = neutron_client(session) + self.servers = collections.defaultdict(list) self.hostnames = list() self.app = None @@ -86,37 +92,49 @@ class SampleInspector(BaseInspector): event_type = event['type'] if event_type == self.event_type: self.hostnames.append(hostname) - self.disable_compute_host(hostname) - - def disable_compute_host(self, hostname): - threads = [] - if len(self.servers[hostname]) > self.NUMBER_OF_CLIENTS: - # TODO(tojuvone): This could be enhanced in future with dynamic - # reuse of self.novaclients when all threads in use - self.log.error('%d servers in %s. Can handle only %d'%( - self.servers[hostname], hostname, self.NUMBER_OF_CLIENTS)) - for nova, server in zip(self.novaclients, self.servers[hostname]): - t = ThreadedResetState(nova, "error", server, self.log) - t.start() - threads.append(t) - for t in threads: - t.join() + thr1 = self._disable_compute_host(hostname) + thr2 = self._vms_reset_state('error', hostname) + thr3 = self._set_ports_data_plane_status('DOWN', hostname) + thr1.join() + thr2.join() + thr3.join() + + @utils.run_async + def _disable_compute_host(self, hostname): self.nova.services.force_down(hostname, 'nova-compute', True) self.log.info('doctor mark host(%s) down at %s' % (hostname, time.time())) + @utils.run_async + def _vms_reset_state(self, state, hostname): -class ThreadedResetState(Thread): + @utils.run_async + def _vm_reset_state(nova, server, state): + nova.servers.reset_state(server, state) + self.log.info('doctor mark vm(%s) error at %s' % (server, time.time())) - def __init__(self, nova, state, server, log): - Thread.__init__(self) - self.nova = nova - self.state = state - self.server = server - self.log = log + thrs = [] + for nova, server in zip(self.novaclients, self.servers[hostname]): + t = _vm_reset_state(nova, server, state) + thrs.append(t) + for t in thrs: + t.join() - def run(self): - self.nova.servers.reset_state(self.server, self.state) - self.log.info('doctor mark vm(%s) error at %s' % (self.server, time.time())) + @utils.run_async + def _set_ports_data_plane_status(self, status, hostname): + body = {'data_plane_status': status} + + @utils.run_async + def _set_port_data_plane_status(port_id): + self.neutron.update_port(port_id, body) + self.log.info('doctor set data plane status %s on port %s' % (status, port_id)) + + thrs = [] + params = {'binding:host_id': hostname} + for port_id in self.neutron.list_ports(**params): + t = _set_port_data_plane_status(port_id) + thrs.append(t) + for t in thrs: + t.join() class InspectorApp(Thread): @@ -135,7 +153,7 @@ class InspectorApp(Thread): self.log.info('event posted in sample inspector at %s' % time.time()) self.log.info('sample inspector = %s' % self.inspector) self.log.info('sample inspector received data = %s' % request.data) - events = json.loads(request.data) + events = json.loads(request.data.decode('utf8')) self.inspector.handle_events(events) return "OK" |