From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001 From: Masahito Muroi Date: Tue, 18 Apr 2017 04:22:24 +0900 Subject: [PATCH] Parallel execution in DataSource Driver Datasource driver calls datasource's API serially when Policy Engine sends execution requests. It could take long time number of execution targets is a lots. This patch changes datasource driver calls datasource's API in parallel. Closes-Bug: #1670529 Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0 --- congress/datasources/datasource_driver.py | 15 +++++++++------ congress/policy_engines/agnostic.py | 6 ++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py index eec83017..8eeb62d7 100644 --- a/congress/datasources/datasource_driver.py +++ b/congress/datasources/datasource_driver.py @@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints): def request_refresh(self, context, source_id): return self.service.request_refresh() - def request_execute(self, context, action, action_args): - return self.service.request_execute(context, action, action_args) + def request_execute(self, context, action, action_args, wait): + return self.service.request_execute(context, action, action_args, wait) class PushedDataSourceDriver(DataSourceDriver): @@ -1574,18 +1574,21 @@ class ExecutionDriver(object): return {'results': actions} # Note(thread-safety): blocking function - def request_execute(self, context, action, action_args): + def request_execute(self, context, action, action_args, wait): """Accept execution requests and execute requests from leader""" node_id = context.get('node_id', None) + th = None if self._leader_node_id == node_id: - # Note(thread-safety): blocking call - self.execute(action, action_args) + # Note(thread-safety): blocking call + th = eventlet.spawn(self.execute, action, action_args) elif node_id is not None: if self._leader_node_id is None: self._leader_node_id = node_id LOG.debug('New local leader %s selected', self._leader_node_id) # Note(thread-safety): blocking call - self.execute(action, action_args) + th = eventlet.spawn(self.execute, action, action_args) + if wait and th: + th.wait() # Note(thread-safety): blocking function (in some subclasses) def execute(self, action, action_args): diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py index d1d67bdc..df09ed96 100644 --- a/congress/policy_engines/agnostic.py +++ b/congress/policy_engines/agnostic.py @@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService): """Overloading the DseRuntime version of _rpc so it uses dse2.""" # TODO(ramineni): This is called only during execute_action, added # the same function name for compatibility with old arch - args = {'action': action, 'action_args': args} + + retry_rpc = cfg.CONF.dse.execute_action_retry + args = {'action': action, 'action_args': args, 'wait': retry_rpc} def execute_once(): return self.rpc(service_name, 'request_execute', args, @@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService): action, args['action_args']) # long timeout for action execution because actions can take a while - if not cfg.CONF.dse.execute_action_retry: + if not retry_rpc: # Note(thread-safety): blocking call # Only when thread pool at capacity eventlet.spawn_n(execute_once) -- 2.12.3