aboutsummaryrefslogtreecommitdiffstats
path: root/networking_sfc/tests/unit/db
diff options
context:
space:
mode:
Diffstat (limited to 'networking_sfc/tests/unit/db')
-rw-r--r--networking_sfc/tests/unit/db/__init__.py0
-rw-r--r--networking_sfc/tests/unit/db/test_flowclassifier_db.py677
-rw-r--r--networking_sfc/tests/unit/db/test_sfc_db.py1490
3 files changed, 2167 insertions, 0 deletions
diff --git a/networking_sfc/tests/unit/db/__init__.py b/networking_sfc/tests/unit/db/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/tests/unit/db/__init__.py
diff --git a/networking_sfc/tests/unit/db/test_flowclassifier_db.py b/networking_sfc/tests/unit/db/test_flowclassifier_db.py
new file mode 100644
index 0000000..36c9af8
--- /dev/null
+++ b/networking_sfc/tests/unit/db/test_flowclassifier_db.py
@@ -0,0 +1,677 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import mock
+import six
+import webob.exc
+
+from oslo_config import cfg
+from oslo_utils import importutils
+from oslo_utils import uuidutils
+
+from neutron.api import extensions as api_ext
+from neutron.common import config
+from neutron.common import constants as const
+import neutron.extensions as nextensions
+
+from networking_sfc.db import flowclassifier_db as fdb
+from networking_sfc import extensions
+from networking_sfc.extensions import flowclassifier as fc_ext
+from networking_sfc.tests import base
+
+
+DB_FLOWCLASSIFIER_PLUGIN_CLASS = (
+ "networking_sfc.db.flowclassifier_db.FlowClassifierDbPlugin"
+)
+extensions_path = ':'.join(extensions.__path__ + nextensions.__path__)
+
+
+class FlowClassifierDbPluginTestCaseBase(base.BaseTestCase):
+ def _create_flow_classifier(
+ self, fmt, flow_classifier=None, expected_res_status=None, **kwargs
+ ):
+ ctx = kwargs.get('context', None)
+ tenant_id = kwargs.get('tenant_id', self._tenant_id)
+ data = {'flow_classifier': flow_classifier or {}}
+ if ctx is None:
+ data['flow_classifier'].update({'tenant_id': tenant_id})
+ req = self.new_create_request(
+ 'flow_classifiers', data, fmt, context=ctx
+ )
+ res = req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(res.status_int, expected_res_status)
+ return res
+
+ @contextlib.contextmanager
+ def flow_classifier(
+ self, fmt=None, flow_classifier=None, do_delete=True, **kwargs
+ ):
+ if not fmt:
+ fmt = self.fmt
+ res = self._create_flow_classifier(fmt, flow_classifier, **kwargs)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ flow_classifier = self.deserialize(fmt or self.fmt, res)
+ yield flow_classifier
+ if do_delete:
+ self._delete('flow_classifiers',
+ flow_classifier['flow_classifier']['id'])
+
+ def _get_expected_flow_classifier(self, flow_classifier):
+ expected_flow_classifier = {
+ 'name': flow_classifier.get('name') or '',
+ 'description': flow_classifier.get('description') or '',
+ 'source_port_range_min': flow_classifier.get(
+ 'source_port_range_min'),
+ 'source_port_range_max': flow_classifier.get(
+ 'source_port_range_max'),
+ 'destination_port_range_min': flow_classifier.get(
+ 'destination_port_range_min'),
+ 'destination_port_range_max': flow_classifier.get(
+ 'destination_port_range_max'),
+ 'ethertype': flow_classifier.get(
+ 'ethertype') or 'IPv4',
+ 'protocol': flow_classifier.get(
+ 'protocol'),
+ 'l7_parameters': flow_classifier.get(
+ 'l7_parameters') or {}
+ }
+ if (
+ 'source_ip_prefix' in flow_classifier and
+ flow_classifier['source_ip_prefix']
+ ):
+ expected_flow_classifier['source_ip_prefix'] = (
+ flow_classifier['source_ip_prefix'])
+ if (
+ 'destination_ip_prefix' in flow_classifier and
+ flow_classifier['destination_ip_prefix']
+ ):
+ expected_flow_classifier['destination_ip_prefix'] = (
+ flow_classifier['destination_ip_prefix'])
+ return expected_flow_classifier
+
+ def _test_create_flow_classifier(
+ self, flow_classifier, expected_flow_classifier=None
+ ):
+ if expected_flow_classifier is None:
+ expected_flow_classifier = self._get_expected_flow_classifier(
+ flow_classifier)
+ with self.flow_classifier(flow_classifier=flow_classifier) as fc:
+ for k, v in six.iteritems(expected_flow_classifier):
+ self.assertIn(k, fc['flow_classifier'])
+ self.assertEqual(fc['flow_classifier'][k], v)
+
+
+class FlowClassifierDbPluginTestCase(
+ base.NeutronDbPluginV2TestCase,
+ FlowClassifierDbPluginTestCaseBase
+):
+ resource_prefix_map = dict(
+ (k, fc_ext.FLOW_CLASSIFIER_PREFIX)
+ for k in fc_ext.RESOURCE_ATTRIBUTE_MAP.keys()
+ )
+
+ def setUp(self, core_plugin=None, flowclassifier_plugin=None,
+ ext_mgr=None):
+ mock_log_p = mock.patch.object(fdb, 'LOG')
+ self.mock_log = mock_log_p.start()
+ cfg.CONF.register_opts(fc_ext.flow_classifier_quota_opts, 'QUOTAS')
+ if not flowclassifier_plugin:
+ flowclassifier_plugin = DB_FLOWCLASSIFIER_PLUGIN_CLASS
+ service_plugins = {
+ fc_ext.FLOW_CLASSIFIER_EXT: flowclassifier_plugin
+ }
+ fdb.FlowClassifierDbPlugin.supported_extension_aliases = [
+ fc_ext.FLOW_CLASSIFIER_EXT]
+ fdb.FlowClassifierDbPlugin.path_prefix = (
+ fc_ext.FLOW_CLASSIFIER_PREFIX
+ )
+ super(FlowClassifierDbPluginTestCase, self).setUp(
+ ext_mgr=ext_mgr,
+ plugin=core_plugin,
+ service_plugins=service_plugins
+ )
+ if not ext_mgr:
+ self.flowclassifier_plugin = importutils.import_object(
+ flowclassifier_plugin)
+ ext_mgr = api_ext.PluginAwareExtensionManager(
+ extensions_path,
+ {fc_ext.FLOW_CLASSIFIER_EXT: self.flowclassifier_plugin}
+ )
+ app = config.load_paste_app('extensions_test_app')
+ self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+ def test_create_flow_classifier(self):
+ self._test_create_flow_classifier({})
+
+ def test_quota_create_flow_classifier(self):
+ cfg.CONF.set_override('quota_flow_classifier', 3, group='QUOTAS')
+ self._create_flow_classifier(self.fmt, {}, expected_res_status=201)
+ self._create_flow_classifier(self.fmt, {}, expected_res_status=201)
+ self._create_flow_classifier(self.fmt, {}, expected_res_status=201)
+ self._create_flow_classifier(self.fmt, {}, expected_res_status=409)
+
+ def test_create_flow_classifier_with_all_fields(self):
+ self._test_create_flow_classifier({
+ 'name': 'test1',
+ 'ethertype': const.IPv4,
+ 'protocol': const.PROTO_NAME_TCP,
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 200,
+ 'destination_port_range_min': 101,
+ 'destination_port_range_max': 201,
+ 'source_ip_prefix': '10.100.0.0/16',
+ 'destination_ip_prefix': '10.200.0.0/16',
+ 'logical_source_port': None,
+ 'logical_destination_port': None,
+ 'l7_parameters': {}
+ })
+
+ def test_create_flow_classifier_with_all_supported_ethertype(self):
+ self._test_create_flow_classifier({
+ 'ethertype': None
+ })
+ self._test_create_flow_classifier({
+ 'ethertype': 'IPv4'
+ })
+ self._test_create_flow_classifier({
+ 'ethertype': 'IPv6'
+ })
+
+ def test_create_flow_classifier_with_invalid_ethertype(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'ethertype': 'unsupported',
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_all_supported_protocol(self):
+ self._test_create_flow_classifier({
+ 'protocol': None
+ })
+ self._test_create_flow_classifier({
+ 'protocol': const.PROTO_NAME_TCP
+ })
+ self._test_create_flow_classifier({
+ 'protocol': const.PROTO_NAME_UDP
+ })
+ self._test_create_flow_classifier({
+ 'protocol': const.PROTO_NAME_ICMP
+ })
+
+ def test_create_flow_classifier_with_invalid_protocol(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'protocol': 'unsupported',
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_all_supported_port_protocol(self):
+ self._test_create_flow_classifier({
+ 'source_port_range_min': None,
+ 'source_port_range_max': None,
+ 'destination_port_range_min': None,
+ 'destination_port_range_max': None
+ })
+ self._test_create_flow_classifier({
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 200,
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 200,
+ 'protocol': const.PROTO_NAME_TCP
+ })
+ self._test_create_flow_classifier({
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 100,
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 100,
+ 'protocol': const.PROTO_NAME_TCP
+ })
+ self._test_create_flow_classifier({
+ 'source_port_range_min': '100',
+ 'source_port_range_max': '200',
+ 'destination_port_range_min': '100',
+ 'destination_port_range_max': '200',
+ 'protocol': const.PROTO_NAME_UDP
+ }, {
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 200,
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 200,
+ 'protocol': const.PROTO_NAME_UDP
+ })
+
+ def test_create_flow_classifier_with_invalid__port_protocol(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 'abc',
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_max': 'abc',
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 99,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 65536,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_max': 65536,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': -1,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_max': -1,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 'abc',
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_max': 'abc',
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 99,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 65536,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_max': 65536,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': -1,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_max': -1,
+ 'protocol': const.PROTO_NAME_TCP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 100
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_max': 100
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 200
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_port_range_min': 100,
+ 'source_port_range_max': 200,
+ 'protocol': const.PROTO_NAME_ICMP
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 100
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_max': 100
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 200
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_port_range_min': 100,
+ 'destination_port_range_max': 200,
+ 'protocol': const.PROTO_NAME_ICMP
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_all_supported_ip_prefix(self):
+ self._test_create_flow_classifier({
+ 'source_ip_prefix': None,
+ 'destination_ip_prefix': None
+ })
+ self._test_create_flow_classifier({
+ 'source_ip_prefix': '10.0.0.0/8',
+ 'destination_ip_prefix': '10.0.0.0/8'
+ })
+
+ def test_create_flow_classifier_with_invalid_ip_prefix(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_ip_prefix': '10.0.0.0/34'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_ip_prefix': '10.0.0.0.0/8'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_ip_prefix': '256.0.0.0/8'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'source_ip_prefix': '10.0.0.0'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_ip_prefix': '10.0.0.0/34'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_ip_prefix': '10.0.0.0.0/8'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_ip_prefix': '256.0.0.0/8'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'destination_ip_prefix': '10.0.0.0'
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_all_supported_l7_parameters(self):
+ self._test_create_flow_classifier({
+ 'l7_parameters': None
+ })
+ self._test_create_flow_classifier({
+ 'l7_parameters': {}
+ })
+
+ def test_create_flow_classifier_with_invalid_l7_parameters(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'l7_parameters': {'abc': 'def'}
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_port_id(self):
+ self._test_create_flow_classifier({
+ 'logical_source_port': None,
+ 'logical_destination_port': None,
+ })
+ with self.port(
+ name='test1'
+ ) as port:
+ self._test_create_flow_classifier({
+ 'logical_source_port': port['port']['id'],
+ 'logical_destination_port': port['port']['id'],
+ })
+
+ def test_create_flow_classifier_with_nouuid_port_id(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'logical_source_port': 'abc'
+ },
+ expected_res_status=400
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'logical_destination_port': 'abc'
+ },
+ expected_res_status=400
+ )
+
+ def test_create_flow_classifier_with_unknown_port_id(self):
+ self._create_flow_classifier(
+ self.fmt, {
+ 'logical_source_port': uuidutils.generate_uuid()
+ },
+ expected_res_status=404
+ )
+ self._create_flow_classifier(
+ self.fmt, {
+ 'logical_destination_port': uuidutils.generate_uuid()
+ },
+ expected_res_status=404
+ )
+
+ def test_list_flow_classifiers(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1'
+ }) as fc1, self.flow_classifier(flow_classifier={
+ 'name': 'test2',
+ }) as fc2:
+ fcs = [fc1, fc2]
+ self._test_list_resources(
+ 'flow_classifier', fcs
+ )
+
+ def test_list_flow_classifiers_with_params(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1'
+ }) as fc1, self.flow_classifier(flow_classifier={
+ 'name': 'test2',
+ }) as fc2:
+ self._test_list_resources(
+ 'flow_classifier', [fc1],
+ query_params='name=test1'
+ )
+ self._test_list_resources(
+ 'flow_classifier', [fc2],
+ query_params='name=test2'
+ )
+ self._test_list_resources(
+ 'flow_classifier', [],
+ query_params='name=test3'
+ )
+
+ def test_list_flow_classifiers_with_unknown_params(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1'
+ }) as fc1, self.flow_classifier(flow_classifier={
+ 'name': 'test2',
+ }) as fc2:
+ self._test_list_resources(
+ 'flow_classifier', [fc1, fc2],
+ query_params='hello=test3'
+ )
+
+ def test_show_flow_classifier(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1'
+ }) as fc:
+ req = self.new_show_request(
+ 'flow_classifiers', fc['flow_classifier']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(fc['flow_classifier']):
+ self.assertEqual(res['flow_classifier'][k], v)
+
+ def test_show_flow_classifier_noexist(self):
+ req = self.new_show_request(
+ 'flow_classifiers', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_update_flow_classifier(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1',
+ 'description': 'desc1'
+ }) as fc:
+ updates = {
+ 'name': 'test2',
+ 'description': 'desc2',
+ }
+ req = self.new_update_request(
+ 'flow_classifiers', {'flow_classifier': updates},
+ fc['flow_classifier']['id']
+ )
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
+ expected = fc['flow_classifier']
+ expected.update(updates)
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['flow_classifier'][k], v)
+ req = self.new_show_request(
+ 'flow_classifiers', fc['flow_classifier']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['flow_classifier'][k], v)
+
+ def _test_update_with_field(
+ self, fc, updates, expected_status_code
+ ):
+ req = self.new_update_request(
+ 'flow_classifiers', {'flow_classifier': updates},
+ fc['flow_classifier']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, expected_status_code)
+
+ def test_update_flow_classifer_unsupported_fields(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1',
+ 'description': 'desc1'
+ }) as fc:
+ self._test_update_with_field(
+ fc, {'ethertype': None}, 400)
+ self._test_update_with_field(
+ fc, {'protocol': None}, 400)
+ self._test_update_with_field(
+ fc, {'source_port_range_min': None}, 400)
+ self._test_update_with_field(
+ fc, {'source_port_range_max': None}, 400)
+ self._test_update_with_field(
+ fc, {'destination_port_range_min': None}, 400)
+ self._test_update_with_field(
+ fc, {'destination_port_range_max': None}, 400)
+ self._test_update_with_field(
+ fc, {'source_ip_prefix': None}, 400)
+ self._test_update_with_field(
+ fc, {'destination_ip_prefix': None}, 400)
+ self._test_update_with_field(
+ fc, {'l7_parameters': None}, 400)
+
+ def test_delete_flow_classifier(self):
+ with self.flow_classifier(flow_classifier={
+ 'name': 'test1'
+ }, do_delete=False) as fc:
+ req = self.new_delete_request(
+ 'flow_classifiers', fc['flow_classifier']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ req = self.new_show_request(
+ 'flow_classifiers', fc['flow_classifier']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_delete_flow_classifier_noexist(self):
+ req = self.new_delete_request(
+ 'flow_classifiers', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
diff --git a/networking_sfc/tests/unit/db/test_sfc_db.py b/networking_sfc/tests/unit/db/test_sfc_db.py
new file mode 100644
index 0000000..70d57e3
--- /dev/null
+++ b/networking_sfc/tests/unit/db/test_sfc_db.py
@@ -0,0 +1,1490 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import mock
+import six
+import webob.exc
+
+from oslo_config import cfg
+from oslo_utils import importutils
+from oslo_utils import uuidutils
+
+from neutron.api import extensions as api_ext
+from neutron.common import config
+import neutron.extensions as nextensions
+
+from networking_sfc.db import flowclassifier_db as fdb
+from networking_sfc.db import sfc_db
+from networking_sfc import extensions
+from networking_sfc.extensions import flowclassifier as fc_ext
+from networking_sfc.extensions import sfc
+from networking_sfc.tests import base
+from networking_sfc.tests.unit.db import test_flowclassifier_db
+
+
+DB_SFC_PLUGIN_CLASS = (
+ "networking_sfc.db.sfc_db.SfcDbPlugin"
+)
+extensions_path = ':'.join(extensions.__path__ + nextensions.__path__)
+
+
+class SfcDbPluginTestCaseBase(
+ base.BaseTestCase
+):
+ def _create_port_chain(
+ self, fmt, port_chain=None, expected_res_status=None, **kwargs
+ ):
+ ctx = kwargs.get('context', None)
+ tenant_id = kwargs.get('tenant_id', self._tenant_id)
+ data = {'port_chain': port_chain or {}}
+ if ctx is None:
+ data['port_chain'].update({'tenant_id': tenant_id})
+ req = self.new_create_request(
+ 'port_chains', data, fmt, context=ctx
+ )
+ res = req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(res.status_int, expected_res_status)
+ return res
+
+ @contextlib.contextmanager
+ def port_chain(self, fmt=None, port_chain=None, do_delete=True, **kwargs):
+ if not fmt:
+ fmt = self.fmt
+ res = self._create_port_chain(fmt, port_chain, **kwargs)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ port_chain = self.deserialize(fmt or self.fmt, res)
+ yield port_chain
+ if do_delete:
+ self._delete('port_chains', port_chain['port_chain']['id'])
+
+ def _create_port_pair_group(
+ self, fmt, port_pair_group=None, expected_res_status=None, **kwargs
+ ):
+ ctx = kwargs.get('context', None)
+ tenant_id = kwargs.get('tenant_id', self._tenant_id)
+ data = {'port_pair_group': port_pair_group or {}}
+ if ctx is None:
+ data['port_pair_group'].update({'tenant_id': tenant_id})
+ req = self.new_create_request(
+ 'port_pair_groups', data, fmt, context=ctx
+ )
+ res = req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(res.status_int, expected_res_status)
+ return res
+
+ @contextlib.contextmanager
+ def port_pair_group(
+ self, fmt=None, port_pair_group=None, do_delete=True, **kwargs
+ ):
+ if not fmt:
+ fmt = self.fmt
+ res = self._create_port_pair_group(fmt, port_pair_group, **kwargs)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ port_pair_group = self.deserialize(fmt or self.fmt, res)
+ yield port_pair_group
+ if do_delete:
+ self._delete(
+ 'port_pair_groups',
+ port_pair_group['port_pair_group']['id'])
+
+ def _create_port_pair(
+ self, fmt, port_pair=None, expected_res_status=None, **kwargs
+ ):
+ ctx = kwargs.get('context', None)
+ tenant_id = kwargs.get('tenant_id', self._tenant_id)
+ data = {'port_pair': port_pair or {}}
+ if ctx is None:
+ data['port_pair'].update({'tenant_id': tenant_id})
+ req = self.new_create_request(
+ 'port_pairs', data, fmt, context=ctx
+ )
+ res = req.get_response(self.ext_api)
+ if expected_res_status:
+ self.assertEqual(res.status_int, expected_res_status)
+ return res
+
+ @contextlib.contextmanager
+ def port_pair(self, fmt=None, port_pair=None, do_delete=True, **kwargs):
+ if not fmt:
+ fmt = self.fmt
+ res = self._create_port_pair(fmt, port_pair, **kwargs)
+ if res.status_int >= 400:
+ raise webob.exc.HTTPClientError(code=res.status_int)
+ port_pair = self.deserialize(fmt or self.fmt, res)
+ yield port_pair
+ if do_delete:
+ self._delete('port_pairs', port_pair['port_pair']['id'])
+
+ def _get_expected_port_pair(self, port_pair):
+ return {
+ 'name': port_pair.get('name') or '',
+ 'description': port_pair.get('description') or '',
+ 'egress': port_pair.get('egress'),
+ 'ingress': port_pair.get('ingress'),
+ 'service_function_parameters': port_pair.get(
+ 'service_function_parameters') or {'correlation': None}
+ }
+
+ def _test_create_port_pair(self, port_pair, expected_port_pair=None):
+ if expected_port_pair is None:
+ expected_port_pair = self._get_expected_port_pair(port_pair)
+ with self.port_pair(port_pair=port_pair) as pp:
+ for k, v in six.iteritems(expected_port_pair):
+ self.assertEqual(pp['port_pair'][k], v)
+
+ def _test_create_port_pairs(
+ self, port_pairs, expected_port_pairs=None
+ ):
+ if port_pairs:
+ port_pair = port_pairs.pop()
+ if expected_port_pairs:
+ expected_port_pair = expected_port_pairs.pop()
+ else:
+ expected_port_pair = self._get_expected_port_pair(port_pair)
+ with self.port_pair(port_pair=port_pair) as pp:
+ for k, v in six.iteritems(expected_port_pair):
+ self.assertEqual(pp['port_pair'][k], v)
+
+ def _get_expected_port_pair_group(self, port_pair_group):
+ return {
+ 'name': port_pair_group.get('name') or '',
+ 'description': port_pair_group.get('description') or '',
+ 'port_pairs': port_pair_group.get('port_pairs') or []
+ }
+
+ def _test_create_port_pair_group(
+ self, port_pair_group, expected_port_pair_group=None
+ ):
+ if expected_port_pair_group is None:
+ expected_port_pair_group = self._get_expected_port_pair_group(
+ port_pair_group)
+ with self.port_pair_group(port_pair_group=port_pair_group) as pg:
+ for k, v in six.iteritems(expected_port_pair_group):
+ self.assertEqual(pg['port_pair_group'][k], v)
+
+ def _test_create_port_pair_groups(
+ self, port_pair_groups, expected_port_pair_groups=None
+ ):
+ if port_pair_groups:
+ port_pair_group = port_pair_groups.pop()
+ if expected_port_pair_groups:
+ expected_port_pair_group = expected_port_pair_groups.pop()
+ else:
+ expected_port_pair_group = self._get_expected_port_pair_group(
+ port_pair_group)
+ with self.port_pair_group(port_pair_group=port_pair_group) as pg:
+ for k, v in six.iteritems(expected_port_pair_group):
+ self.assertEqual(pg['port_pair_group'][k], v)
+
+ def _get_expected_port_chain(self, port_chain):
+ return {
+ 'name': port_chain.get('name') or '',
+ 'description': port_chain.get('description') or '',
+ 'port_pair_groups': port_chain['port_pair_groups'],
+ 'flow_classifiers': port_chain.get('flow_classifiers') or [],
+ 'chain_parameters': port_chain.get(
+ 'chain_parameters') or {'correlation': 'mpls'}
+ }
+
+ def _test_create_port_chain(self, port_chain, expected_port_chain=None):
+ if expected_port_chain is None:
+ expected_port_chain = self._get_expected_port_chain(port_chain)
+ with self.port_chain(port_chain=port_chain) as pc:
+ for k, v in six.iteritems(expected_port_chain):
+ self.assertEqual(pc['port_chain'][k], v)
+
+ def _test_create_port_chains(
+ self, port_chains, expected_port_chains=None
+ ):
+ if port_chains:
+ port_chain = port_chains.pop()
+ if expected_port_chains:
+ expected_port_chain = expected_port_chains.pop()
+ else:
+ expected_port_chain = self._get_expected_port_chain(
+ port_chain)
+ with self.port_chain(port_chain=port_chain) as pc:
+ for k, v in six.iteritems(expected_port_chain):
+ self.assertEqual(pc['port_chain'][k], v)
+
+
+class SfcDbPluginTestCase(
+ base.NeutronDbPluginV2TestCase,
+ test_flowclassifier_db.FlowClassifierDbPluginTestCaseBase,
+ SfcDbPluginTestCaseBase
+):
+ resource_prefix_map = dict([
+ (k, sfc.SFC_PREFIX)
+ for k in sfc.RESOURCE_ATTRIBUTE_MAP.keys()
+ ] + [
+ (k, fc_ext.FLOW_CLASSIFIER_PREFIX)
+ for k in fc_ext.RESOURCE_ATTRIBUTE_MAP.keys()
+ ])
+
+ def setUp(self, core_plugin=None, sfc_plugin=None,
+ flowclassifier_plugin=None, ext_mgr=None):
+ mock_log_p = mock.patch.object(sfc_db, 'LOG')
+ self.mock_log = mock_log_p.start()
+ cfg.CONF.register_opts(sfc.sfc_quota_opts, 'QUOTAS')
+ if not sfc_plugin:
+ sfc_plugin = DB_SFC_PLUGIN_CLASS
+ if not flowclassifier_plugin:
+ flowclassifier_plugin = (
+ test_flowclassifier_db.DB_FLOWCLASSIFIER_PLUGIN_CLASS)
+
+ service_plugins = {
+ sfc.SFC_EXT: sfc_plugin,
+ fc_ext.FLOW_CLASSIFIER_EXT: flowclassifier_plugin
+ }
+ sfc_db.SfcDbPlugin.supported_extension_aliases = [
+ "sfc"]
+ sfc_db.SfcDbPlugin.path_prefix = sfc.SFC_PREFIX
+ fdb.FlowClassifierDbPlugin.supported_extension_aliases = [
+ "flow_classifier"]
+ fdb.FlowClassifierDbPlugin.path_prefix = (
+ fc_ext.FLOW_CLASSIFIER_PREFIX
+ )
+ super(SfcDbPluginTestCase, self).setUp(
+ ext_mgr=ext_mgr,
+ plugin=core_plugin,
+ service_plugins=service_plugins
+ )
+ if not ext_mgr:
+ self.sfc_plugin = importutils.import_object(sfc_plugin)
+ self.flowclassifier_plugin = importutils.import_object(
+ flowclassifier_plugin)
+ ext_mgr = api_ext.PluginAwareExtensionManager(
+ extensions_path,
+ {
+ sfc.SFC_EXT: self.sfc_plugin,
+ fc_ext.FLOW_CLASSIFIER_EXT: self.flowclassifier_plugin
+ }
+ )
+ app = config.load_paste_app('extensions_test_app')
+ self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr)
+
+ def test_create_port_chain(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'port_pair_groups': [pg['port_pair_group']['id']]})
+
+ def test_quota_create_port_chain(self):
+ cfg.CONF.set_override('quota_port_chain', 3, group='QUOTAS')
+ with self.port_pair_group(
+ port_pair_group={}, do_delete=False
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}, do_delete=False
+ ) as pg2, self.port_pair_group(
+ port_pair_group={}, do_delete=False
+ ) as pg3, self.port_pair_group(
+ port_pair_group={}, do_delete=False
+ ) as pg4:
+ self._create_port_chain(
+ self.fmt, {
+ 'port_pair_groups': [pg1['port_pair_group']['id']]
+ }, expected_res_status=201)
+ self._create_port_chain(
+ self.fmt, {
+ 'port_pair_groups': [pg2['port_pair_group']['id']]
+ }, expected_res_status=201)
+ self._create_port_chain(
+ self.fmt, {
+ 'port_pair_groups': [pg3['port_pair_group']['id']]
+ }, expected_res_status=201)
+ self._create_port_chain(
+ self.fmt, {
+ 'port_pair_groups': [pg4['port_pair_group']['id']]
+ }, expected_res_status=409)
+
+ def test_create_port_chain_all_fields(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'port_pair_groups': [pg['port_pair_group']['id']],
+ 'flow_classifiers': [],
+ 'name': 'abc',
+ 'description': 'def',
+ 'chain_parameters': {'correlation': 'mpls'}
+ })
+
+ def test_create_port_chain_multi_port_pair_groups(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ self._test_create_port_chain({
+ 'port_pair_groups': [
+ pg1['port_pair_group']['id'],
+ pg2['port_pair_group']['id']
+ ]
+ })
+
+ def test_create_port_chain_shared_port_pair_groups(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2, self.port_pair_group(
+ port_pair_group={}
+ ) as pg3:
+ self._test_create_port_chains([{
+ 'port_pair_groups': [
+ pg1['port_pair_group']['id'],
+ pg2['port_pair_group']['id']
+ ]
+ }, {
+ 'port_pair_groups': [
+ pg1['port_pair_group']['id'],
+ pg3['port_pair_group']['id']
+ ]
+ }])
+
+ def test_create_port_chain_shared_port_pair_groups_different_order(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ self._test_create_port_chains([{
+ 'port_pair_groups': [
+ pg1['port_pair_group']['id'],
+ pg2['port_pair_group']['id']
+ ]
+ }, {
+ 'port_pair_groups': [
+ pg2['port_pair_group']['id'],
+ pg1['port_pair_group']['id']
+ ]
+ }])
+
+ def test_create_port_chain_with_empty_chain_parameters(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'chain_parameters': {},
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_none_chain_parameters(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'chain_parameters': None,
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_default_chain_parameters(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'chain_parameters': {'correlation': 'mpls'},
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_none_flow_classifiers(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'flow_classifiers': None,
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_empty_flow_classifiers(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'flow_classifiers': [],
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_flow_classifiers(self):
+ with self.flow_classifier(flow_classifier={}) as fc:
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'flow_classifiers': [fc['flow_classifier']['id']],
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_multi_flow_classifiers(self):
+ with self.flow_classifier(
+ flow_classifier={}
+ ) as fc1, self.flow_classifier(
+ flow_classifier={}
+ ) as fc2:
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._test_create_port_chain({
+ 'flow_classifiers': [
+ fc1['flow_classifier']['id'],
+ fc2['flow_classifier']['id']
+ ],
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ })
+
+ def test_create_port_chain_with_port_pairs(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pp1, self.port_pair(port_pair={
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pp2:
+ with self.port_pair_group(port_pair_group={
+ 'port_pairs': [
+ pp1['port_pair']['id']
+ ]
+ }) as pg1, self.port_pair_group(port_pair_group={
+ 'port_pairs': [
+ pp2['port_pair']['id']
+ ]
+ }) as pg2:
+ self._test_create_port_chain({
+ 'port_pair_groups': [
+ pg1['port_pair_group']['id'],
+ pg2['port_pair_group']['id']
+ ]
+ })
+
+ def test_create_port_chain_with_empty_port_pair_groups(self):
+ self._create_port_chain(
+ self.fmt, {'port_pair_groups': []},
+ expected_res_status=400
+ )
+
+ def test_create_port_chain_with_nonuuid_port_pair_group_id(self):
+ self._create_port_chain(
+ self.fmt, {'port_pair_groups': ['unknown']},
+ expected_res_status=400
+ )
+
+ def test_create_port_chain_with_unknown_port_pair_group_id(self):
+ self._create_port_chain(
+ self.fmt, {'port_pair_groups': [uuidutils.generate_uuid()]},
+ expected_res_status=404
+ )
+
+ def test_create_port_chain_with_same_port_pair_groups(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg:
+ with self.port_chain(
+ port_chain={
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }
+ ):
+ self._create_port_chain(
+ self.fmt, {
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }, expected_res_status=409
+ )
+
+ def test_create_port_chain_with_no_port_pair_groups(self):
+ self._create_port_chain(
+ self.fmt, {}, expected_res_status=400
+ )
+
+ def test_create_port_chain_with_invalid_chain_parameters(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._create_port_chain(
+ self.fmt, {
+ 'chain_parameters': {'correlation': 'unknown'},
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }, expected_res_status=400
+ )
+
+ def test_create_port_chain_unknown_flow_classifiers(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._create_port_chain(
+ self.fmt, {
+ 'flow_classifiers': [uuidutils.generate_uuid()],
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }, expected_res_status=404
+ )
+
+ def test_create_port_chain_nouuid_flow_classifiers(self):
+ with self.port_pair_group(port_pair_group={}) as pg:
+ self._create_port_chain(
+ self.fmt, {
+ 'flow_classifiers': ['unknown'],
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }, expected_res_status=400
+ )
+
+ def test_list_port_chains(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg1['port_pair_group']['id']]
+ }) as pc1, self.port_chain(port_chain={
+ 'port_pair_groups': [pg2['port_pair_group']['id']]
+ }) as pc2:
+ port_chains = [pc1, pc2]
+ self._test_list_resources(
+ 'port_chain', port_chains
+ )
+
+ def test_list_port_chains_with_params(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ with self.port_chain(port_chain={
+ 'name': 'test1',
+ 'port_pair_groups': [pg1['port_pair_group']['id']]
+ }) as pc1, self.port_chain(port_chain={
+ 'name': 'test2',
+ 'port_pair_groups': [pg2['port_pair_group']['id']]
+ }) as pc2:
+ self._test_list_resources(
+ 'port_chain', [pc1],
+ query_params='name=test1'
+ )
+ self._test_list_resources(
+ 'port_chain', [pc2],
+ query_params='name=test2'
+ )
+ self._test_list_resources(
+ 'port_chain', [],
+ query_params='name=test3'
+ )
+
+ def test_list_port_chains_with_unknown_params(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ with self.port_chain(port_chain={
+ 'name': 'test1',
+ 'port_pair_groups': [pg1['port_pair_group']['id']]
+ }) as pc1, self.port_chain(port_chain={
+ 'name': 'test2',
+ 'port_pair_groups': [pg2['port_pair_group']['id']]
+ }) as pc2:
+ self._test_list_resources(
+ 'port_chain', [pc1, pc2],
+ query_params='hello=test3'
+ )
+
+ def test_show_port_chain(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg:
+ with self.port_chain(port_chain={
+ 'name': 'test1',
+ 'description': 'portchain',
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }) as pc:
+ req = self.new_show_request(
+ 'port_chains', pc['port_chain']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ expected = self._get_expected_port_chain(pc['port_chain'])
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_chain'][k], v)
+
+ def test_show_port_chain_noexist(self):
+ req = self.new_show_request(
+ 'port_chains', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_update_port_chain(self):
+ with self.flow_classifier(
+ flow_classifier={}
+ ) as fc1, self.flow_classifier(
+ flow_classifier={}
+ ) as fc2:
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg:
+ with self.port_chain(port_chain={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'port_pair_groups': [pg['port_pair_group']['id']],
+ 'flow_classifiers': [fc1['flow_classifier']['id']]
+ }) as pc:
+ updates = {
+ 'name': 'test2',
+ 'description': 'desc2',
+ 'flow_classifiers': [fc2['flow_classifier']['id']]
+ }
+ req = self.new_update_request(
+ 'port_chains', {'port_chain': updates},
+ pc['port_chain']['id']
+ )
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
+ expected = pc['port_chain']
+ expected.update(updates)
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_chain'][k], v)
+ req = self.new_show_request(
+ 'port_chains', pc['port_chain']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_chain'][k], v)
+
+ def test_update_port_chain_port_pair_groups(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg1, self.port_pair_group(
+ port_pair_group={}
+ ) as pg2:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg1['port_pair_group']['id']],
+ }) as pc:
+ updates = {
+ 'port_pair_groups': [pg2['port_pair_group']['id']]
+ }
+ req = self.new_update_request(
+ 'port_chains', {'port_chain': updates},
+ pc['port_chain']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_port_chain_chain_parameters(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg['port_pair_group']['id']],
+ }) as pc:
+ updates = {
+ 'chain_parameters': {'correlation': 'mpls'}
+ }
+ req = self.new_update_request(
+ 'port_chains', {'port_chain': updates},
+ pc['port_chain']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_delete_port_chain(self):
+ with self.port_pair_group(
+ port_pair_group={}
+ ) as pg:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }, do_delete=False) as pc:
+ req = self.new_delete_request(
+ 'port_chains', pc['port_chain']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ req = self.new_show_request(
+ 'port_chains', pc['port_chain']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+ req = self.new_show_request(
+ 'port_pair_groups', pg['port_pair_group']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 200)
+
+ def test_delete_port_chain_noexist(self):
+ req = self.new_delete_request(
+ 'port_chains', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_delete_flow_classifier_port_chain_exist(self):
+ with self.flow_classifier(flow_classifier={
+ }) as fc:
+ with self.port_pair_group(port_pair_group={
+ }) as pg:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg['port_pair_group']['id']],
+ 'flow_classifiers': [fc['flow_classifier']['id']]
+ }):
+ req = self.new_delete_request(
+ 'flow_classifiers', fc['flow_classifier']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 409)
+
+ def test_create_port_pair_group(self):
+ self._test_create_port_pair_group({})
+
+ def test_quota_create_port_pair_group_quota(self):
+ cfg.CONF.set_override('quota_port_pair_group', 3, group='QUOTAS')
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': []}, expected_res_status=201
+ )
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': []}, expected_res_status=201
+ )
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': []}, expected_res_status=201
+ )
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': []}, expected_res_status=409
+ )
+
+ def test_create_port_pair_group_all_fields(self):
+ self._test_create_port_pair_group({
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'port_pairs': []
+ })
+
+ def test_create_port_pair_group_with_port_pairs(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pp1, self.port_pair(port_pair={
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pp2:
+ self._test_create_port_pair_group({
+ 'port_pairs': [
+ pp1['port_pair']['id'],
+ pp2['port_pair']['id']
+ ]
+ })
+
+ def test_create_port_pair_group_with_nouuid_port_pair_id(self):
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': ['unknown']},
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_group_with_unknown_port_pair_id(self):
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': [uuidutils.generate_uuid()]},
+ expected_res_status=404
+ )
+
+ def test_create_port_pair_group_share_port_pair_id(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pp:
+ with self.port_pair_group(port_pair_group={
+ 'port_pairs': [pp['port_pair']['id']]
+ }):
+ self._create_port_pair_group(
+ self.fmt, {'port_pairs': [pp['port_pair']['id']]},
+ expected_res_status=409
+ )
+
+ def test_list_port_pair_groups(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }) as pc1, self.port_pair_group(port_pair_group={
+ 'name': 'test2'
+ }) as pc2:
+ port_pair_groups = [pc1, pc2]
+ self._test_list_resources(
+ 'port_pair_group', port_pair_groups
+ )
+
+ def test_list_port_pair_groups_with_params(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }) as pc1, self.port_pair_group(port_pair_group={
+ 'name': 'test2'
+ }) as pc2:
+ self._test_list_resources(
+ 'port_pair_group', [pc1],
+ query_params='name=test1'
+ )
+ self._test_list_resources(
+ 'port_pair_group', [pc2],
+ query_params='name=test2'
+ )
+ self._test_list_resources(
+ 'port_pair_group', [],
+ query_params='name=test3'
+ )
+
+ def test_list_port_pair_groups_with_unknown_params(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }) as pc1, self.port_pair_group(port_pair_group={
+ 'name': 'test2'
+ }) as pc2:
+ self._test_list_resources(
+ 'port_pair_group', [pc1, pc2],
+ query_params='hello=test3'
+ )
+
+ def test_show_port_pair_group(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }) as pc:
+ req = self.new_show_request(
+ 'port_pair_groups', pc['port_pair_group']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(pc['port_pair_group']):
+ self.assertEqual(res['port_pair_group'][k], v)
+
+ def test_show_port_pair_group_noexist(self):
+ req = self.new_show_request(
+ 'port_pair_groups', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_update_port_pair_group(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pp1, self.port_pair(port_pair={
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pp2:
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'port_pairs': [pp1['port_pair']['id']]
+ }) as pg:
+ updates = {
+ 'name': 'test2',
+ 'description': 'desc2',
+ 'port_pairs': [pp2['port_pair']['id']]
+ }
+ req = self.new_update_request(
+ 'port_pair_groups', {'port_pair_group': updates},
+ pg['port_pair_group']['id']
+ )
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
+ expected = pg['port_pair_group']
+ expected.update(updates)
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_pair_group'][k], v)
+ req = self.new_show_request(
+ 'port_pair_groups', pg['port_pair_group']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_pair_group'][k], v)
+
+ def test_delete_port_pair_group(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }, do_delete=False) as pc:
+ req = self.new_delete_request(
+ 'port_pair_groups', pc['port_pair_group']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ req = self.new_show_request(
+ 'port_pair_groups', pc['port_pair_group']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_delete_port_pair_group_port_chain_exist(self):
+ with self.port_pair_group(port_pair_group={
+ 'name': 'test1'
+ }) as pg:
+ with self.port_chain(port_chain={
+ 'port_pair_groups': [pg['port_pair_group']['id']]
+ }):
+ req = self.new_delete_request(
+ 'port_pair_groups', pg['port_pair_group']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 409)
+
+ def test_delete_port_pair_group_noexist(self):
+ req = self.new_delete_request(
+ 'port_pair_groups', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_create_port_pair(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ self._test_create_port_pair({
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ })
+
+ def test_quota_create_port_pair_quota(self):
+ cfg.CONF.set_override('quota_port_pair', 3, group='QUOTAS')
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port1, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port1, self.port(
+ name='port3',
+ device_id='default'
+ ) as src_port2, self.port(
+ name='port4',
+ device_id='default'
+ ) as dst_port2, self.port(
+ name='port5',
+ device_id='default'
+ ) as src_port3, self.port(
+ name='port6',
+ device_id='default'
+ ) as dst_port3, self.port(
+ name='port7',
+ device_id='default'
+ ) as src_port4, self.port(
+ name='port8',
+ device_id='default'
+ ) as dst_port4:
+ self._create_port_pair(
+ self.fmt, {
+ 'ingress': src_port1['port']['id'],
+ 'egress': dst_port1['port']['id']
+ }, expected_res_status=201)
+ self._create_port_pair(
+ self.fmt, {
+ 'ingress': src_port2['port']['id'],
+ 'egress': dst_port2['port']['id']
+ }, expected_res_status=201)
+ self._create_port_pair(
+ self.fmt, {
+ 'ingress': src_port3['port']['id'],
+ 'egress': dst_port3['port']['id']
+ }, expected_res_status=201)
+ self._create_port_pair(
+ self.fmt, {
+ 'ingress': src_port4['port']['id'],
+ 'egress': dst_port4['port']['id']
+ }, expected_res_status=409)
+
+ def test_create_port_pair_all_fields(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ self._test_create_port_pair({
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id'],
+ 'service_function_parameters': {'correlation': None}
+ })
+
+ def test_create_port_pair_none_service_function_parameters(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ self._test_create_port_pair({
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id'],
+ 'service_function_parameters': None
+ })
+
+ def test_create_port_pair_empty_service_function_parameters(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ self._test_create_port_pair({
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id'],
+ 'service_function_parameters': {}
+ })
+
+ def test_create_port_pair_with_src_dst_same_port(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_dst_port:
+ self._test_create_port_pair({
+ 'ingress': src_dst_port['port']['id'],
+ 'egress': src_dst_port['port']['id']
+ })
+
+ def test_create_port_pair_empty_input(self):
+ self._create_port_pair(self.fmt, {}, expected_res_status=400)
+
+ def test_create_port_pair_with_no_ingress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as dst_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'egress': dst_port['port']['id']
+ },
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_with_no_egress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': src_port['port']['id']
+ },
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_with_nouuid_ingress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as dst_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': '1',
+ 'egress': dst_port['port']['id']
+ },
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_with_unknown_ingress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as dst_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': uuidutils.generate_uuid(),
+ 'egress': dst_port['port']['id']
+ },
+ expected_res_status=404
+ )
+
+ def test_create_port_pair_with_nouuid_egress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': src_port['port']['id'],
+ 'egress': '1'
+ },
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_with_unkown_egress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': src_port['port']['id'],
+ 'egress': uuidutils.generate_uuid()
+ },
+ expected_res_status=404
+ )
+
+ def test_create_port_pair_ingress_egress_different_hosts(self):
+ with self.port(
+ name='port1',
+ device_id='device1'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='device2'
+ ) as dst_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ },
+ expected_res_status=400
+ )
+
+ def test_create_port_pair_with_invalid_service_function_parameters(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_dst_port:
+ self._create_port_pair(
+ self.fmt,
+ {
+ 'ingress': src_dst_port['port']['id'],
+ 'egress': src_dst_port['port']['id'],
+ 'service_function_parameters': {'abc': 'def'}
+ },
+ expected_res_status=400
+ )
+
+ def test_list_port_pairs(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc1, self.port_pair(port_pair={
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pc2:
+ port_pairs = [pc1, pc2]
+ self._test_list_resources(
+ 'port_pair', port_pairs
+ )
+
+ def test_list_port_pairs_with_params(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc1, self.port_pair(port_pair={
+ 'name': 'test2',
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pc2:
+ self._test_list_resources(
+ 'port_pair', [pc1],
+ query_params='name=test1'
+ )
+ self._test_list_resources(
+ 'port_pair', [pc2],
+ query_params='name=test2'
+ )
+ self._test_list_resources(
+ 'port_pair', [],
+ query_params='name=test3'
+ )
+
+ def test_list_port_pairs_with_unknown_params(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc1, self.port_pair(port_pair={
+ 'name': 'test2',
+ 'ingress': dst_port['port']['id'],
+ 'egress': src_port['port']['id']
+ }) as pc2:
+ port_pairs = [pc1, pc2]
+ self._test_list_resources(
+ 'port_pair', port_pairs,
+ query_params='hello=test3'
+ )
+
+ def test_show_port_pair(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc:
+ req = self.new_show_request(
+ 'port_pairs', pc['port_pair']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(pc['port_pair']):
+ self.assertEqual(res['port_pair'][k], v)
+
+ def test_show_port_pair_noexist(self):
+ req = self.new_show_request(
+ 'port_pairs', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_update_port_pair(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc:
+ updates = {
+ 'name': 'test2',
+ 'description': 'desc2'
+ }
+ req = self.new_update_request(
+ 'port_pairs', {'port_pair': updates},
+ pc['port_pair']['id']
+ )
+ res = self.deserialize(
+ self.fmt,
+ req.get_response(self.ext_api)
+ )
+ expected = pc['port_pair']
+ expected.update(updates)
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_pair'][k], v)
+ req = self.new_show_request(
+ 'port_pairs', pc['port_pair']['id']
+ )
+ res = self.deserialize(
+ self.fmt, req.get_response(self.ext_api)
+ )
+ for k, v in six.iteritems(expected):
+ self.assertEqual(res['port_pair'][k], v)
+
+ def test_update_port_pair_service_function_parameters(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc:
+ updates = {
+ 'service_function_parameters': {
+ 'correlation': None
+ }
+ }
+ req = self.new_update_request(
+ 'port_pairs', {'port_pair': updates},
+ pc['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_port_pair_ingress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc:
+ updates = {
+ 'ingress': dst_port['port']['id']
+ }
+ req = self.new_update_request(
+ 'port_pairs', {'port_pair': updates},
+ pc['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_update_port_pair_egress(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'name': 'test1',
+ 'description': 'desc1',
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pc:
+ updates = {
+ 'egress': src_port['port']['id']
+ }
+ req = self.new_update_request(
+ 'port_pairs', {'port_pair': updates},
+ pc['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 400)
+
+ def test_delete_port_pair(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }, do_delete=False) as pc:
+ req = self.new_delete_request(
+ 'port_pairs', pc['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 204)
+ req = self.new_show_request(
+ 'port_pairs', pc['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_delete_port_pair_noexist(self):
+ req = self.new_delete_request(
+ 'port_pairs', '1'
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 404)
+
+ def test_delete_port_pair_port_pair_group_exist(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }) as pp:
+ with self.port_pair_group(port_pair_group={
+ 'port_pairs': [pp['port_pair']['id']]
+ }):
+ req = self.new_delete_request(
+ 'port_pairs', pp['port_pair']['id']
+ )
+ res = req.get_response(self.ext_api)
+ self.assertEqual(res.status_int, 409)
+
+ def test_delete_ingress_port_pair_exist(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }):
+ req = self.new_delete_request(
+ 'ports', src_port['port']['id']
+ )
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 500)
+
+ def test_delete_egress_port_pair_exist(self):
+ with self.port(
+ name='port1',
+ device_id='default'
+ ) as src_port, self.port(
+ name='port2',
+ device_id='default'
+ ) as dst_port:
+ with self.port_pair(port_pair={
+ 'ingress': src_port['port']['id'],
+ 'egress': dst_port['port']['id']
+ }):
+ req = self.new_delete_request(
+ 'ports', dst_port['port']['id']
+ )
+ res = req.get_response(self.api)
+ self.assertEqual(res.status_int, 500)