diff options
author | bryan <bryan.sullivan@att.com> | 2017-03-06 20:29:55 -0800 |
---|---|---|
committer | bryan <bryan.sullivan@att.com> | 2017-03-06 21:08:12 -0800 |
commit | 33547549793608d1fef82d758abe4aecbc874f05 (patch) | |
tree | bda7e190d1ac42e0245027ce28b927da341a1d33 /tests/blueprints/tosca-vnfd-hello-ves | |
parent | f6526c8dab68fb8c848ce8d99d390011e8d5f6b8 (diff) |
Rebase on VES V3
JIRA: VES-1
Rebase monitor.py on latest collector.py
Rebase evel_demo.c on latest evel_demo.c
Update start.sh to align.
Change-Id: I11495c9f9b170de5598186f2c5781df0c04d7cba
Signed-off-by: bryan <bryan.sullivan@att.com>
Diffstat (limited to 'tests/blueprints/tosca-vnfd-hello-ves')
-rw-r--r-- | tests/blueprints/tosca-vnfd-hello-ves/evel_demo.c | 1080 | ||||
-rw-r--r-- | tests/blueprints/tosca-vnfd-hello-ves/monitor.py | 839 | ||||
-rwxr-xr-x | tests/blueprints/tosca-vnfd-hello-ves/start.sh | 7 |
3 files changed, 1636 insertions, 290 deletions
diff --git a/tests/blueprints/tosca-vnfd-hello-ves/evel_demo.c b/tests/blueprints/tosca-vnfd-hello-ves/evel_demo.c index 49ad3dc..fc244d8 100644 --- a/tests/blueprints/tosca-vnfd-hello-ves/evel_demo.c +++ b/tests/blueprints/tosca-vnfd-hello-ves/evel_demo.c @@ -2,7 +2,7 @@ * @file * Agent for the OPNFV VNF Event Stream (VES) vHello_VES test * - * Copyright 2016 AT&T Intellectual Property, Inc + * Copyright 2016-2017 AT&T Intellectual Property, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,16 +20,16 @@ #include <stdio.h> #include <stdlib.h> -#include <string.h> #include <unistd.h> #include <getopt.h> #include <sys/signal.h> #include <pthread.h> #include <mcheck.h> -#include <time.h> +#include <sys/time.h> #include "evel.h" #include "evel_demo.h" +#include "evel_test_control.h" /**************************************************************************//** * Definition of long options to the program. @@ -41,16 +41,21 @@ static const struct option long_options[] = { {"id", required_argument, 0, 'i'}, {"fqdn", required_argument, 0, 'f'}, {"port", required_argument, 0, 'n'}, - {"username", required_argument, 0, 'u'}, - {"password", required_argument, 0, 'p'}, + {"path", required_argument, 0, 'p'}, + {"topic", required_argument, 0, 't'}, + {"https", no_argument, 0, 's'}, {"verbose", no_argument, 0, 'v'}, + {"cycles", required_argument, 0, 'c'}, + {"username", required_argument, 0, 'u'}, + {"password", required_argument, 0, 'w'}, + {"nothrott", no_argument, 0, 'x'}, {0, 0, 0, 0} }; /**************************************************************************//** * Definition of short options to the program. *****************************************************************************/ -static const char* short_options = "h:i:f:n:u:p:v:"; +static const char* short_options = "hi:f:n:p:t:sc:u:w:vx"; /**************************************************************************//** * Basic user help text describing the usage of the application. @@ -60,11 +65,15 @@ static const char* usage_text = " --id <Agent host ID>\n" " --fqdn <domain>\n" " --port <port_number>\n" -" --username <username>\n" -" --password <password>\n" -" [--verbose]\n" +" [--path <path>]\n" +" [--topic <topic>]\n" +" [--username <username>]\n" +" [--password <password>]\n" +" [--https]\n" +" [--cycles <cycles>]\n" +" [--nothrott]\n" "\n" -"Agent for the OPNFV VNF Event Stream (VES) vHello_VES test.\n" +"Demonstrate use of the ECOMP Vendor Event Listener API.\n" "\n" " -h Display this usage message.\n" " --help\n" @@ -78,30 +87,62 @@ static const char* usage_text = " -n The port number the RESTful API.\n" " --port\n" "\n" -" -u Username for authentication to the RESTful API.\n" +" -p The optional path prefix to the RESTful API.\n" +" --path\n" +"\n" +" -t The optional topic part of the RESTful API.\n" +" --topic\n" +"\n" +" -u The optional username for basic authentication of requests.\n" " --username\n" "\n" -" -p Password for authentication to the RESTful API.\n" +" -w The optional password for basic authentication of requests.\n" " --password\n" "\n" +" -s Use HTTPS rather than HTTP for the transport.\n" +" --https\n" +"\n" +" -c Loop <cycles> times round the main loop. Default = 1.\n" +" --cycles\n" +"\n" " -v Generate much chattier logs.\n" -" --verbose\n"; +" --verbose\n" +"\n" +" -x Exclude throttling commands from demonstration.\n" +" --nothrott\n"; -/**************************************************************************//** - * Global flags related the applicaton. - *****************************************************************************/ +#define DEFAULT_SLEEP_SECONDS 3 +#define MINIMUM_SLEEP_SECONDS 1 -char *app_prevstate = "Stopped"; +unsigned long long epoch_start = 0; + +typedef enum { + SERVICE_CODEC, + SERVICE_TRANSCODING, + SERVICE_RTCP, + SERVICE_EOC_VQM, + SERVICE_MARKER +} SERVICE_EVENT; + +/*****************************************************************************/ +/* Local prototypes. */ +/*****************************************************************************/ +static void demo_heartbeat(void); +static void demo_fault(void); +static void demo_measurement(const int interval); +static void demo_mobile_flow(void); +static void demo_service(void); +static void demo_service_event(const SERVICE_EVENT service_event); +static void demo_signaling(void); +static void demo_state_change(void); +static void demo_syslog(void); +static void demo_other(void); /**************************************************************************//** - * Global flag to initiate shutdown. + * Global flags related the applicaton. *****************************************************************************/ -static int glob_exit_now = 0; -static void show_usage(FILE* fp) -{ - fputs(usage_text, fp); -} +char *app_prevstate = "Stopped"; /**************************************************************************//** * Report app state change fault. @@ -245,13 +286,12 @@ void measure_traffic() { if (fgets(count, 10, fp) != NULL) { request_rate = atoi(count); printf("Reporting request rate for second: %s as %d\n", period, request_rate); - measurement = evel_new_measurement(concurrent_sessions, - configured_entities, mean_request_latency, measurement_interval, - memory_configured, memory_used, request_rate); + measurement = evel_new_measurement(measurement_interval); if (measurement != NULL) { cpu(); evel_measurement_type_set(measurement, "HTTP request rate"); + evel_measurement_request_rate_set(measurement, request_rate); // evel_measurement_agg_cpu_use_set(measurement, loadavg); // evel_measurement_cpu_use_add(measurement, "cpu0", loadavg); @@ -274,6 +314,20 @@ void measure_traffic() { } /**************************************************************************//** + * Global flag to initiate shutdown. + *****************************************************************************/ +static int glob_exit_now = 0; + +static char * api_fqdn = NULL; +static int api_port = 0; +static int api_secure = 0; + +static void show_usage(FILE* fp) +{ + fputs(usage_text, fp); +} + +/**************************************************************************//** * Main function. * * Parses the command-line then ... @@ -288,19 +342,17 @@ int main(int argc, char ** argv) int option_index = 0; int param = 0; char * api_vmid = NULL; - char * api_fqdn = NULL; - int api_port = 0; - char * api_username = NULL; - char * api_password = NULL; char * api_path = NULL; char * api_topic = NULL; - int api_secure = 0; + char * api_username = ""; + char * api_password = ""; int verbose_mode = 0; - EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + int exclude_throttling = 0; + int cycles = 2147483647; + int cycle; + int measurement_interval = EVEL_MEASUREMENT_INTERVAL_UKNOWN; EVENT_HEADER * heartbeat = NULL; -// EVENT_FAULT * fault = NULL; -// EVENT_MEASUREMENT * measurement = NULL; -// EVENT_REPORT * report = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; /***************************************************************************/ /* We're very interested in memory management problems so check behavior. */ @@ -337,18 +389,38 @@ int main(int argc, char ** argv) api_port = atoi(optarg); break; + case 'p': + api_path = optarg; + break; + + case 't': + api_topic = optarg; + break; + case 'u': api_username = optarg; break; - case 'p': + case 'w': api_password = optarg; break; + case 's': + api_secure = 1; + break; + + case 'c': + cycles = atoi(optarg); + break; + case 'v': verbose_mode = 1; break; + case 'x': + exclude_throttling = 1; + break; + case '?': /*********************************************************************/ /* Unrecognized parameter - getopt_long already printed an error */ @@ -387,6 +459,12 @@ int main(int argc, char ** argv) "specified between 1 and 65535.\n"); exit(1); } + if (cycles <= 0) + { + fprintf(stderr, "Number of cycles around the main loop must be an" + "integer greater than zero.\n"); + exit(1); + } /***************************************************************************/ /* Set up default signal behaviour. Block all signals we trap explicitly */ @@ -423,7 +501,7 @@ int main(int argc, char ** argv) api_username, api_password, EVEL_SOURCE_VIRTUAL_MACHINE, - "vHello_VES agent", + "EVEL demo client", verbose_mode)) { fprintf(stderr, "Failed to initialize the EVEL library!!!"); @@ -435,12 +513,140 @@ int main(int argc, char ** argv) } /***************************************************************************/ + /* Work out a start time for measurements, and sleep for initial period. */ + /***************************************************************************/ + struct timeval tv_start; + gettimeofday(&tv_start, NULL); + epoch_start = tv_start.tv_usec + 1000000 * tv_start.tv_sec; + sleep(DEFAULT_SLEEP_SECONDS); + + /***************************************************************************/ /* MAIN LOOP */ /***************************************************************************/ - while (1) + printf("Starting %d loops...\n", cycles); + cycle = 0; + while (cycle++ < cycles) { EVEL_INFO("MAI: Starting main loop"); -// printf("Starting main loop\n"); + printf("\nStarting main loop %d\n", cycle); + + /*************************************************************************/ + /* A 20s-long repeating cycle of behaviour. */ + /*************************************************************************/ + if (exclude_throttling == 0) + { + switch (cycle % 20) + { + case 1: + printf(" 1 - Resetting throttle specification for all domains\n"); + evel_test_control_scenario(TC_RESET_ALL_DOMAINS, + api_secure, + api_fqdn, + api_port); + break; + + case 2: + printf(" 2 - Switching measurement interval to 2s\n"); + evel_test_control_meas_interval(2, + api_secure, + api_fqdn, + api_port); + break; + + case 3: + printf(" 3 - Suppressing fault domain\n"); + evel_test_control_scenario(TC_FAULT_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 4: + printf(" 4 - Suppressing measurement domain\n"); + evel_test_control_scenario(TC_MEAS_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 5: + printf(" 5 - Switching measurement interval to 5s\n"); + evel_test_control_meas_interval(5, + api_secure, + api_fqdn, + api_port); + break; + + case 6: + printf(" 6 - Suppressing mobile flow domain\n"); + evel_test_control_scenario(TC_MOBILE_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 7: + printf(" 7 - Suppressing state change domain\n"); + evel_test_control_scenario(TC_STATE_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 8: + printf(" 8 - Suppressing signaling domain\n"); + evel_test_control_scenario(TC_SIGNALING_SUPPRESS_FIELDS, + api_secure, + api_fqdn, + api_port); + break; + + case 9: + printf(" 9 - Suppressing service event domain\n"); + evel_test_control_scenario(TC_SERVICE_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 10: + printf(" 10 - Switching measurement interval to 20s\n"); + evel_test_control_meas_interval(20, + api_secure, + api_fqdn, + api_port); + break; + + case 11: + printf(" 11 - Suppressing syslog domain\n"); + evel_test_control_scenario(TC_SYSLOG_SUPPRESS_FIELDS_AND_PAIRS, + api_secure, + api_fqdn, + api_port); + break; + + case 12: + printf(" 12 - Switching measurement interval to 10s\n"); + evel_test_control_meas_interval(10, + api_secure, + api_fqdn, + api_port); + break; + + case 15: + printf(" Requesting provide throttling spec\n"); + evel_test_control_scenario(TC_PROVIDE_THROTTLING_SPEC, + api_secure, + api_fqdn, + api_port); + break; + } + } + fflush(stdout); + + /*************************************************************************/ + /* Send a bunch of events. */ + /*************************************************************************/ printf("Sending heartbeat\n"); heartbeat = evel_new_heartbeat(); @@ -460,19 +666,53 @@ int main(int argc, char ** argv) check_app_container_state(); measure_traffic(); +// demo_heartbeat(); +// demo_fault(); +// demo_measurement((measurement_interval == +// EVEL_MEASUREMENT_INTERVAL_UKNOWN) ? +// DEFAULT_SLEEP_SECONDS : measurement_interval); +// demo_mobile_flow(); +// demo_service(); +// demo_signaling(); +// demo_state_change(); +// demo_syslog(); +// demo_other(); + /*************************************************************************/ - /* MAIN RETRY LOOP. Loop every 10 secs. */ - /* TODO: Listener for throttling back scheduled reports. */ + /* MAIN RETRY LOOP. Check and implement the measurement interval. */ /*************************************************************************/ - // printf("End of main loop, sleeping for 10 seconds\n"); - fflush(stdout); - sleep(10); - } + if (cycle <= cycles) + { + int sleep_time; + + /***********************************************************************/ + /* We have a minimum loop time. */ + /***********************************************************************/ + sleep(MINIMUM_SLEEP_SECONDS); + + /***********************************************************************/ + /* Get the latest measurement interval and sleep for the remainder. */ + /***********************************************************************/ + measurement_interval = evel_get_measurement_interval(); + printf("Measurement Interval = %d\n", measurement_interval); + + if (measurement_interval == EVEL_MEASUREMENT_INTERVAL_UKNOWN) + { + sleep_time = DEFAULT_SLEEP_SECONDS - MINIMUM_SLEEP_SECONDS; + } + else + { + sleep_time = measurement_interval - MINIMUM_SLEEP_SECONDS; + } + sleep(sleep_time); + } + } + /***************************************************************************/ /* We are exiting, but allow the final set of events to be dispatched */ /* properly first. */ /***************************************************************************/ - sleep(1); + sleep(2); printf("All done - exiting!\n"); return 0; } @@ -526,3 +766,751 @@ void *signal_watcher(void *void_sig_set) exit(0); return(NULL); } + +/**************************************************************************//** + * Create and send a heartbeat event. + *****************************************************************************/ +void demo_heartbeat(void) +{ + EVENT_HEADER * heartbeat = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Heartbeat */ + /***************************************************************************/ + heartbeat = evel_new_heartbeat(); + if (heartbeat != NULL) + { + evel_rc = evel_post_event(heartbeat); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Heartbeat failed"); + } + printf(" Processed Heartbeat\n"); +} + +/**************************************************************************//** + * Create and send three fault events. + *****************************************************************************/ +void demo_fault(void) +{ + EVENT_FAULT * fault = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Fault */ + /***************************************************************************/ + fault = evel_new_fault("An alarm condition", + "Things are broken", + EVEL_PRIORITY_NORMAL, + EVEL_SEVERITY_MAJOR); + if (fault != NULL) + { + evel_rc = evel_post_event((EVENT_HEADER *)fault); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Fault failed"); + } + printf(" Processed empty Fault\n"); + + fault = evel_new_fault("Another alarm condition", + "It broke badly", + EVEL_PRIORITY_NORMAL, + EVEL_SEVERITY_MAJOR); + if (fault != NULL) + { + evel_fault_type_set(fault, "Bad things happening"); + evel_fault_interface_set(fault, "An Interface Card"); + evel_rc = evel_post_event((EVENT_HEADER *)fault); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Fault failed"); + } + printf(" Processed partial Fault\n"); + + fault = evel_new_fault("My alarm condition", + "It broke very badly", + EVEL_PRIORITY_NORMAL, + EVEL_SEVERITY_MAJOR); + if (fault != NULL) + { + evel_fault_type_set(fault, "Bad things happen..."); + evel_fault_interface_set(fault, "My Interface Card"); + evel_fault_addl_info_add(fault, "name1", "value1"); + evel_fault_addl_info_add(fault, "name2", "value2"); + evel_rc = evel_post_event((EVENT_HEADER *)fault); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Fault failed"); + } + printf(" Processed full Fault\n"); +} + +/**************************************************************************//** + * Create and send a measurement event. + *****************************************************************************/ +void demo_measurement(const int interval) +{ + EVENT_MEASUREMENT * measurement = NULL; + MEASUREMENT_LATENCY_BUCKET * bucket = NULL; + MEASUREMENT_VNIC_USE * vnic_use = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Measurement */ + /***************************************************************************/ + measurement = evel_new_measurement(interval); + if (measurement != NULL) + { + evel_measurement_type_set(measurement, "Perf management..."); + evel_measurement_conc_sess_set(measurement, 1); + evel_measurement_cfg_ents_set(measurement, 2); + evel_measurement_mean_req_lat_set(measurement, 4.4); + evel_measurement_mem_cfg_set(measurement, 6.6); + evel_measurement_mem_used_set(measurement, 3.3); + evel_measurement_request_rate_set(measurement, 6); + evel_measurement_agg_cpu_use_set(measurement, 8.8); + evel_measurement_cpu_use_add(measurement, "cpu1", 11.11); + evel_measurement_cpu_use_add(measurement, "cpu2", 22.22); + evel_measurement_fsys_use_add(measurement,"00-11-22",100.11, 100.22, 33, + 200.11, 200.22, 44); + evel_measurement_fsys_use_add(measurement,"33-44-55",300.11, 300.22, 55, + 400.11, 400.22, 66); + + bucket = evel_new_meas_latency_bucket(20); + evel_meas_latency_bucket_low_end_set(bucket, 0.0); + evel_meas_latency_bucket_high_end_set(bucket, 10.0); + evel_meas_latency_bucket_add(measurement, bucket); + + bucket = evel_new_meas_latency_bucket(30); + evel_meas_latency_bucket_low_end_set(bucket, 10.0); + evel_meas_latency_bucket_high_end_set(bucket, 20.0); + evel_meas_latency_bucket_add(measurement, bucket); + + vnic_use = evel_new_measurement_vnic_use("eth0", 100, 200, 3, 4); + evel_vnic_use_bcast_pkt_in_set(vnic_use, 1); + evel_vnic_use_bcast_pkt_out_set(vnic_use, 2); + evel_vnic_use_mcast_pkt_in_set(vnic_use, 5); + evel_vnic_use_mcast_pkt_out_set(vnic_use, 6); + evel_vnic_use_ucast_pkt_in_set(vnic_use, 7); + evel_vnic_use_ucast_pkt_out_set(vnic_use, 8); + evel_meas_vnic_use_add(measurement, vnic_use); + + vnic_use = evel_new_measurement_vnic_use("eth1", 110, 240, 13, 14); + evel_vnic_use_bcast_pkt_in_set(vnic_use, 11); + evel_vnic_use_bcast_pkt_out_set(vnic_use, 12); + evel_vnic_use_mcast_pkt_in_set(vnic_use, 15); + evel_vnic_use_mcast_pkt_out_set(vnic_use, 16); + evel_vnic_use_ucast_pkt_in_set(vnic_use, 17); + evel_vnic_use_ucast_pkt_out_set(vnic_use, 18); + evel_meas_vnic_use_add(measurement, vnic_use); + + evel_measurement_errors_set(measurement, 1, 0, 2, 1); + + evel_measurement_feature_use_add(measurement, "FeatureA", 123); + evel_measurement_feature_use_add(measurement, "FeatureB", 567); + + evel_measurement_codec_use_add(measurement, "G711a", 91); + evel_measurement_codec_use_add(measurement, "G729ab", 92); + + evel_measurement_media_port_use_set(measurement, 1234); + + evel_measurement_vnfc_scaling_metric_set(measurement, 1234.5678); + + evel_measurement_custom_measurement_add(measurement, + "Group1", "Name1", "Value1"); + evel_measurement_custom_measurement_add(measurement, + "Group2", "Name1", "Value1"); + evel_measurement_custom_measurement_add(measurement, + "Group2", "Name2", "Value2"); + + /*************************************************************************/ + /* Work out the time, to use as end of measurement period. */ + /*************************************************************************/ + struct timeval tv_now; + gettimeofday(&tv_now, NULL); + unsigned long long epoch_now = tv_now.tv_usec + 1000000 * tv_now.tv_sec; + evel_start_epoch_set(&measurement->header, epoch_start); + evel_last_epoch_set(&measurement->header, epoch_now); + epoch_start = epoch_now; + evel_reporting_entity_name_set(&measurement->header, "measurer"); + evel_reporting_entity_id_set(&measurement->header, "measurer_id"); + + evel_rc = evel_post_event((EVENT_HEADER *)measurement); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post Measurement failed %d (%s)", + evel_rc, + evel_error_string()); + } + } + else + { + EVEL_ERROR("New Measurement failed"); + } + printf(" Processed Measurement\n"); +} + +/**************************************************************************//** + * Create and send three mobile flow events. + *****************************************************************************/ +void demo_mobile_flow(void) +{ + MOBILE_GTP_PER_FLOW_METRICS * metrics = NULL; + EVENT_MOBILE_FLOW * mobile_flow = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Mobile Flow */ + /***************************************************************************/ + metrics = evel_new_mobile_gtp_flow_metrics(12.3, + 3.12, + 100, + 2100, + 500, + 1470409421, + 987, + 1470409431, + 11, + (time_t)1470409431, + "Working", + 87, + 3, + 17, + 123654, + 4561, + 0, + 12, + 10, + 1, + 3, + 7, + 899, + 901, + 302, + 6, + 2, + 0, + 110, + 225); + if (metrics != NULL) + { + mobile_flow = evel_new_mobile_flow("Outbound", + metrics, + "TCP", + "IPv4", + "2.3.4.1", + 2341, + "4.2.3.1", + 4321); + if (mobile_flow != NULL) + { + evel_rc = evel_post_event((EVENT_HEADER *)mobile_flow); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post Mobile Flow failed %d (%s)", + evel_rc, + evel_error_string()); + } + } + else + { + EVEL_ERROR("New Mobile Flow failed"); + } + printf(" Processed empty Mobile Flow\n"); + } + else + { + EVEL_ERROR("New GTP Per Flow Metrics failed - skipping Mobile Flow"); + printf(" Skipped empty Mobile Flow\n"); + } + + metrics = evel_new_mobile_gtp_flow_metrics(132.0001, + 31.2, + 101, + 2101, + 501, + 1470409422, + 988, + 1470409432, + 12, + (time_t)1470409432, + "Inactive", + 88, + 4, + 18, + 123655, + 4562, + 1, + 13, + 11, + 2, + 4, + 8, + 900, + 902, + 303, + 7, + 3, + 1, + 111, + 226); + if (metrics != NULL) + { + mobile_flow = evel_new_mobile_flow("Inbound", + metrics, + "UDP", + "IPv6", + "2.3.4.2", + 2342, + "4.2.3.2", + 4322); + if (mobile_flow != NULL) + { + evel_mobile_flow_app_type_set(mobile_flow, "Demo application"); + evel_mobile_flow_app_prot_type_set(mobile_flow, "GSM"); + evel_mobile_flow_app_prot_ver_set(mobile_flow, "1"); + evel_mobile_flow_cid_set(mobile_flow, "65535"); + evel_mobile_flow_con_type_set(mobile_flow, "S1-U"); + evel_mobile_flow_ecgi_set(mobile_flow, "e65535"); + evel_mobile_flow_gtp_prot_type_set(mobile_flow, "GTP-U"); + evel_mobile_flow_gtp_prot_ver_set(mobile_flow, "1"); + evel_mobile_flow_http_header_set(mobile_flow, + "http://www.something.com"); + evel_mobile_flow_imei_set(mobile_flow, "209917614823"); + evel_mobile_flow_imsi_set(mobile_flow, "355251/05/850925/8"); + evel_mobile_flow_lac_set(mobile_flow, "1"); + evel_mobile_flow_mcc_set(mobile_flow, "410"); + evel_mobile_flow_mnc_set(mobile_flow, "04"); + evel_mobile_flow_msisdn_set(mobile_flow, "6017123456789"); + evel_mobile_flow_other_func_role_set(mobile_flow, "MME"); + evel_mobile_flow_rac_set(mobile_flow, "514"); + evel_mobile_flow_radio_acc_tech_set(mobile_flow, "LTE"); + evel_mobile_flow_sac_set(mobile_flow, "1"); + evel_mobile_flow_samp_alg_set(mobile_flow, 1); + evel_mobile_flow_tac_set(mobile_flow, "2099"); + evel_mobile_flow_tunnel_id_set(mobile_flow, "Tunnel 1"); + evel_mobile_flow_vlan_id_set(mobile_flow, "15"); + + evel_rc = evel_post_event((EVENT_HEADER *)mobile_flow); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post Mobile Flow failed %d (%s)", + evel_rc, + evel_error_string()); + } + } + else + { + EVEL_ERROR("New Mobile Flow failed"); + } + printf(" Processed partial Mobile Flow\n"); + } + else + { + EVEL_ERROR("New GTP Per Flow Metrics failed - skipping Mobile Flow"); + printf(" Skipped partial Mobile Flow\n"); + } + + metrics = evel_new_mobile_gtp_flow_metrics(12.32, + 3.122, + 1002, + 21002, + 5002, + 1470409423, + 9872, + 1470409433, + 112, + (time_t)1470409433, + "Failed", + 872, + 32, + 172, + 1236542, + 45612, + 2, + 122, + 102, + 12, + 32, + 72, + 8992, + 9012, + 3022, + 62, + 22, + 2, + 1102, + 2252); + if (metrics != NULL) + { + evel_mobile_gtp_metrics_dur_con_fail_set(metrics, 12); + evel_mobile_gtp_metrics_dur_tun_fail_set(metrics, 13); + evel_mobile_gtp_metrics_act_by_set(metrics, "Remote"); + evel_mobile_gtp_metrics_act_time_set(metrics, (time_t)1470409423); + evel_mobile_gtp_metrics_deact_by_set(metrics, "Remote"); + evel_mobile_gtp_metrics_con_status_set(metrics, "Connected"); + evel_mobile_gtp_metrics_tun_status_set(metrics, "Not tunneling"); + evel_mobile_gtp_metrics_iptos_set(metrics, 1, 13); + evel_mobile_gtp_metrics_iptos_set(metrics, 17, 1); + evel_mobile_gtp_metrics_iptos_set(metrics, 4, 99); + evel_mobile_gtp_metrics_large_pkt_rtt_set(metrics, 80); + evel_mobile_gtp_metrics_large_pkt_thresh_set(metrics, 600.0); + evel_mobile_gtp_metrics_max_rcv_bit_rate_set(metrics, 1357924680); + evel_mobile_gtp_metrics_max_trx_bit_rate_set(metrics, 235711); + evel_mobile_gtp_metrics_num_echo_fail_set(metrics, 1); + evel_mobile_gtp_metrics_num_tun_fail_set(metrics, 4); + evel_mobile_gtp_metrics_num_http_errors_set(metrics, 2); + evel_mobile_gtp_metrics_tcp_flag_count_add(metrics, EVEL_TCP_CWR, 10); + evel_mobile_gtp_metrics_tcp_flag_count_add(metrics, EVEL_TCP_URG, 121); + evel_mobile_gtp_metrics_qci_cos_count_add( + metrics, EVEL_QCI_COS_UMTS_CONVERSATIONAL, 11); + evel_mobile_gtp_metrics_qci_cos_count_add( + metrics, EVEL_QCI_COS_LTE_65, 122); + + mobile_flow = evel_new_mobile_flow("Outbound", + metrics, + "RTP", + "IPv8", + "2.3.4.3", + 2343, + "4.2.3.3", + 4323); + if (mobile_flow != NULL) + { + evel_mobile_flow_app_type_set(mobile_flow, "Demo application 2"); + evel_mobile_flow_app_prot_type_set(mobile_flow, "GSM"); + evel_mobile_flow_app_prot_ver_set(mobile_flow, "2"); + evel_mobile_flow_cid_set(mobile_flow, "1"); + evel_mobile_flow_con_type_set(mobile_flow, "S1-U"); + evel_mobile_flow_ecgi_set(mobile_flow, "e1"); + evel_mobile_flow_gtp_prot_type_set(mobile_flow, "GTP-U"); + evel_mobile_flow_gtp_prot_ver_set(mobile_flow, "1"); + evel_mobile_flow_http_header_set(mobile_flow, "http://www.google.com"); + evel_mobile_flow_imei_set(mobile_flow, "209917614823"); + evel_mobile_flow_imsi_set(mobile_flow, "355251/05/850925/8"); + evel_mobile_flow_lac_set(mobile_flow, "1"); + evel_mobile_flow_mcc_set(mobile_flow, "410"); + evel_mobile_flow_mnc_set(mobile_flow, "04"); + evel_mobile_flow_msisdn_set(mobile_flow, "6017123456789"); + evel_mobile_flow_other_func_role_set(mobile_flow, "MMF"); + evel_mobile_flow_rac_set(mobile_flow, "514"); + evel_mobile_flow_radio_acc_tech_set(mobile_flow, "3G"); + evel_mobile_flow_sac_set(mobile_flow, "1"); + evel_mobile_flow_samp_alg_set(mobile_flow, 2); + evel_mobile_flow_tac_set(mobile_flow, "2099"); + evel_mobile_flow_tunnel_id_set(mobile_flow, "Tunnel 2"); + evel_mobile_flow_vlan_id_set(mobile_flow, "4096"); + + evel_rc = evel_post_event((EVENT_HEADER *)mobile_flow); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post Mobile Flow failed %d (%s)", + evel_rc, + evel_error_string()); + } + } + else + { + EVEL_ERROR("New Mobile Flow failed"); + } + printf(" Processed full Mobile Flow\n"); + } + else + { + EVEL_ERROR("New GTP Per Flow Metrics failed - skipping Mobile Flow"); + printf(" Skipped full Mobile Flow\n"); + } +} + +/**************************************************************************//** + * Create and send a Service event. + *****************************************************************************/ +void demo_service() +{ + demo_service_event(SERVICE_CODEC); + demo_service_event(SERVICE_TRANSCODING); + demo_service_event(SERVICE_RTCP); + demo_service_event(SERVICE_EOC_VQM); + demo_service_event(SERVICE_MARKER); +} + +void demo_service_event(const SERVICE_EVENT service_event) +{ + EVENT_SERVICE * event = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + event = evel_new_service("vendor_x_id", "vendor_x_event_id"); + if (event != NULL) + { + evel_service_type_set(event, "Service Event"); + evel_service_product_id_set(event, "vendor_x_product_id"); + evel_service_subsystem_id_set(event, "vendor_x_subsystem_id"); + evel_service_friendly_name_set(event, "vendor_x_frieldly_name"); + evel_service_correlator_set(event, "vendor_x_correlator"); + evel_service_addl_field_add(event, "Name1", "Value1"); + evel_service_addl_field_add(event, "Name2", "Value2"); + + switch (service_event) + { + case SERVICE_CODEC: + evel_service_codec_set(event, "PCMA"); + break; + case SERVICE_TRANSCODING: + evel_service_callee_codec_set(event, "PCMA"); + evel_service_caller_codec_set(event, "G729A"); + break; + case SERVICE_RTCP: + evel_service_rtcp_data_set(event, "some_rtcp_data"); + break; + case SERVICE_EOC_VQM: + evel_service_adjacency_name_set(event, "vendor_x_adjacency"); + evel_service_endpoint_desc_set(event, EVEL_SERVICE_ENDPOINT_CALLER); + evel_service_endpoint_jitter_set(event, 66); + evel_service_endpoint_rtp_oct_disc_set(event, 100); + evel_service_endpoint_rtp_oct_recv_set(event, 200); + evel_service_endpoint_rtp_oct_sent_set(event, 300); + evel_service_endpoint_rtp_pkt_disc_set(event, 400); + evel_service_endpoint_rtp_pkt_recv_set(event, 500); + evel_service_endpoint_rtp_pkt_sent_set(event, 600); + evel_service_local_jitter_set(event, 99); + evel_service_local_rtp_oct_disc_set(event, 150); + evel_service_local_rtp_oct_recv_set(event, 250); + evel_service_local_rtp_oct_sent_set(event, 350); + evel_service_local_rtp_pkt_disc_set(event, 450); + evel_service_local_rtp_pkt_recv_set(event, 550); + evel_service_local_rtp_pkt_sent_set(event, 650); + evel_service_mos_cqe_set(event, 12.255); + evel_service_packets_lost_set(event, 157); + evel_service_packet_loss_percent_set(event, 0.232); + evel_service_r_factor_set(event, 11); + evel_service_round_trip_delay_set(event, 15); + break; + case SERVICE_MARKER: + evel_service_phone_number_set(event, "0888888888"); + break; + } + + evel_rc = evel_post_event((EVENT_HEADER *) event); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Service failed"); + } + printf(" Processed Service Events\n"); +} + +/**************************************************************************//** + * Create and send a Signaling event. + *****************************************************************************/ +void demo_signaling(void) +{ + EVENT_SIGNALING * event = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + event = evel_new_signaling("vendor_x_id", "vendor_x_event_id"); + if (event != NULL) + { + evel_signaling_type_set(event, "Signaling"); + evel_signaling_product_id_set(event, "vendor_x_product_id"); + evel_signaling_subsystem_id_set(event, "vendor_x_subsystem_id"); + evel_signaling_friendly_name_set(event, "vendor_x_frieldly_name"); + evel_signaling_correlator_set(event, "vendor_x_correlator"); + evel_signaling_local_ip_address_set(event, "1.0.3.1"); + evel_signaling_local_port_set(event, "1031"); + evel_signaling_remote_ip_address_set(event, "5.3.3.0"); + evel_signaling_remote_port_set(event, "5330"); + evel_signaling_compressed_sip_set(event, "compressed_sip"); + evel_signaling_summary_sip_set(event, "summary_sip"); + evel_rc = evel_post_event((EVENT_HEADER *) event); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Signaling failed"); + } + printf(" Processed Signaling\n"); +} + +/**************************************************************************//** + * Create and send a state change event. + *****************************************************************************/ +void demo_state_change(void) +{ + EVENT_STATE_CHANGE * state_change = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* State Change */ + /***************************************************************************/ + state_change = evel_new_state_change(EVEL_ENTITY_STATE_IN_SERVICE, + EVEL_ENTITY_STATE_OUT_OF_SERVICE, + "Interface"); + if (state_change != NULL) + { + evel_state_change_type_set(state_change, "State Change"); + evel_state_change_addl_field_add(state_change, "Name1", "Value1"); + evel_state_change_addl_field_add(state_change, "Name2", "Value2"); + evel_rc = evel_post_event((EVENT_HEADER *)state_change); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New State Change failed"); + } + printf(" Processed State Change\n"); +} + +/**************************************************************************//** + * Create and send two syslog events. + *****************************************************************************/ +void demo_syslog(void) +{ + EVENT_SYSLOG * syslog = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Syslog */ + /***************************************************************************/ + syslog = evel_new_syslog(EVEL_SOURCE_VIRTUAL_NETWORK_FUNCTION, + "EVEL library message", + "EVEL"); + if (syslog != NULL) + { + evel_rc = evel_post_event((EVENT_HEADER *)syslog); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Syslog failed"); + } + printf(" Processed empty Syslog\n"); + + syslog = evel_new_syslog(EVEL_SOURCE_VIRTUAL_MACHINE, + "EVEL library message", + "EVEL"); + if (syslog != NULL) + { + evel_syslog_event_source_host_set(syslog, "Virtual host"); + evel_syslog_facility_set(syslog, EVEL_SYSLOG_FACILITY_LOCAL0); + evel_syslog_proc_set(syslog, "vnf_process"); + evel_syslog_proc_id_set(syslog, 1423); + evel_syslog_version_set(syslog, 1); + evel_syslog_addl_field_add(syslog, "Name1", "Value1"); + evel_syslog_addl_field_add(syslog, "Name2", "Value2"); + evel_syslog_addl_field_add(syslog, "Name3", "Value3"); + evel_syslog_addl_field_add(syslog, "Name4", "Value4"); + evel_rc = evel_post_event((EVENT_HEADER *)syslog); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Syslog failed"); + } + printf(" Processed full Syslog\n"); +} + +/**************************************************************************//** + * Create and send two other events. + *****************************************************************************/ +void demo_other(void) +{ + EVENT_OTHER * other = NULL; + EVEL_ERR_CODES evel_rc = EVEL_SUCCESS; + + /***************************************************************************/ + /* Other */ + /***************************************************************************/ + other = evel_new_other(); + if (other != NULL) + { + evel_rc = evel_post_event((EVENT_HEADER *)other); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Other failed"); + } + printf(" Processed empty Other\n"); + + other = evel_new_other(); + if (other != NULL) + { + evel_other_field_add(other, + "Other field 1", + "Other value 1"); + evel_rc = evel_post_event((EVENT_HEADER *)other); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Other failed"); + } + printf(" Processed small Other\n"); + + other = evel_new_other(); + if (other != NULL) + { + evel_other_field_add(other, + "Other field A", + "Other value A"); + evel_other_field_add(other, + "Other field B", + "Other value B"); + evel_other_field_add(other, + "Other field C", + "Other value C"); + evel_rc = evel_post_event((EVENT_HEADER *)other); + if (evel_rc != EVEL_SUCCESS) + { + EVEL_ERROR("Post failed %d (%s)", evel_rc, evel_error_string()); + } + } + else + { + EVEL_ERROR("New Other failed"); + } + printf(" Processed large Other\n"); +} diff --git a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py index b34bed9..9b16af3 100644 --- a/tests/blueprints/tosca-vnfd-hello-ves/monitor.py +++ b/tests/blueprints/tosca-vnfd-hello-ves/monitor.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2016 AT&T Intellectual Property, Inc +# Copyright 2016-2017 AT&T Intellectual Property, Inc # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +19,8 @@ # # Status: this is a work in progress, under test. -from wsgiref.simple_server import make_server, WSGIRequestHandler +from rest_dispatcher import PathDispatcher, set_404_content +from wsgiref.simple_server import make_server import sys import os import platform @@ -32,7 +33,7 @@ from base64 import b64decode import string import json import jsonschema -import select +from functools import partial monitor_mode = "f" vdu_id = ['','','',''] @@ -48,53 +49,209 @@ class JSONObject: def __init__(self, d): self.__dict__ = d -class NoLoggingWSGIRequestHandler(WSGIRequestHandler): - def log_message(self, format, *args): - pass +_hello_resp = '''\ +<html> + <head> + <title>Hello {name}</title> + </head> + <body> + <h1>Hello {name}!</h1> + </body> +</html>''' + +_localtime_resp = '''\ +<?xml version="1.0"?> +<time> + <year>{t.tm_year}</year> + <month>{t.tm_mon}</month> + <day>{t.tm_mday}</day> + <hour>{t.tm_hour}</hour> + <minute>{t.tm_min}</minute> + <second>{t.tm_sec}</second> +</time>''' + +__all__ = [] +__version__ = 0.1 +__date__ = '2015-12-04' +__updated__ = '2015-12-04' + +TESTRUN = False +DEBUG = False +PROFILE = False -base_url = '' -template_404 = b'''POST {0}''' +#------------------------------------------------------------------------------ +# Credentials we expect clients to authenticate themselves with. +#------------------------------------------------------------------------------ +vel_username = '' +vel_password = '' + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to validate events. +#------------------------------------------------------------------------------ +vel_schema = None + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to validate client throttle state. +#------------------------------------------------------------------------------ +throttle_schema = None + +#------------------------------------------------------------------------------ +# The JSON schema which we will use to provoke throttling commands for testing. +#------------------------------------------------------------------------------ +test_control_schema = None + +#------------------------------------------------------------------------------ +# Pending command list from the testControl API +# This is sent as a response commandList to the next received event. +#------------------------------------------------------------------------------ +pending_command_list = None + +#------------------------------------------------------------------------------ +# Logger for this module. +#------------------------------------------------------------------------------ +logger = None + +def listener(environ, start_response, schema): + ''' + Handler for the Vendor Event Listener REST API. + + Extract headers and the body and check that: + + 1) The client authenticated themselves correctly. + 2) The body validates against the provided schema for the API. + + ''' + logger.info('Got a Vendor Event request') + print('==== ' + time.asctime() + ' ' + '=' * 49) -def notfound_404(environ, start_response): - print('Unexpected URL/Method: {0} {1}'.format( - environ['REQUEST_METHOD'].upper(), - environ['PATH_INFO'])) - start_response('404 Not Found', [ ('Content-type', 'text/plain') ]) - return [template_404.format(base_url)] - -class PathDispatcher: - def __init__(self): - self.pathmap = { } - - def __call__(self, environ, start_response): - #---------------------------------------------------------------------- - # Extract the method and path from the environment. - #---------------------------------------------------------------------- - method = environ['REQUEST_METHOD'].lower() - path = environ['PATH_INFO'] - - #---------------------------------------------------------------------- - # See if we have a handler for this path, and if so invoke it. - # Otherwise, return a 404. - #---------------------------------------------------------------------- - handler = self.pathmap.get((method, path), notfound_404) - return handler(environ, start_response) - - def register(self, method, path, function): - print('Registering for {0} at {1}'.format(method, path)) - self.pathmap[method.lower(), path] = function - return function + #-------------------------------------------------------------------------- + # Extract the content from the request. + #-------------------------------------------------------------------------- + length = int(environ.get('CONTENT_LENGTH', '0')) + logger.debug('Content Length: {0}'.format(length)) + body = environ['wsgi.input'].read(length) + logger.debug('Content Body: {0}'.format(body)) + + mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION', + 'None None')) + # logger.debug('Auth. Mode: {0} Credentials: {1}'.format(mode, + # b64_credentials)) + logger.debug('Auth. Mode: {0} Credentials: ****'.format(mode)) + if (b64_credentials != 'None'): + credentials = b64decode(b64_credentials) + else: + credentials = None + + # logger.debug('Credentials: {0}'.format(credentials)) + logger.debug('Credentials: ****') + + #-------------------------------------------------------------------------- + # If we have a schema file then check that the event matches that expected. + #-------------------------------------------------------------------------- + if (schema is not None): + logger.debug('Attempting to validate data: {0}\n' + 'Against schema: {1}'.format(body, schema)) + try: + decoded_body = json.loads(body) + jsonschema.validate(decoded_body, schema) + logger.info('Event is valid!') + print('Valid body decoded & checked against schema OK:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except jsonschema.SchemaError as e: + logger.error('Schema is not valid! {0}'.format(e)) + print('Schema is not valid! {0}'.format(e)) + + except jsonschema.ValidationError as e: + logger.warn('Event is not valid against schema! {0}'.format(e)) + print('Event is not valid against schema! {0}'.format(e)) + print('Bad JSON body decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except Exception as e: + logger.error('Event invalid for unexpected reason! {0}'.format(e)) + print('Schema is not valid for unexpected reason! {0}'.format(e)) + else: + logger.debug('No schema so just decode JSON: {0}'.format(body)) + try: + decoded_body = json.loads(body) + print('Valid JSON body (no schema checking) decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + logger.info('Event is valid JSON but not checked against schema!') + + except Exception as e: + logger.error('Event invalid for unexpected reason! {0}'.format(e)) + print('JSON body not valid for unexpected reason! {0}'.format(e)) + + #-------------------------------------------------------------------------- + # See whether the user authenticated themselves correctly. + #-------------------------------------------------------------------------- + if (credentials == (vel_username + ':' + vel_password)): + logger.debug('Authenticated OK') + print('Authenticated OK') + + #---------------------------------------------------------------------- + # Respond to the caller. If we have a pending commandList from the + # testControl API, send it in response. + #---------------------------------------------------------------------- + global pending_command_list + if pending_command_list is not None: + start_response('202 Accepted', + [('Content-type', 'application/json')]) + response = pending_command_list + pending_command_list = None + + print('\n'+ '='*80) + print('Sending pending commandList in the response:\n' + '{0}'.format(json.dumps(response, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + print('='*80 + '\n') + yield json.dumps(response) + else: + start_response('202 Accepted', []) + yield '' + else: + logger.warn('Failed to authenticate OK') + print('Failed to authenticate agent credentials: ', credentials) + + #---------------------------------------------------------------------- + # Respond to the caller. + #---------------------------------------------------------------------- + start_response('401 Unauthorized', [ ('Content-type', + 'application/json')]) + req_error = { 'requestError': { + 'policyException': { + 'messageId': 'POL0001', + 'text': 'Failed to authenticate' + } + } + } + yield json.dumps(req_error) + + process_event(body) #-------------------------------------------------------------------------- # Event processing #-------------------------------------------------------------------------- -def process_event(e): +def process_event(body): global status global summary_e global summary_c global vdu_id vdu = 0 + e = json.loads(body, object_hook=JSONObject) epoch = e.event.commonEventHeader.lastEpochMicrosec sourceId = e.event.commonEventHeader.sourceId @@ -111,7 +268,7 @@ def process_event(e): if domain == 'measurementsForVfScaling': if vdu >= 1: - requestRate = e.event.measurementsForVfScaling.requestRate + requestRate = e.event.measurementsForVfScalingFields.requestRate summary_e[vdu] = host + ": state=" + status[vdu] + ", tps=" + str(requestRate) else: aggregateCpuUsage = e.event.measurementsForVfScalingFields.aggregateCpuUsage @@ -142,217 +299,415 @@ def process_event(e): # status[vdu] = e.event.faultFields.vfStatus status[vdu] = e.event.faultFields.specificProblem -#-------------------------------------------------------------------------- -# Main monitoring and logging procedure -#-------------------------------------------------------------------------- -def ves_monitor(environ, start_response): +def test_listener(environ, start_response, schema): + ''' + Handler for the Test Collector Test Control API. + + There is no authentication on this interface. + + This simply stores a commandList which will be sent in response to the next + incoming event on the EVEL interface. + ''' + global pending_command_list + logger.info('Got a Test Control input') + print('============================') + print('==== TEST CONTROL INPUT ====') + + #-------------------------------------------------------------------------- + # GET allows us to get the current pending request. + #-------------------------------------------------------------------------- + if environ.get('REQUEST_METHOD') == 'GET': + start_response('200 OK', [('Content-type', 'application/json')]) + yield json.dumps(pending_command_list) + return + + #-------------------------------------------------------------------------- + # Extract the content from the request. + #-------------------------------------------------------------------------- + length = int(environ.get('CONTENT_LENGTH', '0')) + logger.debug('TestControl Content Length: {0}'.format(length)) + body = environ['wsgi.input'].read(length) + logger.debug('TestControl Content Body: {0}'.format(body)) + + #-------------------------------------------------------------------------- + # If we have a schema file then check that the event matches that expected. + #-------------------------------------------------------------------------- + if (schema is not None): + logger.debug('Attempting to validate data: {0}\n' + 'Against schema: {1}'.format(body, schema)) + try: + decoded_body = json.loads(body) + jsonschema.validate(decoded_body, schema) + logger.info('TestControl is valid!') + print('TestControl:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except jsonschema.SchemaError as e: + logger.error('TestControl Schema is not valid: {0}'.format(e)) + print('TestControl Schema is not valid: {0}'.format(e)) + + except jsonschema.ValidationError as e: + logger.warn('TestControl input not valid: {0}'.format(e)) + print('TestControl input not valid: {0}'.format(e)) + print('Bad JSON body decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + + except Exception as e: + logger.error('TestControl input not valid: {0}'.format(e)) + print('TestControl Schema not valid: {0}'.format(e)) + else: + logger.debug('Missing schema just decode JSON: {0}'.format(body)) + try: + decoded_body = json.loads(body) + print('Valid JSON body (no schema checking) decoded:\n' + '{0}'.format(json.dumps(decoded_body, + sort_keys=True, + indent=4, + separators=(',', ': ')))) + logger.info('TestControl input not checked against schema!') + + except Exception as e: + logger.error('TestControl input not valid: {0}'.format(e)) + print('TestControl input not valid: {0}'.format(e)) + + #-------------------------------------------------------------------------- + # Respond to the caller. If we received otherField 'ThrottleRequest', + # generate the appropriate canned response. + #-------------------------------------------------------------------------- + pending_command_list = decoded_body + print('===== TEST CONTROL END =====') + print('============================') + start_response('202 Accepted', []) + yield '' - # Check for keyboard input - if sys.stdin in select.select([sys.stdin], [], [], 0)[0]: - line = sys.stdin.readline() - if "f" in line: monitor_mode = "f" - if "c" in line: monitor_mode = "c" +def main(argv=None): + ''' + Main function for the collector start-up. - print('==== ' + time.asctime() + ' ' + '=' * 49) + Called with command-line arguments: + * --config *<file>* + * --section *<section>* + * --verbose - #-------------------------------------------------------------------------- - # Extract the content from the request. - #-------------------------------------------------------------------------- - length = int(environ.get('CONTENT_LENGTH', '0')) - body = environ['wsgi.input'].read(length) + Where: - mode, b64_credentials = string.split(environ.get('HTTP_AUTHORIZATION', - 'None None')) - if (b64_credentials != 'None'): - credentials = b64decode(b64_credentials) - else: - credentials = None - - #-------------------------------------------------------------------------- - # See whether the user authenticated themselves correctly. - #-------------------------------------------------------------------------- - if (credentials == (vel_username + ':' + vel_password)): - start_response('204 No Content', []) - yield '' - else: - print('Failed to authenticate agent') - start_response('401 Unauthorized', [ ('Content-type', - 'application/json')]) - req_error = { 'requestError': { - 'policyException': { - 'messageId': 'POL0001', - 'text': 'Failed to authenticate' - } - } - } - yield json.dumps(req_error) - - #-------------------------------------------------------------------------- - # Decode the JSON body - #-------------------------------------------------------------------------- - - try: - decoded_body = json.loads(body) - print('{0}'.format(json.dumps(decoded_body, - sort_keys=True, - indent=4, - separators=(',', ': ')))) - decoded_body = json.loads(body, object_hook=JSONObject) - process_event(decoded_body) - - except Exception as e: - print('JSON body is not valid for unexpected reason! {0}'.format(e)) + *<file>* specifies the path to the configuration file. -def main(argv=None): - global columns - global rows - - a,b = os.popen('stty size', 'r').read().split() - rows = int(a) - columns = int(b) - - if argv is None: - argv = sys.argv - else: - sys.argv.extend(argv) - - try: - #---------------------------------------------------------------------- - # Setup argument parser so we can parse the command-line. - #---------------------------------------------------------------------- - parser = ArgumentParser(description='', - formatter_class=ArgumentDefaultsHelpFormatter) - parser.add_argument('-v', '--verbose', - dest='verbose', - action='count', - help='set verbosity level') - parser.add_argument('-V', '--version', - action='version', - version='1.0', - help='Display version information') - parser.add_argument('-c', '--config', - dest='config', - default='/etc/opt/att/collector.conf', - help='Use this config file.', - metavar='<file>') - parser.add_argument('-s', '--section', - dest='section', - default='default', - metavar='<section>', - help='section to use in the config file') - - #---------------------------------------------------------------------- - # Process arguments received. - #---------------------------------------------------------------------- - args = parser.parse_args() - verbose = args.verbose - config_file = args.config - config_section = args.section - #---------------------------------------------------------------------- - # Now read the config file, using command-line supplied values as - # overrides. - #---------------------------------------------------------------------- - defaults = {'log_file': 'ves.log', - 'vel_port': '30000', - 'vel_path': '', - 'vel_topic_name': '' - } - overrides = {} - config = ConfigParser.SafeConfigParser(defaults) - config.read(config_file) - - #---------------------------------------------------------------------- - # extract the values we want. - #---------------------------------------------------------------------- - log_file = config.get(config_section, 'log_file', vars=overrides) - vel_port = config.get(config_section, 'vel_port', vars=overrides) - vel_path = config.get(config_section, 'vel_path', vars=overrides) - vel_topic_name = config.get(config_section, - 'vel_topic_name', - vars=overrides) - global vel_username - global vel_password - global vdu_id - vel_username = config.get(config_section, - 'vel_username', - vars=overrides) - vel_password = config.get(config_section, - 'vel_password', - vars=overrides) - vel_schema_file = config.get(config_section, - 'schema_file', - vars=overrides) - base_schema_file = config.get(config_section, - 'base_schema_file', - vars=overrides) - vdu_id[1] = config.get(config_section, - 'vdu1_id', - vars=overrides) - vdu_id[2] = config.get(config_section, - 'vdu2_id', - vars=overrides) - vdu_id[3] = config.get(config_section, - 'vdu3_id', - vars=overrides) - base_schema_file = config.get(config_section, - 'base_schema_file', - vars=overrides) - - #---------------------------------------------------------------------- - # Perform some basic error checking on the config. - #---------------------------------------------------------------------- - if (int(vel_port) < 1024 or int(vel_port) > 65535): - raise RuntimeError('Invalid Vendor Event Listener port ({0}) ' - 'specified'.format(vel_port)) + *<section>* specifies the section within that config file. - if (len(vel_path) > 0 and vel_path[-1] != '/'): - vel_path += '/' - - #---------------------------------------------------------------------- - # Load up the vel_schema and base_schema, if they exist. - #---------------------------------------------------------------------- - if (os.path.exists(vel_schema_file)): - global vel_schema - vel_schema = json.load(open(vel_schema_file, 'r')) - if (os.path.exists(base_schema_file)): - base_schema = json.load(open(base_schema_file, 'r')) - vel_schema.update(base_schema) - - #---------------------------------------------------------------------- - # We are now ready to get started with processing. Start-up the various - # components of the system in order: - # - # 1) Create the dispatcher. - # 2) Register the functions for the URLs of interest. - # 3) Run the webserver. - #---------------------------------------------------------------------- - root_url = '/{0}eventListener/v{1}{2}'.format(vel_path, - '1', - '/' + vel_topic_name - if len(vel_topic_name) > 0 - else '') - - base_url = root_url - dispatcher = PathDispatcher() - dispatcher.register('GET', root_url, ves_monitor) - dispatcher.register('POST', root_url, ves_monitor) - httpd = make_server('', 30000, dispatcher, handler_class=NoLoggingWSGIRequestHandler) - httpd.serve_forever() - - return 0 - - except Exception as e: - #---------------------------------------------------------------------- - # Handle unexpected exceptions. - #---------------------------------------------------------------------- - indent = len('VES Monitor') * ' ' - sys.stderr.write('VES Monitor: ' + repr(e) + '\n') - sys.stderr.write(indent + ' for help use --help\n') - sys.stderr.write(traceback.format_exc()) - return 2 + *verbose* generates more information in the log files. + + The process listens for REST API invocations and checks them. Errors are + displayed to stdout and logged. + ''' + + if argv is None: + argv = sys.argv + else: + sys.argv.extend(argv) + + program_name = os.path.basename(sys.argv[0]) + program_version = 'v{0}'.format(__version__) + program_build_date = str(__updated__) + program_version_message = '%%(prog)s {0} ({1})'.format(program_version, + program_build_date) + if (__import__('__main__').__doc__ is not None): + program_shortdesc = __import__('__main__').__doc__.split('\n')[1] + else: + program_shortdesc = 'Running in test harness' + program_license = '''{0} + + Created on {1}. + Copyright 2015 Metaswitch Networks Ltd. All rights reserved. + + Distributed on an "AS IS" basis without warranties + or conditions of any kind, either express or implied. + +USAGE +'''.format(program_shortdesc, str(__date__)) + + try: + #---------------------------------------------------------------------- + # Setup argument parser so we can parse the command-line. + #---------------------------------------------------------------------- + parser = ArgumentParser(description=program_license, + formatter_class=ArgumentDefaultsHelpFormatter) + parser.add_argument('-v', '--verbose', + dest='verbose', + action='count', + help='set verbosity level') + parser.add_argument('-V', '--version', + action='version', + version=program_version_message, + help='Display version information') + parser.add_argument('-a', '--api-version', + dest='api_version', + default='3', + help='set API version') + parser.add_argument('-c', '--config', + dest='config', + default='/etc/opt/att/collector.conf', + help='Use this config file.', + metavar='<file>') + parser.add_argument('-s', '--section', + dest='section', + default='default', + metavar='<section>', + help='section to use in the config file') + + #---------------------------------------------------------------------- + # Process arguments received. + #---------------------------------------------------------------------- + args = parser.parse_args() + verbose = args.verbose + api_version = args.api_version + config_file = args.config + config_section = args.section + + #---------------------------------------------------------------------- + # Now read the config file, using command-line supplied values as + # overrides. + #---------------------------------------------------------------------- + defaults = {'log_file': 'collector.log', + 'vel_port': '12233', + 'vel_path': '', + 'vel_topic_name': '' + } + overrides = {} + config = ConfigParser.SafeConfigParser(defaults) + config.read(config_file) + + #---------------------------------------------------------------------- + # extract the values we want. + #---------------------------------------------------------------------- + log_file = config.get(config_section, 'log_file', vars=overrides) + vel_port = config.get(config_section, 'vel_port', vars=overrides) + vel_path = config.get(config_section, 'vel_path', vars=overrides) + vel_topic_name = config.get(config_section, + 'vel_topic_name', + vars=overrides) + global vel_username + global vel_password + vel_username = config.get(config_section, + 'vel_username', + vars=overrides) + vel_password = config.get(config_section, + 'vel_password', + vars=overrides) + vel_schema_file = config.get(config_section, + 'schema_file', + vars=overrides) + base_schema_file = config.get(config_section, + 'base_schema_file', + vars=overrides) + throttle_schema_file = config.get(config_section, + 'throttle_schema_file', + vars=overrides) + test_control_schema_file = config.get(config_section, + 'test_control_schema_file', + vars=overrides) + + #---------------------------------------------------------------------- + # Finally we have enough info to start a proper flow trace. + #---------------------------------------------------------------------- + global logger + print('Logfile: {0}'.format(log_file)) + logger = logging.getLogger('collector') + if verbose > 0: + print('Verbose mode on') + logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.INFO) + handler = logging.handlers.RotatingFileHandler(log_file, + maxBytes=1000000, + backupCount=10) + if (platform.system() == 'Windows'): + date_format = '%Y-%m-%d %H:%M:%S' + else: + date_format = '%Y-%m-%d %H:%M:%S.%f %z' + formatter = logging.Formatter('%(asctime)s %(name)s - ' + '%(levelname)s - %(message)s', + date_format) + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.info('Started') + + #---------------------------------------------------------------------- + # Log the details of the configuration. + #---------------------------------------------------------------------- + logger.debug('Log file = {0}'.format(log_file)) + logger.debug('Event Listener Port = {0}'.format(vel_port)) + logger.debug('Event Listener Path = {0}'.format(vel_path)) + logger.debug('Event Listener Topic = {0}'.format(vel_topic_name)) + logger.debug('Event Listener Username = {0}'.format(vel_username)) + # logger.debug('Event Listener Password = {0}'.format(vel_password)) + logger.debug('Event Listener JSON Schema File = {0}'.format( + vel_schema_file)) + logger.debug('Base JSON Schema File = {0}'.format(base_schema_file)) + logger.debug('Throttle JSON Schema File = {0}'.format( + throttle_schema_file)) + logger.debug('Test Control JSON Schema File = {0}'.format( + test_control_schema_file)) + + #---------------------------------------------------------------------- + # Perform some basic error checking on the config. + #---------------------------------------------------------------------- + if (int(vel_port) < 1024 or int(vel_port) > 65535): + logger.error('Invalid Vendor Event Listener port ({0}) ' + 'specified'.format(vel_port)) + raise RuntimeError('Invalid Vendor Event Listener port ({0}) ' + 'specified'.format(vel_port)) + + if (len(vel_path) > 0 and vel_path[-1] != '/'): + logger.warning('Event Listener Path ({0}) should have terminating ' + '"/"! Adding one on to configured string.'.format( + vel_path)) + vel_path += '/' + + #---------------------------------------------------------------------- + # Load up the vel_schema, if it exists. + #---------------------------------------------------------------------- + if not os.path.exists(vel_schema_file): + logger.warning('Event Listener Schema File ({0}) not found. ' + 'No validation will be undertaken.'.format( + vel_schema_file)) + else: + global vel_schema + global throttle_schema + global test_control_schema + vel_schema = json.load(open(vel_schema_file, 'r')) + logger.debug('Loaded the JSON schema file') + + #------------------------------------------------------------------ + # Load up the throttle_schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(throttle_schema_file)): + logger.debug('Loading throttle schema') + throttle_fragment = json.load(open(throttle_schema_file, 'r')) + throttle_schema = {} + throttle_schema.update(vel_schema) + throttle_schema.update(throttle_fragment) + logger.debug('Loaded the throttle schema') + + #------------------------------------------------------------------ + # Load up the test control _schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(test_control_schema_file)): + logger.debug('Loading test control schema') + test_control_fragment = json.load( + open(test_control_schema_file, 'r')) + test_control_schema = {} + test_control_schema.update(vel_schema) + test_control_schema.update(test_control_fragment) + logger.debug('Loaded the test control schema') + + #------------------------------------------------------------------ + # Load up the base_schema, if it exists. + #------------------------------------------------------------------ + if (os.path.exists(base_schema_file)): + logger.debug('Updating the schema with base definition') + base_schema = json.load(open(base_schema_file, 'r')) + vel_schema.update(base_schema) + logger.debug('Updated the JSON schema file') + + #---------------------------------------------------------------------- + # We are now ready to get started with processing. Start-up the various + # components of the system in order: + # + # 1) Create the dispatcher. + # 2) Register the functions for the URLs of interest. + # 3) Run the webserver. + #---------------------------------------------------------------------- + root_url = '/{0}eventListener/v{1}{2}'.\ + format(vel_path, + api_version, + '/' + vel_topic_name + if len(vel_topic_name) > 0 + else '') + throttle_url = '/{0}eventListener/v{1}/clientThrottlingState'.\ + format(vel_path, api_version) + set_404_content(root_url) + dispatcher = PathDispatcher() + vendor_event_listener = partial(listener, schema = vel_schema) + dispatcher.register('GET', root_url, vendor_event_listener) + dispatcher.register('POST', root_url, vendor_event_listener) + vendor_throttle_listener = partial(listener, schema = throttle_schema) + dispatcher.register('GET', throttle_url, vendor_throttle_listener) + dispatcher.register('POST', throttle_url, vendor_throttle_listener) + + #---------------------------------------------------------------------- + # We also add a POST-only mechanism for test control, so that we can + # send commands to a single attached client. + #---------------------------------------------------------------------- + test_control_url = '/testControl/v{0}/commandList'.format(api_version) + test_control_listener = partial(test_listener, + schema = test_control_schema) + dispatcher.register('POST', test_control_url, test_control_listener) + dispatcher.register('GET', test_control_url, test_control_listener) + + httpd = make_server('', int(vel_port), dispatcher) + print('Serving on port {0}...'.format(vel_port)) + httpd.serve_forever() + + logger.error('Main loop exited unexpectedly!') + return 0 + + except KeyboardInterrupt: + #---------------------------------------------------------------------- + # handle keyboard interrupt + #---------------------------------------------------------------------- + logger.info('Exiting on keyboard interrupt!') + return 0 + + except Exception as e: + #---------------------------------------------------------------------- + # Handle unexpected exceptions. + #---------------------------------------------------------------------- + if DEBUG or TESTRUN: + raise(e) + indent = len(program_name) * ' ' + sys.stderr.write(program_name + ': ' + repr(e) + '\n') + sys.stderr.write(indent + ' for help use --help\n') + sys.stderr.write(traceback.format_exc()) + logger.critical('Exiting because of exception: {0}'.format(e)) + logger.critical(traceback.format_exc()) + return 2 #------------------------------------------------------------------------------ # MAIN SCRIPT ENTRY POINT. #------------------------------------------------------------------------------ if __name__ == '__main__': - sys.exit(main()) + if TESTRUN: + #---------------------------------------------------------------------- + # Running tests - note that doctest comments haven't been included so + # this is a hook for future improvements. + #---------------------------------------------------------------------- + import doctest + doctest.testmod() + + if PROFILE: + #---------------------------------------------------------------------- + # Profiling performance. Performance isn't expected to be a major + # issue, but this should all work as expected. + #---------------------------------------------------------------------- + import cProfile + import pstats + profile_filename = 'collector_profile.txt' + cProfile.run('main()', profile_filename) + statsfile = open('collector_profile_stats.txt', 'wb') + p = pstats.Stats(profile_filename, stream=statsfile) + stats = p.strip_dirs().sort_stats('cumulative') + stats.print_stats() + statsfile.close() + sys.exit(0) + + #-------------------------------------------------------------------------- + # Normal operation - call through to the main function. + #-------------------------------------------------------------------------- + sys.exit(main()) diff --git a/tests/blueprints/tosca-vnfd-hello-ves/start.sh b/tests/blueprints/tosca-vnfd-hello-ves/start.sh index e5f8797..b7e74bc 100755 --- a/tests/blueprints/tosca-vnfd-hello-ves/start.sh +++ b/tests/blueprints/tosca-vnfd-hello-ves/start.sh @@ -166,6 +166,7 @@ setup_agent () { echo "$0: Update parameters and build agent demo" # This sed command will add a line after the search line + sed -i -- "s/api_port,/30000,/" evel-library/code/evel_demo/evel_demo.c sed -i -- "/api_secure,/{n;s/.*/ \"$username\",/}" evel-library/code/evel_demo/evel_demo.c sed -i -- "/\"$username\",/{n;s/.*/ \"$password\",/}" evel-library/code/evel_demo/evel_demo.c @@ -175,7 +176,8 @@ setup_agent () { export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/ubuntu/evel-library/libs/x86_64 echo "$0: Start evel_demo agent" - nohup ../output/x86_64/evel_demo --id $vm_id --fqdn $collector_ip --port 30000 --username $username --password $password > /dev/null 2>&1 & + id=$(cut -d ',' -f 3 /mnt/openstack/latest/meta_data.json | cut -d '"' -f 4) + nohup ../output/x86_64/evel_demo --id $id --fqdn $collector_ip --port 30000 --username $username --password $password > /dev/null 2>&1 & echo "$0: Start collectd agent running in the VM" setup_collectd true @@ -204,7 +206,8 @@ setup_monitor () { sed -i -- "/vel_topic_name = /a vdu2_id = $vdu2_id" evel-test-collector/config/collector.conf sed -i -- "/vel_topic_name = /a vdu1_id = $vdu1_id" evel-test-collector/config/collector.conf -# python monitor.py --config evel-test-collector/config/collector.conf --section default + cp monitor.py evel-test-collector/code/collector/monitor.py +# python evel-test-collector/code/collector/monitor.py --config evel-test-collector/config/collector.conf --section default } type=$1 |