summaryrefslogtreecommitdiffstats
path: root/vstf/vstf/controller/api_server.py
blob: d3547011abbb555dc508ebfb9f714edd75c557ff (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
import uuid
import time
import os
import sys
import logging
import signal
import json

from vstf.common import unix, message, cliutil, excepts
from vstf.common.vstfcli import VstfParser
from vstf.common.log import setup_logging
from vstf.common import daemon
from vstf.rpc_frame_work import rpc_producer
from vstf.controller.fabricant import Fabricant
from vstf.agent.env.basic.commandline import CommandLine
from vstf.controller.env_build.env_build import EnvBuildApi as Builder
from vstf.controller.env_build.env_collect import EnvCollectApi
from vstf.controller.database.dbinterface import DbManage
import vstf.controller.sw_perf.performance as pf
from vstf.controller.settings.tester_settings import TesterSettings
from vstf.controller.settings.device_settings import DeviceSettings
from vstf.controller.settings.flows_settings import FlowsSettings
from vstf.controller.settings.mail_settings import MailSettings
from vstf.controller.settings.tool_settings import ToolSettings
from vstf.controller.settings.perf_settings import PerfSettings
from vstf.controller.sw_perf.perf_provider import PerfProvider
from vstf.controller.sw_perf.flow_producer import FlowsProducer
import vstf.controller.reporters.reporter as rp
import vstf.common.constants as cst
import vstf.common.check as chk

LOG = logging.getLogger(__name__)
cmd = CommandLine()


class OpsChains(object):
    def __init__(self, monitor, port):
        """The ops chains will setup the proxy to rabbitmq
        and setup a thread to watch the queues of rabbitmq
        
        """
        super(OpsChains, self).__init__()
        if not os.path.exists(cst.VSTFCPATH):
            os.mkdir(cst.VSTFCPATH)

        LOG.info("VSTF Manager start to listen to %s", monitor)
        self.chanl = rpc_producer.Server(host=monitor, port=port)
        self.dbconn = DbManage()
        self.collection = EnvCollectApi(self.chanl)

    def list_devs(self, **kwargs):
        target = kwargs.get('host')
        if not target:
            respond = "the target is empty, not support now."
        else:
            respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
        return respond

    def src_install(self, host, config_file):
        if not os.path.exists(config_file):
            raise Exception("Can not found the config file.")
        cfg = json.load(open(config_file))
        msg = self.chanl.make_msg("src_install", cfg=cfg)
        return self.chanl.call(msg, host, timeout=1000)

    def create_images(self, host, config_file):
        if not os.path.exists(config_file):
            raise Exception("Can not found the config file.")
        cfg = json.load(open(config_file))
        msg = self.chanl.make_msg("create_images", cfg=cfg)
        return self.chanl.call(msg, host, timeout=1000)

    def clean_images(self, host, config_file):
        if not os.path.exists(config_file):
            raise Exception("Can not found the config file.")
        cfg = json.load(open(config_file))
        msg = self.chanl.make_msg("clean_images", cfg=cfg)
        return self.chanl.call(msg, host, timeout=1000)

    def apply_model(self, host, model=None, config_file=None):
        if config_file is None:
            config_file = "/etc/vstf/env/%s.json" % model
        if not os.path.exists(config_file):
            raise Exception("Can not found the config file.")
        env = Builder(self.chanl, config_file)
        ret = env.build()
        return ret

    def disapply_model(self, host, model=None, config_file=None):
        if config_file is None:
            config_file = "/etc/vstf/env/%s.json" % model
        if not os.path.exists(config_file):
            raise Exception("Can not found the config file.")
        env = Builder(self.chanl, config_file)
        ret = env.clean()
        return ret

    def list_tasks(self):
        ret = self.dbconn.query_tasks()
        head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
        if ret:
            ret = head + ret
        return ret

    def affctl_list(self, host):
        if not host:
            return "Need input the host"
        return Fabricant(host, self.chanl).affctl_list()

    def _create_task(self, scenario):
        taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
                                         desc=scenario + "Test")
        LOG.info("new Task id:%s" % taskid)
        if -1 == taskid:
            raise Exception("DB create task failed.")

        device = DeviceSettings().settings
        hosts = [device["host"], device["tester"]]
        for host in hosts:
            LOG.info(host)

            devs = host["devs"][0]
            keys = ["bdf", "iface", "mac"]
            key = devs.keys()[0]
            if key in keys:
                name = devs[key]
            else:
                raise Exception("error devs :%s", devs)

            query = Fabricant(host["agent"], self.chanl)
            nic_info = query.get_device_detail(identity=name)

            LOG.info(nic_info)

            os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
            LOG.info(os_info)
            LOG.info(cpu_info)
            LOG.info(mem_info)
            LOG.info(hw_info)

            self.dbconn.add_host_2task(taskid,
                                       host["agent"],
                                       json.dumps(hw_info[cst.HW_INFO]),
                                       json.dumps(cpu_info[cst.CPU_INFO]),
                                       json.dumps(mem_info[cst.MEMORY_INFO]),
                                       nic_info["desc"],
                                       json.dumps(os_info[cst.OS_INFO]))

        self.dbconn.add_extent_2task(taskid, "CETH", "driver", "version 2.0")
        self.dbconn.add_extent_2task(taskid, "EVS", "switch", "version 3.0")
        return taskid

    def settings(self, mail=False, perf=False):
        LOG.info("mail:%s, perf:%s" % (mail, perf))
        if mail:
            MailSettings().input()
        if perf:
            PerfSettings().input()

    def report(self, rpath='./', mail_off=False, taskid=-1):
        report = rp.Report(self.dbconn, rpath)
        if taskid == -1:
            taskid = self.dbconn.get_last_taskid()
        report.report(taskid, mail_off)
        info_str = "do report over"
        return info_str

    def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False, mail_on=False):
        LOG.info(case)
        LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
        casetag = case['case']
        tool = case['tool']
        protocol = case['protocol']
        profile = case['profile']
        ttype = case['type']
        sizes = case['sizes']

        ret, ret_str = chk.check_case_params(protocol, ttype, tool)
        if not ret:
            return ret_str

        scenario = self.dbconn.query_scenario(casetag)
        LOG.info(scenario)
        if not scenario:
            LOG.warn("not support the case:%s", casetag)
            return

        config_file = os.path.join("/etc/vstf/env", scenario + ".json")

        LOG.info(config_file)
        env = Builder(self.chanl, config_file)
        if build_on:
            env.build()
        flows_settings = FlowsSettings()
        tool_settings = ToolSettings()
        tester_settings = TesterSettings()
        flow_producer = FlowsProducer(self.chanl, flows_settings)
        provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)

        perf = pf.Performance(self.chanl, provider)
        flow_producer.create(scenario, casetag)
        result = perf.run(tool, protocol, ttype, sizes, affctl)
        LOG.info(flows_settings.settings)
        LOG.info(result)
        if save_on:
            taskid = self._create_task(scenario)
            testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
            LOG.info(testid)
            self.dbconn.add_data_2test(testid, result)
            if report_on:
                self.report(rpath, not mail_on, taskid)
        return result

    def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
        perf_settings = PerfSettings()
        flows_settings = FlowsSettings()
        tool_settings = ToolSettings()
        tester_settings = TesterSettings()
        flow_producer = FlowsProducer(self.chanl, flows_settings)
        provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
        perf = pf.Performance(self.chanl, provider)
        tests = perf_settings.settings

        for scenario, cases in tests.items():
            LOG.info(scenario)
            if not cases:
                continue

            config_file = os.path.join("/etc/vstf/env", scenario + ".json")

            LOG.info(config_file)
            env = Builder(self.chanl, config_file)
            env.build()

            taskid = self._create_task(scenario)

            for case in cases:
                LOG.info(case)
                casetag = case['case']
                tool = case['tool']
                protocol = case['protocol']
                profile = case['profile']
                ttype = case['type']
                sizes = case['sizes']

                ret, ret_str = chk.check_case_params(protocol, ttype, tool)
                if not ret:
                    LOG.warn(ret_str)
                    continue

                flow_producer.create(scenario, casetag)
                result = perf.run(tool, protocol, ttype, sizes, affctl)
                LOG.info(result)

                testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
                LOG.info(testid)

                self.dbconn.add_data_2test(testid, result)

            if report_on:
                self.report(rpath, not mail_on, taskid)

        info_str = "do batch perf test successfully"
        return info_str

    def collect_host_info(self, target):
        if self.collection is not None:
            return self.collection.collect_host_info(target)
        else:
            return "collection is None"


