aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCédric Ollivier <cedric.ollivier@orange.com>2019-06-14 11:14:56 +0200
committerCédric Ollivier <cedric.ollivier@orange.com>2019-06-14 15:53:16 +0200
commite90fd3ce7b45c1a8c2de645884d09caa21323193 (patch)
treeef0ee400e73f1957e1753dd19e1bda2d1f94a0be
parentc659caccbf1f55db4e6e3cb31bf088ac57751e86 (diff)
Remove energy modules
The public recorder api has been down for a while. Change-Id: Ib879ef3b9ef56338c10cfcdeb733451c6f7573a6 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
-rw-r--r--api/apidoc/xtesting.energy.energy.rst7
-rw-r--r--api/apidoc/xtesting.energy.rst15
-rw-r--r--api/apidoc/xtesting.rst1
-rw-r--r--tox.ini2
-rw-r--r--xtesting/energy/__init__.py0
-rw-r--r--xtesting/energy/energy.py334
-rw-r--r--xtesting/tests/unit/energy/__init__.py0
-rw-r--r--xtesting/tests/unit/energy/test_functest_energy.py371
-rw-r--r--xtesting/utils/env.py5
9 files changed, 1 insertions, 734 deletions
diff --git a/api/apidoc/xtesting.energy.energy.rst b/api/apidoc/xtesting.energy.energy.rst
deleted file mode 100644
index 05e42381..00000000
--- a/api/apidoc/xtesting.energy.energy.rst
+++ /dev/null
@@ -1,7 +0,0 @@
-xtesting\.energy\.energy module
-===============================
-
-.. automodule:: xtesting.energy.energy
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/api/apidoc/xtesting.energy.rst b/api/apidoc/xtesting.energy.rst
deleted file mode 100644
index efd6ed16..00000000
--- a/api/apidoc/xtesting.energy.rst
+++ /dev/null
@@ -1,15 +0,0 @@
-xtesting\.energy package
-========================
-
-.. automodule:: xtesting.energy
- :members:
- :undoc-members:
- :show-inheritance:
-
-Submodules
-----------
-
-.. toctree::
-
- xtesting.energy.energy
-
diff --git a/api/apidoc/xtesting.rst b/api/apidoc/xtesting.rst
index 7f7ee676..46907394 100644
--- a/api/apidoc/xtesting.rst
+++ b/api/apidoc/xtesting.rst
@@ -13,6 +13,5 @@ Subpackages
xtesting.ci
xtesting.core
- xtesting.energy
xtesting.utils
diff --git a/tox.ini b/tox.ini
index 2193fc61..c80bcb48 100644
--- a/tox.ini
+++ b/tox.ini
@@ -66,11 +66,9 @@ basepython = python2.7
dirs =
xtesting/tests/unit/ci
xtesting/tests/unit/core
- xtesting/tests/unit/energy
xtesting/tests/unit/utils/test_decorators.py
commands = nosetests --with-coverage --cover-tests \
--cover-package xtesting.core \
- --cover-package xtesting.energy \
--cover-package xtesting.tests.unit \
--cover-package xtesting.utils.decorators \
--cover-min-percentage 100 {[testenv:cover]dirs}
diff --git a/xtesting/energy/__init__.py b/xtesting/energy/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/xtesting/energy/__init__.py
+++ /dev/null
diff --git a/xtesting/energy/energy.py b/xtesting/energy/energy.py
deleted file mode 100644
index 76e4873c..00000000
--- a/xtesting/energy/energy.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""This module manages calls to Energy recording API."""
-
-import json
-import logging
-import traceback
-
-from functools import wraps
-import requests
-from six.moves import urllib
-
-from xtesting.utils import env
-
-
-def finish_session(current_scenario):
- """Finish a recording session."""
- if current_scenario is None:
- EnergyRecorder.stop()
- else:
- EnergyRecorder.logger.debug("Restoring previous scenario (%s/%s)",
- current_scenario["scenario"],
- current_scenario["step"])
- EnergyRecorder.submit_scenario(
- current_scenario["scenario"],
- current_scenario["step"]
- )
-
-
-def enable_recording(method):
- """
- Record energy during method execution.
-
- Decorator to record energy during "method" exection.
-
- param method: Method to suround with start and stop
- :type method: function
-
- .. note:: "method" should belong to a class having a "case_name"
- attribute
- """
- @wraps(method)
- def wrapper(*args):
- """
- Record energy during method execution (implementation).
-
- Wrapper for decorator to handle method arguments.
- """
- current_scenario = EnergyRecorder.get_current_scenario()
- EnergyRecorder.start(args[0].case_name)
- try:
- return_value = method(*args)
- finish_session(current_scenario)
- except Exception as exc: # pylint: disable=broad-except
- EnergyRecorder.logger.exception(exc)
- finish_session(current_scenario)
- raise exc
- return return_value
- return wrapper
-
-
-# Class to manage energy recording sessions
-class EnergyRecorder(object):
- """Manage Energy recording session."""
-
- logger = logging.getLogger(__name__)
- # Energy recording API connectivity settings
- # see load_config method
- energy_recorder_api = None
-
- # Default initial step
- INITIAL_STEP = "running"
-
- # Default connection timeout
- CONNECTION_TIMEOUT = 4
-
- @staticmethod
- def load_config():
- """
- Load connectivity settings from yaml.
-
- Load connectivity settings to Energy recording API
- """
- # Singleton pattern for energy_recorder_api static member
- # Load only if not previouly done
- if EnergyRecorder.energy_recorder_api is None:
- assert env.get('NODE_NAME')
- assert env.get('ENERGY_RECORDER_API_URL')
- environment = env.get('NODE_NAME')
- energy_recorder_uri = env.get(
- 'ENERGY_RECORDER_API_URL')
-
- # Creds
- creds_usr = env.get("ENERGY_RECORDER_API_USER")
- creds_pass = env.get("ENERGY_RECORDER_API_PASSWORD")
-
- uri_comp = "/recorders/environment/"
- uri_comp += urllib.parse.quote_plus(environment)
-
- if creds_usr and creds_pass:
- energy_recorder_api_auth = (creds_usr, creds_pass)
- else:
- energy_recorder_api_auth = None
-
- try:
- resp = requests.get(energy_recorder_uri + "/monitoring/ping",
- auth=energy_recorder_api_auth,
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT)
- api_available = json.loads(resp.text)["status"] == "OK"
- EnergyRecorder.logger.info(
- "API recorder available at : %s",
- energy_recorder_uri + uri_comp)
- except Exception as exc: # pylint: disable=broad-except
- EnergyRecorder.logger.info(
- "Energy recorder API is not available, cause=%s",
- str(exc))
- api_available = False
- # Final config
- EnergyRecorder.energy_recorder_api = {
- "uri": energy_recorder_uri + uri_comp,
- "auth": energy_recorder_api_auth,
- "available": api_available
- }
- return EnergyRecorder.energy_recorder_api["available"]
-
- @staticmethod
- def submit_scenario(scenario, step):
- """
- Submit a complet scenario definition to Energy recorder API.
-
- param scenario: Scenario name
- :type scenario: string
- param step: Step name
- :type step: string
- """
- try:
- return_status = True
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Submitting scenario (%s/%s)",
- scenario, step)
-
- # Create API payload
- payload = {
- "step": step,
- "scenario": scenario
- }
- # Call API to start energy recording
- response = requests.post(
- EnergyRecorder.energy_recorder_api["uri"],
- data=json.dumps(payload),
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while submitting scenario\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "submit_scenario: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while submitting scenarion to energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def start(scenario):
- """
- Start a recording session for scenario.
-
- param scenario: Starting scenario
- :type scenario: string
- """
- return_status = True
- try:
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Starting recording")
- return_status = EnergyRecorder.submit_scenario(
- scenario,
- EnergyRecorder.INITIAL_STEP
- )
-
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while starting energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def stop():
- """Stop current recording session."""
- return_status = True
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Stopping recording")
-
- # Call API to stop energy recording
- response = requests.delete(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while stating energy recording session\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "stop: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while stoping energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def set_step(step):
- """Notify energy recording service of current step of the testcase."""
- return_status = True
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Setting step")
-
- # Create API payload
- payload = {
- "step": step,
- }
-
- # Call API to define step
- response = requests.post(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- data=json.dumps(payload),
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers={
- 'content-type': 'application/json'
- },
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code != 200:
- EnergyRecorder.logger.error(
- "Error while setting current step of testcase\n%s",
- response.text)
- return_status = False
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "set_step: Unable to connect energy recorder API")
- return_status = False
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while setting step on energy recorder API\n%s",
- traceback.format_exc()
- )
- return_status = False
- return return_status
-
- @staticmethod
- def get_current_scenario():
- """Get current running scenario (if any, None else)."""
- return_value = None
- try:
- # Ensure that connectyvity settings are loaded
- if EnergyRecorder.load_config():
- EnergyRecorder.logger.debug("Getting current scenario")
-
- # Call API get running scenario
- response = requests.get(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
- if response.status_code == 200:
- return_value = json.loads(response.text)
- elif response.status_code == 404:
- EnergyRecorder.logger.info(
- "No current running scenario at %s",
- EnergyRecorder.energy_recorder_api["uri"])
- return_value = None
- else:
- EnergyRecorder.logger.error(
- "Error while getting current scenario\n%s",
- response.text)
- return_value = None
- except requests.exceptions.ConnectionError:
- EnergyRecorder.logger.warning(
- "get_currernt_sceario: Unable to connect energy recorder API")
- return_value = None
- except Exception: # pylint: disable=broad-except
- # Default exception handler to ensure that method
- # is safe for caller
- EnergyRecorder.logger.info(
- "Error while getting current scenario from energy recorder API"
- "\n%s", traceback.format_exc()
- )
- return_value = None
- return return_value
diff --git a/xtesting/tests/unit/energy/__init__.py b/xtesting/tests/unit/energy/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/xtesting/tests/unit/energy/__init__.py
+++ /dev/null
diff --git a/xtesting/tests/unit/energy/test_functest_energy.py b/xtesting/tests/unit/energy/test_functest_energy.py
deleted file mode 100644
index ea83c1ea..00000000
--- a/xtesting/tests/unit/energy/test_functest_energy.py
+++ /dev/null
@@ -1,371 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: UTF-8 -*-
-
-# Copyright (c) 2017 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-"""Unitary test for energy module."""
-# pylint: disable=unused-argument
-import logging
-import os
-import unittest
-
-import mock
-import requests
-
-from xtesting.energy.energy import EnergyRecorder
-import xtesting.energy.energy as energy
-
-CASE_NAME = "UNIT_TEST_CASE"
-STEP_NAME = "UNIT_TEST_STEP"
-
-PREVIOUS_SCENARIO = "previous_scenario"
-PREVIOUS_STEP = "previous_step"
-
-
-class MockHttpResponse(object): # pylint: disable=too-few-public-methods
- """Mock response for Energy recorder API."""
-
- def __init__(self, text, status_code):
- """Create an instance of MockHttpResponse."""
- self.text = text
- self.status_code = status_code
-
-
-API_OK = MockHttpResponse(
- '{"status": "OK"}',
- 200
-)
-API_KO = MockHttpResponse(
- '{"message": "API-KO"}',
- 500
-)
-
-RECORDER_OK = MockHttpResponse(
- '{"environment": "UNIT_TEST",'
- ' "step": "string",'
- ' "scenario": "' + CASE_NAME + '"}',
- 200
-)
-RECORDER_KO = MockHttpResponse(
- '{"message": "An unhandled API exception occurred (MOCK)"}',
- 500
-)
-RECORDER_NOT_FOUND = MockHttpResponse(
- '{"message": "Recorder not found (MOCK)"}',
- 404
-)
-
-
-# pylint: disable=too-many-public-methods
-class EnergyRecorderTest(unittest.TestCase):
- """Energy module unitary test suite."""
-
- case_name = CASE_NAME
- request_headers = {'content-type': 'application/json'}
- returned_value_to_preserve = "value"
- exception_message_to_preserve = "exception_message"
-
- @staticmethod
- def _set_env_creds():
- """Set config values."""
- os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
- os.environ["ENERGY_RECORDER_API_USER"] = "user"
- os.environ["ENERGY_RECORDER_API_PASSWORD"] = "password"
-
- @staticmethod
- def _set_env_nocreds():
- """Set config values."""
- os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
- del os.environ["ENERGY_RECORDER_API_USER"]
- del os.environ["ENERGY_RECORDER_API_PASSWORD"]
-
- @mock.patch('xtesting.energy.energy.requests.post',
- return_value=RECORDER_OK)
- def test_start(self, post_mock=None, get_mock=None):
- """EnergyRecorder.start method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.post',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_start_error(self, post_mock=None):
- """EnergyRecorder.start method (error in method)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_start_exception(self, conf_loader_mock=None):
- """EnergyRecorder.start test with exception during execution."""
- start_status = EnergyRecorder.start(CASE_NAME)
- self.assertFalse(start_status)
-
- @mock.patch('xtesting.energy.energy.requests.post',
- return_value=RECORDER_KO)
- def test_start_api_error(self, post_mock=None):
- """EnergyRecorder.start method (API error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.start(self.case_name))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.post',
- return_value=RECORDER_OK)
- def test_set_step(self, post_mock=None):
- """EnergyRecorder.set_step method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.post',
- return_value=RECORDER_KO)
- def test_set_step_api_error(self, post_mock=None):
- """EnergyRecorder.set_step method (API error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.post',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_set_step_error(self, post_mock=None):
- """EnergyRecorder.set_step method (method error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
- post_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"] + "/step",
- auth=EnergyRecorder.energy_recorder_api["auth"],
- data=mock.ANY,
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
- side_effect=requests.exceptions.ConnectionError())
- def test_set_step_connection_error(self, conf_loader_mock=None):
- """EnergyRecorder.start test with exception during execution."""
- step_status = EnergyRecorder.set_step(STEP_NAME)
- self.assertFalse(step_status)
-
- @mock.patch('xtesting.energy.energy.requests.delete',
- return_value=RECORDER_OK)
- def test_stop(self, delete_mock=None):
- """EnergyRecorder.stop method (regular case)."""
- self.test_load_config()
- self.assertTrue(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.delete',
- return_value=RECORDER_KO)
- def test_stop_api_error(self, delete_mock=None):
- """EnergyRecorder.stop method (API Error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @mock.patch('xtesting.energy.energy.requests.delete',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_stop_error(self, delete_mock=None):
- """EnergyRecorder.stop method (method error)."""
- self.test_load_config()
- self.assertFalse(EnergyRecorder.stop())
- delete_mock.assert_called_once_with(
- EnergyRecorder.energy_recorder_api["uri"],
- auth=EnergyRecorder.energy_recorder_api["auth"],
- headers=self.request_headers,
- timeout=EnergyRecorder.CONNECTION_TIMEOUT
- )
-
- @energy.enable_recording
- def __decorated_method(self):
- """Call with to energy recorder decorators."""
- return self.returned_value_to_preserve
-
- @energy.enable_recording
- def __decorated_method_with_ex(self):
- """Call with to energy recorder decorators."""
- raise Exception(self.exception_message_to_preserve)
-
- @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
- return_value=None)
- @mock.patch("xtesting.energy.energy.EnergyRecorder")
- def test_decorators(self,
- recorder_mock=None,
- cur_scenario_mock=None):
- """Test energy module decorators."""
- self.__decorated_method()
- calls = [mock.call.start(self.case_name),
- mock.call.stop()]
- recorder_mock.assert_has_calls(calls)
-
- @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
- return_value={"scenario": PREVIOUS_SCENARIO,
- "step": PREVIOUS_STEP})
- @mock.patch("xtesting.energy.energy.EnergyRecorder")
- def test_decorators_with_previous(self,
- recorder_mock=None,
- cur_scenario_mock=None):
- """Test energy module decorators."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- self.__decorated_method()
- calls = [mock.call.start(self.case_name),
- mock.call.submit_scenario(PREVIOUS_SCENARIO,
- PREVIOUS_STEP)]
- recorder_mock.assert_has_calls(calls, True)
-
- def test_decorator_preserve_return(self):
- """Test that decorator preserve method returned value."""
- self.test_load_config()
- self.assertTrue(
- self.__decorated_method() == self.returned_value_to_preserve
- )
-
- @mock.patch(
- "xtesting.energy.energy.finish_session")
- def test_decorator_preserve_ex(self, finish_mock=None):
- """Test that decorator preserve method exceptions."""
- self.test_load_config()
- with self.assertRaises(Exception) as context:
- self.__decorated_method_with_ex()
- self.assertTrue(
- self.exception_message_to_preserve in str(context.exception)
- )
- self.assertTrue(finish_mock.called)
-
- @mock.patch("xtesting.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config(self, loader_mock=None, get_mock=None):
- """Test load config."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
-
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["auth"],
- ("user", "password")
- )
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["uri"],
- "http://pod-uri:8888/recorders/environment/MOCK_POD"
- )
-
- @mock.patch("xtesting.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config_no_creds(self, loader_mock=None, get_mock=None):
- """Test load config without creds."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_nocreds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None)
- self.assertEquals(
- EnergyRecorder.energy_recorder_api["uri"],
- "http://pod-uri:8888/recorders/environment/MOCK_POD"
- )
-
- @mock.patch("xtesting.energy.energy.requests.get",
- return_value=API_OK)
- def test_load_config_ex(self, loader_mock=None, get_mock=None):
- """Test load config with exception."""
- for key in ['NODE_NAME', 'ENERGY_RECORDER_API_URL']:
- os.environ[key] = ''
- with self.assertRaises(AssertionError):
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api, None)
-
- @mock.patch("xtesting.energy.energy.requests.get",
- return_value=API_KO)
- def test_load_config_api_ko(self, loader_mock=None, get_mock=None):
- """Test load config with API unavailable."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self._set_env_creds()
- EnergyRecorder.energy_recorder_api = None
- EnergyRecorder.load_config()
- self.assertEquals(EnergyRecorder.energy_recorder_api["available"],
- False)
-
- @mock.patch('xtesting.energy.energy.requests.get',
- return_value=RECORDER_OK)
- def test_get_current_scenario(self, loader_mock=None, get_mock=None):
- """Test get_current_scenario."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is not None)
-
- @mock.patch('xtesting.energy.energy.requests.get',
- return_value=RECORDER_NOT_FOUND)
- def test_current_scenario_not_found(self, get_mock=None):
- """Test get current scenario not existing."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
- @mock.patch('xtesting.energy.energy.requests.get',
- return_value=RECORDER_KO)
- def test_current_scenario_api_error(self, get_mock=None):
- """Test get current scenario with API error."""
- os.environ['NODE_NAME'] = 'MOCK_POD'
- self.test_load_config()
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
- @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
- side_effect=Exception("Internal execution error (MOCK)"))
- def test_current_scenario_exception(self, get_mock=None):
- """Test get current scenario with exception."""
- scenario = EnergyRecorder.get_current_scenario()
- self.assertTrue(scenario is None)
-
-if __name__ == "__main__":
- logging.disable(logging.CRITICAL)
- unittest.main(verbosity=2)
diff --git a/xtesting/utils/env.py b/xtesting/utils/env.py
index df06cb7b..8bf6da81 100644
--- a/xtesting/utils/env.py
+++ b/xtesting/utils/env.py
@@ -20,10 +20,7 @@ INPUTS = {
'INSTALLER_TYPE': 'unknown',
'BUILD_TAG': None,
'NODE_NAME': None,
- 'TEST_DB_URL': 'http://testresults.opnfv.org/test/api/v1/results',
- 'ENERGY_RECORDER_API_URL': 'http://energy.opnfv.fr/resources',
- 'ENERGY_RECORDER_API_USER': None,
- 'ENERGY_RECORDER_API_PASSWORD': None
+ 'TEST_DB_URL': 'http://testresults.opnfv.org/test/api/v1/results'
}