aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py
diff options
context:
space:
mode:
Diffstat (limited to 'charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py')
-rw-r--r--charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py552
1 files changed, 552 insertions, 0 deletions
diff --git a/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py
new file mode 100644
index 0000000..0fb545a
--- /dev/null
+++ b/charms/trusty/ceilometer/charmhelpers/contrib/hardening/audits/file.py
@@ -0,0 +1,552 @@
+# Copyright 2016 Canonical Limited.
+#
+# This file is part of charm-helpers.
+#
+# charm-helpers is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License version 3 as
+# published by the Free Software Foundation.
+#
+# charm-helpers is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
+
+import grp
+import os
+import pwd
+import re
+
+from subprocess import (
+ CalledProcessError,
+ check_output,
+ check_call,
+)
+from traceback import format_exc
+from six import string_types
+from stat import (
+ S_ISGID,
+ S_ISUID
+)
+
+from charmhelpers.core.hookenv import (
+ log,
+ DEBUG,
+ INFO,
+ WARNING,
+ ERROR,
+)
+from charmhelpers.core import unitdata
+from charmhelpers.core.host import file_hash
+from charmhelpers.contrib.hardening.audits import BaseAudit
+from charmhelpers.contrib.hardening.templating import (
+ get_template_path,
+ render_and_write,
+)
+from charmhelpers.contrib.hardening import utils
+
+
+class BaseFileAudit(BaseAudit):
+ """Base class for file audits.
+
+ Provides api stubs for compliance check flow that must be used by any class
+ that implemented this one.
+ """
+
+ def __init__(self, paths, always_comply=False, *args, **kwargs):
+ """
+ :param paths: string path of list of paths of files we want to apply
+ compliance checks are criteria to.
+ :param always_comply: if true compliance criteria is always applied
+ else compliance is skipped for non-existent
+ paths.
+ """
+ super(BaseFileAudit, self).__init__(*args, **kwargs)
+ self.always_comply = always_comply
+ if isinstance(paths, string_types) or not hasattr(paths, '__iter__'):
+ self.paths = [paths]
+ else:
+ self.paths = paths
+
+ def ensure_compliance(self):
+ """Ensure that the all registered files comply to registered criteria.
+ """
+ for p in self.paths:
+ if os.path.exists(p):
+ if self.is_compliant(p):
+ continue
+
+ log('File %s is not in compliance.' % p, level=INFO)
+ else:
+ if not self.always_comply:
+ log("Non-existent path '%s' - skipping compliance check"
+ % (p), level=INFO)
+ continue
+
+ if self._take_action():
+ log("Applying compliance criteria to '%s'" % (p), level=INFO)
+ self.comply(p)
+
+ def is_compliant(self, path):
+ """Audits the path to see if it is compliance.
+
+ :param path: the path to the file that should be checked.
+ """
+ raise NotImplementedError
+
+ def comply(self, path):
+ """Enforces the compliance of a path.
+
+ :param path: the path to the file that should be enforced.
+ """
+ raise NotImplementedError
+
+ @classmethod
+ def _get_stat(cls, path):
+ """Returns the Posix st_stat information for the specified file path.
+
+ :param path: the path to get the st_stat information for.
+ :returns: an st_stat object for the path or None if the path doesn't
+ exist.
+ """
+ return os.stat(path)
+
+
+class FilePermissionAudit(BaseFileAudit):
+ """Implements an audit for file permissions and ownership for a user.
+
+ This class implements functionality that ensures that a specific user/group
+ will own the file(s) specified and that the permissions specified are
+ applied properly to the file.
+ """
+ def __init__(self, paths, user, group=None, mode=0o600, **kwargs):
+ self.user = user
+ self.group = group
+ self.mode = mode
+ super(FilePermissionAudit, self).__init__(paths, user, group, mode,
+ **kwargs)
+
+ @property
+ def user(self):
+ return self._user
+
+ @user.setter
+ def user(self, name):
+ try:
+ user = pwd.getpwnam(name)
+ except KeyError:
+ log('Unknown user %s' % name, level=ERROR)
+ user = None
+ self._user = user
+
+ @property
+ def group(self):
+ return self._group
+
+ @group.setter
+ def group(self, name):
+ try:
+ group = None
+ if name:
+ group = grp.getgrnam(name)
+ else:
+ group = grp.getgrgid(self.user.pw_gid)
+ except KeyError:
+ log('Unknown group %s' % name, level=ERROR)
+ self._group = group
+
+ def is_compliant(self, path):
+ """Checks if the path is in compliance.
+
+ Used to determine if the path specified meets the necessary
+ requirements to be in compliance with the check itself.
+
+ :param path: the file path to check
+ :returns: True if the path is compliant, False otherwise.
+ """
+ stat = self._get_stat(path)
+ user = self.user
+ group = self.group
+
+ compliant = True
+ if stat.st_uid != user.pw_uid or stat.st_gid != group.gr_gid:
+ log('File %s is not owned by %s:%s.' % (path, user.pw_name,
+ group.gr_name),
+ level=INFO)
+ compliant = False
+
+ # POSIX refers to the st_mode bits as corresponding to both the
+ # file type and file permission bits, where the least significant 12
+ # bits (o7777) are the suid (11), sgid (10), sticky bits (9), and the
+ # file permission bits (8-0)
+ perms = stat.st_mode & 0o7777
+ if perms != self.mode:
+ log('File %s has incorrect permissions, currently set to %s' %
+ (path, oct(stat.st_mode & 0o7777)), level=INFO)
+ compliant = False
+
+ return compliant
+
+ def comply(self, path):
+ """Issues a chown and chmod to the file paths specified."""
+ utils.ensure_permissions(path, self.user.pw_name, self.group.gr_name,
+ self.mode)
+
+
+class DirectoryPermissionAudit(FilePermissionAudit):
+ """Performs a permission check for the specified directory path."""
+
+ def __init__(self, paths, user, group=None, mode=0o600,
+ recursive=True, **kwargs):
+ super(DirectoryPermissionAudit, self).__init__(paths, user, group,
+ mode, **kwargs)
+ self.recursive = recursive
+
+ def is_compliant(self, path):
+ """Checks if the directory is compliant.
+
+ Used to determine if the path specified and all of its children
+ directories are in compliance with the check itself.
+
+ :param path: the directory path to check
+ :returns: True if the directory tree is compliant, otherwise False.
+ """
+ if not os.path.isdir(path):
+ log('Path specified %s is not a directory.' % path, level=ERROR)
+ raise ValueError("%s is not a directory." % path)
+
+ if not self.recursive:
+ return super(DirectoryPermissionAudit, self).is_compliant(path)
+
+ compliant = True
+ for root, dirs, _ in os.walk(path):
+ if len(dirs) > 0:
+ continue
+
+ if not super(DirectoryPermissionAudit, self).is_compliant(root):
+ compliant = False
+ continue
+
+ return compliant
+
+ def comply(self, path):
+ for root, dirs, _ in os.walk(path):
+ if len(dirs) > 0:
+ super(DirectoryPermissionAudit, self).comply(root)
+
+
+class ReadOnly(BaseFileAudit):
+ """Audits that files and folders are read only."""
+ def __init__(self, paths, *args, **kwargs):
+ super(ReadOnly, self).__init__(paths=paths, *args, **kwargs)
+
+ def is_compliant(self, path):
+ try:
+ output = check_output(['find', path, '-perm', '-go+w',
+ '-type', 'f']).strip()
+
+ # The find above will find any files which have permission sets
+ # which allow too broad of write access. As such, the path is
+ # compliant if there is no output.
+ if output:
+ return False
+
+ return True
+ except CalledProcessError as e:
+ log('Error occurred checking finding writable files for %s. '
+ 'Error information is: command %s failed with returncode '
+ '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
+ format_exc(e)), level=ERROR)
+ return False
+
+ def comply(self, path):
+ try:
+ check_output(['chmod', 'go-w', '-R', path])
+ except CalledProcessError as e:
+ log('Error occurred removing writeable permissions for %s. '
+ 'Error information is: command %s failed with returncode '
+ '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
+ format_exc(e)), level=ERROR)
+
+
+class NoReadWriteForOther(BaseFileAudit):
+ """Ensures that the files found under the base path are readable or
+ writable by anyone other than the owner or the group.
+ """
+ def __init__(self, paths):
+ super(NoReadWriteForOther, self).__init__(paths)
+
+ def is_compliant(self, path):
+ try:
+ cmd = ['find', path, '-perm', '-o+r', '-type', 'f', '-o',
+ '-perm', '-o+w', '-type', 'f']
+ output = check_output(cmd).strip()
+
+ # The find above here will find any files which have read or
+ # write permissions for other, meaning there is too broad of access
+ # to read/write the file. As such, the path is compliant if there's
+ # no output.
+ if output:
+ return False
+
+ return True
+ except CalledProcessError as e:
+ log('Error occurred while finding files which are readable or '
+ 'writable to the world in %s. '
+ 'Command output is: %s.' % (path, e.output), level=ERROR)
+
+ def comply(self, path):
+ try:
+ check_output(['chmod', '-R', 'o-rw', path])
+ except CalledProcessError as e:
+ log('Error occurred attempting to change modes of files under '
+ 'path %s. Output of command is: %s' % (path, e.output))
+
+
+class NoSUIDSGIDAudit(BaseFileAudit):
+ """Audits that specified files do not have SUID/SGID bits set."""
+ def __init__(self, paths, *args, **kwargs):
+ super(NoSUIDSGIDAudit, self).__init__(paths=paths, *args, **kwargs)
+
+ def is_compliant(self, path):
+ stat = self._get_stat(path)
+ if (stat.st_mode & (S_ISGID | S_ISUID)) != 0:
+ return False
+
+ return True
+
+ def comply(self, path):
+ try:
+ log('Removing suid/sgid from %s.' % path, level=DEBUG)
+ check_output(['chmod', '-s', path])
+ except CalledProcessError as e:
+ log('Error occurred removing suid/sgid from %s.'
+ 'Error information is: command %s failed with returncode '
+ '%d and output %s.\n%s' % (path, e.cmd, e.returncode, e.output,
+ format_exc(e)), level=ERROR)
+
+
+class TemplatedFile(BaseFileAudit):
+ """The TemplatedFileAudit audits the contents of a templated file.
+
+ This audit renders a file from a template, sets the appropriate file
+ permissions, then generates a hashsum with which to check the content
+ changed.
+ """
+ def __init__(self, path, context, template_dir, mode, user='root',
+ group='root', service_actions=None, **kwargs):
+ self.context = context
+ self.user = user
+ self.group = group
+ self.mode = mode
+ self.template_dir = template_dir
+ self.service_actions = service_actions
+ super(TemplatedFile, self).__init__(paths=path, always_comply=True,
+ **kwargs)
+
+ def is_compliant(self, path):
+ """Determines if the templated file is compliant.
+
+ A templated file is only compliant if it has not changed (as
+ determined by its sha256 hashsum) AND its file permissions are set
+ appropriately.
+
+ :param path: the path to check compliance.
+ """
+ same_templates = self.templates_match(path)
+ same_content = self.contents_match(path)
+ same_permissions = self.permissions_match(path)
+
+ if same_content and same_permissions and same_templates:
+ return True
+
+ return False
+
+ def run_service_actions(self):
+ """Run any actions on services requested."""
+ if not self.service_actions:
+ return
+
+ for svc_action in self.service_actions:
+ name = svc_action['service']
+ actions = svc_action['actions']
+ log("Running service '%s' actions '%s'" % (name, actions),
+ level=DEBUG)
+ for action in actions:
+ cmd = ['service', name, action]
+ try:
+ check_call(cmd)
+ except CalledProcessError as exc:
+ log("Service name='%s' action='%s' failed - %s" %
+ (name, action, exc), level=WARNING)
+
+ def comply(self, path):
+ """Ensures the contents and the permissions of the file.
+
+ :param path: the path to correct
+ """
+ dirname = os.path.dirname(path)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+
+ self.pre_write()
+ render_and_write(self.template_dir, path, self.context())
+ utils.ensure_permissions(path, self.user, self.group, self.mode)
+ self.run_service_actions()
+ self.save_checksum(path)
+ self.post_write()
+
+ def pre_write(self):
+ """Invoked prior to writing the template."""
+ pass
+
+ def post_write(self):
+ """Invoked after writing the template."""
+ pass
+
+ def templates_match(self, path):
+ """Determines if the template files are the same.
+
+ The template file equality is determined by the hashsum of the
+ template files themselves. If there is no hashsum, then the content
+ cannot be sure to be the same so treat it as if they changed.
+ Otherwise, return whether or not the hashsums are the same.
+
+ :param path: the path to check
+ :returns: boolean
+ """
+ template_path = get_template_path(self.template_dir, path)
+ key = 'hardening:template:%s' % template_path
+ template_checksum = file_hash(template_path)
+ kv = unitdata.kv()
+ stored_tmplt_checksum = kv.get(key)
+ if not stored_tmplt_checksum:
+ kv.set(key, template_checksum)
+ kv.flush()
+ log('Saved template checksum for %s.' % template_path,
+ level=DEBUG)
+ # Since we don't have a template checksum, then assume it doesn't
+ # match and return that the template is different.
+ return False
+ elif stored_tmplt_checksum != template_checksum:
+ kv.set(key, template_checksum)
+ kv.flush()
+ log('Updated template checksum for %s.' % template_path,
+ level=DEBUG)
+ return False
+
+ # Here the template hasn't changed based upon the calculated
+ # checksum of the template and what was previously stored.
+ return True
+
+ def contents_match(self, path):
+ """Determines if the file content is the same.
+
+ This is determined by comparing hashsum of the file contents and
+ the saved hashsum. If there is no hashsum, then the content cannot
+ be sure to be the same so treat them as if they are not the same.
+ Otherwise, return True if the hashsums are the same, False if they
+ are not the same.
+
+ :param path: the file to check.
+ """
+ checksum = file_hash(path)
+
+ kv = unitdata.kv()
+ stored_checksum = kv.get('hardening:%s' % path)
+ if not stored_checksum:
+ # If the checksum hasn't been generated, return False to ensure
+ # the file is written and the checksum stored.
+ log('Checksum for %s has not been calculated.' % path, level=DEBUG)
+ return False
+ elif stored_checksum != checksum:
+ log('Checksum mismatch for %s.' % path, level=DEBUG)
+ return False
+
+ return True
+
+ def permissions_match(self, path):
+ """Determines if the file owner and permissions match.
+
+ :param path: the path to check.
+ """
+ audit = FilePermissionAudit(path, self.user, self.group, self.mode)
+ return audit.is_compliant(path)
+
+ def save_checksum(self, path):
+ """Calculates and saves the checksum for the path specified.
+
+ :param path: the path of the file to save the checksum.
+ """
+ checksum = file_hash(path)
+ kv = unitdata.kv()
+ kv.set('hardening:%s' % path, checksum)
+ kv.flush()
+
+
+class DeletedFile(BaseFileAudit):
+ """Audit to ensure that a file is deleted."""
+ def __init__(self, paths):
+ super(DeletedFile, self).__init__(paths)
+
+ def is_compliant(self, path):
+ return not os.path.exists(path)
+
+ def comply(self, path):
+ os.remove(path)
+
+
+class FileContentAudit(BaseFileAudit):
+ """Audit the contents of a file."""
+ def __init__(self, paths, cases, **kwargs):
+ # Cases we expect to pass
+ self.pass_cases = cases.get('pass', [])
+ # Cases we expect to fail
+ self.fail_cases = cases.get('fail', [])
+ super(FileContentAudit, self).__init__(paths, **kwargs)
+
+ def is_compliant(self, path):
+ """
+ Given a set of content matching cases i.e. tuple(regex, bool) where
+ bool value denotes whether or not regex is expected to match, check that
+ all cases match as expected with the contents of the file. Cases can be
+ expected to pass of fail.
+
+ :param path: Path of file to check.
+ :returns: Boolean value representing whether or not all cases are
+ found to be compliant.
+ """
+ log("Auditing contents of file '%s'" % (path), level=DEBUG)
+ with open(path, 'r') as fd:
+ contents = fd.read()
+
+ matches = 0
+ for pattern in self.pass_cases:
+ key = re.compile(pattern, flags=re.MULTILINE)
+ results = re.search(key, contents)
+ if results:
+ matches += 1
+ else:
+ log("Pattern '%s' was expected to pass but instead it failed"
+ % (pattern), level=WARNING)
+
+ for pattern in self.fail_cases:
+ key = re.compile(pattern, flags=re.MULTILINE)
+ results = re.search(key, contents)
+ if not results:
+ matches += 1
+ else:
+ log("Pattern '%s' was expected to fail but instead it passed"
+ % (pattern), level=WARNING)
+
+ total = len(self.pass_cases) + len(self.fail_cases)
+ log("Checked %s cases and %s passed" % (total, matches), level=DEBUG)
+ return matches == total
+
+ def comply(self, *args, **kwargs):
+ """NOOP since we just issue warnings. This is to avoid the
+ NotImplememtedError.
+ """
+ log("Not applying any compliance criteria, only checks.", level=INFO)