diff options
Diffstat (limited to 'nfvbench/nfvbenchd.py')
-rw-r--r-- | nfvbench/nfvbenchd.py | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py new file mode 100644 index 0000000..aef896a --- /dev/null +++ b/nfvbench/nfvbenchd.py @@ -0,0 +1,251 @@ +#!/usr/bin/env python +# Copyright 2017 Cisco Systems, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from flask import Flask +from flask import jsonify +from flask import render_template +from flask import request + +from flask_socketio import emit +from flask_socketio import SocketIO + +import json +import Queue +import traceback +from utils import byteify +from utils import RunLock +import uuid + +# this global cannot reside in Ctx because of the @app and @socketio decorators +app = None +socketio = None + +STATUS_OK = 'OK' +STATUS_ERROR = 'ERROR' +STATUS_PENDING = 'PENDING' +STATUS_NOT_FOUND = 'NOT_FOUND' + + +def result_json(status, message, request_id=None): + body = { + 'status': status, + 'error_message': message + } + + if request_id is not None: + body['request_id'] = request_id + + return body + + +def load_json(data): + return json.loads(json.dumps(data), object_hook=byteify) + + +def get_uuid(): + return uuid.uuid4().hex + + +class Ctx(object): + MAXLEN = 5 + run_queue = Queue.Queue() + busy = False + result = None + request_from_socketio = False + results = {} + ids = [] + current_id = None + + @staticmethod + def enqueue(config, request_id, from_socketio=False): + Ctx.busy = True + Ctx.request_from_socketio = from_socketio + config['request_id'] = request_id + Ctx.run_queue.put(config) + + if len(Ctx.ids) >= Ctx.MAXLEN: + try: + del Ctx.results[Ctx.ids.pop(0)] + except KeyError: + pass + Ctx.ids.append(request_id) + + @staticmethod + def dequeue(): + config = Ctx.run_queue.get() + Ctx.current_id = config['request_id'] + return config + + @staticmethod + def release(): + Ctx.current_id = None + Ctx.busy = False + + @staticmethod + def set_result(res): + res['request_id'] = Ctx.current_id + Ctx.results[Ctx.current_id] = res + Ctx.result = res + + @staticmethod + def get_result(request_id=None): + if request_id: + try: + res = Ctx.results[request_id] + except KeyError: + return None + + if Ctx.result and request_id == Ctx.result['request_id']: + Ctx.result = None + + return res + else: + res = Ctx.result + if res: + Ctx.result = None + return res + + @staticmethod + def is_busy(): + return Ctx.busy + + @staticmethod + def get_current_request_id(): + return Ctx.current_id + + +def setup_flask(root_path): + global socketio + global app + app = Flask(__name__) + app.root_path = root_path + socketio = SocketIO(app, async_mode='threading') + busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running') + not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run') + not_found_msg = 'results not found' + pending_msg = 'NFVbench run still pending' + + # --------- socketio requests ------------ + + @socketio.on('start_run') + def socketio_start_run(config): + if not Ctx.is_busy(): + Ctx.enqueue(config, get_uuid(), from_socketio=True) + else: + emit('error', {'reason': 'there is already an NFVbench request running'}) + + @socketio.on('echo') + def socketio_echo(config): + emit('echo', config) + + # --------- HTTP requests ------------ + + @app.route('/') + def index(): + return render_template('index.html') + + @app.route('/echo', methods=['GET']) + def echo(): + config = request.json + return jsonify(config) + + @app.route('/start_run', methods=['POST']) + def start_run(): + config = load_json(request.json) + if Ctx.is_busy(): + return jsonify(busy_json) + else: + request_id = get_uuid() + Ctx.enqueue(config, request_id) + return jsonify(result_json(STATUS_PENDING, pending_msg, request_id)) + + @app.route('/status', defaults={'request_id': None}, methods=['GET']) + @app.route('/status/<request_id>', methods=['GET']) + def get_status(request_id): + if request_id: + if Ctx.is_busy() and request_id == Ctx.get_current_request_id(): + # task with request_id still pending + return jsonify(result_json(STATUS_PENDING, pending_msg, request_id)) + + res = Ctx.get_result(request_id) + if res: + # found result for given request_id + return jsonify(res) + else: + # result for given request_id not found + return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id)) + else: + if Ctx.is_busy(): + # task still pending, return with request_id + return jsonify(result_json(STATUS_PENDING, + pending_msg, + Ctx.get_current_request_id())) + + res = Ctx.get_result() + if res: + return jsonify(res) + else: + return jsonify(not_busy_json) + + +class WebSocketIoServer(object): + """This class takes care of the web socketio server, accepts websocket events, and sends back + notifications using websocket events (send_ methods). Caller should simply create an instance + of this class and pass a runner object then invoke the run method + """ + def __init__(self, http_root, runner): + self.nfvbench_runner = runner + setup_flask(http_root) + + def run(self, host='127.0.0.1', port=7556): + + # socketio.run will not return so we need to run it in a background thread so that + # the calling thread (main thread) can keep doing work + socketio.start_background_task(target=socketio.run, app=app, host=host, port=port) + + # wait for run requests + # the runner must be executed from the main thread (Trex client library requirement) + while True: + # print 'main thread waiting for requests...' + config = Ctx.dequeue() + # print 'main thread processing request...' + print config + try: + # remove unfilled values as we do not want them to override default values with None + config = {k: v for k, v in config.items() if v is not None} + with RunLock(): + results = self.nfvbench_runner.run(config) + except Exception as exc: + print 'NFVbench runner exception:' + traceback.print_exc() + results = result_json(STATUS_ERROR, str(exc)) + + if Ctx.request_from_socketio: + socketio.emit('run_end', results) + else: + # this might overwrite a previously unfetched result + Ctx.set_result(results) + Ctx.release() + + def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct): + stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct} + socketio.emit('run_interval_stats', stats) + + def send_ndr_found(self, ndr_pps): + socketio.emit('ndr_found', {'rate_pps': ndr_pps}) + + def send_pdr_found(self, pdr_pps): + socketio.emit('pdr_found', {'rate_pps': pdr_pps}) |