summaryrefslogtreecommitdiffstats
path: root/src/ceph/qa/tasks/cephfs/fuse_mount.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph/qa/tasks/cephfs/fuse_mount.py')
-rw-r--r--src/ceph/qa/tasks/cephfs/fuse_mount.py428
1 files changed, 0 insertions, 428 deletions
diff --git a/src/ceph/qa/tasks/cephfs/fuse_mount.py b/src/ceph/qa/tasks/cephfs/fuse_mount.py
deleted file mode 100644
index 8d8410c..0000000
--- a/src/ceph/qa/tasks/cephfs/fuse_mount.py
+++ /dev/null
@@ -1,428 +0,0 @@
-
-from StringIO import StringIO
-import json
-import time
-import logging
-from textwrap import dedent
-
-from teuthology import misc
-from teuthology.contextutil import MaxWhileTries
-from teuthology.orchestra import run
-from teuthology.orchestra.run import CommandFailedError
-from .mount import CephFSMount
-
-log = logging.getLogger(__name__)
-
-
-class FuseMount(CephFSMount):
- def __init__(self, client_config, test_dir, client_id, client_remote):
- super(FuseMount, self).__init__(test_dir, client_id, client_remote)
-
- self.client_config = client_config if client_config else {}
- self.fuse_daemon = None
- self._fuse_conn = None
-
- def mount(self, mount_path=None, mount_fs_name=None):
- try:
- return self._mount(mount_path, mount_fs_name)
- except RuntimeError:
- # Catch exceptions by the mount() logic (i.e. not remote command
- # failures) and ensure the mount is not left half-up.
- # Otherwise we might leave a zombie mount point that causes
- # anyone traversing cephtest/ to get hung up on.
- log.warn("Trying to clean up after failed mount")
- self.umount_wait(force=True)
- raise
-
- def _mount(self, mount_path, mount_fs_name):
- log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
-
- daemon_signal = 'kill'
- if self.client_config.get('coverage') or self.client_config.get('valgrind') is not None:
- daemon_signal = 'term'
-
- log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
- id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
-
- self.client_remote.run(
- args=[
- 'mkdir',
- '--',
- self.mountpoint,
- ],
- )
-
- run_cmd = [
- 'sudo',
- 'adjust-ulimits',
- 'ceph-coverage',
- '{tdir}/archive/coverage'.format(tdir=self.test_dir),
- 'daemon-helper',
- daemon_signal,
- ]
-
- fuse_cmd = ['ceph-fuse', "-f"]
-
- if mount_path is not None:
- fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
-
- if mount_fs_name is not None:
- fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
-
- fuse_cmd += [
- '--name', 'client.{id}'.format(id=self.client_id),
- # TODO ceph-fuse doesn't understand dash dash '--',
- self.mountpoint,
- ]
-
- if self.client_config.get('valgrind') is not None:
- run_cmd = misc.get_valgrind_args(
- self.test_dir,
- 'client.{id}'.format(id=self.client_id),
- run_cmd,
- self.client_config.get('valgrind'),
- )
-
- run_cmd.extend(fuse_cmd)
-
- def list_connections():
- self.client_remote.run(
- args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
- check_status=False
- )
- p = self.client_remote.run(
- args=["ls", "/sys/fs/fuse/connections"],
- stdout=StringIO(),
- check_status=False
- )
- if p.exitstatus != 0:
- return []
-
- ls_str = p.stdout.getvalue().strip()
- if ls_str:
- return [int(n) for n in ls_str.split("\n")]
- else:
- return []
-
- # Before starting ceph-fuse process, note the contents of
- # /sys/fs/fuse/connections
- pre_mount_conns = list_connections()
- log.info("Pre-mount connections: {0}".format(pre_mount_conns))
-
- proc = self.client_remote.run(
- args=run_cmd,
- logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
- stdin=run.PIPE,
- wait=False,
- )
- self.fuse_daemon = proc
-
- # Wait for the connection reference to appear in /sys
- mount_wait = self.client_config.get('mount_wait', 0)
- if mount_wait > 0:
- log.info("Fuse mount waits {0} seconds before checking /sys/".format(mount_wait))
- time.sleep(mount_wait)
- timeout = int(self.client_config.get('mount_timeout', 30))
- waited = 0
-
- post_mount_conns = list_connections()
- while len(post_mount_conns) <= len(pre_mount_conns):
- if self.fuse_daemon.finished:
- # Did mount fail? Raise the CommandFailedError instead of
- # hitting the "failed to populate /sys/" timeout
- self.fuse_daemon.wait()
- time.sleep(1)
- waited += 1
- if waited > timeout:
- raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
- waited
- ))
- else:
- post_mount_conns = list_connections()
-
- log.info("Post-mount connections: {0}".format(post_mount_conns))
-
- # Record our fuse connection number so that we can use it when
- # forcing an unmount
- new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
- if len(new_conns) == 0:
- raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
- elif len(new_conns) > 1:
- raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
- else:
- self._fuse_conn = new_conns[0]
-
- def is_mounted(self):
- proc = self.client_remote.run(
- args=[
- 'stat',
- '--file-system',
- '--printf=%T\n',
- '--',
- self.mountpoint,
- ],
- stdout=StringIO(),
- stderr=StringIO(),
- wait=False
- )
- try:
- proc.wait()
- except CommandFailedError:
- if ("endpoint is not connected" in proc.stderr.getvalue()
- or "Software caused connection abort" in proc.stderr.getvalue()):
- # This happens is fuse is killed without unmount
- log.warn("Found stale moutn point at {0}".format(self.mountpoint))
- return True
- else:
- # This happens if the mount directory doesn't exist
- log.info('mount point does not exist: %s', self.mountpoint)
- return False
-
- fstype = proc.stdout.getvalue().rstrip('\n')
- if fstype == 'fuseblk':
- log.info('ceph-fuse is mounted on %s', self.mountpoint)
- return True
- else:
- log.debug('ceph-fuse not mounted, got fs type {fstype!r}'.format(
- fstype=fstype))
- return False
-
- def wait_until_mounted(self):
- """
- Check to make sure that fuse is mounted on mountpoint. If not,
- sleep for 5 seconds and check again.
- """
-
- while not self.is_mounted():
- # Even if it's not mounted, it should at least
- # be running: catch simple failures where it has terminated.
- assert not self.fuse_daemon.poll()
-
- time.sleep(5)
-
- # Now that we're mounted, set permissions so that the rest of the test will have
- # unrestricted access to the filesystem mount.
- self.client_remote.run(
- args=['sudo', 'chmod', '1777', self.mountpoint])
-
- def _mountpoint_exists(self):
- return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
-
- def umount(self):
- try:
- log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
- self.client_remote.run(
- args=[
- 'sudo',
- 'fusermount',
- '-u',
- self.mountpoint,
- ],
- )
- except run.CommandFailedError:
- log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
-
- self.client_remote.run(args=[
- 'sudo',
- run.Raw('PATH=/usr/sbin:$PATH'),
- 'lsof',
- run.Raw(';'),
- 'ps',
- 'auxf',
- ])
-
- # abort the fuse mount, killing all hung processes
- if self._fuse_conn:
- self.run_python(dedent("""
- import os
- path = "/sys/fs/fuse/connections/{0}/abort"
- if os.path.exists(path):
- open(path, "w").write("1")
- """).format(self._fuse_conn))
- self._fuse_conn = None
-
- stderr = StringIO()
- try:
- # make sure its unmounted
- self.client_remote.run(
- args=[
- 'sudo',
- 'umount',
- '-l',
- '-f',
- self.mountpoint,
- ],
- stderr=stderr
- )
- except CommandFailedError:
- if self.is_mounted():
- raise
-
- assert not self.is_mounted()
- self._fuse_conn = None
-
- def umount_wait(self, force=False, require_clean=False):
- """
- :param force: Complete cleanly even if the MDS is offline
- """
- if force:
- assert not require_clean # mutually exclusive
-
- # When we expect to be forcing, kill the ceph-fuse process directly.
- # This should avoid hitting the more aggressive fallback killing
- # in umount() which can affect other mounts too.
- self.fuse_daemon.stdin.close()
-
- # However, we will still hit the aggressive wait if there is an ongoing
- # mount -o remount (especially if the remount is stuck because MDSs
- # are unavailable)
-
- self.umount()
-
- try:
- if self.fuse_daemon:
- # Permit a timeout, so that we do not block forever
- run.wait([self.fuse_daemon], 900)
- except MaxWhileTries:
- log.error("process failed to terminate after unmount. This probably"
- "indicates a bug within ceph-fuse.")
- raise
- except CommandFailedError:
- if require_clean:
- raise
-
- self.cleanup()
-
- def cleanup(self):
- """
- Remove the mount point.
-
- Prerequisite: the client is not mounted.
- """
- stderr = StringIO()
- try:
- self.client_remote.run(
- args=[
- 'rmdir',
- '--',
- self.mountpoint,
- ],
- stderr=stderr
- )
- except CommandFailedError:
- if "No such file or directory" in stderr.getvalue():
- pass
- else:
- raise
-
- def kill(self):
- """
- Terminate the client without removing the mount point.
- """
- self.fuse_daemon.stdin.close()
- try:
- self.fuse_daemon.wait()
- except CommandFailedError:
- pass
-
- def kill_cleanup(self):
- """
- Follow up ``kill`` to get to a clean unmounted state.
- """
- self.umount()
- self.cleanup()
-
- def teardown(self):
- """
- Whatever the state of the mount, get it gone.
- """
- super(FuseMount, self).teardown()
-
- self.umount()
-
- if self.fuse_daemon and not self.fuse_daemon.finished:
- self.fuse_daemon.stdin.close()
- try:
- self.fuse_daemon.wait()
- except CommandFailedError:
- pass
-
- # Indiscriminate, unlike the touchier cleanup()
- self.client_remote.run(
- args=[
- 'rm',
- '-rf',
- self.mountpoint,
- ],
- )
-
- def _asok_path(self):
- return "/var/run/ceph/ceph-client.{0}.*.asok".format(self.client_id)
-
- @property
- def _prefix(self):
- return ""
-
- def admin_socket(self, args):
- pyscript = """
-import glob
-import re
-import os
-import subprocess
-
-def find_socket(client_name):
- asok_path = "{asok_path}"
- files = glob.glob(asok_path)
-
- # Given a non-glob path, it better be there
- if "*" not in asok_path:
- assert(len(files) == 1)
- return files[0]
-
- for f in files:
- pid = re.match(".*\.(\d+)\.asok$", f).group(1)
- if os.path.exists("/proc/{{0}}".format(pid)):
- return f
- raise RuntimeError("Client socket {{0}} not found".format(client_name))
-
-print find_socket("{client_name}")
-""".format(
- asok_path=self._asok_path(),
- client_name="client.{0}".format(self.client_id))
-
- # Find the admin socket
- p = self.client_remote.run(args=[
- 'python', '-c', pyscript
- ], stdout=StringIO())
- asok_path = p.stdout.getvalue().strip()
- log.info("Found client admin socket at {0}".format(asok_path))
-
- # Query client ID from admin socket
- p = self.client_remote.run(
- args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
- stdout=StringIO())
- return json.loads(p.stdout.getvalue())
-
- def get_global_id(self):
- """
- Look up the CephFS client ID for this mount
- """
-
- return self.admin_socket(['mds_sessions'])['id']
-
- def get_osd_epoch(self):
- """
- Return 2-tuple of osd_epoch, osd_epoch_barrier
- """
- status = self.admin_socket(['status'])
- return status['osd_epoch'], status['osd_epoch_barrier']
-
- def get_dentry_count(self):
- """
- Return 2-tuple of dentry_count, dentry_pinned_count
- """
- status = self.admin_socket(['status'])
- return status['dentry_count'], status['dentry_pinned_count']
-
- def set_cache_size(self, size):
- return self.admin_socket(['config', 'set', 'client_cache_size', str(size)])