summaryrefslogtreecommitdiffstats
path: root/nfvbench/nfvbenchd.py
diff options
context:
space:
mode:
Diffstat (limited to 'nfvbench/nfvbenchd.py')
-rw-r--r--nfvbench/nfvbenchd.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/nfvbench/nfvbenchd.py b/nfvbench/nfvbenchd.py
new file mode 100644
index 0000000..aef896a
--- /dev/null
+++ b/nfvbench/nfvbenchd.py
@@ -0,0 +1,251 @@
+#!/usr/bin/env python
+# Copyright 2017 Cisco Systems, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from flask import Flask
+from flask import jsonify
+from flask import render_template
+from flask import request
+
+from flask_socketio import emit
+from flask_socketio import SocketIO
+
+import json
+import Queue
+import traceback
+from utils import byteify
+from utils import RunLock
+import uuid
+
+# this global cannot reside in Ctx because of the @app and @socketio decorators
+app = None
+socketio = None
+
+STATUS_OK = 'OK'
+STATUS_ERROR = 'ERROR'
+STATUS_PENDING = 'PENDING'
+STATUS_NOT_FOUND = 'NOT_FOUND'
+
+
+def result_json(status, message, request_id=None):
+ body = {
+ 'status': status,
+ 'error_message': message
+ }
+
+ if request_id is not None:
+ body['request_id'] = request_id
+
+ return body
+
+
+def load_json(data):
+ return json.loads(json.dumps(data), object_hook=byteify)
+
+
+def get_uuid():
+ return uuid.uuid4().hex
+
+
+class Ctx(object):
+ MAXLEN = 5
+ run_queue = Queue.Queue()
+ busy = False
+ result = None
+ request_from_socketio = False
+ results = {}
+ ids = []
+ current_id = None
+
+ @staticmethod
+ def enqueue(config, request_id, from_socketio=False):
+ Ctx.busy = True
+ Ctx.request_from_socketio = from_socketio
+ config['request_id'] = request_id
+ Ctx.run_queue.put(config)
+
+ if len(Ctx.ids) >= Ctx.MAXLEN:
+ try:
+ del Ctx.results[Ctx.ids.pop(0)]
+ except KeyError:
+ pass
+ Ctx.ids.append(request_id)
+
+ @staticmethod
+ def dequeue():
+ config = Ctx.run_queue.get()
+ Ctx.current_id = config['request_id']
+ return config
+
+ @staticmethod
+ def release():
+ Ctx.current_id = None
+ Ctx.busy = False
+
+ @staticmethod
+ def set_result(res):
+ res['request_id'] = Ctx.current_id
+ Ctx.results[Ctx.current_id] = res
+ Ctx.result = res
+
+ @staticmethod
+ def get_result(request_id=None):
+ if request_id:
+ try:
+ res = Ctx.results[request_id]
+ except KeyError:
+ return None
+
+ if Ctx.result and request_id == Ctx.result['request_id']:
+ Ctx.result = None
+
+ return res
+ else:
+ res = Ctx.result
+ if res:
+ Ctx.result = None
+ return res
+
+ @staticmethod
+ def is_busy():
+ return Ctx.busy
+
+ @staticmethod
+ def get_current_request_id():
+ return Ctx.current_id
+
+
+def setup_flask(root_path):
+ global socketio
+ global app
+ app = Flask(__name__)
+ app.root_path = root_path
+ socketio = SocketIO(app, async_mode='threading')
+ busy_json = result_json(STATUS_ERROR, 'there is already an NFVbench request running')
+ not_busy_json = result_json(STATUS_ERROR, 'no pending NFVbench run')
+ not_found_msg = 'results not found'
+ pending_msg = 'NFVbench run still pending'
+
+ # --------- socketio requests ------------
+
+ @socketio.on('start_run')
+ def socketio_start_run(config):
+ if not Ctx.is_busy():
+ Ctx.enqueue(config, get_uuid(), from_socketio=True)
+ else:
+ emit('error', {'reason': 'there is already an NFVbench request running'})
+
+ @socketio.on('echo')
+ def socketio_echo(config):
+ emit('echo', config)
+
+ # --------- HTTP requests ------------
+
+ @app.route('/')
+ def index():
+ return render_template('index.html')
+
+ @app.route('/echo', methods=['GET'])
+ def echo():
+ config = request.json
+ return jsonify(config)
+
+ @app.route('/start_run', methods=['POST'])
+ def start_run():
+ config = load_json(request.json)
+ if Ctx.is_busy():
+ return jsonify(busy_json)
+ else:
+ request_id = get_uuid()
+ Ctx.enqueue(config, request_id)
+ return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+
+ @app.route('/status', defaults={'request_id': None}, methods=['GET'])
+ @app.route('/status/<request_id>', methods=['GET'])
+ def get_status(request_id):
+ if request_id:
+ if Ctx.is_busy() and request_id == Ctx.get_current_request_id():
+ # task with request_id still pending
+ return jsonify(result_json(STATUS_PENDING, pending_msg, request_id))
+
+ res = Ctx.get_result(request_id)
+ if res:
+ # found result for given request_id
+ return jsonify(res)
+ else:
+ # result for given request_id not found
+ return jsonify(result_json(STATUS_NOT_FOUND, not_found_msg, request_id))
+ else:
+ if Ctx.is_busy():
+ # task still pending, return with request_id
+ return jsonify(result_json(STATUS_PENDING,
+ pending_msg,
+ Ctx.get_current_request_id()))
+
+ res = Ctx.get_result()
+ if res:
+ return jsonify(res)
+ else:
+ return jsonify(not_busy_json)
+
+
+class WebSocketIoServer(object):
+ """This class takes care of the web socketio server, accepts websocket events, and sends back
+ notifications using websocket events (send_ methods). Caller should simply create an instance
+ of this class and pass a runner object then invoke the run method
+ """
+ def __init__(self, http_root, runner):
+ self.nfvbench_runner = runner
+ setup_flask(http_root)
+
+ def run(self, host='127.0.0.1', port=7556):
+
+ # socketio.run will not return so we need to run it in a background thread so that
+ # the calling thread (main thread) can keep doing work
+ socketio.start_background_task(target=socketio.run, app=app, host=host, port=port)
+
+ # wait for run requests
+ # the runner must be executed from the main thread (Trex client library requirement)
+ while True:
+ # print 'main thread waiting for requests...'
+ config = Ctx.dequeue()
+ # print 'main thread processing request...'
+ print config
+ try:
+ # remove unfilled values as we do not want them to override default values with None
+ config = {k: v for k, v in config.items() if v is not None}
+ with RunLock():
+ results = self.nfvbench_runner.run(config)
+ except Exception as exc:
+ print 'NFVbench runner exception:'
+ traceback.print_exc()
+ results = result_json(STATUS_ERROR, str(exc))
+
+ if Ctx.request_from_socketio:
+ socketio.emit('run_end', results)
+ else:
+ # this might overwrite a previously unfetched result
+ Ctx.set_result(results)
+ Ctx.release()
+
+ def send_interval_stats(self, time_ms, tx_pps, rx_pps, drop_pct):
+ stats = {'time_ms': time_ms, 'tx_pps': tx_pps, 'rx_pps': rx_pps, 'drop_pct': drop_pct}
+ socketio.emit('run_interval_stats', stats)
+
+ def send_ndr_found(self, ndr_pps):
+ socketio.emit('ndr_found', {'rate_pps': ndr_pps})
+
+ def send_pdr_found(self, pdr_pps):
+ socketio.emit('pdr_found', {'rate_pps': pdr_pps})