aboutsummaryrefslogtreecommitdiffstats
path: root/charms/trusty/contrail-control/hooks/charmhelpers/contrib/openstack/amulet/utils.py
blob: b1397419c749e95215ddf0e30a2f476baaac5281 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
# Copyright 2014-2015 Canonical Limited.
#
# This file is part of charm-helpers.
#
# charm-helpers is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License version 3 as
# published by the Free Software Foundation.
#
# charm-helpers is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with charm-helpers.  If not, see <http://www.gnu.org/licenses/>.

import amulet
import json
import logging
import os
import six
import time
import urllib

import cinderclient.v1.client as cinder_client
import glanceclient.v1.client as glance_client
import heatclient.v1.client as heat_client
import keystoneclient.v2_0 as keystone_client
import novaclient.v1_1.client as nova_client
import pika
import swiftclient

from charmhelpers.contrib.amulet.utils import (
    AmuletUtils
)

DEBUG = logging.DEBUG
ERROR = logging.ERROR


class OpenStackAmuletUtils(AmuletUtils):
    """OpenStack amulet utilities.

       This class inherits from AmuletUtils and has additional support
       that is specifically for use by OpenStack charm tests.
       """

    def __init__(self, log_level=ERROR):
        """Initialize the deployment environment."""
        super(OpenStackAmuletUtils, self).__init__(log_level)

    def validate_endpoint_data(self, endpoints, admin_port, internal_port,
                               public_port, expected):
        """Validate endpoint data.

           Validate actual endpoint data vs expected endpoint data. The ports
           are used to find the matching endpoint.
           """
        self.log.debug('Validating endpoint data...')
        self.log.debug('actual: {}'.format(repr(endpoints)))
        found = False
        for ep in endpoints:
            self.log.debug('endpoint: {}'.format(repr(ep)))
            if (admin_port in ep.adminurl and
                    internal_port in ep.internalurl and
                    public_port in ep.publicurl):
                found = True
                actual = {'id': ep.id,
                          'region': ep.region,
                          'adminurl': ep.adminurl,
                          'internalurl': ep.internalurl,
                          'publicurl': ep.publicurl,
                          'service_id': ep.service_id}
                ret = self._validate_dict_data(expected, actual)
                if ret:
                    return 'unexpected endpoint data - {}'.format(ret)

        if not found:
            return 'endpoint not found'

    def validate_svc_catalog_endpoint_data(self, expected, actual):
        """Validate service catalog endpoint data.

           Validate a list of actual service catalog endpoints vs a list of
           expected service catalog endpoints.
           """
        self.log.debug('Validating service catalog endpoint data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for k, v in six.iteritems(expected):
            if k in actual:
                ret = self._validate_dict_data(expected[k][0], actual[k][0])
                if ret:
                    return self.endpoint_error(k, ret)
            else:
                return "endpoint {} does not exist".format(k)
        return ret

    def validate_tenant_data(self, expected, actual):
        """Validate tenant data.

           Validate a list of actual tenant data vs list of expected tenant
           data.
           """
        self.log.debug('Validating tenant data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for e in expected:
            found = False
            for act in actual:
                a = {'enabled': act.enabled, 'description': act.description,
                     'name': act.name, 'id': act.id}
                if e['name'] == a['name']:
                    found = True
                    ret = self._validate_dict_data(e, a)
                    if ret:
                        return "unexpected tenant data - {}".format(ret)
            if not found:
                return "tenant {} does not exist".format(e['name'])
        return ret

    def validate_role_data(self, expected, actual):
        """Validate role data.

           Validate a list of actual role data vs a list of expected role
           data.
           """
        self.log.debug('Validating role data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for e in expected:
            found = False
            for act in actual:
                a = {'name': act.name, 'id': act.id}
                if e['name'] == a['name']:
                    found = True
                    ret = self._validate_dict_data(e, a)
                    if ret:
                        return "unexpected role data - {}".format(ret)
            if not found:
                return "role {} does not exist".format(e['name'])
        return ret

    def validate_user_data(self, expected, actual):
        """Validate user data.

           Validate a list of actual user data vs a list of expected user
           data.
           """
        self.log.debug('Validating user data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        for e in expected:
            found = False
            for act in actual:
                a = {'enabled': act.enabled, 'name': act.name,
                     'email': act.email, 'tenantId': act.tenantId,
                     'id': act.id}
                if e['name'] == a['name']:
                    found = True
                    ret = self._validate_dict_data(e, a)
                    if ret:
                        return "unexpected user data - {}".format(ret)
            if not found:
                return "user {} does not exist".format(e['name'])
        return ret

    def validate_flavor_data(self, expected, actual):
        """Validate flavor data.

           Validate a list of actual flavors vs a list of expected flavors.
           """
        self.log.debug('Validating flavor data...')
        self.log.debug('actual: {}'.format(repr(actual)))
        act = [a.name for a in actual]
        return self._validate_list_data(expected, act)

    def tenant_exists(self, keystone, tenant):
        """Return True if tenant exists."""
        self.log.debug('Checking if tenant exists ({})...'.format(tenant))
        return tenant in [t.name for t in keystone.tenants.list()]

    def authenticate_cinder_admin(self, keystone_sentry, username,
                                  password, tenant):
        """Authenticates admin user with cinder."""
        # NOTE(beisner): cinder python client doesn't accept tokens.
        service_ip = \
            keystone_sentry.relation('shared-db',
                                     'mysql:shared-db')['private-address']
        ept = "http://{}:5000/v2.0".format(service_ip.strip().decode('utf-8'))
        return cinder_client.Client(username, password, tenant, ept)

    def authenticate_keystone_admin(self, keystone_sentry, user, password,
                                    tenant):
        """Authenticates admin user with the keystone admin endpoint."""
        self.log.debug('Authenticating keystone admin...')
        unit = keystone_sentry
        service_ip = unit.relation('shared-db',
                                   'mysql:shared-db')['private-address']
        ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
        return keystone_client.Client(username=user, password=password,
                                      tenant_name=tenant, auth_url=ep)

    def authenticate_keystone_user(self, keystone, user, password, tenant):
        """Authenticates a regular user with the keystone public endpoint."""
        self.log.debug('Authenticating keystone user ({})...'.format(user))
        ep = keystone.service_catalog.url_for(service_type='identity',
                                              endpoint_type='publicURL')
        return keystone_client.Client(username=user, password=password,
                                      tenant_name=tenant, auth_url=ep)

    def authenticate_glance_admin(self, keystone):
        """Authenticates admin user with glance."""
        self.log.debug('Authenticating glance admin...')
        ep = keystone.service_catalog.url_for(service_type='image',
                                              endpoint_type='adminURL')
        return glance_client.Client(ep, token=keystone.auth_token)

    def authenticate_heat_admin(self, keystone):
        """Authenticates the admin user with heat."""
        self.log.debug('Authenticating heat admin...')
        ep = keystone.service_catalog.url_for(service_type='orchestration',
                                              endpoint_type='publicURL')
        return heat_client.Client(endpoint=ep, token=keystone.auth_token)

    def authenticate_nova_user(self, keystone, user, password, tenant):
        """Authenticates a regular user with nova-api."""
        self.log.debug('Authenticating nova user ({})...'.format(user))
        ep = keystone.service_catalog.url_for(service_type='identity',
                                              endpoint_type='publicURL')
        return nova_client.Client(username=user, api_key=password,
                                  project_id=tenant, auth_url=ep)

    def authenticate_swift_user(self, keystone, user, password, tenant):
        """Authenticates a regular user with swift api."""
        self.log.debug('Authenticating swift user ({})...'.format(user))
        ep = keystone.service_catalog.url_for(service_type='identity',
                                              endpoint_type='publicURL')
        return swiftclient.Connection(authurl=ep,
                                      user=user,
                                      key=password,
                                      tenant_name=tenant,
                                      auth_version='2.0')

    def create_cirros_image(self, glance, image_name):
        """Download the latest cirros image and upload it to glance,
        validate and return a resource pointer.

        :param glance: pointer to authenticated glance connection
        :param image_name: display name for new image
        :returns: glance image pointer
        """
        self.log.debug('Creating glance cirros image '
                       '({})...'.format(image_name))

        # Download cirros image
        http_proxy = os.getenv('AMULET_HTTP_PROXY')
        self.log.debug('AMULET_HTTP_PROXY: {}'.format(http_proxy))
        if http_proxy:
            proxies = {'http': http_proxy}
            opener = urllib.FancyURLopener(proxies)
        else:
            opener = urllib.FancyURLopener()

        f = opener.open('http://download.cirros-cloud.net/version/released')
        version = f.read().strip()
        cirros_img = 'cirros-{}-x86_64-disk.img'.format(version)
        local_path = os.path.join('tests', cirros_img)

        if not os.path.exists(local_path):
            cirros_url = 'http://{}/{}/{}'.format('download.cirros-cloud.net',
                                                  version, cirros_img)
            opener.retrieve(cirros_url, local_path)
        f.close()

        # Create glance image
        with open(local_path) as f:
            image = glance.images.create(name=image_name, is_public=True,
                                         disk_format='qcow2',
                                         container_format='bare', data=f)

        # Wait for image to reach active status
        img_id = image.id
        ret = self.resource_reaches_status(glance.images, img_id,
                                           expected_stat='active',
                                           msg='Image status wait')
        if not ret:
            msg = 'Glance image failed to reach expected state.'
            amulet.raise_status(amulet.FAIL, msg=msg)

        # Re-validate new image
        self.log.debug('Validating image attributes...')
        val_img_name = glance.images.get(img_id).name
        val_img_stat = glance.images.get(img_id).status
        val_img_pub = glance.images.get(img_id).is_public
        val_img_cfmt = glance.images.get(img_id).container_format
        val_img_dfmt = glance.images.get(img_id).disk_format
        msg_attr = ('Image attributes - name:{} public:{} id:{} stat:{} '
                    'container fmt:{} disk fmt:{}'.format(
                        val_img_name, val_img_pub, img_id,
                        val_img_stat, val_img_cfmt, val_img_dfmt))

        if val_img_name == image_name and val_img_stat == 'active' \
                and val_img_pub is True and val_img_cfmt == 'bare' \
                and val_img_dfmt == 'qcow2':
            self.log.debug(msg_attr)
        else:
            msg = ('Volume validation failed, {}'.format(msg_attr))
            amulet.raise_status(amulet.FAIL, msg=msg)

        return image

    def delete_image(self, glance, image):
        """Delete the specified image."""

        # /!\ DEPRECATION WARNING
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
                      'delete_resource instead of delete_image.')
        self.log.debug('Deleting glance image ({})...'.format(image))
        return self.delete_resource(glance.images, image, msg='glance image')

    def create_instance(self, nova, image_name, instance_name, flavor):
        """Create the specified instance."""
        self.log.debug('Creating instance '
                       '({}|{}|{})'.format(instance_name, image_name, flavor))
        image = nova.images.find(name=image_name)
        flavor = nova.flavors.find(name=flavor)
        instance = nova.servers.create(name=instance_name, image=image,
                                       flavor=flavor)

        count = 1
        status = instance.status
        while status != 'ACTIVE' and count < 60:
            time.sleep(3)
            instance = nova.servers.get(instance.id)
            status = instance.status
            self.log.debug('instance status: {}'.format(status))
            count += 1

        if status != 'ACTIVE':
            self.log.error('instance creation timed out')
            return None

        return instance

    def delete_instance(self, nova, instance):
        """Delete the specified instance."""

        # /!\ DEPRECATION WARNING
        self.log.warn('/!\\ DEPRECATION WARNING:  use '
                      'delete_resource instead of delete_instance.')
        self.log.debug('Deleting instance ({})...'.format(instance))
        return self.delete_resource(nova.servers, instance,
                                    msg='nova instance')

    def create_or_get_keypair(self, nova, keypair_name="testkey"):
        """Create a new keypair, or return pointer if it already exists."""
        try:
            _keypair = nova.keypairs.get(keypair_name)
            self.log.debug('Keypair ({}) already exists, '
                           'using it.'.format(keypair_name))
            return _keypair
        except:
            self.log.debug('Keypair ({}) does not exist, '
                           'creating it.'.format(keypair_name))

        _keypair = nova.keypairs.create(name=keypair_name)
        return _keypair

    def create_cinder_volume(self, cinder, vol_name="demo-vol", vol_size=1,
                             img_id=None, src_vol_id=None, snap_id=None):
        """Create cinder volume, optionally from a glance image, OR
        optionally as a clone of an existing volume, OR optionally
        from a snapshot.  Wait for the new volume status to reach
        the expected status, validate and return a resource pointer.

        :param vol_name: cinder volume display name
        :param vol_size: size in gigabytes
        :param img_id: optional glance image id
        :param src_vol_id: optional source volume id to clone
        :param snap_id: optional snapshot id to use
        :returns: cinder volume pointer
        """
        # Handle parameter input and avoid impossible combinations
        if img_id and not src_vol_id and not snap_id:
            # Create volume from image
            self.log.debug('Creating cinder volume from glance image...')
            bootable = 'true'
        elif src_vol_id and not img_id and not snap_id:
            # Clone an existing volume
            self.log.debug('Cloning cinder volume...')
            bootable = cinder.volumes.get(src_vol_id).bootable
        elif snap_id and not src_vol_id and not img_id:
            # Create volume from snapshot
            self.log.debug('Creating cinder volume from snapshot...')
            snap = cinder.volume_snapshots.find(id=snap_id)
            vol_size = snap.size
            snap_vol_id = cinder.volume_snapshots.get(snap_id).volume_id
            bootable = cinder.volumes.get(snap_vol_id).bootable
        elif not img_id and not src_vol_id and not snap_id:
            # Create volume
            self.log.debug('Creating cinder volume...')
            bootable = 'false'
        else:
            # Impossible combination of parameters
            msg = ('Invalid method use - name:{} size:{} img_id:{} '
                   'src_vol_id:{} snap_id:{}'.format(vol_name, vol_size,
                                                     img_id, src_vol_id,
                                                     snap_id))
            amulet.raise_status(amulet.FAIL, msg=msg)

        # Create new volume
        try:
            vol_new = cinder.volumes.create(display_name=vol_name,
                                            imageRef=img_id,
                                            size=vol_size,
                                            source_volid=src_vol_id,
                                            snapshot_id=snap_id)
            vol_id = vol_new.id
        except Exception as e:
            msg = 'Failed to create volume: {}'.format(e)
            amulet.raise_status(amulet.FAIL, msg=msg)

        # Wait for volume to reach available status
        ret = self.resource_reaches_status(cinder.volumes, vol_id,
                                           expected_stat="available",
                                           msg="Volume status wait")
        if not ret:
            msg = 'Cinder volume failed to reach expected state.'
            amulet.raise_status(amulet.FAIL, msg=msg)

        # Re-validate new volume
        self.log.debug('Validating volume attributes...')
        val_vol_name = cinder.volumes.get(vol_id).display_name
        val_vol_boot = cinder.volumes.get(vol_id).bootable
        val_vol_stat = cinder.volumes.get(vol_id).status
        val_vol_size = cinder.volumes.get(vol_id).size
        msg_attr = ('Volume attributes - name:{} id:{} stat:{} boot:'
                    '{} size:{}'.format(val_vol_name, vol_id,
                                        val_vol_stat, val_vol_boot,
                                        val_vol_size))

        if val_vol_boot == bootable and val_vol_stat == 'available' \
                and val_vol_name == vol_name and val_vol_size == vol_size:
            self.log.debug(msg_attr)
        else:
            msg = ('Volume validation failed, {}'.format(msg_attr))
            amulet.raise_status(amulet.FAIL, msg=msg)

        return vol_new

    def delete_resource(self, resource, resource_id,
                        msg="resource", max_wait=120):
        """Delete one openstack resource, such as one instance, keypair,
        image, volume, stack, etc., and confirm deletion within max wait time.

        :param resource: pointer to os resource type, ex:glance_client.images
        :param resource_id: unique name or id for the openstack resource
        :param msg: text to identify purpose in logging
        :param max_wait: maximum wait time in seconds
        :returns: True if successful, otherwise False
        """
        self.log.debug('Deleting OpenStack resource '
                       '{} ({})'.format(resource_id, msg))
        num_before = len(list(resource.list()))
        resource.delete(resource_id)

        tries = 0
        num_after = len(list(resource.list()))
        while num_after != (num_before - 1) and tries < (max_wait / 4):
            self.log.debug('{} delete check: '
                           '{} [{}:{}] {}'.format(msg, tries,
                                                  num_before,
                                                  num_after,
                                                  resource_id))
            time.sleep(4)
            num_after = len(list(resource.list()))
            tries += 1

        self.log.debug('{}:  expected, actual count = {}, '
                       '{}'.format(msg, num_before - 1, num_after))

        if num_after == (num_before - 1):
            return True
        else:
            self.log.error('{} delete timed out'.format(msg))
            return False

    def resource_reaches_status(self, resource, resource_id,
                                expected_stat='available',
                                msg='resource', max_wait=120):
        """Wait for an openstack resources status to reach an
           expected status within a specified time.  Useful to confirm that
           nova instances, cinder vols, snapshots, glance images, heat stacks
           and other resources eventually reach the expected status.

        :param resource: pointer to os resource type, ex: heat_client.stacks
        :param resource_id: unique id for the openstack resource
        :param expected_stat: status to expect resource to reach
        :param msg: text to identify purpose in logging
        :param max_wait: maximum wait time in seconds
        :returns: True if successful, False if status is not reached
        """

        tries = 0
        resource_stat = resource.get(resource_id).status
        while resource_stat != expected_stat and tries < (max_wait / 4):
            self.log.debug('{} status check: '
                           '{} [{}:{}] {}'.format(msg, tries,
                                                  resource_stat,
                                                  expected_stat,
                                                  resource_id))
            time.sleep(4)
            resource_stat = resource.get(resource_id).status
            tries += 1

        self.log.debug('{}:  expected, actual status = {}, '
                       '{}'.format(msg, resource_stat, expected_stat))

        if resource_stat == expected_stat:
            return True
        else:
            self.log.debug('{} never reached expected status: '
                           '{}'.format(resource_id, expected_stat))
            return False

    def get_ceph_osd_id_cmd(self, index):
        """Produce a shell command that will return a ceph-osd id."""
        return ("`initctl list | grep 'ceph-osd ' | "
                "awk 'NR=={} {{ print $2 }}' | "
                "grep -o '[0-9]*'`".format(index + 1))

    def get_ceph_pools(self, sentry_unit):
        """Return a dict of ceph pools from a single ceph unit, with
        pool name as keys, pool id as vals."""
        pools = {}
        cmd = 'sudo ceph osd lspools'
        output, code = sentry_unit.run(cmd)
        if code != 0:
            msg = ('{} `{}` returned {} '
                   '{}'.format(sentry_unit.info['unit_name'],
                               cmd, code, output))
            amulet.raise_status(amulet.FAIL, msg=msg)

        # Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
        for pool in str(output).split(','):
            pool_id_name = pool.split(' ')
            if len(pool_id_name) == 2:
                pool_id = pool_id_name[0]
                pool_name = pool_id_name[1]
                pools[pool_name] = int(pool_id)

        self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
                                                pools))
        return pools

    def get_ceph_df(self, sentry_unit):
        """Return dict of ceph df json output, including ceph pool state.

        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
        :returns: Dict of ceph df output
        """
        cmd = 'sudo ceph df --format=json'
        output, code = sentry_unit.run(cmd)
        if code != 0:
            msg = ('{} `{}` returned {} '
                   '{}'.format(sentry_unit.info['unit_name'],
                               cmd, code, output))
            amulet.raise_status(amulet.FAIL, msg=msg)
        return json.loads(output)

    def get_ceph_pool_sample(self, sentry_unit, pool_id=0):
        """Take a sample of attributes of a ceph pool, returning ceph
        pool name, object count and disk space used for the specified
        pool ID number.

        :param sentry_unit: Pointer to amulet sentry instance (juju unit)
        :param pool_id: Ceph pool ID
        :returns: List of pool name, object count, kb disk space used
        """
        df = self.get_ceph_df(sentry_unit)
        pool_name = df['pools'][pool_id]['name']
        obj_count = df['pools'][pool_id]['stats']['objects']
        kb_used = df['pools'][pool_id]['stats']['kb_used']
        self.log.debug('Ceph {} pool (ID {}): {} objects, '
                       '{} kb used'.format(pool_name, pool_id,
                                           obj_count, kb_used))
        return pool_name, obj_count, kb_used

    def validate_ceph_pool_samples(self, samples, sample_type="resource pool"):
        """Validate ceph pool samples taken over time, such as pool
        object counts or pool kb used, before adding, after adding, and
        after deleting items which affect those pool attributes.  The
        2nd element is expected to be greater than the 1st; 3rd is expected
        to be less than the 2nd.

        :param samples: List containing 3 data samples
        :param sample_type: String for logging and usage context
        :returns: None if successful, Failure message otherwise
        """
        original, created, deleted = range(3)
        if samples[created] <= samples[original] or \
                samples[deleted] >= samples[created]:
            return ('Ceph {} samples ({}) '
                    'unexpected.'.format(sample_type, samples))
        else:
            self.log.debug('Ceph {} samples (OK): '
                           '{}'.format(sample_type, samples))
            return None

# rabbitmq/amqp specific helpers:
    def add_rmq_test_user(self, sentry_units,
                          username="testuser1", password="changeme"):
        """Add a test user via the first rmq juju unit, check connection as
        the new user against all sentry units.

        :param sentry_units: list of sentry unit pointers
        :param username: amqp user name, default to testuser1
        :param password: amqp user password
        :returns: None if successful.  Raise on error.
        """
        self.log.debug('Adding rmq user ({})...'.format(username))

        # Check that user does not already exist
        cmd_user_list = 'rabbitmqctl list_users'
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
        if username in output:
            self.log.warning('User ({}) already exists, returning '
                             'gracefully.'.format(username))
            return

        perms = '".*" ".*" ".*"'
        cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
                'rabbitmqctl set_permissions {} {}'.format(username, perms)]

        # Add user via first unit
        for cmd in cmds:
            output, _ = self.run_cmd_unit(sentry_units[0], cmd)

        # Check connection against the other sentry_units
        self.log.debug('Checking user connect against units...')
        for sentry_unit in sentry_units:
            connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
                                                   username=username,
                                                   password=password)
            connection.close()

    def delete_rmq_test_user(self, sentry_units, username="testuser1"):
        """Delete a rabbitmq user via the first rmq juju unit.

        :param sentry_units: list of sentry unit pointers
        :param username: amqp user name, default to testuser1
        :param password: amqp user password
        :returns: None if successful or no such user.
        """
        self.log.debug('Deleting rmq user ({})...'.format(username))

        # Check that the user exists
        cmd_user_list = 'rabbitmqctl list_users'
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)

        if username not in output:
            self.log.warning('User ({}) does not exist, returning '
                             'gracefully.'.format(username))
            return

        # Delete the user
        cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
        output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)

    def get_rmq_cluster_status(self, sentry_unit):
        """Execute rabbitmq cluster status command on a unit and return
        the full output.

        :param unit: sentry unit
        :returns: String containing console output of cluster status command
        """
        cmd = 'rabbitmqctl cluster_status'
        output, _ = self.run_cmd_unit(sentry_unit, cmd)
        self.log.debug('{} cluster_status:\n{}'.format(
            sentry_unit.info['unit_name'], output))
        return str(output)

    def get_rmq_cluster_running_nodes(self, sentry_unit):
        """Parse rabbitmqctl cluster_status output string, return list of
        running rabbitmq cluster nodes.

        :param unit: sentry unit
        :returns: List containing node names of running nodes
        """
        # NOTE(beisner): rabbitmqctl cluster_status output is not
        # json-parsable, do string chop foo, then json.loads that.
        str_stat = self.get_rmq_cluster_status(sentry_unit)
        if 'running_nodes' in str_stat:
            pos_start = str_stat.find("{running_nodes,") + 15
            pos_end = str_stat.find("]},", pos_start) + 1
            str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
            run_nodes = json.loads(str_run_nodes)
            return run_nodes
        else:
            return []

    def validate_rmq_cluster_running_nodes(self, sentry_units):
        """Check that all rmq unit hostnames are represented in the
        cluster_status output of all units.

        :param host_names: dict of juju unit names to host names
        :param units: list of sentry unit pointers (all rmq units)
        :returns: None if successful, otherwise return error message
        """
        host_names = self.get_unit_hostnames(sentry_units)
        errors = []

        # Query every unit for cluster_status running nodes
        for query_unit in sentry_units:
            query_unit_name = query_unit.info['unit_name']
            running_nodes = self.get_rmq_cluster_running_nodes(query_unit)

            # Confirm that every unit is represented in the queried unit's
            # cluster_status running nodes output.
            for validate_unit in sentry_units:
                val_host_name = host_names[validate_unit.info['unit_name']]
                val_node_name = 'rabbit@{}'.format(val_host_name)

                if val_node_name not in running_nodes:
                    errors.append('Cluster member check failed on {}: {} not '
                                  'in {}\n'.format(query_unit_name,
                                                   val_node_name,
                                                   running_nodes))
        if errors:
            return ''.join(errors)

    def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
        """Check a single juju rmq unit for ssl and port in the config file."""
        host = sentry_unit.info['public-address']
        unit_name = sentry_unit.info['unit_name']

        conf_file = '/etc/rabbitmq/rabbitmq.config'
        conf_contents = str(self.file_contents_safe(sentry_unit,
                                                    conf_file, max_wait=16))
        # Checks
        conf_ssl = 'ssl' in conf_contents
        conf_port = str(port) in conf_contents

        # Port explicitly checked in config
        if port and conf_port and conf_ssl:
            self.log.debug('SSL is enabled  @{}:{} '
                           '({})'.format(host, port, unit_name))
            return True
        elif port and not conf_port and conf_ssl:
            self.log.debug('SSL is enabled @{} but not on port {} '
                           '({})'.format(host, port, unit_name))
            return False
        # Port not checked (useful when checking that ssl is disabled)
        elif not port and conf_ssl:
            self.log.debug('SSL is enabled  @{}:{} '
                           '({})'.format(host, port, unit_name))
            return True
        elif not port and not conf_ssl:
            self.log.debug('SSL not enabled @{}:{} '
                           '({})'.format(host, port, unit_name))
            return False
        else:
            msg = ('Unknown condition when checking SSL status @{}:{} '
                   '({})'.format(host, port, unit_name))
            amulet.raise_status(amulet.FAIL, msg)

    def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
        """Check that ssl is enabled on rmq juju sentry units.

        :param sentry_units: list of all rmq sentry units
        :param port: optional ssl port override to validate
        :returns: None if successful, otherwise return error message
        """
        for sentry_unit in sentry_units:
            if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
                return ('Unexpected condition:  ssl is disabled on unit '
                        '({})'.format(sentry_unit.info['unit_name']))
        return None

    def validate_rmq_ssl_disabled_units(self, sentry_units):
        """Check that ssl is enabled on listed rmq juju sentry units.

        :param sentry_units: list of all rmq sentry units
        :returns: True if successful.  Raise on error.
        """
        for sentry_unit in sentry_units:
            if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
                return ('Unexpected condition:  ssl is enabled on unit '
                        '({})'.format(sentry_unit.info['unit_name']))
        return None

    def configure_rmq_ssl_on(self, sentry_units, deployment,
                             port=None, max_wait=60):
        """Turn ssl charm config option on, with optional non-default
        ssl port specification.  Confirm that it is enabled on every
        unit.

        :param sentry_units: list of sentry units
        :param deployment: amulet deployment object pointer
        :param port: amqp port, use defaults if None
        :param max_wait: maximum time to wait in seconds to confirm
        :returns: None if successful.  Raise on error.
        """
        self.log.debug('Setting ssl charm config option:  on')

        # Enable RMQ SSL
        config = {'ssl': 'on'}
        if port:
            config['ssl_port'] = port

        deployment.configure('rabbitmq-server', config)

        # Confirm
        tries = 0
        ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
        while ret and tries < (max_wait / 4):
            time.sleep(4)
            self.log.debug('Attempt {}: {}'.format(tries, ret))
            ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
            tries += 1

        if ret:
            amulet.raise_status(amulet.FAIL, ret)

    def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
        """Turn ssl charm config option off, confirm that it is disabled
        on every unit.

        :param sentry_units: list of sentry units
        :param deployment: amulet deployment object pointer
        :param max_wait: maximum time to wait in seconds to confirm
        :returns: None if successful.  Raise on error.
        """
        self.log.debug('Setting ssl charm config option:  off')

        # Disable RMQ SSL
        config = {'ssl': 'off'}
        deployment.configure('rabbitmq-server', config)

        # Confirm
        tries = 0
        ret = self.validate_rmq_ssl_disabled_units(sentry_units)
        while ret and tries < (max_wait / 4):
            time.sleep(4)
            self.log.debug('Attempt {}: {}'.format(tries, ret))
            ret = self.validate_rmq_ssl_disabled_units(sentry_units)
            tries += 1

        if ret:
            amulet.raise_status(amulet.FAIL, ret)

    def connect_amqp_by_unit(self, sentry_unit, ssl=False,
                             port=None, fatal=True,
                             username="testuser1", password="changeme"):
        """Establish and return a pika amqp connection to the rabbitmq service
        running on a rmq juju unit.

        :param sentry_unit: sentry unit pointer
        :param ssl: boolean, default to False
        :param port: amqp port, use defaults if None
        :param fatal: boolean, default to True (raises on connect error)
        :param username: amqp user name, default to testuser1
        :param password: amqp user password
        :returns: pika amqp connection pointer or None if failed and non-fatal
        """
        host = sentry_unit.info['public-address']
        unit_name = sentry_unit.info['unit_name']

        # Default port logic if port is not specified
        if ssl and not port:
            port = 5671
        elif not ssl and not port:
            port = 5672

        self.log.debug('Connecting to amqp on {}:{} ({}) as '
                       '{}...'.format(host, port, unit_name, username))

        try:
            credentials = pika.PlainCredentials(username, password)
            parameters = pika.ConnectionParameters(host=host, port=port,
                                                   credentials=credentials,
                                                   ssl=ssl,
                                                   connection_attempts=3,
                                                   retry_delay=5,
                                                   socket_timeout=1)
            connection = pika.BlockingConnection(parameters)
            assert connection.server_properties['product'] == 'RabbitMQ'
            self.log.debug('Connect OK')
            return connection
        except Exception as e:
            msg = ('amqp connection failed to {}:{} as '
                   '{} ({})'.format(host, port, username, str(e)))
            if fatal:
                amulet.raise_status(amulet.FAIL, msg)
            else:
                self.log.warn(msg)
                return None

    def publish_amqp_message_by_unit(self, sentry_unit, message,
                                     queue="test", ssl=False,
                                     username="testuser1",
                                     password="changeme",
                                     port=None):
        """Publish an amqp message to a rmq juju unit.

        :param sentry_unit: sentry unit pointer
        :param message: amqp message string
        :param queue: message queue, default to test
        :param username: amqp user name, default to testuser1
        :param password: amqp user password
        :param ssl: boolean, default to False
        :param port: amqp port, use defaults if None
        :returns: None.  Raises exception if publish failed.
        """
        self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
                                                                    message))
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
                                               port=port,
                                               username=username,
                                               password=password)

        # NOTE(beisner): extra debug here re: pika hang potential:
        #   https://github.com/pika/pika/issues/297
        #   https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
        self.log.debug('Defining channel...')
        channel = connection.channel()
        self.log.debug('Declaring queue...')
        channel.queue_declare(queue=queue, auto_delete=False, durable=True)
        self.log.debug('Publishing message...')
        channel.basic_publish(exchange='', routing_key=queue, body=message)
        self.log.debug('Closing channel...')
        channel.close()
        self.log.debug('Closing connection...')
        connection.close()

    def get_amqp_message_by_unit(self, sentry_unit, queue="test",
                                 username="testuser1",
                                 password="changeme",
                                 ssl=False, port=None):
        """Get an amqp message from a rmq juju unit.

        :param sentry_unit: sentry unit pointer
        :param queue: message queue, default to test
        :param username: amqp user name, default to testuser1
        :param password: amqp user password
        :param ssl: boolean, default to False
        :param port: amqp port, use defaults if None
        :returns: amqp message body as string.  Raise if get fails.
        """
        connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
                                               port=port,
                                               username=username,
                                               password=password)
        channel = connection.channel()
        method_frame, _, body = channel.basic_get(queue)

        if method_frame:
            self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
                                                                         body))
            channel.basic_ack(method_frame.delivery_tag)
            channel.close()
            connection.close()
            return body
        else:
            msg = 'No message retrieved.'
            amulet.raise_status(amulet.FAIL, msg)