1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
From 02ff94adb9bc433549f5b3483f36b2ede19b3614 Mon Sep 17 00:00:00 2001
From: Masahito Muroi <muroi.masahito@lab.ntt.co.jp>
Date: Tue, 18 Apr 2017 04:22:24 +0900
Subject: [PATCH] Parallel execution in DataSource Driver
Datasource driver calls datasource's API serially when Policy Engine sends
execution requests. It could take long time number of execution targets is
a lots.
This patch changes datasource driver calls datasource's API in parallel.
Closes-Bug: #1670529
Change-Id: I065bd625004401a1bb78c6d56d929bdaf76d37f0
---
congress/datasources/datasource_driver.py | 15 +++++++++------
congress/policy_engines/agnostic.py | 6 ++++--
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/congress/datasources/datasource_driver.py b/congress/datasources/datasource_driver.py
index eec83017..8eeb62d7 100644
--- a/congress/datasources/datasource_driver.py
+++ b/congress/datasources/datasource_driver.py
@@ -1176,8 +1176,8 @@ class DataSourceDriverEndpoints(data_service.DataServiceEndPoints):
def request_refresh(self, context, source_id):
return self.service.request_refresh()
- def request_execute(self, context, action, action_args):
- return self.service.request_execute(context, action, action_args)
+ def request_execute(self, context, action, action_args, wait):
+ return self.service.request_execute(context, action, action_args, wait)
class PushedDataSourceDriver(DataSourceDriver):
@@ -1574,18 +1574,21 @@ class ExecutionDriver(object):
return {'results': actions}
# Note(thread-safety): blocking function
- def request_execute(self, context, action, action_args):
+ def request_execute(self, context, action, action_args, wait):
"""Accept execution requests and execute requests from leader"""
node_id = context.get('node_id', None)
+ th = None
if self._leader_node_id == node_id:
- # Note(thread-safety): blocking call
- self.execute(action, action_args)
+ # Note(thread-safety): blocking call
+ th = eventlet.spawn(self.execute, action, action_args)
elif node_id is not None:
if self._leader_node_id is None:
self._leader_node_id = node_id
LOG.debug('New local leader %s selected', self._leader_node_id)
# Note(thread-safety): blocking call
- self.execute(action, action_args)
+ th = eventlet.spawn(self.execute, action, action_args)
+ if wait and th:
+ th.wait()
# Note(thread-safety): blocking function (in some subclasses)
def execute(self, action, action_args):
diff --git a/congress/policy_engines/agnostic.py b/congress/policy_engines/agnostic.py
index d1d67bdc..df09ed96 100644
--- a/congress/policy_engines/agnostic.py
+++ b/congress/policy_engines/agnostic.py
@@ -2021,7 +2021,9 @@ class DseRuntime (Runtime, data_service.DataService):
"""Overloading the DseRuntime version of _rpc so it uses dse2."""
# TODO(ramineni): This is called only during execute_action, added
# the same function name for compatibility with old arch
- args = {'action': action, 'action_args': args}
+
+ retry_rpc = cfg.CONF.dse.execute_action_retry
+ args = {'action': action, 'action_args': args, 'wait': retry_rpc}
def execute_once():
return self.rpc(service_name, 'request_execute', args,
@@ -2045,7 +2047,7 @@ class DseRuntime (Runtime, data_service.DataService):
action, args['action_args'])
# long timeout for action execution because actions can take a while
- if not cfg.CONF.dse.execute_action_retry:
+ if not retry_rpc:
# Note(thread-safety): blocking call
# Only when thread pool at capacity
eventlet.spawn_n(execute_once)
--
2.12.3
|