summaryrefslogtreecommitdiffstats
path: root/snaps/file_utils.py
blob: 819c7074d49c4799b5cabb0e23a61581b51cf5b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
#                    and others.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import logging
try:
    import urllib.request as urllib
except ImportError:
    import urllib2 as urllib

import yaml

__author__ = 'spisarski'

"""
Utilities for basic file handling
"""

logger = logging.getLogger('file_utils')


def file_exists(file_path):
    """
    Returns True if the image file already exists and throws an exception if the path is a directory
    :return:
    """
    if os.path.exists(file_path):
        if os.path.isdir(file_path):
            return False
        return os.path.isfile(file_path)
    return False


def download(url, dest_path, name=None):
    """
    Download a file to a destination path given a URL
    :param url: the endpoint to the file to download
    :param dest_path: the directory to save the file
    :param name: the file name (optional)
    :rtype : File object
    """
    if not name:
        name = url.rsplit('/')[-1]
    dest = dest_path + '/' + name
    logger.debug('Downloading file from - ' + url)
    # Override proxy settings to use localhost to download file
    f = None
    try:
        with open(dest, 'wb') as f:
            logger.debug('Saving file to - ' + dest)
            response = __get_url_response(url)
            f.write(response.read())
        return f
    finally:
        if f:
            f.close()


def get_content_length(url):
    """
    Returns the number of bytes to be downloaded from the given URL
    :param url: the URL to inspect
    :return: the number of bytes
    """
    response = __get_url_response(url)
    return response.headers['Content-Length']


def __get_url_response(url):
    """
    Returns a response object for a given URL
    :param url: the URL
    :return: the response
    """
    proxy_handler = urllib.ProxyHandler({})
    opener = urllib.build_opener(proxy_handler)
    urllib.install_opener(opener)
    return urllib.urlopen(url)


def read_yaml(config_file_path):
    """
    Reads the yaml file and returns a dictionary object representation
    :param config_file_path: The file path to config
    :return: a dictionary
    """
    logger.debug('Attempting to load configuration file - ' + config_file_path)
    with open(config_file_path) as config_file:
        config = yaml.safe_load(config_file)
        logger.info('Loaded configuration')
    config_file.close()
    logger.info('Closing configuration file')
    return config


def read_os_env_file(os_env_filename):
    """
    Reads the OS environment source file and returns a map of each key/value
    Will ignore lines beginning with a '#' and will replace any single or double quotes contained within the value
    :param os_env_filename: The name of the OS environment file to read
    :return: a dictionary
    """
    if os_env_filename:
        logger.info('Attempting to read OS environment file - ' + os_env_filename)
        out = {}
        for line in open(os_env_filename):
            line = line.lstrip()
            if not line.startswith('#') and line.startswith('export '):
                line = line.lstrip('export ').strip()
                tokens = line.split('=')
                if len(tokens) > 1:
                    # Remove leading and trailing ' & " characters from value
                    out[tokens[0]] = tokens[1].lstrip('\'').lstrip('\"').rstrip('\'').rstrip('\"')
        return out