aboutsummaryrefslogtreecommitdiffstats
path: root/tools/module_manager.py
blob: 943399baf6fa34b924f200a11d876efca3e97a1e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright 2015-2017 Intel Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Simple kernel module manager implementation.
"""
import os
import subprocess
import logging

from tools import tasks

class ModuleManager(object):
    """Simple module manager which acts as system wrapper for Kernel Modules.
    """

    _logger = logging.getLogger(__name__)

    def __init__(self):
        """Initializes data
        """
        self._modules = []

    def insert_module(self, module, auto_remove=True):
        """Method inserts given module.

        In case that module name ends with .ko suffix then insmod will
        be used for its insertion. Otherwise modprobe will be called.

        :param module: a name of kernel module
        :param auto_remove: if True (by default), then module will be
            automatically removed by remove_modules() method
        """
        module_base_name = os.path.basename(os.path.splitext(module)[0])

        if self.is_module_inserted(module):
            self._logger.info('Module already loaded \'%s\'.', module_base_name)
            # add it to internal list, so we can try to remove it at the end
            if auto_remove:
                self._modules.append(module)
            return

        try:
            if module.endswith('.ko'):
                # load module dependecies first, but suppress automatic
                # module removal at the end; Just for case, that module
                # depends on generic module
                for depmod in self.get_module_dependecies(module):
                    self.insert_module(depmod, auto_remove=False)

                tasks.run_task(['sudo', 'insmod', module], self._logger,
                               'Insmod module \'%s\'...' % module_base_name, True)
            else:
                tasks.run_task(['sudo', 'modprobe', module], self._logger,
                               'Modprobe module \'%s\'...' % module_base_name, True)
            if auto_remove:
                self._modules.append(module)
        except subprocess.CalledProcessError:
            # in case of error, show full module name
            self._logger.error('Unable to insert module \'%s\'.', module)
            raise  # fail catastrophically

    def insert_modules(self, modules):
        """Method inserts list of modules.

        :param modules: a list of modules to be inserted
        """
        for module in modules:
            self.insert_module(module)

    def remove_module(self, module):
        """Removes a single module.

        :param module: a name of kernel module
        """
        if self.is_module_inserted(module):
            # get module base name, i.e strip path and .ko suffix if possible
            module_base_name = os.path.basename(os.path.splitext(module)[0])

            try:
                self._logger.info('Removing module \'%s\'...', module_base_name)
                subprocess.check_call('sudo rmmod {}'.format(module_base_name),
                                      shell=True, stderr=subprocess.DEVNULL)
                # in case that module was loaded automatically by modprobe
                # to solve dependecies, then it is not in internal list of modules
                if module in self._modules:
                    self._modules.remove(module)
            except subprocess.CalledProcessError:
                # in case of error, show full module name...
                self._logger.info('Unable to remove module \'%s\'.', module)
                # ...and list of dependend modules, if there are any
                module_details = self.get_module_details(module_base_name)
                if module_details:
                    mod_dep = module_details.split(' ')[3].rstrip(',')
                    if mod_dep[0] != '-':
                        self._logger.debug('Module \'%s\' is used by module(s) \'%s\'.',
                                           module_base_name, mod_dep)

    def remove_modules(self):
        """Removes all modules that have been previously inserted.
        """
        # remove modules in reverse order to respect their dependencies
        for module in reversed(self._modules):
            self.remove_module(module)

    def is_module_inserted(self, module):
        """Check if a module is inserted on system.

        :param module: a name of kernel module
        """
        module_base_name = os.path.basename(os.path.splitext(module)[0])

        return self.get_module_details(module_base_name) != None

    @staticmethod
    def get_module_details(module):
        """Return details about given module

        :param module: a name of kernel module
        :returns: In case that module is loaded in OS, then corresponding
            line from /proc/modules will be returned. Otherwise it returns None.
        """
        # get list of modules from kernel
        with open('/proc/modules') as mod_file:
            loaded_mods = mod_file.readlines()

        # check if module is loaded
        for line in loaded_mods:
            # underscores '_' and dashes '-' in module names are interchangeable, so we
            # have to normalize module names before comparision
            if line.split(' ')[0].replace('-', '_') == module.replace('-', '_'):
                return line

        return None

    def get_module_dependecies(self, module):
        """Return list of modules, which must be loaded before module itself

        :param module: a name of kernel module
        :returns: In case that module has any dependencies, then list of module
            names will be returned. Otherwise it returns empty list, i.e. [].
        """
        deps = ''
        try:
            # get list of module dependecies from kernel
            deps = subprocess.check_output('modinfo -F depends {}'.format(module),
                                           shell=True).decode().rstrip('\n')
        except subprocess.CalledProcessError:
            # in case of error, show full module name...
            self._logger.info('Unable to get list of dependecies for module \'%s\'.', module)
            # ...and try to continue, just for case that dependecies are already loaded

        if deps:
            return deps.split(',')
        else:
            return []