aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
blob: 08117c6253d9648c37fbcdff98675b6964b0d682 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
##############################################################################
# Copyright (c) 2015 Ericsson AB and others.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################

import importlib
import multiprocessing
import json
import logging

log = logging.getLogger(__name__)

import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario


def _output_serializer_main(filename, queue):
    '''entrypoint for the singleton subprocess writing to outfile
    Use of this process enables multiple instances of a scenario without
    messing up the output file.
    '''
    with open(filename, 'w') as outfile:
        while True:
            # blocks until data becomes available
            record = queue.get()
            if record == '_TERMINATE_':
                outfile.close()
                break
            else:
                json.dump(record, outfile)
                outfile.write('\n')


class Runner(object):
    queue = None
    dump_process = None
    runners = []

    @staticmethod
    def get_cls(runner_type):
        '''return class of specified type'''
        for runner in utils.itersubclasses(Runner):
            if runner_type == runner.__execution_type__:
                return runner
        raise RuntimeError("No such runner_type %s" % runner_type)

    @staticmethod
    def get_types():
        '''return a list of known runner type (class) names'''
        types = []
        for runner in utils.itersubclasses(Runner):
            types.append(runner)
        return types

    @staticmethod
    def get(config):
        """Returns instance of a scenario runner for execution type.
        """
        # if there is no runner, start the output serializer subprocess
        if len(Runner.runners) == 0:
            log.debug("Starting dump process file '%s'" %
                      config["output_filename"])
            Runner.queue = multiprocessing.Queue()
            Runner.dump_process = multiprocessing.Process(
                target=_output_serializer_main,
                name="Dumper",
                args=(config["output_filename"], Runner.queue))
            Runner.dump_process.start()

        return Runner.get_cls(config["type"])(config, Runner.queue)

    @staticmethod
    def release(runner):
        '''Release the runner'''
        Runner.runners.remove(runner)
        # if this was the last runner, stop the output serializer subprocess
        if len(Runner.runners) == 0:
            log.debug("Stopping dump process")
            Runner.queue.put('_TERMINATE_')
            Runner.dump_process.join()

    @staticmethod
    def terminate_all():
        '''Terminate all runners (subprocesses)'''
        log.debug("Terminating all runners")
        for runner in Runner.runners:
            runner.process.terminate()
            runner.process.join()
            Runner.release(runner)

    def __init__(self, config, queue):
        self.context = {}
        self.config = config
        self.result_queue = queue
        Runner.runners.append(self)

    def run(self, scenario_type, scenario_args):
        class_name = base_scenario.Scenario.get(scenario_type)
        path_split = class_name.split(".")
        module_path = ".".join(path_split[:-1])
        module = importlib.import_module(module_path)
        cls = getattr(module, path_split[-1])

        self.config['object'] = class_name
        self._run_benchmark(cls, "run", scenario_args)

    def join(self):
        self.process.join()
        return self.process.exitcode