From 2f9d5cf583b4b5dd635f386cb9ac07435bd44cc8 Mon Sep 17 00:00:00 2001 From: Ilia Abashin Date: Fri, 25 Aug 2017 14:24:59 +0300 Subject: Heavily refactored all event based scans Restricted real mongo interaction. Still work to do, but it's a good start. Fixed a bug with old subnets not being properly deleted. Change-Id: I5f260e09f0e11a477a47cb031d397a454465123f Signed-off-by: Ilia Abashin --- app/test/event_based_scan/test_subnet_update.py | 41 ++++++++++++++++--------- 1 file changed, 26 insertions(+), 15 deletions(-) (limited to 'app/test/event_based_scan/test_subnet_update.py') diff --git a/app/test/event_based_scan/test_subnet_update.py b/app/test/event_based_scan/test_subnet_update.py index eddfe84..2749db9 100644 --- a/app/test/event_based_scan/test_subnet_update.py +++ b/app/test/event_based_scan/test_subnet_update.py @@ -11,35 +11,46 @@ from discover.events.event_subnet_update import EventSubnetUpdate from discover.fetchers.api.api_access import ApiAccess from test.event_based_scan.test_data.event_payload_subnet_add import \ EVENT_PAYLOAD_REGION -from test.event_based_scan.test_data.event_payload_subnet_update import EVENT_PAYLOAD_SUBNET_UPDATE, NETWORK_DOC +from test.event_based_scan.test_data.event_payload_subnet_update import \ + EVENT_PAYLOAD_SUBNET_UPDATE, NETWORK_DOC, HOST_DOC from test.event_based_scan.test_event import TestEvent class TestSubnetUpdate(TestEvent): + def get_by_id(self, env, object_id): + if object_id == self.network_id: + return NETWORK_DOC + elif object_id == self.host_id: + return HOST_DOC + else: + return None + def test_handle_subnet_add(self): self.values = EVENT_PAYLOAD_SUBNET_UPDATE self.payload = self.values['payload'] self.subnet = self.payload['subnet'] self.subnet_id = self.subnet['id'] self.network_id = self.subnet['network_id'] - self.item_ids.append(self.network_id) - - #add network document for subnet. - self.set_item(NETWORK_DOC) + self.host_id = self.values["publisher_id"].replace("network.", "", 1) + old_subnet_name = list(NETWORK_DOC['subnets'].keys())[0] + new_subnet_name = self.subnet['name'] - # check network document - network_document = self.inv.get_by_id(self.env, self.network_id) - self.assertIsNotNone(network_document) + self.inv.get_by_id.side_effect = self.get_by_id - # check region data. if not ApiAccess.regions: ApiAccess.regions = EVENT_PAYLOAD_REGION - handler = EventSubnetUpdate() - handler.handle(self.env, self.values) + res = EventSubnetUpdate().handle(self.env, self.values) + + self.assertTrue(res.result) + updated_network = [call[0][0] for call in self.inv.set.call_args_list + if call[0][0]['type'] == 'network'] + self.assertTrue(updated_network) + self.assertFalse(updated_network[0]['subnets'].get(old_subnet_name)) + self.assertTrue(updated_network[0]['subnets'].get(new_subnet_name)) + + if ApiAccess.regions == EVENT_PAYLOAD_REGION: + ApiAccess.regions = None - # check network document - network_document = self.inv.get_by_id(self.env, self.network_id) - self.assertIn(self.subnet['name'], network_document['subnets']) - self.assertEqual(self.subnet['gateway_ip'], network_document['subnets'][self.subnet['name']]['gateway_ip']) + # TODO: write tests for "enable_dhcp" change handling -- cgit 1.2.3-korg