From 47b9e06a3f5018baae6091e88fb8388d2c4b6827 Mon Sep 17 00:00:00 2001 From: LeoQi Date: Fri, 20 Jul 2018 02:36:15 +0800 Subject: function of generating workflow config file in server part of testing-scheduler JIRA BOTTLENECK-233 A module to help generating the config files which are sent to conductor server. Based on these config files, conductor can start a workflow. Change-Id: Ibdd71be6454fe2bd2b8718da0c300827897959e0 Signed-off-by: Zheng Qibin --- .../server/src/conductor_processor/workflow.py | 243 +++++++++++++++++++++ 1 file changed, 243 insertions(+) create mode 100644 testing-scheduler/server/src/conductor_processor/workflow.py (limited to 'testing-scheduler/server/src/conductor_processor/workflow.py') diff --git a/testing-scheduler/server/src/conductor_processor/workflow.py b/testing-scheduler/server/src/conductor_processor/workflow.py new file mode 100644 index 00000000..19f0896c --- /dev/null +++ b/testing-scheduler/server/src/conductor_processor/workflow.py @@ -0,0 +1,243 @@ +############################################################################## +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import random +import collections +import re +from src.conductor_processor.task import TaskFile + + +class WorkflowFile(object): + def __init__(self, name): + self._name = "workflow_" + name + "(%s)" % getRandString(10) + self._description = '' + self._version = 1 + self._schemaVersion = 2 + self._tasks = [] + self._outputParameters = {} + + def getDict(self): + d = collections.OrderedDict() + d['name'] = self._name + d['description'] = self._description + d['version'] = self._version + d['schemaVersion'] = self._schemaVersion + d['tasks'] = self._tasks + d['outputParameters'] = self._outputParameters + + return d + + def generateMetaData(self, flowList, stepObjArr): + flowParser = FlowParser(flowList, stepObjArr) + self._tasks, taskMetaList = flowParser.parseMainFlow() + normalTasks = flowParser.getNormalTaskList() + for normalTask in normalTasks: + taskName = normalTask['name'] + referenceName = normalTask['taskReferenceName'] + self._outputParameters["%s(%s)" % (taskName, referenceName)] = \ + "${%s.output.response.body}" % referenceName + return self.getDict(), taskMetaList + + +class FlowParser(object): + def __init__(self, flowList, stepObjArr): + self._mainFlow = {} + self._subFlowDict = {} + self._stepObjArr = stepObjArr + self._normalTasks = [] + for flow in flowList: + if flow['name'] == "main": + self._mainFlow = flow + else: + self._subFlowDict[flow['name']] = flow + + def parseMainFlow(self): + return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr) + + def parse(self, obj, stepObjArr): + if isinstance(obj, str): + return self.parseFlow(obj, stepObjArr) + else: + return self.parseOrderList(obj, stepObjArr) + + def parseFlow(self, flowName, stepObjArr): + orderList = self._subFlowDict[flowName]['orders'] + return self.parseOrderList(orderList, stepObjArr) + + def parseOrderList(self, orderList, stepObjArr): + tasks = [] + taskMetaAllList = [] + for order in orderList: + if order['type'] == "normal": + genTask = NormalTask(order, stepObjArr, self) + self._normalTasks.append(genTask) + elif order['type'] == "switch": + genTask = SwitchTask(order, stepObjArr, self) + elif order['type'] == "parallel": + genTask = ParallelTask(order, stepObjArr, self) + tasks.append(genTask.getDict()) + + if order['type'] == "parallel": + joinTask = genTask.getJoinTask() + tasks.append(joinTask.getDict()) + + taskMetaList = genTask.getTaskMetaList() + if taskMetaList is not None: + taskMetaAllList.extend(taskMetaList) + return tasks, taskMetaAllList + + def getNormalTaskList(self): + normalTasksDict = [] + for normalTask in self._normalTasks: + normalTasksDict.append(normalTask.getDict()) + return normalTasksDict + + def getNormalTask(self, stepId): + for normalTask in self._normalTasks: + if normalTask.getStepId() == stepId: + return normalTask + return None + + +class BaseWorkflowTask(object): + def __init__(self, name): + self._name = name + self._taskReferenceName = self._name + "_task_%s" % getRandString(10) + self._type = '' + self._args = {} + + def __str__(self): + dictObj = self.getDict() + return str(dictObj) + + def getDict(self): + d1 = { + "name": self._name, + "taskReferenceName": self._taskReferenceName, + "type": self._type + } + return dict(d1, **self._args) + + def getName(self): + return self._name + + def getReferenceName(self): + return self._taskReferenceName + + def getTaskMetaList(self): + taskFile = TaskFile() + return [taskFile.generateFromStep(self)] + + +class NormalTask(BaseWorkflowTask): + def __init__(self, order, stepObjArr, flowParser): + relatedStepObj = stepObjArr[order['step'] - 1] + super(NormalTask, self).__init__(relatedStepObj.getName()) + self._taskReferenceName = "task_%s" % getRandString(10) + self._stepId = relatedStepObj.getId() + self._type = "HTTP" + self._args['inputParameters'] = relatedStepObj.getArgs() + self._paramTransform(self._args['inputParameters'], flowParser) + print "NormalTask:----------------------\n", relatedStepObj.getArgs() + + def _paramTransform(self, argsDict, flowParser): + for (k, v) in argsDict.items(): + if isinstance(v, str): + if re.match("^\(\(\d+\..*\)\)", v): + v = v[2:-2] + stepId, outputParam = v.split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + if normalTask is None: + continue + argsDict[k] = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + elif isinstance(v, dict): + self._paramTransform(v, flowParser) + + def getStepId(self): + return self._stepId + + +class SwitchTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber)) + SwitchTask.seqNumber = SwitchTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "DECISION" + caseValueParam = 'value' + order['value'] = order['value'][2:-2] + stepId, outputParam = order['value'].split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + caseValue = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + self._args['inputParameters'] = {caseValueParam: caseValue} + self._args['caseValueParam'] = caseValueParam + self._args['decisionCases'] = {} + self._childTaskMetaList = [] + for case, caseOrders in order['cases'].items(): + self._args['decisionCases'][case], taskMetaList = \ + flowParser.parse(caseOrders, stepObjArr) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + + def getTaskMetaList(self): + selfTaskMetaList = super(SwitchTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + return selfTaskMetaList + + +class ParallelTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + InstSeqNumber = ParallelTask.seqNumber + super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber)) + ParallelTask.seqNumber = ParallelTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "FORK_JOIN" + self._args['forkTasks'] = [] + self._childTaskMetaList = [] + lastTasksNameList = [] + parallelList = order['parallel'].items() + parallelList.sort() + for key, orderList in parallelList: + print orderList + taskList, taskMetaList = flowParser.parse(orderList, stepObjArr) + self._args['forkTasks'].append(taskList) + lastTasksNameList.append(taskList[-1]['taskReferenceName']) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList) + + def getTaskMetaList(self): + selfTaskMetaList = super(ParallelTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList()) + return selfTaskMetaList + + def getJoinTask(self): + return self._joinTaskObj + + +class ParallelJoinTask(BaseWorkflowTask): + def __init__(self, seqNumber, joinOnList): + super(ParallelJoinTask, self).__init__( + "paralleljoin_" + str(seqNumber)) + self._type = "JOIN" + self._args['joinOn'] = joinOnList + + +def getRandString(length): + return "".join(random.choice(str("0123456789")) for i in range(length)) -- cgit 1.2.3-korg