summaryrefslogtreecommitdiffstats
path: root/compass-tasks/db/validator.py
diff options
context:
space:
mode:
authorHarry Huang <huangxiangyu5@huawei.com>2017-11-01 15:05:30 +0800
committerHarry Huang <huangxiangyu5@huawei.com>2017-11-03 14:59:22 +0800
commit8120b2583c459c85116d69211038e0110d71e5b7 (patch)
tree886448deb230ba530fe8814cd5d224e0f2a6299d /compass-tasks/db/validator.py
parent3656ab7b5e3f2f26f7c98f9dcc97b3c461fa2a76 (diff)
Add compass-tasks
Registered tasks and MQ modules for Compass Change-Id: Id1569a61fe53357d53448478d5ba42cb1f386bc6 Signed-off-by: Harry Huang <huangxiangyu5@huawei.com>
Diffstat (limited to 'compass-tasks/db/validator.py')
-rw-r--r--compass-tasks/db/validator.py195
1 files changed, 195 insertions, 0 deletions
diff --git a/compass-tasks/db/validator.py b/compass-tasks/db/validator.py
new file mode 100644
index 0000000..730bb52
--- /dev/null
+++ b/compass-tasks/db/validator.py
@@ -0,0 +1,195 @@
+# Copyright 2014 Huawei Technologies Co. Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Validator methods."""
+import logging
+import netaddr
+import re
+import socket
+
+from compass.utils import setting_wrapper as setting
+from compass.utils import util
+
+
+def is_valid_ip(name, ip_addr, **kwargs):
+ """Valid the format of an IP address."""
+ if isinstance(ip_addr, list):
+ return all([
+ is_valid_ip(name, item, **kwargs) for item in ip_addr
+ ])
+ try:
+ netaddr.IPAddress(ip_addr)
+ except Exception:
+ logging.debug('%s invalid ip addr %s', name, ip_addr)
+ return False
+ return True
+
+
+def is_valid_network(name, ip_network, **kwargs):
+ """Valid the format of an Ip network."""
+ if isinstance(ip_network, list):
+ return all([
+ is_valid_network(name, item, **kwargs) for item in ip_network
+ ])
+ try:
+ netaddr.IPNetwork(ip_network)
+ except Exception:
+ logging.debug('%s invalid network %s', name, ip_network)
+ return False
+ return True
+
+
+def is_valid_netmask(name, ip_addr, **kwargs):
+ """Valid the format of a netmask."""
+ if isinstance(ip_addr, list):
+ return all([
+ is_valid_netmask(name, item, **kwargs) for item in ip_addr
+ ])
+ if not is_valid_ip(ip_addr):
+ return False
+ ip = netaddr.IPAddress(ip_addr)
+ if ip.is_netmask():
+ return True
+ logging.debug('%s invalid netmask %s', name, ip_addr)
+ return False
+
+
+def is_valid_gateway(name, ip_addr, **kwargs):
+ """Valid the format of gateway."""
+ if isinstance(ip_addr, list):
+ return all([
+ is_valid_gateway(name, item, **kwargs) for item in ip_addr
+ ])
+ if not is_valid_ip(ip_addr):
+ return False
+ ip = netaddr.IPAddress(ip_addr)
+ if ip.is_private() or ip.is_public():
+ return True
+ logging.debug('%s invalid gateway %s', name, ip_addr)
+ return False
+
+
+def is_valid_dns(name, dns, **kwargs):
+ """Valid the format of DNS."""
+ if isinstance(dns, list):
+ return all([is_valid_dns(name, item, **kwargs) for item in dns])
+ if is_valid_ip(dns):
+ return True
+ try:
+ socket.gethostbyname_ex(dns)
+ except Exception:
+ logging.debug('%s invalid dns name %s', name, dns)
+ return False
+ return True
+
+
+def is_valid_url(name, url, **kwargs):
+ """Valid the format of url."""
+ if isinstance(url, list):
+ return all([
+ is_valid_url(name, item, **kwargs) for item in url
+ ])
+ if re.match(
+ r'^(http|https|ftp)://([0-9A-Za-z_-]+)(\.[0-9a-zA-Z_-]+)*'
+ r'(:\d+)?(/[0-9a-zA-Z_-]+)*$',
+ url
+ ):
+ return True
+ logging.debug(
+ '%s invalid url %s', name, url
+ )
+ return False
+
+
+def is_valid_domain(name, domain, **kwargs):
+ """Validate the format of domain."""
+ if isinstance(domain, list):
+ return all([
+ is_valid_domain(name, item, **kwargs) for item in domain
+ ])
+ if re.match(
+ r'^([0-9a-zA-Z_-]+)(\.[0-9a-zA-Z_-]+)*$',
+ domain
+ ):
+ return True
+ logging.debug(
+ '%s invalid domain %s', name, domain
+ )
+ return False
+
+
+def is_valid_username(name, username, **kwargs):
+ """Valid the format of username."""
+ if bool(username):
+ return True
+ logging.debug(
+ '%s username is empty', name
+ )
+
+
+def is_valid_password(name, password, **kwargs):
+ """Valid the format of password."""
+ if bool(password):
+ return True
+ logging.debug('%s password is empty', name)
+ return False
+
+
+def is_valid_partition(name, partition, **kwargs):
+ """Valid the format of partition name."""
+ if name != 'swap' and not name.startswith('/'):
+ logging.debug(
+ '%s is not started with / or swap', name
+ )
+ return False
+ if 'size' not in partition and 'percentage' not in partition:
+ logging.debug(
+ '%s partition does not contain sie or percentage',
+ name
+ )
+ return False
+ return True
+
+
+def is_valid_percentage(name, percentage, **kwargs):
+ """Valid the percentage."""
+ if 0 <= percentage <= 100:
+ return True
+ logging.debug('%s invalid percentage %s', name, percentage)
+
+
+def is_valid_port(name, port, **kwargs):
+ """Valid the format of port."""
+ if 0 < port < 65536:
+ return True
+ logging.debug('%s invalid port %s', name, port)
+
+
+def is_valid_size(name, size, **kwargs):
+ if re.match(r'^(\d+)(K|M|G|T)$', size):
+ return True
+ logging.debug('%s invalid size %s', name, size)
+ return False
+
+
+VALIDATOR_GLOBALS = globals()
+VALIDATOR_LOCALS = locals()
+VALIDATOR_CONFIGS = util.load_configs(
+ setting.VALIDATOR_DIR,
+ config_name_suffix='.py',
+ env_globals=VALIDATOR_GLOBALS,
+ env_locals=VALIDATOR_LOCALS
+)
+for validator_config in VALIDATOR_CONFIGS:
+ VALIDATOR_LOCALS.update(validator_config)