diff options
author | LeoQi <QibinZheng2014@tongji.edu.cn> | 2018-07-20 02:36:15 +0800 |
---|---|---|
committer | LeoQi <QibinZheng2014@tongji.edu.cn> | 2018-08-30 18:49:51 +0800 |
commit | 47b9e06a3f5018baae6091e88fb8388d2c4b6827 (patch) | |
tree | 846717c47dd1273d88a8fd40fbabe08868279f67 /testing-scheduler | |
parent | 480c3c51a0a2bcc67ac5ca1d96cb63153bf9fff8 (diff) |
function of generating workflow config file in server part of testing-scheduler
JIRA BOTTLENECK-233
A module to help generating the config files which are sent to conductor server.
Based on these config files, conductor can start a workflow.
Change-Id: Ibdd71be6454fe2bd2b8718da0c300827897959e0
Signed-off-by: Zheng Qibin <QibinZheng2014@tongji.edu.cn>
Diffstat (limited to 'testing-scheduler')
5 files changed, 312 insertions, 0 deletions
diff --git a/testing-scheduler/server/src/conductor_processor/__init__.py b/testing-scheduler/server/src/conductor_processor/__init__.py new file mode 100644 index 00000000..bb02be17 --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/__init__.py @@ -0,0 +1,8 @@ +##############################################################################
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json b/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json new file mode 100644 index 00000000..a98a5819 --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json @@ -0,0 +1,9 @@ +{
+ "name": "",
+ "retryCount": 6,
+ "timeOutSeconds": 1200,
+ "timeOutPolicy": "TIME_OUT_WF",
+ "retryLogic": "FIXED",
+ "retryDelaySeconds": 3,
+ "responseTimeOutSeconds": 3600
+}
\ No newline at end of file diff --git a/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json b/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json new file mode 100644 index 00000000..8f6251c0 --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json @@ -0,0 +1,24 @@ +{
+ "name": "workflow_demo_05",
+ "description": "run a workflow of yardstick test service",
+ "version": 1,
+ "tasks": [
+ {
+ "name": "http_yardstick_test",
+ "taskReferenceName": "ping_test",
+ "inputParameters": {
+ "http_request": {
+ "uri": "http://192.168.199.105:8080/greet",
+ "method": "GET"
+ }
+ },
+ "type": "HTTP"
+ }
+ ],
+ "outputParameters": {
+ "header": "${ping_test.output.response.headers}",
+ "response": "${ping_test.output.response.body}",
+ "status": "${ping_test.output.response.statusCode}"
+ },
+ "schemaVersion": 2
+}
\ No newline at end of file diff --git a/testing-scheduler/server/src/conductor_processor/task.py b/testing-scheduler/server/src/conductor_processor/task.py new file mode 100644 index 00000000..6f25aef8 --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/task.py @@ -0,0 +1,28 @@ +##############################################################################
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import json
+import os
+
+
+class TaskFile(object):
+ def __init__(self, taskName='task_0'):
+ self._defaultConfFile = self._getFilePath("defaultTaskFile.json")
+ with open(self._defaultConfFile) as defaultConf:
+ self._jsonObj = json.load(defaultConf)
+ self._jsonObj['name'] = taskName
+
+ def generateFromStep(self, stepObject):
+ self._jsonObj['name'] = stepObject.getName()
+ print "taskFile:", self._jsonObj['name']
+ return self._jsonObj
+
+ def _getFilePath(self, fileName):
+ dirPath = os.path.dirname(os.path.realpath(__file__))
+ return os.path.join(dirPath, fileName)
diff --git a/testing-scheduler/server/src/conductor_processor/workflow.py b/testing-scheduler/server/src/conductor_processor/workflow.py new file mode 100644 index 00000000..19f0896c --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/workflow.py @@ -0,0 +1,243 @@ +##############################################################################
+# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import random
+import collections
+import re
+from src.conductor_processor.task import TaskFile
+
+
+class WorkflowFile(object):
+ def __init__(self, name):
+ self._name = "workflow_" + name + "(%s)" % getRandString(10)
+ self._description = ''
+ self._version = 1
+ self._schemaVersion = 2
+ self._tasks = []
+ self._outputParameters = {}
+
+ def getDict(self):
+ d = collections.OrderedDict()
+ d['name'] = self._name
+ d['description'] = self._description
+ d['version'] = self._version
+ d['schemaVersion'] = self._schemaVersion
+ d['tasks'] = self._tasks
+ d['outputParameters'] = self._outputParameters
+
+ return d
+
+ def generateMetaData(self, flowList, stepObjArr):
+ flowParser = FlowParser(flowList, stepObjArr)
+ self._tasks, taskMetaList = flowParser.parseMainFlow()
+ normalTasks = flowParser.getNormalTaskList()
+ for normalTask in normalTasks:
+ taskName = normalTask['name']
+ referenceName = normalTask['taskReferenceName']
+ self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
+ "${%s.output.response.body}" % referenceName
+ return self.getDict(), taskMetaList
+
+
+class FlowParser(object):
+ def __init__(self, flowList, stepObjArr):
+ self._mainFlow = {}
+ self._subFlowDict = {}
+ self._stepObjArr = stepObjArr
+ self._normalTasks = []
+ for flow in flowList:
+ if flow['name'] == "main":
+ self._mainFlow = flow
+ else:
+ self._subFlowDict[flow['name']] = flow
+
+ def parseMainFlow(self):
+ return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
+
+ def parse(self, obj, stepObjArr):
+ if isinstance(obj, str):
+ return self.parseFlow(obj, stepObjArr)
+ else:
+ return self.parseOrderList(obj, stepObjArr)
+
+ def parseFlow(self, flowName, stepObjArr):
+ orderList = self._subFlowDict[flowName]['orders']
+ return self.parseOrderList(orderList, stepObjArr)
+
+ def parseOrderList(self, orderList, stepObjArr):
+ tasks = []
+ taskMetaAllList = []
+ for order in orderList:
+ if order['type'] == "normal":
+ genTask = NormalTask(order, stepObjArr, self)
+ self._normalTasks.append(genTask)
+ elif order['type'] == "switch":
+ genTask = SwitchTask(order, stepObjArr, self)
+ elif order['type'] == "parallel":
+ genTask = ParallelTask(order, stepObjArr, self)
+ tasks.append(genTask.getDict())
+
+ if order['type'] == "parallel":
+ joinTask = genTask.getJoinTask()
+ tasks.append(joinTask.getDict())
+
+ taskMetaList = genTask.getTaskMetaList()
+ if taskMetaList is not None:
+ taskMetaAllList.extend(taskMetaList)
+ return tasks, taskMetaAllList
+
+ def getNormalTaskList(self):
+ normalTasksDict = []
+ for normalTask in self._normalTasks:
+ normalTasksDict.append(normalTask.getDict())
+ return normalTasksDict
+
+ def getNormalTask(self, stepId):
+ for normalTask in self._normalTasks:
+ if normalTask.getStepId() == stepId:
+ return normalTask
+ return None
+
+
+class BaseWorkflowTask(object):
+ def __init__(self, name):
+ self._name = name
+ self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
+ self._type = ''
+ self._args = {}
+
+ def __str__(self):
+ dictObj = self.getDict()
+ return str(dictObj)
+
+ def getDict(self):
+ d1 = {
+ "name": self._name,
+ "taskReferenceName": self._taskReferenceName,
+ "type": self._type
+ }
+ return dict(d1, **self._args)
+
+ def getName(self):
+ return self._name
+
+ def getReferenceName(self):
+ return self._taskReferenceName
+
+ def getTaskMetaList(self):
+ taskFile = TaskFile()
+ return [taskFile.generateFromStep(self)]
+
+
+class NormalTask(BaseWorkflowTask):
+ def __init__(self, order, stepObjArr, flowParser):
+ relatedStepObj = stepObjArr[order['step'] - 1]
+ super(NormalTask, self).__init__(relatedStepObj.getName())
+ self._taskReferenceName = "task_%s" % getRandString(10)
+ self._stepId = relatedStepObj.getId()
+ self._type = "HTTP"
+ self._args['inputParameters'] = relatedStepObj.getArgs()
+ self._paramTransform(self._args['inputParameters'], flowParser)
+ print "NormalTask:----------------------\n", relatedStepObj.getArgs()
+
+ def _paramTransform(self, argsDict, flowParser):
+ for (k, v) in argsDict.items():
+ if isinstance(v, str):
+ if re.match("^\(\(\d+\..*\)\)", v):
+ v = v[2:-2]
+ stepId, outputParam = v.split(".")
+ stepId = int(stepId)
+ normalTask = flowParser.getNormalTask(stepId)
+ if normalTask is None:
+ continue
+ argsDict[k] = "${%s.output.response.body.%s}" % \
+ (normalTask.getReferenceName(), outputParam)
+ elif isinstance(v, dict):
+ self._paramTransform(v, flowParser)
+
+ def getStepId(self):
+ return self._stepId
+
+
+class SwitchTask(BaseWorkflowTask):
+ seqNumber = 0
+
+ def __init__(self, order, stepObjArr, flowParser):
+ super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
+ SwitchTask.seqNumber = SwitchTask.seqNumber + 1
+ if 'name' in order:
+ self._name = order['name']
+ self._type = "DECISION"
+ caseValueParam = 'value'
+ order['value'] = order['value'][2:-2]
+ stepId, outputParam = order['value'].split(".")
+ stepId = int(stepId)
+ normalTask = flowParser.getNormalTask(stepId)
+ caseValue = "${%s.output.response.body.%s}" % \
+ (normalTask.getReferenceName(), outputParam)
+ self._args['inputParameters'] = {caseValueParam: caseValue}
+ self._args['caseValueParam'] = caseValueParam
+ self._args['decisionCases'] = {}
+ self._childTaskMetaList = []
+ for case, caseOrders in order['cases'].items():
+ self._args['decisionCases'][case], taskMetaList = \
+ flowParser.parse(caseOrders, stepObjArr)
+ if taskMetaList is not None:
+ self._childTaskMetaList.extend(taskMetaList)
+
+ def getTaskMetaList(self):
+ selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
+ selfTaskMetaList.extend(self._childTaskMetaList)
+ return selfTaskMetaList
+
+
+class ParallelTask(BaseWorkflowTask):
+ seqNumber = 0
+
+ def __init__(self, order, stepObjArr, flowParser):
+ InstSeqNumber = ParallelTask.seqNumber
+ super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
+ ParallelTask.seqNumber = ParallelTask.seqNumber + 1
+ if 'name' in order:
+ self._name = order['name']
+ self._type = "FORK_JOIN"
+ self._args['forkTasks'] = []
+ self._childTaskMetaList = []
+ lastTasksNameList = []
+ parallelList = order['parallel'].items()
+ parallelList.sort()
+ for key, orderList in parallelList:
+ print orderList
+ taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
+ self._args['forkTasks'].append(taskList)
+ lastTasksNameList.append(taskList[-1]['taskReferenceName'])
+ if taskMetaList is not None:
+ self._childTaskMetaList.extend(taskMetaList)
+ self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
+
+ def getTaskMetaList(self):
+ selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
+ selfTaskMetaList.extend(self._childTaskMetaList)
+ selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
+ return selfTaskMetaList
+
+ def getJoinTask(self):
+ return self._joinTaskObj
+
+
+class ParallelJoinTask(BaseWorkflowTask):
+ def __init__(self, seqNumber, joinOnList):
+ super(ParallelJoinTask, self).__init__(
+ "paralleljoin_" + str(seqNumber))
+ self._type = "JOIN"
+ self._args['joinOn'] = joinOnList
+
+
+def getRandString(length):
+ return "".join(random.choice(str("0123456789")) for i in range(length))
|