summaryrefslogtreecommitdiffstats
path: root/build/patches/congress-parallel-execution.patch
diff options
context:
space:
mode:
authorCarlos Goncalves <carlos.goncalves@neclab.eu>2017-06-28 12:02:51 +0200
committerCarlos Goncalves <carlos.goncalves@neclab.eu>2017-08-12 10:36:51 +0200
commitcdd577600fbdc42f8dbcae0242818251c963abd8 (patch)
treea0cf499fbd671ca0ddc4cb865fe56f102b5336a8 /build/patches/congress-parallel-execution.patch
parent240fb2eeb9398871e55703f9e62bc7fbb4e01162 (diff)
Backport of Congress parallel execution
JIRA: APEX-480 JIRA: DOCTOR-78 Change-Id: I6b5b3e7f2daaec7e2ead76d74f8d3713378a5200 Signed-off-by: Carlos Goncalves <carlos.goncalves@neclab.eu>
Diffstat (limited to 'build/patches/congress-parallel-execution.patch')
-rw-r--r--build/patches/congress-parallel-execution.patch86
1 files changed, 86 insertions, 0 deletions
diff --git a/build/patches/congress-parallel-execution.patch b/build/patches/congress-parallel-execution.patch
new file mode 100644
index 00000000..ca48c6f3
--- /dev/null
+++ b/build/patches/congress-parallel-execution.patch
@@ -0,0 +1,86 @@
+From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
+From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
+Date: Tue, 18 Apr 2017 04:22:24 +0900
+Subject: [PATCH] Parallel execution in DataSource Driver
+
+Datasource driver calls datasource's API serially when Policy Engine sends
+execution requests. It could take long time number of execution targets is
+a lots.
+
+This patch changes datasource driver calls datasource's API in parallel.
+
+Closes-Bug: #1670529
+Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
+---
+ congress/datasources/datasource_driver.py | 15 +++++++++------
+ congress/policy_engines/agnostic.py | 6 ++++--
+ 2 files changed, 13 insertions(+), 8 deletions(-)
+
+diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
+index eec83017..8eeb62d7 100644
+--- a/congress/datasources/datasource_driver.py
++++ b/congress/datasources/datasource_driver.py
+@@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
+ def request_refresh(self, context, source_id):
+ return self.service.request_refresh()
+
+- def request_execute(self, context, action, action_args):
+- return self.service.request_execute(context, action, action_args)
++ def request_execute(self, context, action, action_args, wait):
++ return self.service.request_execute(context, action, action_args, wait)
+
+
+ class PushedDataSourceDriver(DataSourceDriver):
+@@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
+ return {'results': actions}
+
+ # Note(thread-safety): blocking function
+- def request_execute(self, context, action, action_args):
++ def request_execute(self, context, action, action_args, wait):
+ """Accept execution requests and execute requests from leader"""
+ node_id = context.get('node_id', None)
++ th = None
+ if self._leader_node_id == node_id:
+- # Note(thread-safety): blocking call
+- self.execute(action, action_args)
++ # Note(thread-safety): blocking call
++ th = eventlet.spawn(self.execute, action, action_args)
+ elif node_id is not None:
+ if self._leader_node_id is None:
+ self._leader_node_id = node_id
+ LOG.debug('New local leader %s selected', self._leader_node_id)
+ # Note(thread-safety): blocking call
+- self.execute(action, action_args)
++ th = eventlet.spawn(self.execute, action, action_args)
++ if wait and th:
++ th.wait()
+
+ # Note(thread-safety): blocking function (in some subclasses)
+ def execute(self, action, action_args):
+diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
+index d1d67bdc..df09ed96 100644
+--- a/congress/policy_engines/agnostic.py
++++ b/congress/policy_engines/agnostic.py
+@@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
+ """Overloading the DseRuntime version of _rpc so it uses dse2."""
+ # TODO(ramineni): This is called only during execute_action, added
+ # the same function name for compatibility with old arch
+- args = {'action': action, 'action_args': args}
++
++ retry_rpc = cfg.CONF.dse.execute_action_retry
++ args = {'action': action, 'action_args': args, 'wait': retry_rpc}
+
+ def execute_once():
+ return self.rpc(service_name, 'request_execute', args,
+@@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
+ action, args['action_args'])
+
+ # long timeout for action execution because actions can take a while
+- if not cfg.CONF.dse.execute_action_retry:
++ if not retry_rpc:
+ # Note(thread-safety): blocking call
+ # Only when thread pool at capacity
+ eventlet.spawn_n(execute_once)
+--
+2.12.3
+