class Manager(daemon.Daemon):
    def __init__(self):
        """
        The manager will create a socket for vstfadm.
        also the manager own a ops chains
        """
        super(Manager, self).__init__(cst.vstf_pid)
        # the connection of socket
        self.conn = None
        # the operations of manager
        self.ops = None
        # record the daemon run flag
        self.run_flag = True

    def deal_unknown_obj(self, obj):
        return "unknown response %s" % obj

    def run(self):
        signal.signal(signal.SIGTERM, self.daemon_die)
        # setup the socket server for communicating with vstfadm
        try:
            self.conn = unix.UdpServer()
            self.conn.bind()
            self.conn.listen()
        except Exception as e:
            raise e

        # accept the connection of vstfadm and recv the command
        # run the command from vstfadm and return the response
        while self.run_flag:
            conn, addr = self.conn.accept()
            LOG.debug("accept the conn: %(conn)s", {'conn': conn})

            # recv the msg until the conn break.

            while True:
                try:
                    data = message.recv(conn.recv)
                    LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
                    msg = message.decode(data)
                    body = message.get_body(msg)
                    context = message.get_context(msg)
                except RuntimeError:
                    LOG.debug("manage catch the connection close!")
                    break
                except Exception as e:
                    LOG.error("Manager recv message from socket failed.")
                    self.daemon_die()
                    raise e

                try:
                    func = getattr(self.ops, body.get('method'))
                    LOG.info("Call function:%s, args:%s",
                             func.__name__, body.get('args'))
                    response = func(**body.get('args'))
                    LOG.info("response: %s", response)
                except excepts.UnsolvableExit as e:
                    msg = "The manager opps, exit now"
                    LOG.error(msg)
                    # the manager has no need to be continue, just return
                    # this msg and exit normal
                    self.daemon_die()
                    raise e
                except Exception as e:
                    # here just the function failed no need exit, just return the msg
                    msg = "Run function failed. [ %s ]" % (e)
                    response = msg
                    LOG.error(msg)
                try:
                    response = message.add_context(response, **context)
                    LOG.debug("Manager send the response: <%(r)s", {'r': response})
                    message.send(conn.send, message.encode(response))
                except Exception as e:
                    self.daemon_die()
                    raise e
            # close the connection when conn down
            conn.close()

    def daemon_die(self, signum, frame):
        """overwrite daemon.Daemon.daemon_die(self)"""
        LOG.info("manage catch the signal %s to exit." % signum)
        if self.conn:
            # we can not close the conn direct, just tell manager to stop accept
            self.run_flag = False

        if self.ops:
            # stop the ops's proxy
            # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
            # this a know bug in pika. fix in 0.9.14 release
            try:
                self.ops.chanl.close()
            except AttributeError:
                LOG.warning("The connection close happens attribute error")

    def start_manage(self, monitor="localhost", port=5672):
        try:
            # create manager's ops chains here will create a proxy to rabbitmq
            self.ops = OpsChains(monitor, port)
        except Exception as e:
            raise e
        self.start()

    def stop_manage(self):
        self.stop()


@cliutil.arg("--monitor",
             dest="monitor",
             default="localhost",
             action="store",
             help="which ip to be monitored")
@cliutil.arg("--port",
             dest="port",
             default="5672",
             action="store",
             help="rabbitmq conn server")
def do_start(args):
    Manager().start_manage(args.monitor, args.port)


def do_stop(args):
    Manager().stop_manage()


def main():
    """this is for vstfctl"""
    setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
    parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
    parser.set_subcommand_parser(target=sys.modules[__name__])
    args = parser.parse_args()
    args.func(args)