aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits')
-rw-r--r--charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/__init__.py63
-rw-r--r--charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apache.py100
-rw-r--r--charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apt.py105
-rw-r--r--charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py552
4 files changed, 0 insertions, 820 deletions
diff --git a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/__init__.py b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/__init__.py
deleted file mode 100644
index 6a7057b..0000000
--- a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/__init__.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2016 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-
-class BaseAudit(object): # NO-QA
- """Base class for hardening checks.
-
- The lifecycle of a hardening check is to first check to see if the system
- is in compliance for the specified check. If it is not in compliance, the
- check method will return a value which will be supplied to the.
- """
- def __init__(self, *args, **kwargs):
- self.unless = kwargs.get('unless', None)
- super(BaseAudit, self).__init__()
-
- def ensure_compliance(self):
- """Checks to see if the current hardening check is in compliance or
- not.
-
- If the check that is performed is not in compliance, then an exception
- should be raised.
- """
- pass
-
- def _take_action(self):
- """Determines whether to perform the action or not.
-
- Checks whether or not an action should be taken. This is determined by
- the truthy value for the unless parameter. If unless is a callback
- method, it will be invoked with no parameters in order to determine
- whether or not the action should be taken. Otherwise, the truthy value
- of the unless attribute will determine if the action should be
- performed.
- """
- # Do the action if there isn't an unless override.
- if self.unless is None:
- return True
-
- # Invoke the callback if there is one.
- if hasattr(self.unless, '__call__'):
- results = self.unless()
- if results:
- return False
- else:
- return True
-
- if self.unless:
- return False
- else:
- return True
diff --git a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apache.py b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apache.py
deleted file mode 100644
index cf3c987..0000000
--- a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apache.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# Copyright 2016 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-import re
-import subprocess
-
-from six import string_types
-
-from charmhelpers.core.hookenv import (
- log,
- INFO,
- ERROR,
-)
-
-from charmhelpers.contrib.hardening.audits import BaseAudit
-
-
-class DisabledModuleAudit(BaseAudit):
- """Audits Apache2 modules.
-
- Determines if the apache2 modules are enabled. If the modules are enabled
- then they are removed in the ensure_compliance.
- """
- def __init__(self, modules):
- if modules is None:
- self.modules = []
- elif isinstance(modules, string_types):
- self.modules = [modules]
- else:
- self.modules = modules
-
- def ensure_compliance(self):
- """Ensures that the modules are not loaded."""
- if not self.modules:
- return
-
- try:
- loaded_modules = self._get_loaded_modules()
- non_compliant_modules = []
- for module in self.modules:
- if module in loaded_modules:
- log("Module '%s' is enabled but should not be." %
- (module), level=INFO)
- non_compliant_modules.append(module)
-
- if len(non_compliant_modules) == 0:
- return
-
- for module in non_compliant_modules:
- self._disable_module(module)
- self._restart_apache()
- except subprocess.CalledProcessError as e:
- log('Error occurred auditing apache module compliance. '
- 'This may have been already reported. '
- 'Output is: %s' % e.output, level=ERROR)
-
- @staticmethod
- def _get_loaded_modules():
- """Returns the modules which are enabled in Apache."""
- output = subprocess.check_output(['apache2ctl', '-M'])
- modules = []
- for line in output.strip().split():
- # Each line of the enabled module output looks like:
- # module_name (static|shared)
- # Plus a header line at the top of the output which is stripped
- # out by the regex.
- matcher = re.search(r'^ (\S*)', line)
- if matcher:
- modules.append(matcher.group(1))
- return modules
-
- @staticmethod
- def _disable_module(module):
- """Disables the specified module in Apache."""
- try:
- subprocess.check_call(['a2dismod', module])
- except subprocess.CalledProcessError as e:
- # Note: catch error here to allow the attempt of disabling
- # multiple modules in one go rather than failing after the
- # first module fails.
- log('Error occurred disabling module %s. '
- 'Output is: %s' % (module, e.output), level=ERROR)
-
- @staticmethod
- def _restart_apache():
- """Restarts the apache process"""
- subprocess.check_output(['service', 'apache2', 'restart'])
diff --git a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apt.py b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apt.py
deleted file mode 100644
index e94af03..0000000
--- a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/apt.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2016 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-from __future__ import absolute_import # required for external apt import
-from apt import apt_pkg
-from six import string_types
-
-from charmhelpers.fetch import (
- apt_cache,
- apt_purge
-)
-from charmhelpers.core.hookenv import (
- log,
- DEBUG,
- WARNING,
-)
-from charmhelpers.contrib.hardening.audits import BaseAudit
-
-
-class AptConfig(BaseAudit):
-
- def __init__(self, config, **kwargs):
- self.config = config
-
- def verify_config(self):
- apt_pkg.init()
- for cfg in self.config:
- value = apt_pkg.config.get(cfg['key'], cfg.get('default', ''))
- if value and value != cfg['expected']:
- log("APT config '%s' has unexpected value '%s' "
- "(expected='%s')" %
- (cfg['key'], value, cfg['expected']), level=WARNING)
-
- def ensure_compliance(self):
- self.verify_config()
-
-
-class RestrictedPackages(BaseAudit):
- """Class used to audit restricted packages on the system."""
-
- def __init__(self, pkgs, **kwargs):
- super(RestrictedPackages, self).__init__(**kwargs)
- if isinstance(pkgs, string_types) or not hasattr(pkgs, '__iter__'):
- self.pkgs = [pkgs]
- else:
- self.pkgs = pkgs
-
- def ensure_compliance(self):
- cache = apt_cache()
-
- for p in self.pkgs:
- if p not in cache:
- continue
-
- pkg = cache[p]
- if not self.is_virtual_package(pkg):
- if not pkg.current_ver:
- log("Package '%s' is not installed." % pkg.name,
- level=DEBUG)
- continue
- else:
- log("Restricted package '%s' is installed" % pkg.name,
- level=WARNING)
- self.delete_package(cache, pkg)
- else:
- log("Checking restricted virtual package '%s' provides" %
- pkg.name, level=DEBUG)
- self.delete_package(cache, pkg)
-
- def delete_package(self, cache, pkg):
- """Deletes the package from the system.
-
- Deletes the package form the system, properly handling virtual
- packages.
-
- :param cache: the apt cache
- :param pkg: the package to remove
- """
- if self.is_virtual_package(pkg):
- log("Package '%s' appears to be virtual - purging provides" %
- pkg.name, level=DEBUG)
- for _p in pkg.provides_list:
- self.delete_package(cache, _p[2].parent_pkg)
- elif not pkg.current_ver:
- log("Package '%s' not installed" % pkg.name, level=DEBUG)
- return
- else:
- log("Purging package '%s'" % pkg.name, level=DEBUG)
- apt_purge(pkg.name)
-
- def is_virtual_package(self, pkg):
- return pkg.has_provides and not pkg.has_versions
diff --git a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py
deleted file mode 100644
index 0fb545a..0000000
--- a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py
+++ /dev/null
@@ -1,552 +0,0 @@
-# Copyright 2016 Canonical Limited.
-#
-# This file is part of charm-helpers.
-#
-# charm-helpers is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Lesser General Public License version 3 as
-# published by the Free Software Foundation.
-#
-# charm-helpers is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public License
-# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
-
-import grp
-import os
-import pwd
-import re
-
-from subprocess import (
- CalledProcessError,
- check_output,
- check_call,
-)
-from traceback import format_exc
-from six import string_types
-from stat import (
- S_ISGID,
- S_ISUID
-)
-
-from charmhelpers.core.hookenv import (
- log,
- DEBUG,
- INFO,
- WARNING,
- ERROR,
-)
-from charmhelpers.core import unitdata
-from charmhelpers.core.host import file_hash
-from charmhelpers.contrib.hardening.audits import BaseAudit
-from charmhelpers.contrib.hardening.templating import (
- get_template_path,
- render_and_write,
-)
-from charmhelpers.contrib.hardening import utils
-
-
-class BaseFileAudit(BaseAudit):
- """Base class for file audits.
-
- Provides api stubs for compliance check flow that must be used by any class
- that implemented this one.
- """
-
- def __init__(self, paths, always_comply=False, *args, **kwargs):
- """
- :param paths: string path of list of paths of files we want to apply
- compliance checks are criteria to.
- :param always_comply: if true compliance criteria is always applied
- else compliance is skipped for non-existent
- paths.
- """
- super(BaseFileAudit, self).__init__(*args, **kwargs)
- self.always_comply = always_comply
- if isinstance(paths, string_types) or not hasattr(paths, '__iter__'):
- self.paths = [paths]
- else:
- self.paths = paths
-
- def ensure_compliance(self):
- """Ensure that the all registered files comply to registered criteria.
- """
- for p in self.paths:
- if os.path.exists(p):
- if self.is_compliant(p):
- continue
-
- log('File %s is not in compliance.' % p, level=INFO)
- else:
- if not self.always_comply:
- log("Non-existent path '%s' - skipping compliance check"
- % (p), level=INFO)
- continue
-
- if self._take_action():
- log("Applying compliance criteria to '%s'" % (p), level=INFO)
- self.comply(p)
-
- def is_compliant(self, path):
- """Audits the path to see if it is compliance.
-
- :param path: the path to the file that should be checked.
- """
- raise NotImplementedError
-
- def comply(self, path):
- """Enforces the compliance of a path.
-
- :param path: the path to the file that should be enforced.
- """
- raise NotImplementedError
-
- @classmethod
- def _get_stat(cls, path):
- """Returns the Posix st_stat information for the specified file path.
-
- :param path: the path to get the st_stat information for.
- :returns: an st_stat object for the path or None if the path doesn't
- exist.
- """
- return os.stat(path)
-
-
-class FilePermissionAudit(BaseFileAudit):
- """Implements an audit for file permissions and ownership for a user.
-
- This class implements functionality that ensures that a specific user/group
- will own the file(s) specified and that the permissions specified are
- applied properly to the file.
- """
- def __init__(self, paths, user, group=None, mode=0o600, **kwargs):
- self.user = user
- self.group = group
- self.mode = mode
- super(FilePermissionAudit, self).__init__(paths, user, group, mode,
- **kwargs)
-
- @property
- def user(self):
- return self._user
-
- @user.setter
- def user(self, name):
- try:
- user = pwd.getpwnam(name)
- except KeyError:
- log('Unknown user %s' % name, level=ERROR)
- user = None
- self._user = user
-
- @property
- def group(self):
- return self._group
-
- @group.setter
- def group(self, name):
- try:
- group = None
- if name:
- group = grp.getgrnam(name)
- else:
- group = grp.getgrgid(self.user.pw_gid)
- except KeyError:
- log('Unknown group %s' % name, level=ERROR)
- self._group = group
-
- def is_compliant(self, path):
- """Checks if the path is in compliance.
-
- Used to determine if the path specified meets the necessary
- requirements to be in compliance with the check itself.
-
- :param path: the file path to check
- :returns: True if the path is compliant, False otherwise.
- """
- stat = self._get_stat(path)
- user = self.user
- group = self.group
-
- compliant = True
- if stat.st_uid != user.pw_uid or stat.st_gid != group.gr_gid:
- log('File %s is not owned by %s:%s.' % (path, user.pw_name,
- group.gr_name),
- level=INFO)
- compliant = False
-
- # POSIX refers to the st_mode bits as corresponding to both the
- # file type and file permission bits, where the least significant 12
- # bits (o7777) are the suid (11), sgid (10), sticky bits (9), and the
- # file permission bits (8-0)
- perms = stat.st_mode & 0o7777
- if perms != self.mode:
- log('File %s has incorrect permissions, currently set to %s' %
- (path, oct(stat.st_mode & 0o7777)), level=INFO)
- compliant = False
-
- return compliant
-
- def comply(self, path):
- """Issues a chown and chmod to the file paths specified."""
- utils.ensure_permissions(path, self.user.pw_name, self.group.gr_name,
- self.mode)
-
-
-class DirectoryPermissionAudit(FilePermissionAudit):
- """Performs a permission check for the specified directory path."""
-
- def __init__(self, paths, user, group=None, mode=0o600,
- recursive=True, **kwargs):
- super(DirectoryPermissionAudit, self).__init__(paths, user, group,
- mode, **kwargs)
- self.recursive = recursive
-
- def is_compliant(self, path):
- """Checks if the directory is compliant.
-
- Used to determine if the path specified and all of its children
- directories are in compliance with the check itself.
-
- :param path: the directory path to check
- :returns: True if the directory tree is compliant, otherwise False.
- """
- if not os.path.isdir(path):
- log('Path specified %s is not a directory.' % path, level=ERROR)
- raise ValueError("%s is not a directory." % path)
-
- if not self.recursive:
- return super(DirectoryPermissionAudit, self).is_compliant(path)
-
- compliant = True
- for root, dirs, _ in os.walk(path):
- if len(dirs) > 0:
- continue
-
- if not super(DirectoryPermissionAudit, self).is_compliant(root):
- compliant = False
- continue
-
- return compliant
-
- def comply(self, path):
- for root, dirs, _ in os.walk(path):
- if len(dirs) > 0:
- super(DirectoryPermissionAudit, self).comply(root)
-
-
-class ReadOnly(BaseFileAudit):
- """Audits that files and folders are read only."""
- def __init__(self, paths, *args, **kwargs):
- super(ReadOnly, self).__init__(paths=paths, *args, **kwargs)
-
- def is_compliant(self, path):
- try:
- output = check_output(['find', path, '-perm', '-go+w',
- '-type', 'f']).strip()
-
- # The find above will find any files which have permission sets
- # which allow too broad of write access. As such, the path is
- # compliant if there is no output.
- if output:
- return False
-
- return True
- except CalledProcessError as e:
- log('Error occurred checking finding writable files for %s. '
- 'Error information is: command %s failed with returncode '
- '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
- format_exc(e)), level=ERROR)
- return False
-
- def comply(self, path):
- try:
- check_output(['chmod', 'go-w', '-R', path])
- except CalledProcessError as e:
- log('Error occurred removing writeable permissions for %s. '
- 'Error information is: command %s failed with returncode '
- '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
- format_exc(e)), level=ERROR)
-
-
-class NoReadWriteForOther(BaseFileAudit):
- """Ensures that the files found under the base path are readable or
- writable by anyone other than the owner or the group.
- """
- def __init__(self, paths):
- super(NoReadWriteForOther, self).__init__(paths)
-
- def is_compliant(self, path):
- try:
- cmd = ['find', path, '-perm', '-o+r', '-type', 'f', '-o',
- '-perm', '-o+w', '-type', 'f']
- output = check_output(cmd).strip()
-
- # The find above here will find any files which have read or
- # write permissions for other, meaning there is too broad of access
- # to read/write the file. As such, the path is compliant if there's
- # no output.
- if output:
- return False
-
- return True
- except CalledProcessError as e:
- log('Error occurred while finding files which are readable or '
- 'writable to the world in %s. '
- 'Command output is: %s.' % (path, e.output), level=ERROR)
-
- def comply(self, path):
- try:
- check_output(['chmod', '-R', 'o-rw', path])
- except CalledProcessError as e:
- log('Error occurred attempting to change modes of files under '
- 'path %s. Output of command is: %s' % (path, e.output))
-
-
-class NoSUIDSGIDAudit(BaseFileAudit):
- """Audits that specified files do not have SUID/SGID bits set."""
- def __init__(self, paths, *args, **kwargs):
- super(NoSUIDSGIDAudit, self).__init__(paths=paths, *args, **kwargs)
-
- def is_compliant(self, path):
- stat = self._get_stat(path)
- if (stat.st_mode & (S_ISGID | S_ISUID)) != 0:
- return False
-
- return True
-
- def comply(self, path):
- try:
- log('Removing suid/sgid from %s.' % path, level=DEBUG)
- check_output(['chmod', '-s', path])
- except CalledProcessError as e:
- log('Error occurred removing suid/sgid from %s.'
- 'Error information is: command %s failed with returncode '
- '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
- format_exc(e)), level=ERROR)
-
-
-class TemplatedFile(BaseFileAudit):
- """The TemplatedFileAudit audits the contents of a templated file.
-
- This audit renders a file from a template, sets the appropriate file
- permissions, then generates a hashsum with which to check the content
- changed.
- """
- def __init__(self, path, context, template_dir, mode, user='root',
- group='root', service_actions=None, **kwargs):
- self.context = context
- self.user = user
- self.group = group
- self.mode = mode
- self.template_dir = template_dir
- self.service_actions = service_actions
- super(TemplatedFile, self).__init__(paths=path, always_comply=True,
- **kwargs)
-
- def is_compliant(self, path):
- """Determines if the templated file is compliant.
-
- A templated file is only compliant if it has not changed (as
- determined by its sha256 hashsum) AND its file permissions are set
- appropriately.
-
- :param path: the path to check compliance.
- """
- same_templates = self.templates_match(path)
- same_content = self.contents_match(path)
- same_permissions = self.permissions_match(path)
-
- if same_content and same_permissions and same_templates:
- return True
-
- return False
-
- def run_service_actions(self):
- """Run any actions on services requested."""
- if not self.service_actions:
- return
-
- for svc_action in self.service_actions:
- name = svc_action['service']
- actions = svc_action['actions']
- log("Running service '%s' actions '%s'" % (name, actions),
- level=DEBUG)
- for action in actions:
- cmd = ['service', name, action]
- try:
- check_call(cmd)
- except CalledProcessError as exc:
- log("Service name='%s' action='%s' failed - %s" %
- (name, action, exc), level=WARNING)
-
- def comply(self, path):
- """Ensures the contents and the permissions of the file.
-
- :param path: the path to correct
- """
- dirname = os.path.dirname(path)
- if not os.path.exists(dirname):
- os.makedirs(dirname)
-
- self.pre_write()
- render_and_write(self.template_dir, path, self.context())
- utils.ensure_permissions(path, self.user, self.group, self.mode)
- self.run_service_actions()
- self.save_checksum(path)
- self.post_write()
-
- def pre_write(self):
- """Invoked prior to writing the template."""
- pass
-
- def post_write(self):
- """Invoked after writing the template."""
- pass
-
- def templates_match(self, path):
- """Determines if the template files are the same.
-
- The template file equality is determined by the hashsum of the
- template files themselves. If there is no hashsum, then the content
- cannot be sure to be the same so treat it as if they changed.
- Otherwise, return whether or not the hashsums are the same.
-
- :param path: the path to check
- :returns: boolean
- """
- template_path = get_template_path(self.template_dir, path)
- key = 'hardening:template:%s' % template_path
- template_checksum = file_hash(template_path)
- kv = unitdata.kv()
- stored_tmplt_checksum = kv.get(key)
- if not stored_tmplt_checksum:
- kv.set(key, template_checksum)
- kv.flush()
- log('Saved template checksum for %s.' % template_path,
- level=DEBUG)
- # Since we don't have a template checksum, then assume it doesn't
- # match and return that the template is different.
- return False
- elif stored_tmplt_checksum != template_checksum:
- kv.set(key, template_checksum)
- kv.flush()
- log('Updated template checksum for %s.' % template_path,
- level=DEBUG)
- return False
-
- # Here the template hasn't changed based upon the calculated
- # checksum of the template and what was previously stored.
- return True
-
- def contents_match(self, path):
- """Determines if the file content is the same.
-
- This is determined by comparing hashsum of the file contents and
- the saved hashsum. If there is no hashsum, then the content cannot
- be sure to be the same so treat them as if they are not the same.
- Otherwise, return True if the hashsums are the same, False if they
- are not the same.
-
- :param path: the file to check.
- """
- checksum = file_hash(path)
-
- kv = unitdata.kv()
- stored_checksum = kv.get('hardening:%s' % path)
- if not stored_checksum:
- # If the checksum hasn't been generated, return False to ensure
- # the file is written and the checksum stored.
- log('Checksum for %s has not been calculated.' % path, level=DEBUG)
- return False
- elif stored_checksum != checksum:
- log('Checksum mismatch for %s.' % path, level=DEBUG)
- return False
-
- return True
-
- def permissions_match(self, path):
- """Determines if the file owner and permissions match.
-
- :param path: the path to check.
- """
- audit = FilePermissionAudit(path, self.user, self.group, self.mode)
- return audit.is_compliant(path)
-
- def save_checksum(self, path):
- """Calculates and saves the checksum for the path specified.
-
- :param path: the path of the file to save the checksum.
- """
- checksum = file_hash(path)
- kv = unitdata.kv()
- kv.set('hardening:%s' % path, checksum)
- kv.flush()
-
-
-class DeletedFile(BaseFileAudit):
- """Audit to ensure that a file is deleted."""
- def __init__(self, paths):
- super(DeletedFile, self).__init__(paths)
-
- def is_compliant(self, path):
- return not os.path.exists(path)
-
- def comply(self, path):
- os.remove(path)
-
-
-class FileContentAudit(BaseFileAudit):
- """Audit the contents of a file."""
- def __init__(self, paths, cases, **kwargs):
- # Cases we expect to pass
- self.pass_cases = cases.get('pass', [])
- # Cases we expect to fail
- self.fail_cases = cases.get('fail', [])
- super(FileContentAudit, self).__init__(paths, **kwargs)
-
- def is_compliant(self, path):
- """
- Given a set of content matching cases i.e. tuple(regex, bool) where
- bool value denotes whether or not regex is expected to match, check that
- all cases match as expected with the contents of the file. Cases can be
- expected to pass of fail.
-
- :param path: Path of file to check.
- :returns: Boolean value representing whether or not all cases are
- found to be compliant.
- """
- log("Auditing contents of file '%s'" % (path), level=DEBUG)
- with open(path, 'r') as fd:
- contents = fd.read()
-
- matches = 0
- for pattern in self.pass_cases:
- key = re.compile(pattern, flags=re.MULTILINE)
- results = re.search(key, contents)
- if results:
- matches += 1
- else:
- log("Pattern '%s' was expected to pass but instead it failed"
- % (pattern), level=WARNING)
-
- for pattern in self.fail_cases:
- key = re.compile(pattern, flags=re.MULTILINE)
- results = re.search(key, contents)
- if not results:
- matches += 1
- else:
- log("Pattern '%s' was expected to fail but instead it passed"
- % (pattern), level=WARNING)
-
- total = len(self.pass_cases) + len(self.fail_cases)
- log("Checked %s cases and %s passed" % (total, matches), level=DEBUG)
- return matches == total
-
- def comply(self, *args, **kwargs):
- """NOOP since we just issue warnings. This is to avoid the
- NotImplememtedError.
- """
- log("Not applying any compliance criteria, only checks.", level=INFO)