summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py')
-rw-r--r--networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py577
1 files changed, 577 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py b/networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py
new file mode 100644
index 0000000..7e8c7fc
--- /dev/null
+++ b/networking-odl/networking_odl/tests/unit/ml2/test_mechanism_odl_v2.py
@@ -0,0 +1,577 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import datetime
+
+from networking_odl.common import callback
+from networking_odl.common import client
+from networking_odl.common import constants as odl_const
+from networking_odl.common import filters
+from networking_odl.db import db
+from networking_odl.journal import cleanup
+from networking_odl.journal import journal
+from networking_odl.ml2 import mech_driver_v2
+
+import mock
+from oslo_config import cfg
+from oslo_serialization import jsonutils
+import requests
+
+from neutron.db import api as neutron_db_api
+from neutron import manager
+from neutron.plugins.ml2 import config as config
+from neutron.plugins.ml2 import plugin
+from neutron.tests.unit.plugins.ml2 import test_plugin
+from neutron.tests.unit import testlib_api
+
+cfg.CONF.import_group('ml2_odl', 'networking_odl.common.config')
+
+SECURITY_GROUP = '2f9244b4-9bee-4e81-bc4a-3f3c2045b3d7'
+SG_FAKE_ID = 'sg_fake_uuid'
+SG_RULE_FAKE_ID = 'sg_rule_fake_uuid'
+
+
+class OpenDaylightConfigBase(test_plugin.Ml2PluginV2TestCase):
+ def setUp(self):
+ super(OpenDaylightConfigBase, self).setUp()
+ config.cfg.CONF.set_override('mechanism_drivers',
+ ['logger', 'opendaylight'], 'ml2')
+ config.cfg.CONF.set_override('url', 'http://127.0.0.1:9999', 'ml2_odl')
+ config.cfg.CONF.set_override('username', 'someuser', 'ml2_odl')
+ config.cfg.CONF.set_override('password', 'somepass', 'ml2_odl')
+
+
+class OpenDaylightTestCase(OpenDaylightConfigBase):
+ def setUp(self):
+ super(OpenDaylightTestCase, self).setUp()
+ self.port_create_status = 'DOWN'
+ self.mech = mech_driver_v2.OpenDaylightMechanismDriver()
+ mock.patch.object(journal.OpendaylightJournalThread,
+ 'start_odl_sync_thread').start()
+ self.mock_sendjson = mock.patch.object(client.OpenDaylightRestClient,
+ 'sendjson').start()
+ self.mock_sendjson.side_effect = self.check_sendjson
+
+ def check_sendjson(self, method, urlpath, obj):
+ self.assertFalse(urlpath.startswith("http://"))
+
+
+class OpenDayLightMechanismConfigTests(testlib_api.SqlTestCase):
+ def _set_config(self, url='http://127.0.0.1:9999', username='someuser',
+ password='somepass'):
+ config.cfg.CONF.set_override('mechanism_drivers',
+ ['logger', 'opendaylight'],
+ 'ml2')
+ config.cfg.CONF.set_override('url', url, 'ml2_odl')
+ config.cfg.CONF.set_override('username', username, 'ml2_odl')
+ config.cfg.CONF.set_override('password', password, 'ml2_odl')
+
+ def _test_missing_config(self, **kwargs):
+ self._set_config(**kwargs)
+ self.assertRaises(config.cfg.RequiredOptError,
+ plugin.Ml2Plugin)
+
+ def test_valid_config(self):
+ self._set_config()
+ plugin.Ml2Plugin()
+
+ def test_missing_url_raises_exception(self):
+ self._test_missing_config(url=None)
+
+ def test_missing_username_raises_exception(self):
+ self._test_missing_config(username=None)
+
+ def test_missing_password_raises_exception(self):
+ self._test_missing_config(password=None)
+
+
+class OpenDaylightMechanismTestBasicGet(test_plugin.TestMl2BasicGet,
+ OpenDaylightTestCase):
+ pass
+
+
+class OpenDaylightMechanismTestNetworksV2(test_plugin.TestMl2NetworksV2,
+ OpenDaylightTestCase):
+ pass
+
+
+class OpenDaylightMechanismTestSubnetsV2(test_plugin.TestMl2SubnetsV2,
+ OpenDaylightTestCase):
+ pass
+
+
+class OpenDaylightMechanismTestPortsV2(test_plugin.TestMl2PortsV2,
+ OpenDaylightTestCase):
+ pass
+
+
+class DataMatcher(object):
+
+ def __init__(self, operation, object_type, context):
+ if object_type in [odl_const.ODL_SG, odl_const.ODL_SG_RULE]:
+ self._data = context[object_type].copy()
+ else:
+ self._data = context.current.copy()
+ self._object_type = object_type
+ filters.filter_for_odl(object_type, operation, self._data)
+
+ def __eq__(self, s):
+ data = jsonutils.loads(s)
+ return self._data == data[self._object_type]
+
+ def __ne__(self, s):
+ return not self.__eq__(s)
+
+
+class AttributeDict(dict):
+ def __init__(self, *args, **kwargs):
+ super(AttributeDict, self).__init__(*args, **kwargs)
+ self.__dict__ = self
+
+
+class OpenDaylightMechanismDriverTestCase(OpenDaylightConfigBase):
+ def setUp(self):
+ super(OpenDaylightMechanismDriverTestCase, self).setUp()
+ self.db_session = neutron_db_api.get_session()
+ self.mech = mech_driver_v2.OpenDaylightMechanismDriver()
+ self.mock_sync_thread = mock.patch.object(
+ journal.OpendaylightJournalThread, 'start_odl_sync_thread').start()
+ self.mech.initialize()
+ self.thread = journal.OpendaylightJournalThread()
+ self.addCleanup(self._db_cleanup)
+
+ @staticmethod
+ def _get_mock_network_operation_context():
+ current = {'status': 'ACTIVE',
+ 'subnets': [],
+ 'name': 'net1',
+ 'provider:physical_network': None,
+ 'admin_state_up': True,
+ 'tenant_id': 'test-tenant',
+ 'provider:network_type': 'local',
+ 'router:external': False,
+ 'shared': False,
+ 'id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
+ 'provider:segmentation_id': None}
+ context = mock.Mock(current=current)
+ context._plugin_context.session = neutron_db_api.get_session()
+ return context
+
+ @staticmethod
+ def _get_mock_subnet_operation_context():
+ current = {'ipv6_ra_mode': None,
+ 'allocation_pools': [{'start': '10.0.0.2',
+ 'end': '10.0.1.254'}],
+ 'host_routes': [],
+ 'ipv6_address_mode': None,
+ 'cidr': '10.0.0.0/23',
+ 'id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839',
+ 'name': '',
+ 'enable_dhcp': True,
+ 'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
+ 'tenant_id': 'test-tenant',
+ 'dns_nameservers': [],
+ 'gateway_ip': '10.0.0.1',
+ 'ip_version': 4,
+ 'shared': False}
+ context = mock.Mock(current=current)
+ context._plugin_context.session = neutron_db_api.get_session()
+ return context
+
+ @staticmethod
+ def _get_mock_port_operation_context():
+ current = {'status': 'DOWN',
+ 'binding:host_id': '',
+ 'allowed_address_pairs': [],
+ 'device_owner': 'fake_owner',
+ 'binding:profile': {},
+ 'fixed_ips': [{
+ 'subnet_id': '72c56c48-e9b8-4dcf-b3a7-0813bb3bd839'}],
+ 'id': '83d56c48-e9b8-4dcf-b3a7-0813bb3bd940',
+ 'security_groups': [SECURITY_GROUP],
+ 'device_id': 'fake_device',
+ 'name': '',
+ 'admin_state_up': True,
+ 'network_id': 'd897e21a-dfd6-4331-a5dd-7524fa421c3e',
+ 'tenant_id': 'test-tenant',
+ 'binding:vif_details': {},
+ 'binding:vnic_type': 'normal',
+ 'binding:vif_type': 'unbound',
+ 'mac_address': '12:34:56:78:21:b6'}
+ _network = OpenDaylightMechanismDriverTestCase.\
+ _get_mock_network_operation_context().current
+ _plugin = manager.NeutronManager.get_plugin()
+ _plugin.get_security_group = mock.Mock(return_value=SECURITY_GROUP)
+ _plugin.get_port = mock.Mock(return_value=current)
+ _plugin.get_network = mock.Mock(return_value=_network)
+ _plugin_context_mock = {'session': neutron_db_api.get_session()}
+ _network_context_mock = {'_network': _network}
+ context = {'current': AttributeDict(current),
+ '_plugin': _plugin,
+ '_plugin_context': AttributeDict(_plugin_context_mock),
+ '_network_context': AttributeDict(_network_context_mock)}
+ return AttributeDict(context)
+
+ @staticmethod
+ def _get_mock_security_group_operation_context():
+ context = {odl_const.ODL_SG: {'name': 'test_sg',
+ 'id': SG_FAKE_ID}}
+ return context
+
+ @staticmethod
+ def _get_mock_security_group_rule_operation_context():
+ context = {odl_const.ODL_SG_RULE: {'security_group_id': SG_FAKE_ID,
+ 'id': SG_RULE_FAKE_ID}}
+ return context
+
+ @classmethod
+ def _get_mock_operation_context(cls, object_type):
+ getter = getattr(cls, '_get_mock_%s_operation_context' % object_type)
+ return getter()
+
+ _status_code_msgs = {
+ 200: '',
+ 201: '',
+ 204: '',
+ 400: '400 Client Error: Bad Request',
+ 401: '401 Client Error: Unauthorized',
+ 403: '403 Client Error: Forbidden',
+ 404: '404 Client Error: Not Found',
+ 409: '409 Client Error: Conflict',
+ 501: '501 Server Error: Not Implemented',
+ 503: '503 Server Error: Service Unavailable',
+ }
+
+ def _db_cleanup(self):
+ rows = db.get_all_db_rows(self.db_session)
+ for row in rows:
+ db.delete_row(self.db_session, row=row)
+
+ @classmethod
+ def _get_mock_request_response(cls, status_code):
+ response = mock.Mock(status_code=status_code)
+ response.raise_for_status = mock.Mock() if status_code < 400 else (
+ mock.Mock(side_effect=requests.exceptions.HTTPError(
+ cls._status_code_msgs[status_code])))
+ return response
+
+ def _test_operation(self, method, status_code, expected_calls,
+ *args, **kwargs):
+ request_response = self._get_mock_request_response(status_code)
+ with mock.patch('requests.request',
+ return_value=request_response) as mock_method:
+ method(exit_after_run=True)
+
+ if expected_calls:
+ mock_method.assert_called_with(
+ headers={'Content-Type': 'application/json'},
+ auth=(config.cfg.CONF.ml2_odl.username,
+ config.cfg.CONF.ml2_odl.password),
+ timeout=config.cfg.CONF.ml2_odl.timeout, *args, **kwargs)
+ self.assertEqual(expected_calls, mock_method.call_count)
+
+ def _call_operation_object(self, operation, object_type):
+ context = self._get_mock_operation_context(object_type)
+
+ if object_type in [odl_const.ODL_SG, odl_const.ODL_SG_RULE]:
+ res_type = [rt for rt in callback._RESOURCE_MAPPING.values()
+ if rt.singular == object_type][0]
+ self.mech.sync_from_callback(operation, res_type,
+ context[object_type]['id'], context)
+ else:
+ method = getattr(self.mech, '%s_%s_precommit' % (operation,
+ object_type))
+ method(context)
+
+ def _test_operation_object(self, operation, object_type):
+ self._call_operation_object(operation, object_type)
+
+ context = self._get_mock_operation_context(object_type)
+ row = db.get_oldest_pending_db_row_with_lock(self.db_session)
+ self.assertEqual(operation, row['operation'])
+ self.assertEqual(object_type, row['object_type'])
+ self.assertEqual(context.current['id'], row['object_uuid'])
+
+ def _test_thread_processing(self, operation, object_type,
+ expected_calls=1):
+ http_requests = {odl_const.ODL_CREATE: 'post',
+ odl_const.ODL_UPDATE: 'put',
+ odl_const.ODL_DELETE: 'delete'}
+ status_codes = {odl_const.ODL_CREATE: requests.codes.created,
+ odl_const.ODL_UPDATE: requests.codes.ok,
+ odl_const.ODL_DELETE: requests.codes.no_content}
+
+ http_request = http_requests[operation]
+ status_code = status_codes[operation]
+
+ self._call_operation_object(operation, object_type)
+
+ context = self._get_mock_operation_context(object_type)
+ url_object_type = object_type.replace('_', '-')
+ if operation in [odl_const.ODL_UPDATE, odl_const.ODL_DELETE]:
+ if object_type in [odl_const.ODL_SG, odl_const.ODL_SG_RULE]:
+ uuid = context[object_type]['id']
+ else:
+ uuid = context.current['id']
+ url = '%s/%ss/%s' % (config.cfg.CONF.ml2_odl.url, url_object_type,
+ uuid)
+ else:
+ url = '%s/%ss' % (config.cfg.CONF.ml2_odl.url, url_object_type)
+
+ if operation in [odl_const.ODL_CREATE, odl_const.ODL_UPDATE]:
+ kwargs = {
+ 'url': url,
+ 'data': DataMatcher(operation, object_type, context)}
+ else:
+ kwargs = {'url': url, 'data': None}
+ with mock.patch.object(self.thread.event, 'wait',
+ return_value=False):
+ self._test_operation(self.thread.run_sync_thread, status_code,
+ expected_calls, http_request, **kwargs)
+
+ def _test_object_type(self, object_type):
+ # Add and process create request.
+ self._test_thread_processing(odl_const.ODL_CREATE, object_type)
+ rows = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.COMPLETED)
+ self.assertEqual(1, len(rows))
+
+ # Add and process update request. Adds to database.
+ self._test_thread_processing(odl_const.ODL_UPDATE, object_type)
+ rows = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.COMPLETED)
+ self.assertEqual(2, len(rows))
+
+ # Add and process update request. Adds to database.
+ self._test_thread_processing(odl_const.ODL_DELETE, object_type)
+ rows = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.COMPLETED)
+ self.assertEqual(3, len(rows))
+
+ def _test_object_type_pending_network(self, object_type):
+ # Create a network (creates db row in pending state).
+ self._call_operation_object(odl_const.ODL_CREATE,
+ odl_const.ODL_NETWORK)
+
+ # Create object_type database row and process. This results in both
+ # the object_type and network rows being processed.
+ self._test_thread_processing(odl_const.ODL_CREATE, object_type,
+ expected_calls=2)
+
+ # Verify both rows are now marked as completed.
+ rows = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.COMPLETED)
+ self.assertEqual(2, len(rows))
+
+ def _test_object_type_processing_network(self, object_type):
+ self._test_object_operation_pending_another_object_operation(
+ object_type, odl_const.ODL_CREATE, odl_const.ODL_NETWORK,
+ odl_const.ODL_CREATE)
+
+ def _test_object_operation_pending_object_operation(
+ self, object_type, operation, pending_operation):
+ self._test_object_operation_pending_another_object_operation(
+ object_type, operation, object_type, pending_operation)
+
+ def _test_object_operation_pending_another_object_operation(
+ self, object_type, operation, pending_type, pending_operation):
+ # Create the object_type (creates db row in pending state).
+ self._call_operation_object(pending_operation,
+ pending_type)
+
+ # Get pending row and mark as processing so that
+ # this row will not be processed by journal thread.
+ row = db.get_all_db_rows_by_state(self.db_session, odl_const.PENDING)
+ db.update_db_row_state(self.db_session, row[0], odl_const.PROCESSING)
+
+ # Create the object_type database row and process.
+ # Verify that object request is not processed because the
+ # dependent object operation has not been marked as 'completed'.
+ self._test_thread_processing(operation,
+ object_type,
+ expected_calls=0)
+
+ # Verify that all rows are still in the database.
+ rows = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.PROCESSING)
+ self.assertEqual(1, len(rows))
+ rows = db.get_all_db_rows_by_state(self.db_session, odl_const.PENDING)
+ self.assertEqual(1, len(rows))
+
+ def _test_parent_delete_pending_child_delete(self, parent, child):
+ self._test_object_operation_pending_another_object_operation(
+ parent, odl_const.ODL_DELETE, child, odl_const.ODL_DELETE)
+
+ def _test_cleanup_processing_rows(self, last_retried, expected_state):
+ # Create a dummy network (creates db row in pending state).
+ self._call_operation_object(odl_const.ODL_CREATE,
+ odl_const.ODL_NETWORK)
+
+ # Get pending row and mark as processing and update
+ # the last_retried time
+ row = db.get_all_db_rows_by_state(self.db_session,
+ odl_const.PENDING)[0]
+ row.last_retried = last_retried
+ db.update_db_row_state(self.db_session, row, odl_const.PROCESSING)
+
+ # Test if the cleanup marks this in the desired state
+ # based on the last_retried timestamp
+ cleanup.JournalCleanup().cleanup_processing_rows(self.db_session)
+
+ # Verify that the Db row is in the desired state
+ rows = db.get_all_db_rows_by_state(self.db_session, expected_state)
+ self.assertEqual(1, len(rows))
+
+ def test_driver(self):
+ for operation in [odl_const.ODL_CREATE, odl_const.ODL_UPDATE,
+ odl_const.ODL_DELETE]:
+ for object_type in [odl_const.ODL_NETWORK, odl_const.ODL_SUBNET,
+ odl_const.ODL_PORT]:
+ self._test_operation_object(operation, object_type)
+
+ def test_port_precommit_no_tenant(self):
+ context = self._get_mock_operation_context(odl_const.ODL_PORT)
+ context.current['tenant_id'] = ''
+
+ method = getattr(self.mech, 'create_port_precommit')
+ method(context)
+
+ # Verify that the Db row has a tenant
+ rows = db.get_all_db_rows_by_state(self.db_session, odl_const.PENDING)
+ self.assertEqual(1, len(rows))
+ _network = OpenDaylightMechanismDriverTestCase.\
+ _get_mock_network_operation_context().current
+ self.assertEqual(_network['tenant_id'], rows[0]['data']['tenant_id'])
+
+ def test_network(self):
+ self._test_object_type(odl_const.ODL_NETWORK)
+
+ def test_network_update_pending_network_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_NETWORK, odl_const.ODL_UPDATE, odl_const.ODL_CREATE)
+
+ def test_network_delete_pending_network_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_NETWORK, odl_const.ODL_DELETE, odl_const.ODL_CREATE)
+
+ def test_network_delete_pending_network_update(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_NETWORK, odl_const.ODL_DELETE, odl_const.ODL_UPDATE)
+
+ def test_network_delete_pending_subnet_delete(self):
+ self._test_parent_delete_pending_child_delete(
+ odl_const.ODL_NETWORK, odl_const.ODL_SUBNET)
+
+ def test_network_delete_pending_port_delete(self):
+ self._test_parent_delete_pending_child_delete(
+ odl_const.ODL_NETWORK, odl_const.ODL_PORT)
+
+ def test_subnet(self):
+ self._test_object_type(odl_const.ODL_SUBNET)
+
+ def test_subnet_update_pending_subnet_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_SUBNET, odl_const.ODL_UPDATE, odl_const.ODL_CREATE)
+
+ def test_subnet_delete_pending_subnet_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_SUBNET, odl_const.ODL_DELETE, odl_const.ODL_CREATE)
+
+ def test_subnet_delete_pending_subnet_update(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_SUBNET, odl_const.ODL_DELETE, odl_const.ODL_UPDATE)
+
+ def test_subnet_pending_network(self):
+ self._test_object_type_pending_network(odl_const.ODL_SUBNET)
+
+ def test_subnet_processing_network(self):
+ self._test_object_type_processing_network(odl_const.ODL_SUBNET)
+
+ def test_subnet_delete_pending_port_delete(self):
+ self._test_parent_delete_pending_child_delete(
+ odl_const.ODL_SUBNET, odl_const.ODL_PORT)
+
+ def test_port(self):
+ self._test_object_type(odl_const.ODL_PORT)
+
+ def test_port_update_pending_port_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_PORT, odl_const.ODL_UPDATE, odl_const.ODL_CREATE)
+
+ def test_port_delete_pending_port_create(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_PORT, odl_const.ODL_DELETE, odl_const.ODL_CREATE)
+
+ def test_port_delete_pending_port_update(self):
+ self._test_object_operation_pending_object_operation(
+ odl_const.ODL_PORT, odl_const.ODL_DELETE, odl_const.ODL_UPDATE)
+
+ def test_port_pending_network(self):
+ self._test_object_type_pending_network(odl_const.ODL_PORT)
+
+ def test_port_processing_network(self):
+ self._test_object_type_processing_network(odl_const.ODL_PORT)
+
+ def test_cleanup_processing_rows_time_not_expired(self):
+ self._test_cleanup_processing_rows(datetime.datetime.utcnow(),
+ odl_const.PROCESSING)
+
+ def test_cleanup_processing_rows_time_expired(self):
+ old_time = datetime.datetime.utcnow() - datetime.timedelta(hours=24)
+ self._test_cleanup_processing_rows(old_time, odl_const.PENDING)
+
+ def test_thread_call(self):
+ """Verify that the sync thread method is called."""
+
+ # Create any object that would spin up the sync thread via the
+ # decorator call_thread_on_end() used by all the event handlers.
+ self._call_operation_object(odl_const.ODL_CREATE,
+ odl_const.ODL_NETWORK)
+
+ # Verify that the thread call was made.
+ self.assertTrue(self.mock_sync_thread.called)
+
+ def test_sg(self):
+ self._test_object_type(odl_const.ODL_SG)
+
+ def test_sg_rule(self):
+ self._test_object_type(odl_const.ODL_SG_RULE)
+
+ def _decrease_row_created_time(self, row):
+ row.created_at -= datetime.timedelta(hours=1)
+ self.db_session.merge(row)
+ self.db_session.flush()
+
+ def test_sync_multiple_updates(self):
+ # add 2 updates
+ for i in range(2):
+ self._call_operation_object(odl_const.ODL_UPDATE,
+ odl_const.ODL_NETWORK)
+
+ # get the last update row
+ last_row = db.get_all_db_rows(self.db_session)[-1]
+
+ # change the last update created time
+ self._decrease_row_created_time(last_row)
+
+ # create 1 more operation to trigger the sync thread
+ # verify that there are no calls to ODL controller, because the
+ # first row was not valid (exit_after_run = true)
+ self._test_thread_processing(odl_const.ODL_UPDATE,
+ odl_const.ODL_NETWORK, expected_calls=0)
+
+ # validate that all the rows are in 'pending' state
+ # first row should be set back to 'pending' because it was not valid
+ rows = db.get_all_db_rows_by_state(self.db_session, 'pending')
+ self.assertEqual(3, len(rows))