From 163323cd8f9eec2390099ca4834827a28c46c6db Mon Sep 17 00:00:00 2001 From: Luc Provoost Date: Thu, 17 Sep 2020 17:17:36 +0200 Subject: Using python concurrent futures Different PROX instances are now started in parallel. The script is starting multiple threads. Change-Id: Ia8785a792240d4e9b5d5d98174bc4c5d7ae5657c Signed-off-by: Luc Provoost --- VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py | 57 ++-------------------- .../helper-scripts/rapid/rapid_machine.py | 24 ++++++--- VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py | 11 ++++- 3 files changed, 29 insertions(+), 63 deletions(-) (limited to 'VNFs') diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py index 6f005845..a6dd4bcb 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/prox_ctrl.py @@ -32,7 +32,6 @@ class prox_ctrl(object): self._ip = ip self._key = key self._user = user - self._children = [] self._proxsock = [] def __del__(self): @@ -50,7 +49,7 @@ class prox_ctrl(object): def connect(self): attempts = 1 - RapidLog.debug("Trying to connect to instance which was just launched \ + RapidLog.debug("Trying to connect to machine \ on %s, attempt: %d" % (self._ip, attempts)) while True: try: @@ -64,9 +63,9 @@ class prox_ctrl(object): raise Exception("Failed to connect to instance after %d \ attempts:\n%s" % (attempts, ex)) time.sleep(2) - RapidLog.debug("Trying to connect to instance which was just \ - launched on %s, attempt: %d" % (self._ip, attempts)) - RapidLog.debug("Connected to instance on %s" % self._ip) + RapidLog.debug("Trying to connect to machine \ + on %s, attempt: %d" % (self._ip, attempts)) + RapidLog.debug("Connected to machine on %s" % self._ip) def connect_socket(self): attempts = 1 @@ -93,31 +92,6 @@ class prox_ctrl(object): """Must be called before program termination.""" for sock in self._proxsock: sock.quit() - children = len(self._children) - if children == 0: - return - if children > 1: - print('Waiting for %d child processes to complete ...' % children) - for child in self._children: - ret = os.waitpid(child[0], os.WNOHANG) - if ret[0] == 0: - print("Waiting for child process '%s' to complete ..." - % child[1]) - ret = os.waitpid(child[0], 0) - rc = ret[1] - if os.WIFEXITED(rc): - if os.WEXITSTATUS(rc) == 0: - print("Child process '%s' completed successfully" - % child[1]) - else: - print("Child process '%s' returned exit status %d" % ( - child[1], os.WEXITSTATUS(rc))) - elif os.WIFSIGNALED(rc): - print("Child process '%s' exited on signal %d" % ( - child[1], os.WTERMSIG(rc))) - else: - print("Wait status for child process '%s' is 0x%04x" % ( - child[1], rc)) def run_cmd(self, command, _connect=False): """Execute command over ssh on remote system. @@ -135,29 +109,6 @@ class prox_ctrl(object): raise RuntimeError('ssh returned exit status %d:\n%s' % (ex.returncode, ex.output.strip())) - def fork_cmd(self, command, name=None): - """Execute command over ssh on remote system, in a child process. - Do not wait for remote command completion. - Return child process id. - """ - if name is None: - name = command - cmd = self._build_ssh(command) - pid = os.fork() - if (pid != 0): - # In the parent process - self._children.append((pid, name)) - return pid - # In the child process: use os._exit to terminate - try: - # Actually ignore output on success, but capture stderr on failure - subprocess.check_output(cmd, stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as ex: - raise RuntimeError("Child process '%s' failed:\n" - 'ssh returned exit status %d:\n%s' - % (name, ex.returncode, ex.output.strip())) - os._exit(0) - def prox_sock(self, port=8474): """Connect to the PROX instance on remote system. Return a prox_sock object on success, None on failure. diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_machine.py b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_machine.py index b89c0383..a9f53083 100644 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_machine.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/rapid_machine.py @@ -45,12 +45,7 @@ class RapidMachine(object): break self.rundir = rundir self.machine_params = machine_params - self._client = prox_ctrl(self.ip, self.key, self.user) - self._client.connect() - if vim in ['OpenStack']: - self.devbind() - self.generate_lua(vim) - self._client.scp_put(self.machine_params['config_file'], '{}/{}'.format(self.rundir, machine_params['config_file'])) + self.vim = vim def __del__(self): self._client.scp_get('/prox.log', './{}.prox.log'.format(self.name)) @@ -99,10 +94,23 @@ class RapidMachine(object): self._client.scp_put('helper.lua', self.rundir + '/helper.lua') def start_prox(self, autostart=''): + self._client = prox_ctrl(self.ip, self.key, self.user) + self._client.connect() + if self.vim in ['OpenStack']: + self.devbind() + self.generate_lua(self.vim) + self._client.scp_put(self.machine_params['config_file'], '{}/{}'.format(self.rundir, self.machine_params['config_file'])) if self.machine_params['prox_launch_exit']: cmd = 'sudo {}/prox {} -t -o cli -f {}/{}'.format(self.rundir, autostart, self.rundir, self.machine_params['config_file']) - result = self._client.fork_cmd(cmd, 'PROX Testing on {}'.format(self.name)) - RapidLog.debug("Starting PROX on {}: {}, {}".format(self.name, cmd, result)) + RapidLog.debug("Starting PROX on {}: {}".format(self.name, cmd)) + result = self._client.run_cmd(cmd, 'PROX Testing on {}'.format(self.name)) + #RapidLog.debug("Finished PROX on {}: {}, {}".format(self.name, cmd, result)) + RapidLog.debug("Finished PROX on {}: {}".format(self.name, cmd)) + + def close_prox(self): + self.socket.quit() + + def connect_prox(self): self.socket = self._client.connect_socket() def start(self): diff --git a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py index 023b4bc3..2c18b232 100755 --- a/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py +++ b/VNFs/DPPD-PROX/helper-scripts/rapid/runrapid.py @@ -23,6 +23,8 @@ from future import standard_library standard_library.install_aliases() from builtins import object import sys +import concurrent.futures +from concurrent.futures import ALL_COMPLETED from rapid_cli import RapidCli from rapid_log import RapidLog from rapid_parser import RapidConfigParser @@ -81,8 +83,11 @@ class RapidTestManager(object): machines.append(machine) if test_params['configonly']: sys.exit() - for machine in machines: - machine.start_prox() + prox_executor = concurrent.futures.ThreadPoolExecutor(max_workers=len(machines)) + future_to_prox = {prox_executor.submit(machine.start_prox): machine for machine in machines} + with concurrent.futures.ThreadPoolExecutor(max_workers=len(machines)) as executor: + future_to_connect_prox = {executor.submit(machine.connect_prox): machine for machine in machines} + concurrent.futures.wait(future_to_connect_prox,return_when=ALL_COMPLETED) result = True for test_param in test_params['tests']: RapidLog.info(test_param['test']) @@ -119,6 +124,8 @@ class RapidTestManager(object): single_test_result = test.run() if not single_test_result: result = False + for machine in machines: + machine.close_prox() return (result) def main(): -- cgit 1.2.3-korg