aboutsummaryrefslogtreecommitdiffstats
path: root/xtesting/tests/unit/energy
diff options
context:
space:
mode:
authorCédric Ollivier <cedric.ollivier@orange.com>2018-02-28 09:35:49 +0100
committerCédric Ollivier <cedric.ollivier@orange.com>2018-02-28 09:36:32 +0100
commit2aab5c48df64b044ab9bae6e883e6e0acaabbf52 (patch)
treec82294952795b3953130bf624929d6ecae3e4fcf /xtesting/tests/unit/energy
parentbaa8f2d5f67d45e5761f92cb93fe22050f08d0fe (diff)
Rename all Functest refs to Xtesting
It mainly renames python modules and then the related documentation config files. Change-Id: I186010bb88d3d39afe7b8fd1ebcef9c690cc1282 Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
Diffstat (limited to 'xtesting/tests/unit/energy')
-rw-r--r--xtesting/tests/unit/energy/__init__.py0
-rw-r--r--xtesting/tests/unit/energy/test_functest_energy.py371
2 files changed, 371 insertions, 0 deletions
diff --git a/xtesting/tests/unit/energy/__init__.py b/xtesting/tests/unit/energy/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/xtesting/tests/unit/energy/__init__.py
diff --git a/xtesting/tests/unit/energy/test_functest_energy.py b/xtesting/tests/unit/energy/test_functest_energy.py
new file mode 100644
index 00000000..ea83c1ea
--- /dev/null
+++ b/xtesting/tests/unit/energy/test_functest_energy.py
@@ -0,0 +1,371 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8 -*-
+
+# Copyright (c) 2017 Orange and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+
+"""Unitary test for energy module."""
+# pylint: disable=unused-argument
+import logging
+import os
+import unittest
+
+import mock
+import requests
+
+from xtesting.energy.energy import EnergyRecorder
+import xtesting.energy.energy as energy
+
+CASE_NAME = "UNIT_TEST_CASE"
+STEP_NAME = "UNIT_TEST_STEP"
+
+PREVIOUS_SCENARIO = "previous_scenario"
+PREVIOUS_STEP = "previous_step"
+
+
+class MockHttpResponse(object): # pylint: disable=too-few-public-methods
+ """Mock response for Energy recorder API."""
+
+ def __init__(self, text, status_code):
+ """Create an instance of MockHttpResponse."""
+ self.text = text
+ self.status_code = status_code
+
+
+API_OK = MockHttpResponse(
+ '{"status": "OK"}',
+ 200
+)
+API_KO = MockHttpResponse(
+ '{"message": "API-KO"}',
+ 500
+)
+
+RECORDER_OK = MockHttpResponse(
+ '{"environment": "UNIT_TEST",'
+ ' "step": "string",'
+ ' "scenario": "' + CASE_NAME + '"}',
+ 200
+)
+RECORDER_KO = MockHttpResponse(
+ '{"message": "An unhandled API exception occurred (MOCK)"}',
+ 500
+)
+RECORDER_NOT_FOUND = MockHttpResponse(
+ '{"message": "Recorder not found (MOCK)"}',
+ 404
+)
+
+
+# pylint: disable=too-many-public-methods
+class EnergyRecorderTest(unittest.TestCase):
+ """Energy module unitary test suite."""
+
+ case_name = CASE_NAME
+ request_headers = {'content-type': 'application/json'}
+ returned_value_to_preserve = "value"
+ exception_message_to_preserve = "exception_message"
+
+ @staticmethod
+ def _set_env_creds():
+ """Set config values."""
+ os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
+ os.environ["ENERGY_RECORDER_API_USER"] = "user"
+ os.environ["ENERGY_RECORDER_API_PASSWORD"] = "password"
+
+ @staticmethod
+ def _set_env_nocreds():
+ """Set config values."""
+ os.environ["ENERGY_RECORDER_API_URL"] = "http://pod-uri:8888"
+ del os.environ["ENERGY_RECORDER_API_USER"]
+ del os.environ["ENERGY_RECORDER_API_PASSWORD"]
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_start(self, post_mock=None, get_mock=None):
+ """EnergyRecorder.start method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_start_error(self, post_mock=None):
+ """EnergyRecorder.start method (error in method)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_start_exception(self, conf_loader_mock=None):
+ """EnergyRecorder.start test with exception during execution."""
+ start_status = EnergyRecorder.start(CASE_NAME)
+ self.assertFalse(start_status)
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_start_api_error(self, post_mock=None):
+ """EnergyRecorder.start method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.start(self.case_name))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_OK)
+ def test_set_step(self, post_mock=None):
+ """EnergyRecorder.set_step method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ return_value=RECORDER_KO)
+ def test_set_step_api_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (API error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.post',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_set_step_error(self, post_mock=None):
+ """EnergyRecorder.set_step method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.set_step(STEP_NAME))
+ post_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"] + "/step",
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ data=mock.ANY,
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=requests.exceptions.ConnectionError())
+ def test_set_step_connection_error(self, conf_loader_mock=None):
+ """EnergyRecorder.start test with exception during execution."""
+ step_status = EnergyRecorder.set_step(STEP_NAME)
+ self.assertFalse(step_status)
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ return_value=RECORDER_OK)
+ def test_stop(self, delete_mock=None):
+ """EnergyRecorder.stop method (regular case)."""
+ self.test_load_config()
+ self.assertTrue(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ return_value=RECORDER_KO)
+ def test_stop_api_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (API Error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @mock.patch('xtesting.energy.energy.requests.delete',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_stop_error(self, delete_mock=None):
+ """EnergyRecorder.stop method (method error)."""
+ self.test_load_config()
+ self.assertFalse(EnergyRecorder.stop())
+ delete_mock.assert_called_once_with(
+ EnergyRecorder.energy_recorder_api["uri"],
+ auth=EnergyRecorder.energy_recorder_api["auth"],
+ headers=self.request_headers,
+ timeout=EnergyRecorder.CONNECTION_TIMEOUT
+ )
+
+ @energy.enable_recording
+ def __decorated_method(self):
+ """Call with to energy recorder decorators."""
+ return self.returned_value_to_preserve
+
+ @energy.enable_recording
+ def __decorated_method_with_ex(self):
+ """Call with to energy recorder decorators."""
+ raise Exception(self.exception_message_to_preserve)
+
+ @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
+ return_value=None)
+ @mock.patch("xtesting.energy.energy.EnergyRecorder")
+ def test_decorators(self,
+ recorder_mock=None,
+ cur_scenario_mock=None):
+ """Test energy module decorators."""
+ self.__decorated_method()
+ calls = [mock.call.start(self.case_name),
+ mock.call.stop()]
+ recorder_mock.assert_has_calls(calls)
+
+ @mock.patch("xtesting.energy.energy.EnergyRecorder.get_current_scenario",
+ return_value={"scenario": PREVIOUS_SCENARIO,
+ "step": PREVIOUS_STEP})
+ @mock.patch("xtesting.energy.energy.EnergyRecorder")
+ def test_decorators_with_previous(self,
+ recorder_mock=None,
+ cur_scenario_mock=None):
+ """Test energy module decorators."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ self.__decorated_method()
+ calls = [mock.call.start(self.case_name),
+ mock.call.submit_scenario(PREVIOUS_SCENARIO,
+ PREVIOUS_STEP)]
+ recorder_mock.assert_has_calls(calls, True)
+
+ def test_decorator_preserve_return(self):
+ """Test that decorator preserve method returned value."""
+ self.test_load_config()
+ self.assertTrue(
+ self.__decorated_method() == self.returned_value_to_preserve
+ )
+
+ @mock.patch(
+ "xtesting.energy.energy.finish_session")
+ def test_decorator_preserve_ex(self, finish_mock=None):
+ """Test that decorator preserve method exceptions."""
+ self.test_load_config()
+ with self.assertRaises(Exception) as context:
+ self.__decorated_method_with_ex()
+ self.assertTrue(
+ self.exception_message_to_preserve in str(context.exception)
+ )
+ self.assertTrue(finish_mock.called)
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config(self, loader_mock=None, get_mock=None):
+ """Test load config."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["auth"],
+ ("user", "password")
+ )
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config_no_creds(self, loader_mock=None, get_mock=None):
+ """Test load config without creds."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_nocreds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api["auth"], None)
+ self.assertEquals(
+ EnergyRecorder.energy_recorder_api["uri"],
+ "http://pod-uri:8888/recorders/environment/MOCK_POD"
+ )
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_OK)
+ def test_load_config_ex(self, loader_mock=None, get_mock=None):
+ """Test load config with exception."""
+ for key in ['NODE_NAME', 'ENERGY_RECORDER_API_URL']:
+ os.environ[key] = ''
+ with self.assertRaises(AssertionError):
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api, None)
+
+ @mock.patch("xtesting.energy.energy.requests.get",
+ return_value=API_KO)
+ def test_load_config_api_ko(self, loader_mock=None, get_mock=None):
+ """Test load config with API unavailable."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self._set_env_creds()
+ EnergyRecorder.energy_recorder_api = None
+ EnergyRecorder.load_config()
+ self.assertEquals(EnergyRecorder.energy_recorder_api["available"],
+ False)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_OK)
+ def test_get_current_scenario(self, loader_mock=None, get_mock=None):
+ """Test get_current_scenario."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is not None)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_NOT_FOUND)
+ def test_current_scenario_not_found(self, get_mock=None):
+ """Test get current scenario not existing."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+ @mock.patch('xtesting.energy.energy.requests.get',
+ return_value=RECORDER_KO)
+ def test_current_scenario_api_error(self, get_mock=None):
+ """Test get current scenario with API error."""
+ os.environ['NODE_NAME'] = 'MOCK_POD'
+ self.test_load_config()
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+ @mock.patch('xtesting.energy.energy.EnergyRecorder.load_config',
+ side_effect=Exception("Internal execution error (MOCK)"))
+ def test_current_scenario_exception(self, get_mock=None):
+ """Test get current scenario with exception."""
+ scenario = EnergyRecorder.get_current_scenario()
+ self.assertTrue(scenario is None)
+
+if __name__ == "__main__":
+ logging.disable(logging.CRITICAL)
+ unittest.main(verbosity=2)