summaryrefslogtreecommitdiffstats
path: root/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py
diff options
context:
space:
mode:
Diffstat (limited to 'testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py')
-rw-r--r--testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py346
1 files changed, 0 insertions, 346 deletions
diff --git a/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py b/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py
deleted file mode 100644
index a01689d5..00000000
--- a/testsuites/vstf/vstf_scripts/vstf/controller/settings/settings.py
+++ /dev/null
@@ -1,346 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-import json
-import re
-import os
-import copy
-import logging
-import sys
-
-LOG = logging.getLogger(__name__)
-
-
-def object2dict(obj):
- # convert object to a dict
- dic = {'__class__': obj.__class__.__name__, '__module__': obj.__module__}
- dic.update(obj.__dict__)
- return dic
-
-
-def dict2object(dic):
- # convert dict to object
- if '__class__' in dic:
- class_name = dic.pop('__class__')
- module_name = dic.pop('__module__')
- module = __import__(module_name)
- class_ = getattr(module, class_name)
- args = dict((key.encode('ascii'), value)
- for key, value in dic.items()) # get args
- inst = class_(**args) # create new instance
- else:
- inst = dic
- return inst
-
-
-def filter_comments(filename, flags="//"):
- result = []
- with open(filename, "r") as ifile:
- lines = ifile.readlines()
- for data in lines:
- data = re.sub("%s.*$" % (flags), '', data)
- data = re.sub("^\s*$", '', data)
- if data:
- result.append(data)
- LOG.debug(result)
- return ''.join(result)
-
-
-class BaseSettings(object):
-
- def _load(self, fullname):
- data = filter_comments(fullname)
- LOG.debug(fullname)
- LOG.debug(data)
- jparams = None
- if data:
- jparams = json.loads(data)
- return jparams
-
- def _sub(self, ldata, rdata):
- if isinstance(ldata, list) and isinstance(rdata, list):
- data = []
- if ldata:
- for litem in ldata:
- if rdata:
- for ritem in rdata:
- if isinstance(
- litem,
- dict) or isinstance(
- litem,
- list):
- tmp = self._sub(litem, ritem)
- else:
- tmp = ritem
- if tmp and tmp not in data:
- data.append(tmp)
- else:
- data.append(litem)
-
- else:
- data = rdata
-
- elif isinstance(ldata, dict) and isinstance(rdata, dict):
- data = {}
- rdata_bak = copy.deepcopy(rdata)
- for rkey, rvalue in rdata_bak.items():
- if rkey not in ldata:
- rdata_bak.pop(rkey)
- for lkey, lvalue in ldata.items():
- if lkey in rdata:
- if isinstance(lvalue, dict) or isinstance(lvalue, list):
- data[lkey] = self._sub(lvalue, rdata[lkey])
- else:
- data[lkey] = rdata[lkey]
- else:
- if rdata_bak:
- data[lkey] = lvalue
- else:
- data = rdata
-
- return data
-
- def _save(self, data, filename):
- if os.path.exists(filename):
- os.remove(filename)
- with open(filename, 'w') as ofile:
- content = json.dumps(
- data,
- sort_keys=True,
- indent=4,
- separators=(
- ',',
- ':'))
- ofile.write(content)
-
-
-class DefaultSettings(BaseSettings):
-
- def __init__(self, path):
- self._default = os.path.join(path, 'default')
- self._user = os.path.join(path, 'user')
-
- def load(self, filename):
- dfile = os.path.join(self._default, filename)
- if os.path.exists(dfile):
- ddata = self._load(dfile)
- data = ddata
- else:
- err = "default file is missing : %s" % (dfile)
- LOG.error(err)
- raise Exception(err)
- ufile = os.path.join(self._user, filename)
- if os.path.exists(ufile):
- udata = self._load(ufile)
- if udata:
- data = self._sub(ddata, udata)
- else:
- LOG.info("no user file :%s" % (ufile))
- return data
-
- def save(self, data, filename):
- ufile = os.path.join(self._user, filename)
- self._save(data, ufile)
-
-
-class SingleSettings(BaseSettings):
-
- def __init__(self, path):
- self._path = path
-
- def load(self, filename):
- pfile = os.path.join(self._path, filename)
- if os.path.exists(pfile):
- ddata = self._load(pfile)
- data = ddata
- else:
- err = "settings file is missing : %s" % (pfile)
- LOG.error(err)
- raise Exception(err)
- return data
-
- def save(self, data, filename):
- pfile = os.path.join(self._path, filename)
- self._save(data, pfile)
-
-SETS_DEFAULT = "Default"
-SETS_SINGLE = "Single"
-SETTINGS = [SETS_SINGLE, SETS_DEFAULT]
-
-
-class Settings(object):
-
- def __init__(self, path, filename, mode=SETS_SINGLE):
- if mode not in SETTINGS:
- raise Exception("error Settings mode : %s" % (mode))
- cls_name = mode + "Settings"
- thismodule = sys.modules[__name__]
- cls = getattr(thismodule, cls_name)
- self._settings = cls(path)
- self._filename = filename
- self._fset = self._settings.load(filename)
- self._mset = copy.deepcopy(self._fset)
- self._register_func()
-
- def reset(self):
- self._fset = self._settings.load(self._filename)
- self._mset = copy.deepcopy(self._fset)
-
- @property
- def settings(self):
- return self._mset
-
- def _setting_file(self, func_name, mset, fset, key, check=None):
- def infunc(value):
- if hasattr(check, '__call__'):
- check(value)
- if isinstance(fset, dict):
- mset[key] = copy.deepcopy(value)
- fset[key] = copy.deepcopy(value)
- elif isinstance(fset, list):
- del (mset[:])
- del (fset[:])
- mset.extend(copy.deepcopy(value))
- fset.extend(copy.deepcopy(value))
- self._settings.save(self._fset, self._filename)
- infunc.__name__ = func_name
- LOG.debug(self._mset)
- LOG.debug(self._fset)
-
- return infunc
-
- def _setting_memory(self, func_name, mset, key, check=None):
- def infunc(value):
- if hasattr(check, '__call__'):
- check(value)
- if isinstance(mset, dict):
- mset[key] = copy.deepcopy(value)
- elif isinstance(mset, list):
- for i in range(len(mset)):
- mset.pop()
- mset.extend(copy.deepcopy(value))
-
- infunc.__name__ = func_name
- LOG.debug(self._mset)
- LOG.debug(self._fset)
-
- return infunc
-
- def _adding_file(self, func_name, mset, fset, key, check=None):
- def infunc(value):
- if hasattr(check, '__call__'):
- check(value)
- if key:
- mset[key].append(copy.deepcopy(value))
- fset[key].append(copy.deepcopy(value))
- else:
- mset.append(copy.deepcopy(value))
- fset.append(copy.deepcopy(value))
-
- self._settings.save(self._fset, self._filename)
- infunc.__name__ = func_name
- LOG.debug(self._mset)
- LOG.debug(self._fset)
-
- return infunc
-
- def _adding_memory(self, func_name, mset, key, check=None):
- def infunc(value):
- if hasattr(check, '__call__'):
- check(value)
- if key:
- mset[key].append(copy.deepcopy(value))
- else:
- mset.append(copy.deepcopy(value))
- infunc.__name__ = func_name
- LOG.debug(self._mset)
- LOG.debug(self._fset)
-
- return infunc
-
- def _register_func(self):
- if isinstance(self._fset, dict):
- items = set(
- self._fset.keys()
- )
- for item in items:
- item = item.encode()
- func_name = "set_%s" % item
- setattr(
- self,
- func_name,
- self._setting_file(
- func_name,
- self._mset,
- self._fset,
- item))
- func_name = "mset_%s" % item
- setattr(
- self,
- func_name,
- self._setting_memory(
- func_name,
- self._mset,
- item))
- elif isinstance(self._fset, list):
- func_name = "set"
- setattr(
- self,
- func_name,
- self._setting_file(
- func_name,
- self._mset,
- self._fset,
- None))
- func_name = "mset"
- setattr(
- self,
- func_name,
- self._setting_memory(
- func_name,
- self._mset,
- None))
- func_name = "add"
- setattr(
- self,
- func_name,
- self._adding_file(
- func_name,
- self._mset,
- self._fset,
- None))
- func_name = "madd"
- setattr(
- self,
- func_name,
- self._adding_memory(
- func_name,
- self._mset,
- None))
-
-
-def unit_test():
- from vstf.common.log import setup_logging
- setup_logging(
- level=logging.DEBUG,
- log_file="/var/log/vstf-settings.log",
- clevel=logging.INFO)
-
- path = '/etc/vstf'
- setting = DefaultSettings(path)
- filename = 'reporters.mail.mail-settings'
- data = setting.load(filename)
-
- setting.save(data, filename)
- LOG.info(type(data))
- LOG.info(data)
-
-
-if __name__ == '__main__':
- unit_test()