diff options
Diffstat (limited to 'build/patches/congress-parallel-execution.patch')
-rw-r--r-- | build/patches/congress-parallel-execution.patch | 86 |
1 files changed, 86 insertions, 0 deletions
diff --git a/build/patches/congress-parallel-execution.patch b/build/patches/congress-parallel-execution.patch new file mode 100644 index 00000000..ca48c6f3 --- /dev/null +++ b/build/patches/congress-parallel-execution.patch @@ -0,0 +1,86 @@ +From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001 +From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp> +Date: Tue, 18 Apr 2017 04:22:24 +0900 +Subject: [PATCH] Parallel execution in DataSource Driver + +Datasource driver calls datasource's API serially when Policy Engine sends +execution requests. It could take long time number of execution targets is +a lots. + +This patch changes datasource driver calls datasource's API in parallel. + +Closes-Bug: #1670529 +Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0 +--- + congress/datasources/datasource_driver.py | 15 +++++++++------ + congress/policy_engines/agnostic.py | 6 ++++-- + 2 files changed, 13 insertions(+), 8 deletions(-) + +diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py +index eec83017..8eeb62d7 100644 +--- a/congress/datasources/datasource_driver.py ++++ b/congress/datasources/datasource_driver.py +@@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints): + def request_refresh(self, context, source_id): + return self.service.request_refresh() + +- def request_execute(self, context, action, action_args): +- return self.service.request_execute(context, action, action_args) ++ def request_execute(self, context, action, action_args, wait): ++ return self.service.request_execute(context, action, action_args, wait) + + + class PushedDataSourceDriver(DataSourceDriver): +@@ -1574,18 +1574,21 @@ class ExecutionDriver(object): + return {'results': actions} + + # Note(thread-safety): blocking function +- def request_execute(self, context, action, action_args): ++ def request_execute(self, context, action, action_args, wait): + """Accept execution requests and execute requests from leader""" + node_id = context.get('node_id', None) ++ th = None + if self._leader_node_id == node_id: +- # Note(thread-safety): blocking call +- self.execute(action, action_args) ++ # Note(thread-safety): blocking call ++ th = eventlet.spawn(self.execute, action, action_args) + elif node_id is not None: + if self._leader_node_id is None: + self._leader_node_id = node_id + LOG.debug('New local leader %s selected', self._leader_node_id) + # Note(thread-safety): blocking call +- self.execute(action, action_args) ++ th = eventlet.spawn(self.execute, action, action_args) ++ if wait and th: ++ th.wait() + + # Note(thread-safety): blocking function (in some subclasses) + def execute(self, action, action_args): +diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py +index d1d67bdc..df09ed96 100644 +--- a/congress/policy_engines/agnostic.py ++++ b/congress/policy_engines/agnostic.py +@@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService): + """Overloading the DseRuntime version of _rpc so it uses dse2.""" + # TODO(ramineni): This is called only during execute_action, added + # the same function name for compatibility with old arch +- args = {'action': action, 'action_args': args} ++ ++ retry_rpc = cfg.CONF.dse.execute_action_retry ++ args = {'action': action, 'action_args': args, 'wait': retry_rpc} + + def execute_once(): + return self.rpc(service_name, 'request_execute', args, +@@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService): + action, args['action_args']) + + # long timeout for action execution because actions can take a while +- if not cfg.CONF.dse.execute_action_retry: ++ if not retry_rpc: + # Note(thread-safety): blocking call + # Only when thread pool at capacity + eventlet.spawn_n(execute_once) +-- +2.12.3 + |