summaryrefslogtreecommitdiffstats
path: root/testing-scheduler/server/src/conductor_processor
diff options
context:
space:
mode:
Diffstat (limited to 'testing-scheduler/server/src/conductor_processor')
-rw-r--r--testing-scheduler/server/src/conductor_processor/__init__.py8
-rw-r--r--testing-scheduler/server/src/conductor_processor/defaultTaskFile.json9
-rw-r--r--testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json24
-rw-r--r--testing-scheduler/server/src/conductor_processor/task.py28
-rw-r--r--testing-scheduler/server/src/conductor_processor/workflow.py243
5 files changed, 0 insertions, 312 deletions
diff --git a/testing-scheduler/server/src/conductor_processor/__init__.py b/testing-scheduler/server/src/conductor_processor/__init__.py
deleted file mode 100644
index bb02be17..00000000
--- a/testing-scheduler/server/src/conductor_processor/__init__.py
+++ /dev/null
@@ -1,8 +0,0 @@
-##############################################################################
-# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
diff --git a/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json b/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json
deleted file mode 100644
index a98a5819..00000000
--- a/testing-scheduler/server/src/conductor_processor/defaultTaskFile.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "name": "",
- "retryCount": 6,
- "timeOutSeconds": 1200,
- "timeOutPolicy": "TIME_OUT_WF",
- "retryLogic": "FIXED",
- "retryDelaySeconds": 3,
- "responseTimeOutSeconds": 3600
-} \ No newline at end of file
diff --git a/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json b/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json
deleted file mode 100644
index 8f6251c0..00000000
--- a/testing-scheduler/server/src/conductor_processor/defaultWorkflowFile.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{
- "name": "workflow_demo_05",
- "description": "run a workflow of yardstick test service",
- "version": 1,
- "tasks": [
- {
- "name": "http_yardstick_test",
- "taskReferenceName": "ping_test",
- "inputParameters": {
- "http_request": {
- "uri": "http://192.168.199.105:8080/greet",
- "method": "GET"
- }
- },
- "type": "HTTP"
- }
- ],
- "outputParameters": {
- "header": "${ping_test.output.response.headers}",
- "response": "${ping_test.output.response.body}",
- "status": "${ping_test.output.response.statusCode}"
- },
- "schemaVersion": 2
-} \ No newline at end of file
diff --git a/testing-scheduler/server/src/conductor_processor/task.py b/testing-scheduler/server/src/conductor_processor/task.py
deleted file mode 100644
index 6f25aef8..00000000
--- a/testing-scheduler/server/src/conductor_processor/task.py
+++ /dev/null
@@ -1,28 +0,0 @@
-##############################################################################
-# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import json
-import os
-
-
-class TaskFile(object):
- def __init__(self, taskName='task_0'):
- self._defaultConfFile = self._getFilePath("defaultTaskFile.json")
- with open(self._defaultConfFile) as defaultConf:
- self._jsonObj = json.load(defaultConf)
- self._jsonObj['name'] = taskName
-
- def generateFromStep(self, stepObject):
- self._jsonObj['name'] = stepObject.getName()
- print "taskFile:", self._jsonObj['name']
- return self._jsonObj
-
- def _getFilePath(self, fileName):
- dirPath = os.path.dirname(os.path.realpath(__file__))
- return os.path.join(dirPath, fileName)
diff --git a/testing-scheduler/server/src/conductor_processor/workflow.py b/testing-scheduler/server/src/conductor_processor/workflow.py
deleted file mode 100644
index 19f0896c..00000000
--- a/testing-scheduler/server/src/conductor_processor/workflow.py
+++ /dev/null
@@ -1,243 +0,0 @@
-##############################################################################
-# Copyright (c) 2018 Huawei Technologies Co.,Ltd. and others
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import random
-import collections
-import re
-from src.conductor_processor.task import TaskFile
-
-
-class WorkflowFile(object):
- def __init__(self, name):
- self._name = "workflow_" + name + "(%s)" % getRandString(10)
- self._description = ''
- self._version = 1
- self._schemaVersion = 2
- self._tasks = []
- self._outputParameters = {}
-
- def getDict(self):
- d = collections.OrderedDict()
- d['name'] = self._name
- d['description'] = self._description
- d['version'] = self._version
- d['schemaVersion'] = self._schemaVersion
- d['tasks'] = self._tasks
- d['outputParameters'] = self._outputParameters
-
- return d
-
- def generateMetaData(self, flowList, stepObjArr):
- flowParser = FlowParser(flowList, stepObjArr)
- self._tasks, taskMetaList = flowParser.parseMainFlow()
- normalTasks = flowParser.getNormalTaskList()
- for normalTask in normalTasks:
- taskName = normalTask['name']
- referenceName = normalTask['taskReferenceName']
- self._outputParameters["%s(%s)" % (taskName, referenceName)] = \
- "${%s.output.response.body}" % referenceName
- return self.getDict(), taskMetaList
-
-
-class FlowParser(object):
- def __init__(self, flowList, stepObjArr):
- self._mainFlow = {}
- self._subFlowDict = {}
- self._stepObjArr = stepObjArr
- self._normalTasks = []
- for flow in flowList:
- if flow['name'] == "main":
- self._mainFlow = flow
- else:
- self._subFlowDict[flow['name']] = flow
-
- def parseMainFlow(self):
- return self.parseOrderList(self._mainFlow['orders'], self._stepObjArr)
-
- def parse(self, obj, stepObjArr):
- if isinstance(obj, str):
- return self.parseFlow(obj, stepObjArr)
- else:
- return self.parseOrderList(obj, stepObjArr)
-
- def parseFlow(self, flowName, stepObjArr):
- orderList = self._subFlowDict[flowName]['orders']
- return self.parseOrderList(orderList, stepObjArr)
-
- def parseOrderList(self, orderList, stepObjArr):
- tasks = []
- taskMetaAllList = []
- for order in orderList:
- if order['type'] == "normal":
- genTask = NormalTask(order, stepObjArr, self)
- self._normalTasks.append(genTask)
- elif order['type'] == "switch":
- genTask = SwitchTask(order, stepObjArr, self)
- elif order['type'] == "parallel":
- genTask = ParallelTask(order, stepObjArr, self)
- tasks.append(genTask.getDict())
-
- if order['type'] == "parallel":
- joinTask = genTask.getJoinTask()
- tasks.append(joinTask.getDict())
-
- taskMetaList = genTask.getTaskMetaList()
- if taskMetaList is not None:
- taskMetaAllList.extend(taskMetaList)
- return tasks, taskMetaAllList
-
- def getNormalTaskList(self):
- normalTasksDict = []
- for normalTask in self._normalTasks:
- normalTasksDict.append(normalTask.getDict())
- return normalTasksDict
-
- def getNormalTask(self, stepId):
- for normalTask in self._normalTasks:
- if normalTask.getStepId() == stepId:
- return normalTask
- return None
-
-
-class BaseWorkflowTask(object):
- def __init__(self, name):
- self._name = name
- self._taskReferenceName = self._name + "_task_%s" % getRandString(10)
- self._type = ''
- self._args = {}
-
- def __str__(self):
- dictObj = self.getDict()
- return str(dictObj)
-
- def getDict(self):
- d1 = {
- "name": self._name,
- "taskReferenceName": self._taskReferenceName,
- "type": self._type
- }
- return dict(d1, **self._args)
-
- def getName(self):
- return self._name
-
- def getReferenceName(self):
- return self._taskReferenceName
-
- def getTaskMetaList(self):
- taskFile = TaskFile()
- return [taskFile.generateFromStep(self)]
-
-
-class NormalTask(BaseWorkflowTask):
- def __init__(self, order, stepObjArr, flowParser):
- relatedStepObj = stepObjArr[order['step'] - 1]
- super(NormalTask, self).__init__(relatedStepObj.getName())
- self._taskReferenceName = "task_%s" % getRandString(10)
- self._stepId = relatedStepObj.getId()
- self._type = "HTTP"
- self._args['inputParameters'] = relatedStepObj.getArgs()
- self._paramTransform(self._args['inputParameters'], flowParser)
- print "NormalTask:----------------------\n", relatedStepObj.getArgs()
-
- def _paramTransform(self, argsDict, flowParser):
- for (k, v) in argsDict.items():
- if isinstance(v, str):
- if re.match("^\(\(\d+\..*\)\)", v):
- v = v[2:-2]
- stepId, outputParam = v.split(".")
- stepId = int(stepId)
- normalTask = flowParser.getNormalTask(stepId)
- if normalTask is None:
- continue
- argsDict[k] = "${%s.output.response.body.%s}" % \
- (normalTask.getReferenceName(), outputParam)
- elif isinstance(v, dict):
- self._paramTransform(v, flowParser)
-
- def getStepId(self):
- return self._stepId
-
-
-class SwitchTask(BaseWorkflowTask):
- seqNumber = 0
-
- def __init__(self, order, stepObjArr, flowParser):
- super(SwitchTask, self).__init__("switch_" + str(SwitchTask.seqNumber))
- SwitchTask.seqNumber = SwitchTask.seqNumber + 1
- if 'name' in order:
- self._name = order['name']
- self._type = "DECISION"
- caseValueParam = 'value'
- order['value'] = order['value'][2:-2]
- stepId, outputParam = order['value'].split(".")
- stepId = int(stepId)
- normalTask = flowParser.getNormalTask(stepId)
- caseValue = "${%s.output.response.body.%s}" % \
- (normalTask.getReferenceName(), outputParam)
- self._args['inputParameters'] = {caseValueParam: caseValue}
- self._args['caseValueParam'] = caseValueParam
- self._args['decisionCases'] = {}
- self._childTaskMetaList = []
- for case, caseOrders in order['cases'].items():
- self._args['decisionCases'][case], taskMetaList = \
- flowParser.parse(caseOrders, stepObjArr)
- if taskMetaList is not None:
- self._childTaskMetaList.extend(taskMetaList)
-
- def getTaskMetaList(self):
- selfTaskMetaList = super(SwitchTask, self).getTaskMetaList()
- selfTaskMetaList.extend(self._childTaskMetaList)
- return selfTaskMetaList
-
-
-class ParallelTask(BaseWorkflowTask):
- seqNumber = 0
-
- def __init__(self, order, stepObjArr, flowParser):
- InstSeqNumber = ParallelTask.seqNumber
- super(ParallelTask, self).__init__("parallel_" + str(InstSeqNumber))
- ParallelTask.seqNumber = ParallelTask.seqNumber + 1
- if 'name' in order:
- self._name = order['name']
- self._type = "FORK_JOIN"
- self._args['forkTasks'] = []
- self._childTaskMetaList = []
- lastTasksNameList = []
- parallelList = order['parallel'].items()
- parallelList.sort()
- for key, orderList in parallelList:
- print orderList
- taskList, taskMetaList = flowParser.parse(orderList, stepObjArr)
- self._args['forkTasks'].append(taskList)
- lastTasksNameList.append(taskList[-1]['taskReferenceName'])
- if taskMetaList is not None:
- self._childTaskMetaList.extend(taskMetaList)
- self._joinTaskObj = ParallelJoinTask(InstSeqNumber, lastTasksNameList)
-
- def getTaskMetaList(self):
- selfTaskMetaList = super(ParallelTask, self).getTaskMetaList()
- selfTaskMetaList.extend(self._childTaskMetaList)
- selfTaskMetaList.extend(self._joinTaskObj.getTaskMetaList())
- return selfTaskMetaList
-
- def getJoinTask(self):
- return self._joinTaskObj
-
-
-class ParallelJoinTask(BaseWorkflowTask):
- def __init__(self, seqNumber, joinOnList):
- super(ParallelJoinTask, self).__init__(
- "paralleljoin_" + str(seqNumber))
- self._type = "JOIN"
- self._args['joinOn'] = joinOnList
-
-
-def getRandString(length):
- return "".join(random.choice(str("0123456789")) for i in range(length))