diff options
Diffstat (limited to 'test-scheduler/server/src/conductor_processor')
5 files changed, 312 insertions, 0 deletions
diff --git a/test-scheduler/server/src/conductor_processor/__init__.py b/test-scheduler/server/src/conductor_processor/__init__.py new file mode 100644 index 00000000..02647edc --- /dev/null +++ b/test-scheduler/server/src/conductor_processor/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/test-scheduler/server/src/conductor_processor/defaultTaskFile.json b/test-scheduler/server/src/conductor_processor/defaultTaskFile.json new file mode 100644 index 00000000..544ce530 --- /dev/null +++ b/test-scheduler/server/src/conductor_processor/defaultTaskFile.json @@ -0,0 +1,9 @@ +{ + "name": "", + "retryCount": 6, + "timeOutSeconds": 1200, + "timeOutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 3, + "responseTimeOutSeconds": 3600 +} diff --git a/test-scheduler/server/src/conductor_processor/defaultWorkflowFile.json b/test-scheduler/server/src/conductor_processor/defaultWorkflowFile.json new file mode 100644 index 00000000..f70573f3 --- /dev/null +++ b/test-scheduler/server/src/conductor_processor/defaultWorkflowFile.json @@ -0,0 +1,24 @@ +{ + "name": "workflow_demo_05", + "description": "run a workflow of yardstick test service", + "version": 1, + "tasks": [ + { + "name": "http_yardstick_test", + "taskReferenceName": "ping_test", + "inputParameters": { + "http_request": { + "uri": "http://192.168.199.105:8080/greet", + "method": "GET" + } + }, + "type": "HTTP" + } + ], + "outputParameters": { + "header": "${ping_test.output.response.headers}", + "response": "${ping_test.output.response.body}", + "status": "${ping_test.output.response.statusCode}" + }, + "schemaVersion": 2 +} diff --git a/test-scheduler/server/src/conductor_processor/task.py b/test-scheduler/server/src/conductor_processor/task.py new file mode 100644 index 00000000..04616ade --- /dev/null +++ b/test-scheduler/server/src/conductor_processor/task.py @@ -0,0 +1,28 @@ +############################################################################## +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import os + + +class TaskFile(object): + def __init__(self, taskName='task_0'): + self._defaultConfFile = self._getFilePath("defaultTaskFile.json") + with open(self._defaultConfFile) as defaultConf: + self._jsonObj = json.load(defaultConf) + self._jsonObj['name'] = taskName + + def generateFromStep(self, stepObject): + self._jsonObj['name'] = stepObject.getName() + print "taskFile:", self._jsonObj['name'] + return self._jsonObj + + def _getFilePath(self, fileName): + dirPath = os.path.dirname(os.path.realpath(__file__)) + return os.path.join(dirPath, fileName) diff --git a/test-scheduler/server/src/conductor_processor/workflow.py b/test-scheduler/server/src/conductor_processor/workflow.py new file mode 100644 index 00000000..da67e013 --- /dev/null +++ b/test-scheduler/server/src/conductor_processor/workflow.py @@ -0,0 +1,243 @@ +############################################################################## +# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import random +import collections +import re +from src.conductor_processor.task import TaskFile + + +class WorkflowFile(object): + def __init__(self, name): + self._name = "workflow_" + name + "(%s)" % getRandString(10) + self._description = '' + self._version = 1 + self._schemaVersion = 2 + self._tasks = [] + self._outputParameters = {} + + def getDict(self): + d = collections.OrderedDict() + d['name'] = self._name + d['description'] = self._description + d['version'] = self._version + d['schemaVersion'] = self._schemaVersion + d['tasks'] = self._tasks + d['outputParameters'] = self._outputParameters + + return d + + def generateMetaData(self, flowList, stepObjArr): + flowParser = FlowParser(flowList, stepObjArr) + self._tasks, taskMetaList = flowParser.parseMainFlow() + normalTasks = flowParser.getNormalTaskList() + for normalTask in normalTasks: + taskName = normalTask['name'] + referenceName = normalTask['taskReferenceName'] + self._outputParameters["%s(%s)" % (taskName, referenceName)] = \ + "${%s.output.response.body}" % referenceName + return self.getDict(), taskMetaList + + +class FlowParser(object): + def __init__(self, flowList, stepObjArr): + self._mainFlow = {} + self._subFlowDict = {} + self._stepObjArr = stepObjArr + self._normalTasks = [] + for flow in flowList: + if flow['name'] == "main": + self._mainFlow = flow + else: + self._subFlowDict[flow['name']] = flow + + def parseMainFlow(self): + return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr) + + def parse(self, obj, stepObjArr): + if isinstance(obj, str): + return self.parseFlow(obj, stepObjArr) + else: + return self.parseOrderList(obj, stepObjArr) + + def parseFlow(self, flowName, stepObjArr): + orderList = self._subFlowDict[flowName]['orders'] + return self.parseOrderList(orderList, stepObjArr) + + def parseOrderList(self, orderList, stepObjArr): + tasks = [] + taskMetaAllList = [] + for order in orderList: + if order['type'] == "normal": + genTask = NormalTask(order, stepObjArr, self) + self._normalTasks.append(genTask) + elif order['type'] == "switch": + genTask = SwitchTask(order, stepObjArr, self) + elif order['type'] == "parallel": + genTask = ParallelTask(order, stepObjArr, self) + tasks.append(genTask.getDict()) + + if order['type'] == "parallel": + joinTask = genTask.getJoinTask() + tasks.append(joinTask.getDict()) + + taskMetaList = genTask.getTaskMetaList() + if taskMetaList is not None: + taskMetaAllList.extend(taskMetaList) + return tasks, taskMetaAllList + + def getNormalTaskList(self): + normalTasksDict = [] + for normalTask in self._normalTasks: + normalTasksDict.append(normalTask.getDict()) + return normalTasksDict + + def getNormalTask(self, stepId): + for normalTask in self._normalTasks: + if normalTask.getStepId() == stepId: + return normalTask + return None + + +class BaseWorkflowTask(object): + def __init__(self, name): + self._name = name + self._taskReferenceName = self._name + "_task_%s" % getRandString(10) + self._type = '' + self._args = {} + + def __str__(self): + dictObj = self.getDict() + return str(dictObj) + + def getDict(self): + d1 = { + "name": self._name, + "taskReferenceName": self._taskReferenceName, + "type": self._type + } + return dict(d1, **self._args) + + def getName(self): + return self._name + + def getReferenceName(self): + return self._taskReferenceName + + def getTaskMetaList(self): + taskFile = TaskFile() + return [taskFile.generateFromStep(self)] + + +class NormalTask(BaseWorkflowTask): + def __init__(self, order, stepObjArr, flowParser): + relatedStepObj = stepObjArr[order['step'] - 1] + super(NormalTask, self).__init__(relatedStepObj.getName()) + self._taskReferenceName = "task_%s" % getRandString(10) + self._stepId = relatedStepObj.getId() + self._type = "HTTP" + self._args['inputParameters'] = relatedStepObj.getArgs() + self._paramTransform(self._args['inputParameters'], flowParser) + print "NormalTask:----------------------\n", relatedStepObj.getArgs() + + def _paramTransform(self, argsDict, flowParser): + for (k, v) in argsDict.items(): + if isinstance(v, str): + if re.match("^\(\(\d+\..*\)\)", v): + v = v[2:-2] + stepId, outputParam = v.split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + if normalTask is None: + continue + argsDict[k] = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + elif isinstance(v, dict): + self._paramTransform(v, flowParser) + + def getStepId(self): + return self._stepId + + +class SwitchTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber)) + SwitchTask.seqNumber = SwitchTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "DECISION" + caseValueParam = 'value' + order['value'] = order['value'][2:-2] + stepId, outputParam = order['value'].split(".") + stepId = int(stepId) + normalTask = flowParser.getNormalTask(stepId) + caseValue = "${%s.output.response.body.%s}" % \ + (normalTask.getReferenceName(), outputParam) + self._args['inputParameters'] = {caseValueParam: caseValue} + self._args['caseValueParam'] = caseValueParam + self._args['decisionCases'] = {} + self._childTaskMetaList = [] + for case, caseOrders in order['cases'].items(): + self._args['decisionCases'][case], taskMetaList = \ + flowParser.parse(caseOrders, stepObjArr) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + + def getTaskMetaList(self): + selfTaskMetaList = super(SwitchTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + return selfTaskMetaList + + +class ParallelTask(BaseWorkflowTask): + seqNumber = 0 + + def __init__(self, order, stepObjArr, flowParser): + InstSeqNumber = ParallelTask.seqNumber + super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber)) + ParallelTask.seqNumber = ParallelTask.seqNumber + 1 + if 'name' in order: + self._name = order['name'] + self._type = "FORK_JOIN" + self._args['forkTasks'] = [] + self._childTaskMetaList = [] + lastTasksNameList = [] + parallelList = order['parallel'].items() + parallelList.sort() + for key, orderList in parallelList: + print orderList + taskList, taskMetaList = flowParser.parse(orderList, stepObjArr) + self._args['forkTasks'].append(taskList) + lastTasksNameList.append(taskList[-1]['taskReferenceName']) + if taskMetaList is not None: + self._childTaskMetaList.extend(taskMetaList) + self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList) + + def getTaskMetaList(self): + selfTaskMetaList = super(ParallelTask, self).getTaskMetaList() + selfTaskMetaList.extend(self._childTaskMetaList) + selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList()) + return selfTaskMetaList + + def getJoinTask(self): + return self._joinTaskObj + + +class ParallelJoinTask(BaseWorkflowTask): + def __init__(self, seqNumber, joinOnList): + super(ParallelJoinTask, self).__init__( + "paralleljoin_" + str(seqNumber)) + self._type = "JOIN" + self._args['joinOn'] = joinOnList + + +def getRandString(length): + return "".join(random.choice(str("0123456789")) for i in range(length)) |