From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
Date: Tue, 18 Apr 2017 04:22:24 +0900
Subject: [PATCH] Parallel execution in DataSource Driver

Datasource driver calls datasource's API serially when Policy Engine sends
execution requests.  It could take long time number of execution targets is
a lots.

This patch changes datasource driver calls datasource's API in parallel.

Closes-Bug: #1670529
Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
---
 congress/datasources/datasource_driver.py | 15 +++++++++------
 congress/policy_engines/agnostic.py       |  6 ++++--
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
index eec83017..8eeb62d7 100644
--- a/congress/datasources/datasource_driver.py
+++ b/congress/datasources/datasource_driver.py
@@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
     def request_refresh(self, context, source_id):
         return self.service.request_refresh()
 
-    def request_execute(self, context, action, action_args):
-        return self.service.request_execute(context, action, action_args)
+    def request_execute(self, context, action, action_args, wait):
+        return self.service.request_execute(context, action, action_args, wait)
 
 
 class PushedDataSourceDriver(DataSourceDriver):
@@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
         return {'results': actions}
 
     # Note(thread-safety): blocking function
-    def request_execute(self, context, action, action_args):
+    def request_execute(self, context, action, action_args, wait):
         """Accept execution requests and execute requests from leader"""
         node_id = context.get('node_id', None)
+        th = None
         if self._leader_node_id == node_id:
-                # Note(thread-safety): blocking call
-                self.execute(action, action_args)
+            # Note(thread-safety): blocking call
+            th = eventlet.spawn(self.execute, action, action_args)
         elif node_id is not None:
             if self._leader_node_id is None:
                 self._leader_node_id = node_id
                 LOG.debug('New local leader %s selected', self._leader_node_id)
                 # Note(thread-safety): blocking call
-                self.execute(action, action_args)
+                th = eventlet.spawn(self.execute, action, action_args)
+        if wait and th:
+            th.wait()
 
     # Note(thread-safety): blocking function (in some subclasses)
     def execute(self, action, action_args):
diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
index d1d67bdc..df09ed96 100644
--- a/congress/policy_engines/agnostic.py
+++ b/congress/policy_engines/agnostic.py
@@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
         """Overloading the DseRuntime version of _rpc so it uses dse2."""
         # TODO(ramineni): This is called only during execute_action, added
         # the same function name for compatibility with old arch
-        args = {'action': action, 'action_args': args}
+
+        retry_rpc = cfg.CONF.dse.execute_action_retry
+        args = {'action': action, 'action_args': args, 'wait': retry_rpc}
 
         def execute_once():
             return self.rpc(service_name, 'request_execute', args,
@@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
                       action, args['action_args'])
 
         # long timeout for action execution because actions can take a while
-        if not cfg.CONF.dse.execute_action_retry:
+        if not retry_rpc:
             # Note(thread-safety): blocking call
             #   Only when thread pool at capacity
             eventlet.spawn_n(execute_once)
-- 
2.12.3