diff options
Diffstat (limited to 'networking_sfc/tests/unit/db')
-rw-r--r-- | networking_sfc/tests/unit/db/__init__.py | 0 | ||||
-rw-r--r-- | networking_sfc/tests/unit/db/test_flowclassifier_db.py | 677 | ||||
-rw-r--r-- | networking_sfc/tests/unit/db/test_sfc_db.py | 1490 |
3 files changed, 2167 insertions, 0 deletions
diff --git a/networking_sfc/tests/unit/db/__init__.py b/networking_sfc/tests/unit/db/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/tests/unit/db/__init__.py diff --git a/networking_sfc/tests/unit/db/test_flowclassifier_db.py b/networking_sfc/tests/unit/db/test_flowclassifier_db.py new file mode 100644 index 0000000..36c9af8 --- /dev/null +++ b/networking_sfc/tests/unit/db/test_flowclassifier_db.py @@ -0,0 +1,677 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import mock +import six +import webob.exc + +from oslo_config import cfg +from oslo_utils import importutils +from oslo_utils import uuidutils + +from neutron.api import extensions as api_ext +from neutron.common import config +from neutron.common import constants as const +import neutron.extensions as nextensions + +from networking_sfc.db import flowclassifier_db as fdb +from networking_sfc import extensions +from networking_sfc.extensions import flowclassifier as fc_ext +from networking_sfc.tests import base + + +DB_FLOWCLASSIFIER_PLUGIN_CLASS = ( + "networking_sfc.db.flowclassifier_db.FlowClassifierDbPlugin" +) +extensions_path = ':'.join(extensions.__path__ + nextensions.__path__) + + +class FlowClassifierDbPluginTestCaseBase(base.BaseTestCase): + def _create_flow_classifier( + self, fmt, flow_classifier=None, expected_res_status=None, **kwargs + ): + ctx = kwargs.get('context', None) + tenant_id = kwargs.get('tenant_id', self._tenant_id) + data = {'flow_classifier': flow_classifier or {}} + if ctx is None: + data['flow_classifier'].update({'tenant_id': tenant_id}) + req = self.new_create_request( + 'flow_classifiers', data, fmt, context=ctx + ) + res = req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(res.status_int, expected_res_status) + return res + + @contextlib.contextmanager + def flow_classifier( + self, fmt=None, flow_classifier=None, do_delete=True, **kwargs + ): + if not fmt: + fmt = self.fmt + res = self._create_flow_classifier(fmt, flow_classifier, **kwargs) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + flow_classifier = self.deserialize(fmt or self.fmt, res) + yield flow_classifier + if do_delete: + self._delete('flow_classifiers', + flow_classifier['flow_classifier']['id']) + + def _get_expected_flow_classifier(self, flow_classifier): + expected_flow_classifier = { + 'name': flow_classifier.get('name') or '', + 'description': flow_classifier.get('description') or '', + 'source_port_range_min': flow_classifier.get( + 'source_port_range_min'), + 'source_port_range_max': flow_classifier.get( + 'source_port_range_max'), + 'destination_port_range_min': flow_classifier.get( + 'destination_port_range_min'), + 'destination_port_range_max': flow_classifier.get( + 'destination_port_range_max'), + 'ethertype': flow_classifier.get( + 'ethertype') or 'IPv4', + 'protocol': flow_classifier.get( + 'protocol'), + 'l7_parameters': flow_classifier.get( + 'l7_parameters') or {} + } + if ( + 'source_ip_prefix' in flow_classifier and + flow_classifier['source_ip_prefix'] + ): + expected_flow_classifier['source_ip_prefix'] = ( + flow_classifier['source_ip_prefix']) + if ( + 'destination_ip_prefix' in flow_classifier and + flow_classifier['destination_ip_prefix'] + ): + expected_flow_classifier['destination_ip_prefix'] = ( + flow_classifier['destination_ip_prefix']) + return expected_flow_classifier + + def _test_create_flow_classifier( + self, flow_classifier, expected_flow_classifier=None + ): + if expected_flow_classifier is None: + expected_flow_classifier = self._get_expected_flow_classifier( + flow_classifier) + with self.flow_classifier(flow_classifier=flow_classifier) as fc: + for k, v in six.iteritems(expected_flow_classifier): + self.assertIn(k, fc['flow_classifier']) + self.assertEqual(fc['flow_classifier'][k], v) + + +class FlowClassifierDbPluginTestCase( + base.NeutronDbPluginV2TestCase, + FlowClassifierDbPluginTestCaseBase +): + resource_prefix_map = dict( + (k, fc_ext.FLOW_CLASSIFIER_PREFIX) + for k in fc_ext.RESOURCE_ATTRIBUTE_MAP.keys() + ) + + def setUp(self, core_plugin=None, flowclassifier_plugin=None, + ext_mgr=None): + mock_log_p = mock.patch.object(fdb, 'LOG') + self.mock_log = mock_log_p.start() + cfg.CONF.register_opts(fc_ext.flow_classifier_quota_opts, 'QUOTAS') + if not flowclassifier_plugin: + flowclassifier_plugin = DB_FLOWCLASSIFIER_PLUGIN_CLASS + service_plugins = { + fc_ext.FLOW_CLASSIFIER_EXT: flowclassifier_plugin + } + fdb.FlowClassifierDbPlugin.supported_extension_aliases = [ + fc_ext.FLOW_CLASSIFIER_EXT] + fdb.FlowClassifierDbPlugin.path_prefix = ( + fc_ext.FLOW_CLASSIFIER_PREFIX + ) + super(FlowClassifierDbPluginTestCase, self).setUp( + ext_mgr=ext_mgr, + plugin=core_plugin, + service_plugins=service_plugins + ) + if not ext_mgr: + self.flowclassifier_plugin = importutils.import_object( + flowclassifier_plugin) + ext_mgr = api_ext.PluginAwareExtensionManager( + extensions_path, + {fc_ext.FLOW_CLASSIFIER_EXT: self.flowclassifier_plugin} + ) + app = config.load_paste_app('extensions_test_app') + self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr) + + def test_create_flow_classifier(self): + self._test_create_flow_classifier({}) + + def test_quota_create_flow_classifier(self): + cfg.CONF.set_override('quota_flow_classifier', 3, group='QUOTAS') + self._create_flow_classifier(self.fmt, {}, expected_res_status=201) + self._create_flow_classifier(self.fmt, {}, expected_res_status=201) + self._create_flow_classifier(self.fmt, {}, expected_res_status=201) + self._create_flow_classifier(self.fmt, {}, expected_res_status=409) + + def test_create_flow_classifier_with_all_fields(self): + self._test_create_flow_classifier({ + 'name': 'test1', + 'ethertype': const.IPv4, + 'protocol': const.PROTO_NAME_TCP, + 'source_port_range_min': 100, + 'source_port_range_max': 200, + 'destination_port_range_min': 101, + 'destination_port_range_max': 201, + 'source_ip_prefix': '10.100.0.0/16', + 'destination_ip_prefix': '10.200.0.0/16', + 'logical_source_port': None, + 'logical_destination_port': None, + 'l7_parameters': {} + }) + + def test_create_flow_classifier_with_all_supported_ethertype(self): + self._test_create_flow_classifier({ + 'ethertype': None + }) + self._test_create_flow_classifier({ + 'ethertype': 'IPv4' + }) + self._test_create_flow_classifier({ + 'ethertype': 'IPv6' + }) + + def test_create_flow_classifier_with_invalid_ethertype(self): + self._create_flow_classifier( + self.fmt, { + 'ethertype': 'unsupported', + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_all_supported_protocol(self): + self._test_create_flow_classifier({ + 'protocol': None + }) + self._test_create_flow_classifier({ + 'protocol': const.PROTO_NAME_TCP + }) + self._test_create_flow_classifier({ + 'protocol': const.PROTO_NAME_UDP + }) + self._test_create_flow_classifier({ + 'protocol': const.PROTO_NAME_ICMP + }) + + def test_create_flow_classifier_with_invalid_protocol(self): + self._create_flow_classifier( + self.fmt, { + 'protocol': 'unsupported', + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_all_supported_port_protocol(self): + self._test_create_flow_classifier({ + 'source_port_range_min': None, + 'source_port_range_max': None, + 'destination_port_range_min': None, + 'destination_port_range_max': None + }) + self._test_create_flow_classifier({ + 'source_port_range_min': 100, + 'source_port_range_max': 200, + 'destination_port_range_min': 100, + 'destination_port_range_max': 200, + 'protocol': const.PROTO_NAME_TCP + }) + self._test_create_flow_classifier({ + 'source_port_range_min': 100, + 'source_port_range_max': 100, + 'destination_port_range_min': 100, + 'destination_port_range_max': 100, + 'protocol': const.PROTO_NAME_TCP + }) + self._test_create_flow_classifier({ + 'source_port_range_min': '100', + 'source_port_range_max': '200', + 'destination_port_range_min': '100', + 'destination_port_range_max': '200', + 'protocol': const.PROTO_NAME_UDP + }, { + 'source_port_range_min': 100, + 'source_port_range_max': 200, + 'destination_port_range_min': 100, + 'destination_port_range_max': 200, + 'protocol': const.PROTO_NAME_UDP + }) + + def test_create_flow_classifier_with_invalid__port_protocol(self): + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 'abc', + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_max': 'abc', + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 100, + 'source_port_range_max': 99, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 65536, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_max': 65536, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': -1, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_max': -1, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 'abc', + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_max': 'abc', + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 100, + 'destination_port_range_max': 99, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 65536, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_max': 65536, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': -1, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_max': -1, + 'protocol': const.PROTO_NAME_TCP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 100 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_max': 100 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 100, + 'source_port_range_max': 200 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_port_range_min': 100, + 'source_port_range_max': 200, + 'protocol': const.PROTO_NAME_ICMP + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 100 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_max': 100 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 100, + 'destination_port_range_max': 200 + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_port_range_min': 100, + 'destination_port_range_max': 200, + 'protocol': const.PROTO_NAME_ICMP + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_all_supported_ip_prefix(self): + self._test_create_flow_classifier({ + 'source_ip_prefix': None, + 'destination_ip_prefix': None + }) + self._test_create_flow_classifier({ + 'source_ip_prefix': '10.0.0.0/8', + 'destination_ip_prefix': '10.0.0.0/8' + }) + + def test_create_flow_classifier_with_invalid_ip_prefix(self): + self._create_flow_classifier( + self.fmt, { + 'source_ip_prefix': '10.0.0.0/34' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_ip_prefix': '10.0.0.0.0/8' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_ip_prefix': '256.0.0.0/8' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'source_ip_prefix': '10.0.0.0' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_ip_prefix': '10.0.0.0/34' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_ip_prefix': '10.0.0.0.0/8' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_ip_prefix': '256.0.0.0/8' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'destination_ip_prefix': '10.0.0.0' + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_all_supported_l7_parameters(self): + self._test_create_flow_classifier({ + 'l7_parameters': None + }) + self._test_create_flow_classifier({ + 'l7_parameters': {} + }) + + def test_create_flow_classifier_with_invalid_l7_parameters(self): + self._create_flow_classifier( + self.fmt, { + 'l7_parameters': {'abc': 'def'} + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_port_id(self): + self._test_create_flow_classifier({ + 'logical_source_port': None, + 'logical_destination_port': None, + }) + with self.port( + name='test1' + ) as port: + self._test_create_flow_classifier({ + 'logical_source_port': port['port']['id'], + 'logical_destination_port': port['port']['id'], + }) + + def test_create_flow_classifier_with_nouuid_port_id(self): + self._create_flow_classifier( + self.fmt, { + 'logical_source_port': 'abc' + }, + expected_res_status=400 + ) + self._create_flow_classifier( + self.fmt, { + 'logical_destination_port': 'abc' + }, + expected_res_status=400 + ) + + def test_create_flow_classifier_with_unknown_port_id(self): + self._create_flow_classifier( + self.fmt, { + 'logical_source_port': uuidutils.generate_uuid() + }, + expected_res_status=404 + ) + self._create_flow_classifier( + self.fmt, { + 'logical_destination_port': uuidutils.generate_uuid() + }, + expected_res_status=404 + ) + + def test_list_flow_classifiers(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1' + }) as fc1, self.flow_classifier(flow_classifier={ + 'name': 'test2', + }) as fc2: + fcs = [fc1, fc2] + self._test_list_resources( + 'flow_classifier', fcs + ) + + def test_list_flow_classifiers_with_params(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1' + }) as fc1, self.flow_classifier(flow_classifier={ + 'name': 'test2', + }) as fc2: + self._test_list_resources( + 'flow_classifier', [fc1], + query_params='name=test1' + ) + self._test_list_resources( + 'flow_classifier', [fc2], + query_params='name=test2' + ) + self._test_list_resources( + 'flow_classifier', [], + query_params='name=test3' + ) + + def test_list_flow_classifiers_with_unknown_params(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1' + }) as fc1, self.flow_classifier(flow_classifier={ + 'name': 'test2', + }) as fc2: + self._test_list_resources( + 'flow_classifier', [fc1, fc2], + query_params='hello=test3' + ) + + def test_show_flow_classifier(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1' + }) as fc: + req = self.new_show_request( + 'flow_classifiers', fc['flow_classifier']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(fc['flow_classifier']): + self.assertEqual(res['flow_classifier'][k], v) + + def test_show_flow_classifier_noexist(self): + req = self.new_show_request( + 'flow_classifiers', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_update_flow_classifier(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1', + 'description': 'desc1' + }) as fc: + updates = { + 'name': 'test2', + 'description': 'desc2', + } + req = self.new_update_request( + 'flow_classifiers', {'flow_classifier': updates}, + fc['flow_classifier']['id'] + ) + res = self.deserialize( + self.fmt, + req.get_response(self.ext_api) + ) + expected = fc['flow_classifier'] + expected.update(updates) + for k, v in six.iteritems(expected): + self.assertEqual(res['flow_classifier'][k], v) + req = self.new_show_request( + 'flow_classifiers', fc['flow_classifier']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(expected): + self.assertEqual(res['flow_classifier'][k], v) + + def _test_update_with_field( + self, fc, updates, expected_status_code + ): + req = self.new_update_request( + 'flow_classifiers', {'flow_classifier': updates}, + fc['flow_classifier']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, expected_status_code) + + def test_update_flow_classifer_unsupported_fields(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1', + 'description': 'desc1' + }) as fc: + self._test_update_with_field( + fc, {'ethertype': None}, 400) + self._test_update_with_field( + fc, {'protocol': None}, 400) + self._test_update_with_field( + fc, {'source_port_range_min': None}, 400) + self._test_update_with_field( + fc, {'source_port_range_max': None}, 400) + self._test_update_with_field( + fc, {'destination_port_range_min': None}, 400) + self._test_update_with_field( + fc, {'destination_port_range_max': None}, 400) + self._test_update_with_field( + fc, {'source_ip_prefix': None}, 400) + self._test_update_with_field( + fc, {'destination_ip_prefix': None}, 400) + self._test_update_with_field( + fc, {'l7_parameters': None}, 400) + + def test_delete_flow_classifier(self): + with self.flow_classifier(flow_classifier={ + 'name': 'test1' + }, do_delete=False) as fc: + req = self.new_delete_request( + 'flow_classifiers', fc['flow_classifier']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + req = self.new_show_request( + 'flow_classifiers', fc['flow_classifier']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_delete_flow_classifier_noexist(self): + req = self.new_delete_request( + 'flow_classifiers', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) diff --git a/networking_sfc/tests/unit/db/test_sfc_db.py b/networking_sfc/tests/unit/db/test_sfc_db.py new file mode 100644 index 0000000..70d57e3 --- /dev/null +++ b/networking_sfc/tests/unit/db/test_sfc_db.py @@ -0,0 +1,1490 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import mock +import six +import webob.exc + +from oslo_config import cfg +from oslo_utils import importutils +from oslo_utils import uuidutils + +from neutron.api import extensions as api_ext +from neutron.common import config +import neutron.extensions as nextensions + +from networking_sfc.db import flowclassifier_db as fdb +from networking_sfc.db import sfc_db +from networking_sfc import extensions +from networking_sfc.extensions import flowclassifier as fc_ext +from networking_sfc.extensions import sfc +from networking_sfc.tests import base +from networking_sfc.tests.unit.db import test_flowclassifier_db + + +DB_SFC_PLUGIN_CLASS = ( + "networking_sfc.db.sfc_db.SfcDbPlugin" +) +extensions_path = ':'.join(extensions.__path__ + nextensions.__path__) + + +class SfcDbPluginTestCaseBase( + base.BaseTestCase +): + def _create_port_chain( + self, fmt, port_chain=None, expected_res_status=None, **kwargs + ): + ctx = kwargs.get('context', None) + tenant_id = kwargs.get('tenant_id', self._tenant_id) + data = {'port_chain': port_chain or {}} + if ctx is None: + data['port_chain'].update({'tenant_id': tenant_id}) + req = self.new_create_request( + 'port_chains', data, fmt, context=ctx + ) + res = req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(res.status_int, expected_res_status) + return res + + @contextlib.contextmanager + def port_chain(self, fmt=None, port_chain=None, do_delete=True, **kwargs): + if not fmt: + fmt = self.fmt + res = self._create_port_chain(fmt, port_chain, **kwargs) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + port_chain = self.deserialize(fmt or self.fmt, res) + yield port_chain + if do_delete: + self._delete('port_chains', port_chain['port_chain']['id']) + + def _create_port_pair_group( + self, fmt, port_pair_group=None, expected_res_status=None, **kwargs + ): + ctx = kwargs.get('context', None) + tenant_id = kwargs.get('tenant_id', self._tenant_id) + data = {'port_pair_group': port_pair_group or {}} + if ctx is None: + data['port_pair_group'].update({'tenant_id': tenant_id}) + req = self.new_create_request( + 'port_pair_groups', data, fmt, context=ctx + ) + res = req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(res.status_int, expected_res_status) + return res + + @contextlib.contextmanager + def port_pair_group( + self, fmt=None, port_pair_group=None, do_delete=True, **kwargs + ): + if not fmt: + fmt = self.fmt + res = self._create_port_pair_group(fmt, port_pair_group, **kwargs) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + port_pair_group = self.deserialize(fmt or self.fmt, res) + yield port_pair_group + if do_delete: + self._delete( + 'port_pair_groups', + port_pair_group['port_pair_group']['id']) + + def _create_port_pair( + self, fmt, port_pair=None, expected_res_status=None, **kwargs + ): + ctx = kwargs.get('context', None) + tenant_id = kwargs.get('tenant_id', self._tenant_id) + data = {'port_pair': port_pair or {}} + if ctx is None: + data['port_pair'].update({'tenant_id': tenant_id}) + req = self.new_create_request( + 'port_pairs', data, fmt, context=ctx + ) + res = req.get_response(self.ext_api) + if expected_res_status: + self.assertEqual(res.status_int, expected_res_status) + return res + + @contextlib.contextmanager + def port_pair(self, fmt=None, port_pair=None, do_delete=True, **kwargs): + if not fmt: + fmt = self.fmt + res = self._create_port_pair(fmt, port_pair, **kwargs) + if res.status_int >= 400: + raise webob.exc.HTTPClientError(code=res.status_int) + port_pair = self.deserialize(fmt or self.fmt, res) + yield port_pair + if do_delete: + self._delete('port_pairs', port_pair['port_pair']['id']) + + def _get_expected_port_pair(self, port_pair): + return { + 'name': port_pair.get('name') or '', + 'description': port_pair.get('description') or '', + 'egress': port_pair.get('egress'), + 'ingress': port_pair.get('ingress'), + 'service_function_parameters': port_pair.get( + 'service_function_parameters') or {'correlation': None} + } + + def _test_create_port_pair(self, port_pair, expected_port_pair=None): + if expected_port_pair is None: + expected_port_pair = self._get_expected_port_pair(port_pair) + with self.port_pair(port_pair=port_pair) as pp: + for k, v in six.iteritems(expected_port_pair): + self.assertEqual(pp['port_pair'][k], v) + + def _test_create_port_pairs( + self, port_pairs, expected_port_pairs=None + ): + if port_pairs: + port_pair = port_pairs.pop() + if expected_port_pairs: + expected_port_pair = expected_port_pairs.pop() + else: + expected_port_pair = self._get_expected_port_pair(port_pair) + with self.port_pair(port_pair=port_pair) as pp: + for k, v in six.iteritems(expected_port_pair): + self.assertEqual(pp['port_pair'][k], v) + + def _get_expected_port_pair_group(self, port_pair_group): + return { + 'name': port_pair_group.get('name') or '', + 'description': port_pair_group.get('description') or '', + 'port_pairs': port_pair_group.get('port_pairs') or [] + } + + def _test_create_port_pair_group( + self, port_pair_group, expected_port_pair_group=None + ): + if expected_port_pair_group is None: + expected_port_pair_group = self._get_expected_port_pair_group( + port_pair_group) + with self.port_pair_group(port_pair_group=port_pair_group) as pg: + for k, v in six.iteritems(expected_port_pair_group): + self.assertEqual(pg['port_pair_group'][k], v) + + def _test_create_port_pair_groups( + self, port_pair_groups, expected_port_pair_groups=None + ): + if port_pair_groups: + port_pair_group = port_pair_groups.pop() + if expected_port_pair_groups: + expected_port_pair_group = expected_port_pair_groups.pop() + else: + expected_port_pair_group = self._get_expected_port_pair_group( + port_pair_group) + with self.port_pair_group(port_pair_group=port_pair_group) as pg: + for k, v in six.iteritems(expected_port_pair_group): + self.assertEqual(pg['port_pair_group'][k], v) + + def _get_expected_port_chain(self, port_chain): + return { + 'name': port_chain.get('name') or '', + 'description': port_chain.get('description') or '', + 'port_pair_groups': port_chain['port_pair_groups'], + 'flow_classifiers': port_chain.get('flow_classifiers') or [], + 'chain_parameters': port_chain.get( + 'chain_parameters') or {'correlation': 'mpls'} + } + + def _test_create_port_chain(self, port_chain, expected_port_chain=None): + if expected_port_chain is None: + expected_port_chain = self._get_expected_port_chain(port_chain) + with self.port_chain(port_chain=port_chain) as pc: + for k, v in six.iteritems(expected_port_chain): + self.assertEqual(pc['port_chain'][k], v) + + def _test_create_port_chains( + self, port_chains, expected_port_chains=None + ): + if port_chains: + port_chain = port_chains.pop() + if expected_port_chains: + expected_port_chain = expected_port_chains.pop() + else: + expected_port_chain = self._get_expected_port_chain( + port_chain) + with self.port_chain(port_chain=port_chain) as pc: + for k, v in six.iteritems(expected_port_chain): + self.assertEqual(pc['port_chain'][k], v) + + +class SfcDbPluginTestCase( + base.NeutronDbPluginV2TestCase, + test_flowclassifier_db.FlowClassifierDbPluginTestCaseBase, + SfcDbPluginTestCaseBase +): + resource_prefix_map = dict([ + (k, sfc.SFC_PREFIX) + for k in sfc.RESOURCE_ATTRIBUTE_MAP.keys() + ] + [ + (k, fc_ext.FLOW_CLASSIFIER_PREFIX) + for k in fc_ext.RESOURCE_ATTRIBUTE_MAP.keys() + ]) + + def setUp(self, core_plugin=None, sfc_plugin=None, + flowclassifier_plugin=None, ext_mgr=None): + mock_log_p = mock.patch.object(sfc_db, 'LOG') + self.mock_log = mock_log_p.start() + cfg.CONF.register_opts(sfc.sfc_quota_opts, 'QUOTAS') + if not sfc_plugin: + sfc_plugin = DB_SFC_PLUGIN_CLASS + if not flowclassifier_plugin: + flowclassifier_plugin = ( + test_flowclassifier_db.DB_FLOWCLASSIFIER_PLUGIN_CLASS) + + service_plugins = { + sfc.SFC_EXT: sfc_plugin, + fc_ext.FLOW_CLASSIFIER_EXT: flowclassifier_plugin + } + sfc_db.SfcDbPlugin.supported_extension_aliases = [ + "sfc"] + sfc_db.SfcDbPlugin.path_prefix = sfc.SFC_PREFIX + fdb.FlowClassifierDbPlugin.supported_extension_aliases = [ + "flow_classifier"] + fdb.FlowClassifierDbPlugin.path_prefix = ( + fc_ext.FLOW_CLASSIFIER_PREFIX + ) + super(SfcDbPluginTestCase, self).setUp( + ext_mgr=ext_mgr, + plugin=core_plugin, + service_plugins=service_plugins + ) + if not ext_mgr: + self.sfc_plugin = importutils.import_object(sfc_plugin) + self.flowclassifier_plugin = importutils.import_object( + flowclassifier_plugin) + ext_mgr = api_ext.PluginAwareExtensionManager( + extensions_path, + { + sfc.SFC_EXT: self.sfc_plugin, + fc_ext.FLOW_CLASSIFIER_EXT: self.flowclassifier_plugin + } + ) + app = config.load_paste_app('extensions_test_app') + self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr) + + def test_create_port_chain(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'port_pair_groups': [pg['port_pair_group']['id']]}) + + def test_quota_create_port_chain(self): + cfg.CONF.set_override('quota_port_chain', 3, group='QUOTAS') + with self.port_pair_group( + port_pair_group={}, do_delete=False + ) as pg1, self.port_pair_group( + port_pair_group={}, do_delete=False + ) as pg2, self.port_pair_group( + port_pair_group={}, do_delete=False + ) as pg3, self.port_pair_group( + port_pair_group={}, do_delete=False + ) as pg4: + self._create_port_chain( + self.fmt, { + 'port_pair_groups': [pg1['port_pair_group']['id']] + }, expected_res_status=201) + self._create_port_chain( + self.fmt, { + 'port_pair_groups': [pg2['port_pair_group']['id']] + }, expected_res_status=201) + self._create_port_chain( + self.fmt, { + 'port_pair_groups': [pg3['port_pair_group']['id']] + }, expected_res_status=201) + self._create_port_chain( + self.fmt, { + 'port_pair_groups': [pg4['port_pair_group']['id']] + }, expected_res_status=409) + + def test_create_port_chain_all_fields(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'port_pair_groups': [pg['port_pair_group']['id']], + 'flow_classifiers': [], + 'name': 'abc', + 'description': 'def', + 'chain_parameters': {'correlation': 'mpls'} + }) + + def test_create_port_chain_multi_port_pair_groups(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + self._test_create_port_chain({ + 'port_pair_groups': [ + pg1['port_pair_group']['id'], + pg2['port_pair_group']['id'] + ] + }) + + def test_create_port_chain_shared_port_pair_groups(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2, self.port_pair_group( + port_pair_group={} + ) as pg3: + self._test_create_port_chains([{ + 'port_pair_groups': [ + pg1['port_pair_group']['id'], + pg2['port_pair_group']['id'] + ] + }, { + 'port_pair_groups': [ + pg1['port_pair_group']['id'], + pg3['port_pair_group']['id'] + ] + }]) + + def test_create_port_chain_shared_port_pair_groups_different_order(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + self._test_create_port_chains([{ + 'port_pair_groups': [ + pg1['port_pair_group']['id'], + pg2['port_pair_group']['id'] + ] + }, { + 'port_pair_groups': [ + pg2['port_pair_group']['id'], + pg1['port_pair_group']['id'] + ] + }]) + + def test_create_port_chain_with_empty_chain_parameters(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'chain_parameters': {}, + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_none_chain_parameters(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'chain_parameters': None, + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_default_chain_parameters(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'chain_parameters': {'correlation': 'mpls'}, + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_none_flow_classifiers(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'flow_classifiers': None, + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_empty_flow_classifiers(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'flow_classifiers': [], + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_flow_classifiers(self): + with self.flow_classifier(flow_classifier={}) as fc: + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'flow_classifiers': [fc['flow_classifier']['id']], + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_multi_flow_classifiers(self): + with self.flow_classifier( + flow_classifier={} + ) as fc1, self.flow_classifier( + flow_classifier={} + ) as fc2: + with self.port_pair_group(port_pair_group={}) as pg: + self._test_create_port_chain({ + 'flow_classifiers': [ + fc1['flow_classifier']['id'], + fc2['flow_classifier']['id'] + ], + 'port_pair_groups': [pg['port_pair_group']['id']] + }) + + def test_create_port_chain_with_port_pairs(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pp1, self.port_pair(port_pair={ + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pp2: + with self.port_pair_group(port_pair_group={ + 'port_pairs': [ + pp1['port_pair']['id'] + ] + }) as pg1, self.port_pair_group(port_pair_group={ + 'port_pairs': [ + pp2['port_pair']['id'] + ] + }) as pg2: + self._test_create_port_chain({ + 'port_pair_groups': [ + pg1['port_pair_group']['id'], + pg2['port_pair_group']['id'] + ] + }) + + def test_create_port_chain_with_empty_port_pair_groups(self): + self._create_port_chain( + self.fmt, {'port_pair_groups': []}, + expected_res_status=400 + ) + + def test_create_port_chain_with_nonuuid_port_pair_group_id(self): + self._create_port_chain( + self.fmt, {'port_pair_groups': ['unknown']}, + expected_res_status=400 + ) + + def test_create_port_chain_with_unknown_port_pair_group_id(self): + self._create_port_chain( + self.fmt, {'port_pair_groups': [uuidutils.generate_uuid()]}, + expected_res_status=404 + ) + + def test_create_port_chain_with_same_port_pair_groups(self): + with self.port_pair_group( + port_pair_group={} + ) as pg: + with self.port_chain( + port_chain={ + 'port_pair_groups': [pg['port_pair_group']['id']] + } + ): + self._create_port_chain( + self.fmt, { + 'port_pair_groups': [pg['port_pair_group']['id']] + }, expected_res_status=409 + ) + + def test_create_port_chain_with_no_port_pair_groups(self): + self._create_port_chain( + self.fmt, {}, expected_res_status=400 + ) + + def test_create_port_chain_with_invalid_chain_parameters(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._create_port_chain( + self.fmt, { + 'chain_parameters': {'correlation': 'unknown'}, + 'port_pair_groups': [pg['port_pair_group']['id']] + }, expected_res_status=400 + ) + + def test_create_port_chain_unknown_flow_classifiers(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._create_port_chain( + self.fmt, { + 'flow_classifiers': [uuidutils.generate_uuid()], + 'port_pair_groups': [pg['port_pair_group']['id']] + }, expected_res_status=404 + ) + + def test_create_port_chain_nouuid_flow_classifiers(self): + with self.port_pair_group(port_pair_group={}) as pg: + self._create_port_chain( + self.fmt, { + 'flow_classifiers': ['unknown'], + 'port_pair_groups': [pg['port_pair_group']['id']] + }, expected_res_status=400 + ) + + def test_list_port_chains(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg1['port_pair_group']['id']] + }) as pc1, self.port_chain(port_chain={ + 'port_pair_groups': [pg2['port_pair_group']['id']] + }) as pc2: + port_chains = [pc1, pc2] + self._test_list_resources( + 'port_chain', port_chains + ) + + def test_list_port_chains_with_params(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + with self.port_chain(port_chain={ + 'name': 'test1', + 'port_pair_groups': [pg1['port_pair_group']['id']] + }) as pc1, self.port_chain(port_chain={ + 'name': 'test2', + 'port_pair_groups': [pg2['port_pair_group']['id']] + }) as pc2: + self._test_list_resources( + 'port_chain', [pc1], + query_params='name=test1' + ) + self._test_list_resources( + 'port_chain', [pc2], + query_params='name=test2' + ) + self._test_list_resources( + 'port_chain', [], + query_params='name=test3' + ) + + def test_list_port_chains_with_unknown_params(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + with self.port_chain(port_chain={ + 'name': 'test1', + 'port_pair_groups': [pg1['port_pair_group']['id']] + }) as pc1, self.port_chain(port_chain={ + 'name': 'test2', + 'port_pair_groups': [pg2['port_pair_group']['id']] + }) as pc2: + self._test_list_resources( + 'port_chain', [pc1, pc2], + query_params='hello=test3' + ) + + def test_show_port_chain(self): + with self.port_pair_group( + port_pair_group={} + ) as pg: + with self.port_chain(port_chain={ + 'name': 'test1', + 'description': 'portchain', + 'port_pair_groups': [pg['port_pair_group']['id']] + }) as pc: + req = self.new_show_request( + 'port_chains', pc['port_chain']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + expected = self._get_expected_port_chain(pc['port_chain']) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_chain'][k], v) + + def test_show_port_chain_noexist(self): + req = self.new_show_request( + 'port_chains', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_update_port_chain(self): + with self.flow_classifier( + flow_classifier={} + ) as fc1, self.flow_classifier( + flow_classifier={} + ) as fc2: + with self.port_pair_group( + port_pair_group={} + ) as pg: + with self.port_chain(port_chain={ + 'name': 'test1', + 'description': 'desc1', + 'port_pair_groups': [pg['port_pair_group']['id']], + 'flow_classifiers': [fc1['flow_classifier']['id']] + }) as pc: + updates = { + 'name': 'test2', + 'description': 'desc2', + 'flow_classifiers': [fc2['flow_classifier']['id']] + } + req = self.new_update_request( + 'port_chains', {'port_chain': updates}, + pc['port_chain']['id'] + ) + res = self.deserialize( + self.fmt, + req.get_response(self.ext_api) + ) + expected = pc['port_chain'] + expected.update(updates) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_chain'][k], v) + req = self.new_show_request( + 'port_chains', pc['port_chain']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_chain'][k], v) + + def test_update_port_chain_port_pair_groups(self): + with self.port_pair_group( + port_pair_group={} + ) as pg1, self.port_pair_group( + port_pair_group={} + ) as pg2: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg1['port_pair_group']['id']], + }) as pc: + updates = { + 'port_pair_groups': [pg2['port_pair_group']['id']] + } + req = self.new_update_request( + 'port_chains', {'port_chain': updates}, + pc['port_chain']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 400) + + def test_update_port_chain_chain_parameters(self): + with self.port_pair_group( + port_pair_group={} + ) as pg: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg['port_pair_group']['id']], + }) as pc: + updates = { + 'chain_parameters': {'correlation': 'mpls'} + } + req = self.new_update_request( + 'port_chains', {'port_chain': updates}, + pc['port_chain']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 400) + + def test_delete_port_chain(self): + with self.port_pair_group( + port_pair_group={} + ) as pg: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg['port_pair_group']['id']] + }, do_delete=False) as pc: + req = self.new_delete_request( + 'port_chains', pc['port_chain']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + req = self.new_show_request( + 'port_chains', pc['port_chain']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + req = self.new_show_request( + 'port_pair_groups', pg['port_pair_group']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 200) + + def test_delete_port_chain_noexist(self): + req = self.new_delete_request( + 'port_chains', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_delete_flow_classifier_port_chain_exist(self): + with self.flow_classifier(flow_classifier={ + }) as fc: + with self.port_pair_group(port_pair_group={ + }) as pg: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg['port_pair_group']['id']], + 'flow_classifiers': [fc['flow_classifier']['id']] + }): + req = self.new_delete_request( + 'flow_classifiers', fc['flow_classifier']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 409) + + def test_create_port_pair_group(self): + self._test_create_port_pair_group({}) + + def test_quota_create_port_pair_group_quota(self): + cfg.CONF.set_override('quota_port_pair_group', 3, group='QUOTAS') + self._create_port_pair_group( + self.fmt, {'port_pairs': []}, expected_res_status=201 + ) + self._create_port_pair_group( + self.fmt, {'port_pairs': []}, expected_res_status=201 + ) + self._create_port_pair_group( + self.fmt, {'port_pairs': []}, expected_res_status=201 + ) + self._create_port_pair_group( + self.fmt, {'port_pairs': []}, expected_res_status=409 + ) + + def test_create_port_pair_group_all_fields(self): + self._test_create_port_pair_group({ + 'name': 'test1', + 'description': 'desc1', + 'port_pairs': [] + }) + + def test_create_port_pair_group_with_port_pairs(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pp1, self.port_pair(port_pair={ + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pp2: + self._test_create_port_pair_group({ + 'port_pairs': [ + pp1['port_pair']['id'], + pp2['port_pair']['id'] + ] + }) + + def test_create_port_pair_group_with_nouuid_port_pair_id(self): + self._create_port_pair_group( + self.fmt, {'port_pairs': ['unknown']}, + expected_res_status=400 + ) + + def test_create_port_pair_group_with_unknown_port_pair_id(self): + self._create_port_pair_group( + self.fmt, {'port_pairs': [uuidutils.generate_uuid()]}, + expected_res_status=404 + ) + + def test_create_port_pair_group_share_port_pair_id(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pp: + with self.port_pair_group(port_pair_group={ + 'port_pairs': [pp['port_pair']['id']] + }): + self._create_port_pair_group( + self.fmt, {'port_pairs': [pp['port_pair']['id']]}, + expected_res_status=409 + ) + + def test_list_port_pair_groups(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }) as pc1, self.port_pair_group(port_pair_group={ + 'name': 'test2' + }) as pc2: + port_pair_groups = [pc1, pc2] + self._test_list_resources( + 'port_pair_group', port_pair_groups + ) + + def test_list_port_pair_groups_with_params(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }) as pc1, self.port_pair_group(port_pair_group={ + 'name': 'test2' + }) as pc2: + self._test_list_resources( + 'port_pair_group', [pc1], + query_params='name=test1' + ) + self._test_list_resources( + 'port_pair_group', [pc2], + query_params='name=test2' + ) + self._test_list_resources( + 'port_pair_group', [], + query_params='name=test3' + ) + + def test_list_port_pair_groups_with_unknown_params(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }) as pc1, self.port_pair_group(port_pair_group={ + 'name': 'test2' + }) as pc2: + self._test_list_resources( + 'port_pair_group', [pc1, pc2], + query_params='hello=test3' + ) + + def test_show_port_pair_group(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }) as pc: + req = self.new_show_request( + 'port_pair_groups', pc['port_pair_group']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(pc['port_pair_group']): + self.assertEqual(res['port_pair_group'][k], v) + + def test_show_port_pair_group_noexist(self): + req = self.new_show_request( + 'port_pair_groups', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_update_port_pair_group(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pp1, self.port_pair(port_pair={ + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pp2: + with self.port_pair_group(port_pair_group={ + 'name': 'test1', + 'description': 'desc1', + 'port_pairs': [pp1['port_pair']['id']] + }) as pg: + updates = { + 'name': 'test2', + 'description': 'desc2', + 'port_pairs': [pp2['port_pair']['id']] + } + req = self.new_update_request( + 'port_pair_groups', {'port_pair_group': updates}, + pg['port_pair_group']['id'] + ) + res = self.deserialize( + self.fmt, + req.get_response(self.ext_api) + ) + expected = pg['port_pair_group'] + expected.update(updates) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_pair_group'][k], v) + req = self.new_show_request( + 'port_pair_groups', pg['port_pair_group']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_pair_group'][k], v) + + def test_delete_port_pair_group(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }, do_delete=False) as pc: + req = self.new_delete_request( + 'port_pair_groups', pc['port_pair_group']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + req = self.new_show_request( + 'port_pair_groups', pc['port_pair_group']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_delete_port_pair_group_port_chain_exist(self): + with self.port_pair_group(port_pair_group={ + 'name': 'test1' + }) as pg: + with self.port_chain(port_chain={ + 'port_pair_groups': [pg['port_pair_group']['id']] + }): + req = self.new_delete_request( + 'port_pair_groups', pg['port_pair_group']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 409) + + def test_delete_port_pair_group_noexist(self): + req = self.new_delete_request( + 'port_pair_groups', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_create_port_pair(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + self._test_create_port_pair({ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) + + def test_quota_create_port_pair_quota(self): + cfg.CONF.set_override('quota_port_pair', 3, group='QUOTAS') + with self.port( + name='port1', + device_id='default' + ) as src_port1, self.port( + name='port2', + device_id='default' + ) as dst_port1, self.port( + name='port3', + device_id='default' + ) as src_port2, self.port( + name='port4', + device_id='default' + ) as dst_port2, self.port( + name='port5', + device_id='default' + ) as src_port3, self.port( + name='port6', + device_id='default' + ) as dst_port3, self.port( + name='port7', + device_id='default' + ) as src_port4, self.port( + name='port8', + device_id='default' + ) as dst_port4: + self._create_port_pair( + self.fmt, { + 'ingress': src_port1['port']['id'], + 'egress': dst_port1['port']['id'] + }, expected_res_status=201) + self._create_port_pair( + self.fmt, { + 'ingress': src_port2['port']['id'], + 'egress': dst_port2['port']['id'] + }, expected_res_status=201) + self._create_port_pair( + self.fmt, { + 'ingress': src_port3['port']['id'], + 'egress': dst_port3['port']['id'] + }, expected_res_status=201) + self._create_port_pair( + self.fmt, { + 'ingress': src_port4['port']['id'], + 'egress': dst_port4['port']['id'] + }, expected_res_status=409) + + def test_create_port_pair_all_fields(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + self._test_create_port_pair({ + 'name': 'test1', + 'description': 'desc1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'], + 'service_function_parameters': {'correlation': None} + }) + + def test_create_port_pair_none_service_function_parameters(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + self._test_create_port_pair({ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'], + 'service_function_parameters': None + }) + + def test_create_port_pair_empty_service_function_parameters(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + self._test_create_port_pair({ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'], + 'service_function_parameters': {} + }) + + def test_create_port_pair_with_src_dst_same_port(self): + with self.port( + name='port1', + device_id='default' + ) as src_dst_port: + self._test_create_port_pair({ + 'ingress': src_dst_port['port']['id'], + 'egress': src_dst_port['port']['id'] + }) + + def test_create_port_pair_empty_input(self): + self._create_port_pair(self.fmt, {}, expected_res_status=400) + + def test_create_port_pair_with_no_ingress(self): + with self.port( + name='port1', + device_id='default' + ) as dst_port: + self._create_port_pair( + self.fmt, + { + 'egress': dst_port['port']['id'] + }, + expected_res_status=400 + ) + + def test_create_port_pair_with_no_egress(self): + with self.port( + name='port1', + device_id='default' + ) as src_port: + self._create_port_pair( + self.fmt, + { + 'ingress': src_port['port']['id'] + }, + expected_res_status=400 + ) + + def test_create_port_pair_with_nouuid_ingress(self): + with self.port( + name='port1', + device_id='default' + ) as dst_port: + self._create_port_pair( + self.fmt, + { + 'ingress': '1', + 'egress': dst_port['port']['id'] + }, + expected_res_status=400 + ) + + def test_create_port_pair_with_unknown_ingress(self): + with self.port( + name='port1', + device_id='default' + ) as dst_port: + self._create_port_pair( + self.fmt, + { + 'ingress': uuidutils.generate_uuid(), + 'egress': dst_port['port']['id'] + }, + expected_res_status=404 + ) + + def test_create_port_pair_with_nouuid_egress(self): + with self.port( + name='port1', + device_id='default' + ) as src_port: + self._create_port_pair( + self.fmt, + { + 'ingress': src_port['port']['id'], + 'egress': '1' + }, + expected_res_status=400 + ) + + def test_create_port_pair_with_unkown_egress(self): + with self.port( + name='port1', + device_id='default' + ) as src_port: + self._create_port_pair( + self.fmt, + { + 'ingress': src_port['port']['id'], + 'egress': uuidutils.generate_uuid() + }, + expected_res_status=404 + ) + + def test_create_port_pair_ingress_egress_different_hosts(self): + with self.port( + name='port1', + device_id='device1' + ) as src_port, self.port( + name='port2', + device_id='device2' + ) as dst_port: + self._create_port_pair( + self.fmt, + { + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }, + expected_res_status=400 + ) + + def test_create_port_pair_with_invalid_service_function_parameters(self): + with self.port( + name='port1', + device_id='default' + ) as src_dst_port: + self._create_port_pair( + self.fmt, + { + 'ingress': src_dst_port['port']['id'], + 'egress': src_dst_port['port']['id'], + 'service_function_parameters': {'abc': 'def'} + }, + expected_res_status=400 + ) + + def test_list_port_pairs(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc1, self.port_pair(port_pair={ + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pc2: + port_pairs = [pc1, pc2] + self._test_list_resources( + 'port_pair', port_pairs + ) + + def test_list_port_pairs_with_params(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc1, self.port_pair(port_pair={ + 'name': 'test2', + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pc2: + self._test_list_resources( + 'port_pair', [pc1], + query_params='name=test1' + ) + self._test_list_resources( + 'port_pair', [pc2], + query_params='name=test2' + ) + self._test_list_resources( + 'port_pair', [], + query_params='name=test3' + ) + + def test_list_port_pairs_with_unknown_params(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc1, self.port_pair(port_pair={ + 'name': 'test2', + 'ingress': dst_port['port']['id'], + 'egress': src_port['port']['id'] + }) as pc2: + port_pairs = [pc1, pc2] + self._test_list_resources( + 'port_pair', port_pairs, + query_params='hello=test3' + ) + + def test_show_port_pair(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc: + req = self.new_show_request( + 'port_pairs', pc['port_pair']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(pc['port_pair']): + self.assertEqual(res['port_pair'][k], v) + + def test_show_port_pair_noexist(self): + req = self.new_show_request( + 'port_pairs', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_update_port_pair(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'description': 'desc1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc: + updates = { + 'name': 'test2', + 'description': 'desc2' + } + req = self.new_update_request( + 'port_pairs', {'port_pair': updates}, + pc['port_pair']['id'] + ) + res = self.deserialize( + self.fmt, + req.get_response(self.ext_api) + ) + expected = pc['port_pair'] + expected.update(updates) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_pair'][k], v) + req = self.new_show_request( + 'port_pairs', pc['port_pair']['id'] + ) + res = self.deserialize( + self.fmt, req.get_response(self.ext_api) + ) + for k, v in six.iteritems(expected): + self.assertEqual(res['port_pair'][k], v) + + def test_update_port_pair_service_function_parameters(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'description': 'desc1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc: + updates = { + 'service_function_parameters': { + 'correlation': None + } + } + req = self.new_update_request( + 'port_pairs', {'port_pair': updates}, + pc['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 400) + + def test_update_port_pair_ingress(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'description': 'desc1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc: + updates = { + 'ingress': dst_port['port']['id'] + } + req = self.new_update_request( + 'port_pairs', {'port_pair': updates}, + pc['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 400) + + def test_update_port_pair_egress(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'name': 'test1', + 'description': 'desc1', + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pc: + updates = { + 'egress': src_port['port']['id'] + } + req = self.new_update_request( + 'port_pairs', {'port_pair': updates}, + pc['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 400) + + def test_delete_port_pair(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }, do_delete=False) as pc: + req = self.new_delete_request( + 'port_pairs', pc['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 204) + req = self.new_show_request( + 'port_pairs', pc['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_delete_port_pair_noexist(self): + req = self.new_delete_request( + 'port_pairs', '1' + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 404) + + def test_delete_port_pair_port_pair_group_exist(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }) as pp: + with self.port_pair_group(port_pair_group={ + 'port_pairs': [pp['port_pair']['id']] + }): + req = self.new_delete_request( + 'port_pairs', pp['port_pair']['id'] + ) + res = req.get_response(self.ext_api) + self.assertEqual(res.status_int, 409) + + def test_delete_ingress_port_pair_exist(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }): + req = self.new_delete_request( + 'ports', src_port['port']['id'] + ) + res = req.get_response(self.api) + self.assertEqual(res.status_int, 500) + + def test_delete_egress_port_pair_exist(self): + with self.port( + name='port1', + device_id='default' + ) as src_port, self.port( + name='port2', + device_id='default' + ) as dst_port: + with self.port_pair(port_pair={ + 'ingress': src_port['port']['id'], + 'egress': dst_port['port']['id'] + }): + req = self.new_delete_request( + 'ports', dst_port['port']['id'] + ) + res = req.get_response(self.api) + self.assertEqual(res.status_int, 500) |