diff options
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py')
-rw-r--r-- | testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py | 289 |
1 files changed, 289 insertions, 0 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py b/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py new file mode 100644 index 00000000..2c712bb2 --- /dev/null +++ b/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py @@ -0,0 +1,289 @@ +############################################################################## +# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import json +import re +import os +import copy +import logging +import sys + +LOG = logging.getLogger(__name__) + + +def object2dict(obj): + # convert object to a dict + dic = {'__class__': obj.__class__.__name__, '__module__': obj.__module__} + dic.update(obj.__dict__) + return dic + + +def dict2object(dic): + # convert dict to object + if '__class__' in dic: + class_name = dic.pop('__class__') + module_name = dic.pop('__module__') + module = __import__(module_name) + class_ = getattr(module, class_name) + args = dict((key.encode('ascii'), value) for key, value in dic.items()) # get args + inst = class_(**args) # create new instance + else: + inst = dic + return inst + + +def filter_comments(filename, flags="//"): + result = [] + with open(filename, "r") as ifile: + lines = ifile.readlines() + for data in lines: + data = re.sub("%s.*$" % (flags), '', data) + data = re.sub("^\s*$", '', data) + if data: + result.append(data) + LOG.debug(result) + return ''.join(result) + + +class BaseSettings(object): + def _load(self, fullname): + data = filter_comments(fullname) + LOG.debug(fullname) + LOG.debug(data) + jparams = None + if data: + jparams = json.loads(data) + return jparams + + def _sub(self, ldata, rdata): + if isinstance(ldata, list) and isinstance(rdata, list): + data = [] + if ldata: + for litem in ldata: + if rdata: + for ritem in rdata: + if isinstance(litem, dict) or isinstance(litem, list): + tmp = self._sub(litem, ritem) + else: + tmp = ritem + if tmp and tmp not in data: + data.append(tmp) + else: + data.append(litem) + + else: + data = rdata + + elif isinstance(ldata, dict) and isinstance(rdata, dict): + data = {} + rdata_bak = copy.deepcopy(rdata) + for rkey, rvalue in rdata_bak.items(): + if rkey not in ldata: + rdata_bak.pop(rkey) + for lkey, lvalue in ldata.items(): + if lkey in rdata: + if isinstance(lvalue, dict) or isinstance(lvalue, list): + data[lkey] = self._sub(lvalue, rdata[lkey]) + else: + data[lkey] = rdata[lkey] + else: + if rdata_bak: + data[lkey] = lvalue + else: + data = rdata + + return data + + def _save(self, data, filename): + if os.path.exists(filename): + os.remove(filename) + with open(filename, 'w') as ofile: + content = json.dumps(data, sort_keys=True, indent=4, separators=(',', ':')) + ofile.write(content) + + +class DefaultSettings(BaseSettings): + def __init__(self, path): + self._default = os.path.join(path, 'default') + self._user = os.path.join(path, 'user') + + def load(self, filename): + dfile = os.path.join(self._default, filename) + if os.path.exists(dfile): + ddata = self._load(dfile) + data = ddata + else: + err = "default file is missing : %s" % (dfile) + LOG.error(err) + raise Exception(err) + ufile = os.path.join(self._user, filename) + if os.path.exists(ufile): + udata = self._load(ufile) + if udata: + data = self._sub(ddata, udata) + else: + LOG.info("no user file :%s" % (ufile)) + return data + + def save(self, data, filename): + ufile = os.path.join(self._user, filename) + self._save(data, ufile) + + +class SingleSettings(BaseSettings): + def __init__(self, path): + self._path = path + + def load(self, filename): + pfile = os.path.join(self._path, filename) + if os.path.exists(pfile): + ddata = self._load(pfile) + data = ddata + else: + err = "settings file is missing : %s" % (pfile) + LOG.error(err) + raise Exception(err) + return data + + def save(self, data, filename): + pfile = os.path.join(self._path, filename) + self._save(data, pfile) + +SETS_DEFAULT = "Default" +SETS_SINGLE = "Single" +SETTINGS = [SETS_SINGLE, SETS_DEFAULT] + + +class Settings(object): + def __init__(self, path, filename, mode=SETS_SINGLE): + if mode not in SETTINGS: + raise Exception("error Settings mode : %s" % (mode)) + cls_name = mode + "Settings" + thismodule = sys.modules[__name__] + cls = getattr(thismodule, cls_name) + self._settings = cls(path) + self._filename = filename + self._fset = self._settings.load(filename) + self._mset = copy.deepcopy(self._fset) + self._register_func() + + def reset(self): + self._fset = self._settings.load(self._filename) + self._mset = copy.deepcopy(self._fset) + + @property + def settings(self): + return self._mset + + def _setting_file(self, func_name, mset, fset, key, check=None): + def infunc(value): + if hasattr(check, '__call__'): + check(value) + if isinstance(fset, dict): + mset[key] = copy.deepcopy(value) + fset[key] = copy.deepcopy(value) + elif isinstance(fset, list): + del (mset[:]) + del (fset[:]) + mset.extend(copy.deepcopy(value)) + fset.extend(copy.deepcopy(value)) + self._settings.save(self._fset, self._filename) + infunc.__name__ = func_name + LOG.debug(self._mset) + LOG.debug(self._fset) + + return infunc + + def _setting_memory(self, func_name, mset, key, check=None): + def infunc(value): + if hasattr(check, '__call__'): + check(value) + if isinstance(mset, dict): + mset[key] = copy.deepcopy(value) + elif isinstance(mset, list): + for i in range(len(mset)): + mset.pop() + mset.extend(copy.deepcopy(value)) + + infunc.__name__ = func_name + LOG.debug(self._mset) + LOG.debug(self._fset) + + return infunc + + def _adding_file(self, func_name, mset, fset, key, check=None): + def infunc(value): + if hasattr(check, '__call__'): + check(value) + if key: + mset[key].append(copy.deepcopy(value)) + fset[key].append(copy.deepcopy(value)) + else: + mset.append(copy.deepcopy(value)) + fset.append(copy.deepcopy(value)) + + self._settings.save(self._fset, self._filename) + infunc.__name__ = func_name + LOG.debug(self._mset) + LOG.debug(self._fset) + + return infunc + + def _adding_memory(self, func_name, mset, key, check=None): + def infunc(value): + if hasattr(check, '__call__'): + check(value) + if key: + mset[key].append(copy.deepcopy(value)) + else: + mset.append(copy.deepcopy(value)) + infunc.__name__ = func_name + LOG.debug(self._mset) + LOG.debug(self._fset) + + return infunc + + def _register_func(self): + if isinstance(self._fset, dict): + items = set( + self._fset.keys() + ) + for item in items: + item = item.encode() + func_name = "set_%s" % item + setattr(self, func_name, self._setting_file(func_name, self._mset, self._fset, item)) + func_name = "mset_%s" % item + setattr(self, func_name, self._setting_memory(func_name, self._mset, item)) + elif isinstance(self._fset, list): + func_name = "set" + setattr(self, func_name, self._setting_file(func_name, self._mset, self._fset, None)) + func_name = "mset" + setattr(self, func_name, self._setting_memory(func_name, self._mset, None)) + func_name = "add" + setattr(self, func_name, self._adding_file(func_name, self._mset, self._fset, None)) + func_name = "madd" + setattr(self, func_name, self._adding_memory(func_name, self._mset, None)) + + +def unit_test(): + from vstf.common.log import setup_logging + setup_logging(level=logging.DEBUG, log_file="/var/log/vstf-settings.log", clevel=logging.INFO) + + path = '/etc/vstf' + setting = DefaultSettings(path) + filename = 'reporters.mail.mail-settings' + data = setting.load(filename) + + setting.save(data, filename) + LOG.info(type(data)) + LOG.info(data) + + +if __name__ == '__main__': + unit_test() |