From 7e83d0876ddb84a45e130eeba28bc40ef53c074b Mon Sep 17 00:00:00 2001 From: Yaron Yogev Date: Thu, 27 Jul 2017 09:02:54 +0300 Subject: Calipso initial release for OPNFV Change-Id: I7210c244b0c10fa80bfa8c77cb86c9d6ddf8bc88 Signed-off-by: Yaron Yogev --- app/discover/events/__init__.py | 10 + app/discover/events/event_base.py | 36 +++ app/discover/events/event_delete_base.py | 60 +++++ app/discover/events/event_instance_add.py | 45 ++++ app/discover/events/event_instance_delete.py | 18 ++ app/discover/events/event_instance_update.py | 55 ++++ app/discover/events/event_interface_add.py | 139 ++++++++++ app/discover/events/event_interface_delete.py | 40 +++ app/discover/events/event_metadata_parser.py | 75 ++++++ app/discover/events/event_network_add.py | 50 ++++ app/discover/events/event_network_delete.py | 17 ++ app/discover/events/event_network_update.py | 44 +++ app/discover/events/event_port_add.py | 309 +++++++++++++++++++++ app/discover/events/event_port_delete.py | 80 ++++++ app/discover/events/event_port_update.py | 38 +++ app/discover/events/event_router_add.py | 123 +++++++++ app/discover/events/event_router_delete.py | 37 +++ app/discover/events/event_router_update.py | 82 ++++++ app/discover/events/event_subnet_add.py | 154 +++++++++++ app/discover/events/event_subnet_delete.py | 57 ++++ app/discover/events/event_subnet_update.py | 102 +++++++ app/discover/events/listeners/__init__.py | 10 + app/discover/events/listeners/default_listener.py | 314 ++++++++++++++++++++++ app/discover/events/listeners/listener_base.py | 18 ++ 24 files changed, 1913 insertions(+) create mode 100644 app/discover/events/__init__.py create mode 100644 app/discover/events/event_base.py create mode 100644 app/discover/events/event_delete_base.py create mode 100644 app/discover/events/event_instance_add.py create mode 100644 app/discover/events/event_instance_delete.py create mode 100644 app/discover/events/event_instance_update.py create mode 100644 app/discover/events/event_interface_add.py create mode 100644 app/discover/events/event_interface_delete.py create mode 100644 app/discover/events/event_metadata_parser.py create mode 100644 app/discover/events/event_network_add.py create mode 100644 app/discover/events/event_network_delete.py create mode 100644 app/discover/events/event_network_update.py create mode 100644 app/discover/events/event_port_add.py create mode 100644 app/discover/events/event_port_delete.py create mode 100644 app/discover/events/event_port_update.py create mode 100644 app/discover/events/event_router_add.py create mode 100644 app/discover/events/event_router_delete.py create mode 100644 app/discover/events/event_router_update.py create mode 100644 app/discover/events/event_subnet_add.py create mode 100644 app/discover/events/event_subnet_delete.py create mode 100644 app/discover/events/event_subnet_update.py create mode 100644 app/discover/events/listeners/__init__.py create mode 100755 app/discover/events/listeners/default_listener.py create mode 100644 app/discover/events/listeners/listener_base.py (limited to 'app/discover/events') diff --git a/app/discover/events/__init__.py b/app/discover/events/__init__.py new file mode 100644 index 0000000..1e85a2a --- /dev/null +++ b/app/discover/events/__init__.py @@ -0,0 +1,10 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### + diff --git a/app/discover/events/event_base.py b/app/discover/events/event_base.py new file mode 100644 index 0000000..6b3b290 --- /dev/null +++ b/app/discover/events/event_base.py @@ -0,0 +1,36 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from abc import abstractmethod, ABC + +from discover.fetcher import Fetcher +from utils.inventory_mgr import InventoryMgr + + +class EventResult: + def __init__(self, + result: bool, retry: bool = False, message: str = None, + related_object: str = None, + display_context: str = None): + self.result = result + self.retry = retry + self.message = message + self.related_object = related_object + self.display_context = display_context + + +class EventBase(Fetcher, ABC): + + def __init__(self): + super().__init__() + self.inv = InventoryMgr() + + @abstractmethod + def handle(self, env, values) -> EventResult: + pass diff --git a/app/discover/events/event_delete_base.py b/app/discover/events/event_delete_base.py new file mode 100644 index 0000000..1cf94c3 --- /dev/null +++ b/app/discover/events/event_delete_base.py @@ -0,0 +1,60 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + +from bson.objectid import ObjectId + +from discover.clique_finder import CliqueFinder +from discover.events.event_base import EventBase, EventResult + + +class EventDeleteBase(EventBase): + + def delete_handler(self, env, object_id, object_type) -> EventResult: + item = self.inv.get_by_id(env, object_id) + if not item: + self.log.info('{0} document is not found, aborting {0} delete'.format(object_type)) + return EventResult(result=False, retry=False) + + db_id = ObjectId(item['_id']) + id_path = item['id_path'] + '/' + + # remove related clique + clique_finder = CliqueFinder() + self.inv.delete('cliques', {'focal_point': db_id}) + + # keep related links to do rebuild of cliques using them + matched_links_source = clique_finder.find_links_by_source(db_id) + matched_links_target = clique_finder.find_links_by_target(db_id) + + links_using_object = [] + links_using_object.extend([l['_id'] for l in matched_links_source]) + links_using_object.extend([l['_id'] for l in matched_links_target]) + + # find cliques using these links + if links_using_object: + matched_cliques = clique_finder.find_cliques_by_link(links_using_object) + # find cliques using these links and rebuild them + for clique in matched_cliques: + clique_finder.rebuild_clique(clique) + + # remove all related links + self.inv.delete('links', {'source': db_id}) + self.inv.delete('links', {'target': db_id}) + + # remove object itself + self.inv.delete('inventory', {'_id': db_id}) + + # remove children + regexp = re.compile('^' + id_path) + self.inv.delete('inventory', {'id_path': {'$regex': regexp}}) + return EventResult(result=True, + related_object=object_id, + display_context=object_id) diff --git a/app/discover/events/event_instance_add.py b/app/discover/events/event_instance_add.py new file mode 100644 index 0000000..4dd2b20 --- /dev/null +++ b/app/discover/events/event_instance_add.py @@ -0,0 +1,45 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventBase, EventResult +from discover.scanner import Scanner + + +class EventInstanceAdd(EventBase): + + def handle(self, env, values): + # find the host, to serve as parent + instance_id = values['payload']['instance_id'] + host_id = values['payload']['host'] + instances_root_id = host_id + '-instances' + instances_root = self.inv.get_by_id(env, instances_root_id) + if not instances_root: + self.log.info('instances root not found, aborting instance add') + return EventResult(result=False, retry=True) + + # scan instance + scanner = Scanner() + scanner.set_env(env) + scanner.scan("ScanInstancesRoot", instances_root, + limit_to_child_id=instance_id, + limit_to_child_type='instance') + scanner.scan_from_queue() + + # scan host + host = self.inv.get_by_id(env, host_id) + scanner.scan('ScanHost', host, + limit_to_child_type=['vconnectors_folder', + 'vedges_folder']) + scanner.scan_from_queue() + scanner.scan_links() + scanner.scan_cliques() + + return EventResult(result=True, + related_object=instance_id, + display_context=instance_id) diff --git a/app/discover/events/event_instance_delete.py b/app/discover/events/event_instance_delete.py new file mode 100644 index 0000000..714d0c7 --- /dev/null +++ b/app/discover/events/event_instance_delete.py @@ -0,0 +1,18 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_delete_base import EventDeleteBase + + +class EventInstanceDelete(EventDeleteBase): + + def handle(self, env, values): + # find the corresponding object + instance_id = values['payload']['instance_id'] + return self.delete_handler(env, instance_id, "instance") diff --git a/app/discover/events/event_instance_update.py b/app/discover/events/event_instance_update.py new file mode 100644 index 0000000..6231c30 --- /dev/null +++ b/app/discover/events/event_instance_update.py @@ -0,0 +1,55 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + +from discover.events.event_base import EventBase, EventResult +from discover.events.event_instance_add import EventInstanceAdd +from discover.events.event_instance_delete import EventInstanceDelete + + +class EventInstanceUpdate(EventBase): + + def handle(self, env, values): + # find the host, to serve as parent + payload = values['payload'] + instance_id = payload['instance_id'] + state = payload['state'] + old_state = payload['old_state'] + + if state == 'building': + return EventResult(result=False, retry=False) + + if state == 'active' and old_state == 'building': + return EventInstanceAdd().handle(env, values) + + if state == 'deleted' and old_state == 'active': + return EventInstanceDelete().handle(env, values) + + name = payload['display_name'] + instance = self.inv.get_by_id(env, instance_id) + if not instance: + self.log.info('instance document not found, aborting instance update') + return EventResult(result=False, retry=True) + + instance['name'] = name + instance['object_name'] = name + name_path = instance['name_path'] + instance['name_path'] = name_path[:name_path.rindex('/') + 1] + name + + # TBD: fix name_path for descendants + if name_path != instance['name_path']: + self.inv.values_replace({ + "environment": env, + "name_path": {"$regex": r"^" + re.escape(name_path + '/')}}, + {"name_path": {"from": name_path, "to": instance['name_path']}}) + self.inv.set(instance) + return EventResult(result=True, + related_object=instance_id, + display_context=instance_id) diff --git a/app/discover/events/event_interface_add.py b/app/discover/events/event_interface_add.py new file mode 100644 index 0000000..a06ad14 --- /dev/null +++ b/app/discover/events/event_interface_add.py @@ -0,0 +1,139 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import time + +from functools import partial + +from discover.events.event_base import EventBase, EventResult +from discover.events.event_port_add import EventPortAdd +from discover.events.event_subnet_add import EventSubnetAdd +from discover.fetchers.api.api_access import ApiAccess +from discover.fetchers.api.api_fetch_regions import ApiFetchRegions +from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice +from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics +from discover.scanner import Scanner +from utils.util import decode_router_id, encode_router_id + + +class EventInterfaceAdd(EventBase): + + def __init__(self): + super().__init__() + self.delay = 2 + + def add_gateway_port(self, env, project, network_name, router_doc, host_id): + fetcher = CliFetchHostVservice() + fetcher.set_env(env) + router_id = router_doc['id'] + router = fetcher.get_vservice(host_id, router_id) + device_id = decode_router_id(router_id) + router_doc['gw_port_id'] = router['gw_port_id'] + + # add gateway port documents. + port_doc = EventSubnetAdd().add_port_document(env, router_doc['gw_port_id'], project_name=project) + + mac_address = port_doc['mac_address'] if port_doc else None + + # add vnic document + host = self.inv.get_by_id(env, host_id) + + add_vnic_document = partial(EventPortAdd().add_vnic_document, + env=env, + host=host, + object_id=device_id, + object_type='router', + network_name=network_name, + router_name=router_doc['name'], + mac_address=mac_address) + + ret = add_vnic_document() + if not ret: + time.sleep(self.delay) + self.log.info("Wait %s second, and then fetch vnic document again." % self.delay) + add_vnic_document() + + def update_router(self, env, project, network_id, network_name, router_doc, host_id): + if router_doc: + if 'network' in router_doc: + if network_id not in router_doc['network']: + router_doc['network'].append(network_id) + else: + router_doc['network'] = [network_id] + + # if gw_port_id is None, add gateway port first. + if not router_doc.get('gw_port_id'): + self.add_gateway_port(env, project, network_name, router_doc, host_id) + else: + # check the gateway port document, add it if document does not exist. + port = self.inv.get_by_id(env, router_doc['gw_port_id']) + if not port: + self.add_gateway_port(env, project, network_name, router_doc, host_id) + self.inv.set(router_doc) + else: + self.log.info("router document not found, aborting interface adding") + + def handle(self, env, values): + interface = values['payload']['router_interface'] + project = values['_context_project_name'] + host_id = values["publisher_id"].replace("network.", "", 1) + port_id = interface['port_id'] + subnet_id = interface['subnet_id'] + router_id = encode_router_id(host_id, interface['id']) + + network_document = self.inv.get_by_field(env, "network", "subnet_ids", subnet_id, get_single=True) + if not network_document: + self.log.info("network document not found, aborting interface adding") + return EventResult(result=False, retry=True) + network_name = network_document['name'] + network_id = network_document['id'] + + # add router-interface port document. + if len(ApiAccess.regions) == 0: + fetcher = ApiFetchRegions() + fetcher.set_env(env) + fetcher.get(None) + port_doc = EventSubnetAdd().add_port_document(env, port_id, network_name=network_name) + + mac_address = port_doc['mac_address'] if port_doc else None + + # add vnic document + host = self.inv.get_by_id(env, host_id) + router_doc = self.inv.get_by_id(env, router_id) + + add_vnic_document = partial(EventPortAdd().add_vnic_document, + env=env, + host=host, + object_id=interface['id'], + object_type='router', + network_name=network_name, + router_name=router_doc['name'], + mac_address=mac_address) + + ret = add_vnic_document() + if ret is False: + # try it again to fetch vnic document, vnic will be created a little bit late before CLI fetch. + time.sleep(self.delay) + self.log.info("Wait {} seconds, and then fetch vnic document again.".format(self.delay)) + add_vnic_document() + + # update the router document: gw_port_id, network. + self.update_router(env, project, network_id, network_name, router_doc, host_id) + + # update vservice-vnic, vnic-network, + FindLinksForVserviceVnics().add_links(search={"parent_id": router_id}) + scanner = Scanner() + scanner.set_env(env) + + scanner.scan_cliques() + self.log.info("Finished router-interface added.") + + return EventResult(result=True, + related_object=interface['id'], + display_context=network_id) diff --git a/app/discover/events/event_interface_delete.py b/app/discover/events/event_interface_delete.py new file mode 100644 index 0000000..b1df978 --- /dev/null +++ b/app/discover/events/event_interface_delete.py @@ -0,0 +1,40 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventResult +from discover.events.event_delete_base import EventDeleteBase +from discover.events.event_port_delete import EventPortDelete +from utils.util import encode_router_id + + +class EventInterfaceDelete(EventDeleteBase): + + def handle(self, env, values): + interface = values['payload']['router_interface'] + port_id = interface['port_id'] + host_id = values["publisher_id"].replace("network.", "", 1) + router_id = encode_router_id(host_id, interface['id']) + + # update router document + port_doc = self.inv.get_by_id(env, port_id) + if not port_doc: + self.log.info("Interface deleting handler: port document not found.") + return EventResult(result=False, retry=False) + network_id = port_doc['network_id'] + + router_doc = self.inv.get_by_id(env, router_id) + if router_doc and network_id in router_doc.get('network', []): + router_doc['network'].remove(network_id) + self.inv.set(router_doc) + + # delete port document + result = EventPortDelete().delete_port(env, port_id) + result.related_object = interface['id'] + result.display_context = network_id + return result diff --git a/app/discover/events/event_metadata_parser.py b/app/discover/events/event_metadata_parser.py new file mode 100644 index 0000000..5d09376 --- /dev/null +++ b/app/discover/events/event_metadata_parser.py @@ -0,0 +1,75 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from typing import List, Tuple + +from utils.metadata_parser import MetadataParser + + +class EventMetadataParser(MetadataParser): + + HANDLERS_PACKAGE = 'handlers_package' + QUEUES = 'queues' + EVENT_HANDLERS = 'event_handlers' + + REQUIRED_EXPORTS = [HANDLERS_PACKAGE, EVENT_HANDLERS] + + def __init__(self): + super().__init__() + self.handlers_package = None + self.queues = [] + self.event_handlers = [] + + def get_required_fields(self) -> list: + return self.REQUIRED_EXPORTS + + def validate_metadata(self, metadata: dict) -> bool: + super().validate_metadata(metadata) + + package = metadata.get(self.HANDLERS_PACKAGE) + if not package or not isinstance(package, str): + self.add_error("Handlers package '{}' is invalid".format(package)) + + event_handlers = metadata.get(self.EVENT_HANDLERS) + if not event_handlers or not isinstance(event_handlers, dict): + self.add_error("Event handlers attribute is invalid or empty" + "(should be a non-empty dict)") + + return len(self.errors) == 0 + + def _finalize_parsing(self, metadata): + handlers_package = metadata[self.HANDLERS_PACKAGE] + queues = metadata.get(self.QUEUES, None) + event_handlers = metadata[self.EVENT_HANDLERS] + + # Convert variables to EventHandler-friendly format + self.handlers_package = handlers_package + + try: + if queues and isinstance(queues, list): + self.queues = [{"queue": q["queue"], + "exchange": q["exchange"]} + for q in queues] + except KeyError: + self.add_error("Queues variable has invalid format") + return + + self.event_handlers = event_handlers + + def parse_metadata_file(self, file_path: str) -> dict: + metadata = super().parse_metadata_file(file_path) + self._finalize_parsing(metadata) + super().check_errors() + return metadata + + +def parse_metadata_file(file_path: str): + parser = EventMetadataParser() + parser.parse_metadata_file(file_path) + return parser.handlers_package, parser.queues, parser.event_handlers diff --git a/app/discover/events/event_network_add.py b/app/discover/events/event_network_add.py new file mode 100644 index 0000000..41fafd4 --- /dev/null +++ b/app/discover/events/event_network_add.py @@ -0,0 +1,50 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventBase, EventResult + + +class EventNetworkAdd(EventBase): + + def handle(self, env, notification): + network = notification['payload']['network'] + network_id = network['id'] + network_document = self.inv.get_by_id(env, network_id) + if network_document: + self.log.info('network already existed, aborting network add') + return EventResult(result=False, retry=False) + + # build network document for adding network + project_name = notification['_context_project_name'] + project_id = notification['_context_project_id'] + parent_id = project_id + '-networks' + network_name = network['name'] + + network['environment'] = env + network['type'] = 'network' + network['id_path'] = "/%s/%s-projects/%s/%s/%s" \ + % (env, env, project_id, parent_id, network_id) + network['cidrs'] = [] + network['subnet_ids'] = [] + network['last_scanned'] = notification['timestamp'] + network['name_path'] = "/%s/Projects/%s/Networks/%s" \ + % (env, project_name, network_name) + network['network'] = network_id + network['object_name'] = network_name + network['parent_id'] = parent_id + network['parent_text'] = "Networks" + network['parent_type'] = "networks_folder" + network['project'] = project_name + network["show_in_tree"] = True + network['subnets'] = {} + + self.inv.set(network) + return EventResult(result=True, + related_object=network_id, + display_context=network_id) diff --git a/app/discover/events/event_network_delete.py b/app/discover/events/event_network_delete.py new file mode 100644 index 0000000..b3277da --- /dev/null +++ b/app/discover/events/event_network_delete.py @@ -0,0 +1,17 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_delete_base import EventDeleteBase + + +class EventNetworkDelete(EventDeleteBase): + + def handle(self, env, notification): + network_id = notification['payload']['network_id'] + return self.delete_handler(env, network_id, "network") diff --git a/app/discover/events/event_network_update.py b/app/discover/events/event_network_update.py new file mode 100644 index 0000000..3e1432e --- /dev/null +++ b/app/discover/events/event_network_update.py @@ -0,0 +1,44 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + +from discover.events.event_base import EventBase, EventResult + + +class EventNetworkUpdate(EventBase): + + def handle(self, env, notification): + network = notification['payload']['network'] + network_id = network['id'] + + network_document = self.inv.get_by_id(env, network_id) + if not network_document: + self.log.info('Network document not found, aborting network update') + return EventResult(result=False, retry=True) + + # update network document + name = network['name'] + if name != network_document['name']: + network_document['name'] = name + network_document['object_name'] = name + + name_path = network_document['name_path'] + network_document['name_path'] = name_path[:name_path.rindex('/') + 1] + name + + # TBD: fix name_path for descendants + self.inv.values_replace({"environment": env, + "name_path": {"$regex": r"^" + re.escape(name_path + '/')}}, + {"name_path": {"from": name_path, "to": network_document['name_path']}}) + + network_document['admin_state_up'] = network['admin_state_up'] + self.inv.set(network_document) + return EventResult(result=True, + related_object=network_id, + display_context=network_id) diff --git a/app/discover/events/event_port_add.py b/app/discover/events/event_port_add.py new file mode 100644 index 0000000..63a5e80 --- /dev/null +++ b/app/discover/events/event_port_add.py @@ -0,0 +1,309 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import datetime + +from discover.events.event_base import EventBase, EventResult +from discover.fetchers.api.api_fetch_host_instances import ApiFetchHostInstances +from discover.fetchers.cli.cli_fetch_instance_vnics import CliFetchInstanceVnics +from discover.fetchers.cli.cli_fetch_instance_vnics_vpp import CliFetchInstanceVnicsVpp +from discover.fetchers.cli.cli_fetch_vservice_vnics import CliFetchVserviceVnics +from discover.find_links_for_instance_vnics import FindLinksForInstanceVnics +from discover.find_links_for_vedges import FindLinksForVedges +from discover.scanner import Scanner + + +class EventPortAdd(EventBase): + + def get_name_by_id(self, object_id): + item = self.inv.get_by_id(self.env, object_id) + if item: + return item['name'] + return None + + def add_port_document(self, env, project_name, project_id, network_name, network_id, port): + # add other data for port document + port['type'] = 'port' + port['environment'] = env + + port['parent_id'] = port['network_id'] + '-ports' + port['parent_text'] = 'Ports' + port['parent_type'] = 'ports_folder' + + port['name'] = port['mac_address'] + port['object'] = port['name'] + port['project'] = project_name + + port['id_path'] = "{}/{}-projects/{}/{}-networks/{}/{}-ports/{}" \ + .format(env, env, + project_id, project_id, + network_id, network_id, port['id']) + port['name_path'] = "/{}/Projects/{}/Networks/{}/Ports/{}" \ + .format(env, project_name, network_name, port['id']) + + port['show_in_tree'] = True + port['last_scanned'] = datetime.datetime.utcnow() + self.inv.set(port) + self.log.info("add port document for port: {}".format(port['id'])) + + def add_ports_folder(self, env, project_id, network_id, network_name): + port_folder = { + "id": network_id + "-ports", + "create_object": True, + "name": "Ports", + "text": "Ports", + "type": "ports_folder", + "parent_id": network_id, + "parent_type": "network", + 'environment': env, + 'id_path': "{}/{}-projects/{}/{}-networks/{}/{}-ports/" + .format(env, env, project_id, project_id, + network_id, network_id), + 'name_path': "/{}/Projects/{}/Networks/{}/Ports" + .format(env, project_id, network_name), + "show_in_tree": True, + "last_scanned": datetime.datetime.utcnow(), + "object_name": "Ports", + } + self.inv.set(port_folder) + self.log.info("add ports_folder document for network: {}.".format(network_id)) + + def add_network_services_folder(self, env, project_id, network_id, network_name): + network_services_folder = { + "create_object": True, + "environment": env, + "id": network_id + "-network_services", + "id_path": "{}/{}-projects/{}/{}-networks/{}/{}-network_services/" + .format(env, env, project_id, project_id, + network_id, network_id), + "last_scanned": datetime.datetime.utcnow(), + "name": "Network vServices", + "name_path": "/{}/Projects/{}/Networks/{}/Network vServices" + .format(env, project_id, network_name), + "object_name": "Network vServices", + "parent_id": network_id, + "parent_type": "network", + "show_in_tree": True, + "text": "Network vServices", + "type": "network_services_folder" + } + self.inv.set(network_services_folder) + self.log.info("add network services folder for network:{}".format(network_id)) + + def add_dhcp_document(self, env, host, network_id, network_name): + dhcp_document = { + "environment": env, + "host": host['id'], + "id": "qdhcp-" + network_id, + "id_path": "{}/{}-vservices/{}-vservices-dhcps/qdhcp-{}" + .format(host['id_path'], host['id'], + host['id'], network_id), + "last_scanned": datetime.datetime.utcnow(), + "local_service_id": "qdhcp-" + network_id, + "name": "dhcp-" + network_name, + "name_path": host['name_path'] + "/Vservices/DHCP servers/dhcp-" + network_name, + "network": [network_id], + "object_name": "dhcp-" + network_name, + "parent_id": host['id'] + "-vservices-dhcps", + "parent_text": "DHCP servers", + "parent_type": "vservice_dhcps_folder", + "service_type": "dhcp", + "show_in_tree": True, + "type": "vservice" + } + self.inv.set(dhcp_document) + self.log.info("add DHCP document for network: {}.".format(network_id)) + + # This method has dynamic usages, take caution when changing its signature + def add_vnics_folder(self, + env, host, + object_id, network_name='', + object_type="dhcp", router_name=''): + # when vservice is DHCP, id = network_id, + # when vservice is router, id = router_id + type_map = {"dhcp": ('DHCP servers', 'dhcp-' + network_name), + "router": ('Gateways', router_name)} + + vnics_folder = { + "environment": env, + "id": "q{}-{}-vnics".format(object_type, object_id), + "id_path": "{}/{}-vservices/{}-vservices-{}s/q{}-{}/q{}-{}-vnics" + .format(host['id_path'], host['id'], host['id'], + object_type, object_type, object_id, + object_type, object_id), + "last_scanned": datetime.datetime.utcnow(), + "name": "q{}-{}-vnics".format(object_type, object_id), + "name_path": "{}/Vservices/{}/{}/vNICs" + .format(host['name_path'], + type_map[object_type][0], + type_map[object_type][1]), + "object_name": "vNICs", + "parent_id": "q{}-{}".format(object_type, object_id), + "parent_type": "vservice", + "show_in_tree": True, + "text": "vNICs", + "type": "vnics_folder" + } + self.inv.set(vnics_folder) + self.log.info("add vnics_folder document for q{}-{}-vnics" + .format(object_type, object_id)) + + # This method has dynamic usages, take caution when changing its signature + def add_vnic_document(self, + env, host, + object_id, network_name='', + object_type='dhcp', router_name='', + mac_address=None): + # when vservice is DHCP, id = network_id, + # when vservice is router, id = router_id + type_map = {"dhcp": ('DHCP servers', 'dhcp-' + network_name), + "router": ('Gateways', router_name)} + + fetcher = CliFetchVserviceVnics() + fetcher.set_env(env) + namespace = 'q{}-{}'.format(object_type, object_id) + vnic_documents = fetcher.handle_service(host['id'], namespace, enable_cache=False) + if not vnic_documents: + self.log.info("Vnic document not found in namespace.") + return False + + if mac_address is not None: + for doc in vnic_documents: + if doc['mac_address'] == mac_address: + # add a specific vnic document. + doc["environment"] = env + doc["id_path"] = "{}/{}-vservices/{}-vservices-{}s/{}/{}-vnics/{}"\ + .format(host['id_path'], host['id'], + host['id'], object_type, namespace, + namespace, doc["id"]) + doc["name_path"] = "{}/Vservices/{}/{}/vNICs/{}" \ + .format(host['name_path'], + type_map[object_type][0], + type_map[object_type][1], + doc["id"]) + self.inv.set(doc) + self.log.info("add vnic document with mac_address: {}." + .format(mac_address)) + return True + + self.log.info("Can not find vnic document by mac_address: {}" + .format(mac_address)) + return False + else: + for doc in vnic_documents: + # add all vnic documents. + doc["environment"] = env + doc["id_path"] = "{}/{}-vservices/{}-vservices-{}s/{}/{}-vnics/{}" \ + .format(host['id_path'], host['id'], + host['id'], object_type, + namespace, namespace, doc["id"]) + doc["name_path"] = "{}/Vservices/{}/{}/vNICs/{}" \ + .format(host['name_path'], + type_map[object_type][0], + type_map[object_type][1], + doc["id"]) + self.inv.set(doc) + self.log.info("add vnic document with mac_address: {}." + .format(doc["mac_address"])) + return True + + def handle_dhcp_device(self, env, notification, network_id, network_name, mac_address=None): + # add dhcp vservice document. + host_id = notification["publisher_id"].replace("network.", "", 1) + host = self.inv.get_by_id(env, host_id) + + self.add_dhcp_document(env, host, network_id, network_name) + + # add vnics folder. + self.add_vnics_folder(env, host, network_id, network_name) + + # add vnic document. + self.add_vnic_document(env, host, network_id, network_name, mac_address=mac_address) + + def handle(self, env, notification): + project = notification['_context_project_name'] + project_id = notification['_context_project_id'] + payload = notification['payload'] + port = payload['port'] + network_id = port['network_id'] + network_name = self.get_name_by_id(network_id) + mac_address = port['mac_address'] + + # check ports folder document. + ports_folder = self.inv.get_by_id(env, network_id + '-ports') + if not ports_folder: + self.log.info("ports folder not found, add ports folder first.") + self.add_ports_folder(env, project_id, network_id, network_name) + self.add_port_document(env, project, project_id, network_name, network_id, port) + + # update the port related documents. + if 'compute' in port['device_owner']: + # update the instance related document. + host_id = port['binding:host_id'] + instance_id = port['device_id'] + old_instance_doc = self.inv.get_by_id(env, instance_id) + instances_root_id = host_id + '-instances' + instances_root = self.inv.get_by_id(env, instances_root_id) + if not instances_root: + self.log.info('instance document not found, aborting port adding') + return EventResult(result=False, retry=True) + + # update instance + instance_fetcher = ApiFetchHostInstances() + instance_fetcher.set_env(env) + instance_docs = instance_fetcher.get(host_id + '-') + instance = next(filter(lambda i: i['id'] == instance_id, instance_docs), None) + + if instance: + old_instance_doc['network_info'] = instance['network_info'] + old_instance_doc['network'] = instance['network'] + if old_instance_doc.get('mac_address') is None: + old_instance_doc['mac_address'] = mac_address + + self.inv.set(old_instance_doc) + self.log.info("update instance document") + + # add vnic document. + if port['binding:vif_type'] == 'vpp': + vnic_fetcher = CliFetchInstanceVnicsVpp() + else: + # set ovs as default type. + vnic_fetcher = CliFetchInstanceVnics() + + vnic_fetcher.set_env(env) + vnic_docs = vnic_fetcher.get(instance_id + '-') + vnic = next(filter(lambda vnic: vnic['mac_address'] == mac_address, vnic_docs), None) + + if vnic: + vnic['environment'] = env + vnic['type'] = 'vnic' + vnic['name_path'] = old_instance_doc['name_path'] + '/vNICs/' + vnic['name'] + vnic['id_path'] = '{}/{}/{}'.format(old_instance_doc['id_path'], + old_instance_doc['id'], + vnic['name']) + self.inv.set(vnic) + self.log.info("add instance-vnic document, mac_address: {}" + .format(mac_address)) + + self.log.info("scanning for links") + fetchers_implementing_add_links = [FindLinksForInstanceVnics(), FindLinksForVedges()] + for fetcher in fetchers_implementing_add_links: + fetcher.add_links() + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + + port_document = self.inv.get_by_id(env, port['id']) + if not port_document: + self.log.error("Port {} failed to add".format(port['id'])) + return EventResult(result=False, retry=True) + + return EventResult(result=True, + related_object=port['id'], + display_context=network_id) diff --git a/app/discover/events/event_port_delete.py b/app/discover/events/event_port_delete.py new file mode 100644 index 0000000..1e55870 --- /dev/null +++ b/app/discover/events/event_port_delete.py @@ -0,0 +1,80 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventResult +from discover.events.event_delete_base import EventDeleteBase +from discover.fetchers.api.api_fetch_host_instances import ApiFetchHostInstances + + +class EventPortDelete(EventDeleteBase): + + def delete_port(self, env, port_id): + port_doc = self.inv.get_by_id(env, port_id) + if not port_doc: + self.log.info("Port document not found, aborting port deleting.") + return EventResult(result=False, retry=False) + + # if port is binding to a instance, instance document needs to be updated. + if 'compute' in port_doc['device_owner']: + self.log.info("update instance document to which port is binding.") + self.update_instance(env, port_doc) + + # delete port document + self.inv.delete('inventory', {'id': port_id}) + + # delete vnic and related document + vnic_doc = self.inv.get_by_field(env, 'vnic', 'mac_address', port_doc['mac_address'], get_single=True) + if not vnic_doc: + self.log.info("Vnic document not found, aborting vnic deleting.") + return EventResult(result=False, retry=False) + + result = self.delete_handler(env, vnic_doc['id'], 'vnic') + result.related_object = port_id + result.display_context = port_doc.get('network_id') + self.log.info('Finished port deleting') + return result + + def update_instance(self, env, port_doc): + # update instance document if port + network_id = port_doc['network_id'] + instance_doc = self.inv.get_by_field(env, 'instance', 'network_info.id', port_doc['id'], get_single=True) + if instance_doc: + port_num = 0 + + for port in instance_doc['network_info']: + if port['network']['id'] == network_id: + port_num += 1 + if port['id'] == port_doc['id']: + instance_doc['network_info'].remove(port) + self.log.info("update network information of instance document.") + + if port_num == 1: + # remove network information only when last port in network will be deleted. + instance_doc['network'].remove(network_id) + + # update instance mac address. + if port_doc['mac_address'] == instance_doc['mac_address']: + instance_fetcher = ApiFetchHostInstances() + instance_fetcher.set_env(env) + host_id = port_doc['binding:host_id'] + instance_id = port_doc['device_id'] + instance_docs = instance_fetcher.get(host_id + '-') + instance = next(filter(lambda i: i['id'] == instance_id, instance_docs), None) + if instance: + if 'mac_address' not in instance: + instance_doc['mac_address'] = None + self.log.info("update mac_address:%s of instance document." % instance_doc['mac_address']) + + self.inv.set(instance_doc) + else: + self.log.info("No instance document binding to network:%s." % network_id) + + def handle(self, env, notification): + port_id = notification['payload']['port_id'] + return self.delete_port(env, port_id) diff --git a/app/discover/events/event_port_update.py b/app/discover/events/event_port_update.py new file mode 100644 index 0000000..298b565 --- /dev/null +++ b/app/discover/events/event_port_update.py @@ -0,0 +1,38 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventBase, EventResult + + +class EventPortUpdate(EventBase): + + def handle(self, env, notification): + # check port document. + port = notification['payload']['port'] + port_id = port['id'] + port_document = self.inv.get_by_id(env, port_id) + if not port_document: + self.log.info('port document does not exist, aborting port update') + return EventResult(result=False, retry=True) + + # build port document + port_document['name'] = port['name'] + port_document['admin_state_up'] = port['admin_state_up'] + if port_document['admin_state_up']: + port_document['status'] = 'ACTIVE' + else: + port_document['status'] = 'DOWN' + + port_document['binding:vnic_type'] = port['binding:vnic_type'] + + # update port document. + self.inv.set(port_document) + return EventResult(result=True, + related_object=port_id, + display_context=port_document.get('network_id')) diff --git a/app/discover/events/event_router_add.py b/app/discover/events/event_router_add.py new file mode 100644 index 0000000..20e07e5 --- /dev/null +++ b/app/discover/events/event_router_add.py @@ -0,0 +1,123 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import datetime + +from functools import partial + +from discover.events.event_base import EventBase, EventResult +from discover.events.event_port_add import EventPortAdd +from discover.events.event_subnet_add import EventSubnetAdd +from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice +from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics +from discover.scanner import Scanner +from utils.util import decode_router_id, encode_router_id + + +class EventRouterAdd(EventBase): + + def add_router_document(self, env, network_id, router_doc, host): + router_doc["environment"] = env + router_doc["id_path"] = "{}/{}-vservices/{}-vservices-routers/{}"\ + .format(host['id_path'], host['id'], + host['id'], router_doc['id']) + router_doc['last_scanned'] = datetime.datetime.utcnow() + router_doc['name_path'] = "{}/Vservices/Gateways/{}"\ + .format(host['name_path'], + router_doc['name']) + router_doc['network'] = [] + if network_id: + router_doc['network'] = [network_id] + + router_doc['object_name'] = router_doc['name'] + router_doc['parent_id'] = host['id'] + "-vservices-routers" + router_doc['show_in_tree'] = True + router_doc['type'] = "vservice" + + self.inv.set(router_doc) + + def add_children_documents(self, env, project_id, network_id, host, router_doc): + + network_document = self.inv.get_by_id(env, network_id) + network_name = network_document['name'] + router_id = decode_router_id(router_doc['id']) + + # add port for binding to vservice:router + subnet_handler = EventSubnetAdd() + ports_folder = self.inv.get_by_id(env, network_id + '-ports') + if not ports_folder: + self.log.info("Ports_folder not found.") + subnet_handler.add_ports_folder(env, project_id, network_id, network_name) + add_port_return = subnet_handler.add_port_document(env, + router_doc['gw_port_id'], + network_name=network_name) + + # add vnics folder and vnic document + port_handler = EventPortAdd() + add_vnic_folder = partial(port_handler.add_vnics_folder, + env=env, + host=host, + object_id=router_id, + object_type='router', + network_name=network_name, + router_name=router_doc['name']) + add_vnic_document = partial(port_handler.add_vnic_document, + env=env, + host=host, + object_id=router_id, + object_type='router', + network_name=network_name, + router_name=router_doc['name']) + + add_vnic_folder() + if add_port_return: + add_vnic_return = add_vnic_document() + if not add_vnic_return: + self.log.info("Try to add vnic document again.") + add_vnic_document() + else: + # in some cases, port has been created, + # but port doc cannot be fetched by OpenStack API + self.log.info("Try to add port document again.") + # TODO: #AskCheng - this never returns anything! + add_port_return = add_vnic_folder() + # TODO: #AskCheng - this will never evaluate to True! + if add_port_return is False: + self.log.info("Try to add vnic document again.") + add_vnic_document() + + def handle(self, env, values): + router = values['payload']['router'] + host_id = values["publisher_id"].replace("network.", "", 1) + project_id = values['_context_project_id'] + router_id = encode_router_id(host_id, router['id']) + host = self.inv.get_by_id(env, host_id) + + fetcher = CliFetchHostVservice() + fetcher.set_env(env) + router_doc = fetcher.get_vservice(host_id, router_id) + gateway_info = router['external_gateway_info'] + + if gateway_info: + network_id = gateway_info['network_id'] + self.add_router_document(env, network_id, router_doc, host) + self.add_children_documents(env, project_id, network_id, host, router_doc) + else: + self.add_router_document(env, None, router_doc, host) + + # scan links and cliques + FindLinksForVserviceVnics().add_links(search={"parent_id": router_id}) + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + self.log.info("Finished router added.") + + return EventResult(result=True, + related_object=router_id, + display_context=router_id) diff --git a/app/discover/events/event_router_delete.py b/app/discover/events/event_router_delete.py new file mode 100644 index 0000000..65072d6 --- /dev/null +++ b/app/discover/events/event_router_delete.py @@ -0,0 +1,37 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventResult +from discover.events.event_delete_base import EventDeleteBase +from utils.util import encode_router_id + + +class EventRouterDelete(EventDeleteBase): + + def handle(self, env, values): + payload = values['payload'] + + if 'publisher_id' not in values: + self.log.error("Publisher_id is not in event values. Aborting router delete") + return EventResult(result=False, retry=False) + + host_id = values['publisher_id'].replace('network.', '', 1) + if 'router_id' in payload: + router_id = payload['router_id'] + elif 'id' in payload: + router_id = payload['id'] + else: + router_id = payload.get('router', {}).get('id') + + if not router_id: + self.log.error("Router id is not in payload. Aborting router delete") + return EventResult(result=False, retry=False) + + router_full_id = encode_router_id(host_id, router_id) + return self.delete_handler(env, router_full_id, "vservice") diff --git a/app/discover/events/event_router_update.py b/app/discover/events/event_router_update.py new file mode 100644 index 0000000..8dd53f0 --- /dev/null +++ b/app/discover/events/event_router_update.py @@ -0,0 +1,82 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventBase, EventResult +from discover.events.event_port_delete import EventPortDelete +from discover.events.event_router_add import EventRouterAdd +from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice +from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics +from discover.scanner import Scanner +from utils.util import encode_router_id + + +class EventRouterUpdate(EventBase): + + def handle(self, env, values): + payload = values['payload'] + router = payload['router'] + + project_id = values['_context_project_id'] + host_id = values["publisher_id"].replace("network.", "", 1) + router_id = payload['id'] if 'id' in payload else router['id'] + + router_full_id = encode_router_id(host_id, router_id) + router_doc = self.inv.get_by_id(env, router_full_id) + if not router_doc: + self.log.info("Router document not found, aborting router updating") + return EventResult(result=False, retry=True) + + router_doc['admin_state_up'] = router['admin_state_up'] + router_doc['name'] = router['name'] + gateway_info = router.get('external_gateway_info') + if gateway_info is None: + # when delete gateway, need to delete the port relate document. + port_doc = {} + if router_doc.get('gw_port_id'): + port_doc = self.inv.get_by_id(env, router_doc['gw_port_id']) + EventPortDelete().delete_port(env, router_doc['gw_port_id']) + + if router_doc.get('network'): + if port_doc: + router_doc['network'].remove(port_doc['network_id']) + router_doc['gw_port_id'] = None + + # remove related links + self.inv.delete('links', {'source_id': router_full_id}) + else: + if 'network' in router_doc: + if gateway_info['network_id'] not in router_doc['network']: + router_doc['network'].append(gateway_info['network_id']) + else: + router_doc['network'] = [gateway_info['network_id']] + # update static route + router_doc['routes'] = router['routes'] + + # add gw_port_id info and port document. + fetcher = CliFetchHostVservice() + fetcher.set_env(env) + router_vservice = fetcher.get_vservice(host_id, router_full_id) + if router_vservice.get('gw_port_id'): + router_doc['gw_port_id'] = router_vservice['gw_port_id'] + + host = self.inv.get_by_id(env, host_id) + EventRouterAdd().add_children_documents(env, project_id, gateway_info['network_id'], host, router_doc) + + # rescan the vnic links. + FindLinksForVserviceVnics().add_links(search={'parent_id': router_full_id + '-vnics'}) + self.inv.set(router_doc) + + # update the cliques. + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + self.log.info("Finished router update.") + return EventResult(result=True, + related_object=router_full_id, + display_context=router_full_id) diff --git a/app/discover/events/event_subnet_add.py b/app/discover/events/event_subnet_add.py new file mode 100644 index 0000000..b519b1c --- /dev/null +++ b/app/discover/events/event_subnet_add.py @@ -0,0 +1,154 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import datetime + +from discover.events.event_base import EventBase, EventResult +from discover.events.event_port_add import EventPortAdd +from discover.fetchers.api.api_access import ApiAccess +from discover.fetchers.api.api_fetch_port import ApiFetchPort +from discover.fetchers.api.api_fetch_regions import ApiFetchRegions +from discover.fetchers.db.db_fetch_port import DbFetchPort +from discover.find_links_for_pnics import FindLinksForPnics +from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics +from discover.scanner import Scanner + + +class EventSubnetAdd(EventBase): + + def add_port_document(self, env, port_id, network_name=None, project_name=''): + # when add router-interface port, network_name need to be given to enhance efficiency. + # when add gateway port, project_name need to be specified, cause this type of port + # document does not has project attribute. In this case, network_name should not be provided. + + fetcher = ApiFetchPort() + fetcher.set_env(env) + ports = fetcher.get(port_id) + + if ports: + port = ports[0] + project_id = port['tenant_id'] + network_id = port['network_id'] + + if not network_name: + network = self.inv.get_by_id(env, network_id) + network_name = network['name'] + + port['type'] = "port" + port['environment'] = env + port_id = port['id'] + port['id_path'] = "%s/%s-projects/%s/%s-networks/%s/%s-ports/%s" % \ + (env, env, project_id, project_id, network_id, network_id, port_id) + port['last_scanned'] = datetime.datetime.utcnow() + if 'project' in port: + project_name = port['project'] + port['name_path'] = "/%s/Projects/%s/Networks/%s/Ports/%s" % \ + (env, project_name, network_name, port_id) + self.inv.set(port) + self.log.info("add port document for port:%s" % port_id) + return port + return False + + def add_ports_folder(self, env, project_id, network_id, network_name): + port_folder = { + "id": network_id + "-ports", + "create_object": True, + "name": "Ports", + "text": "Ports", + "type": "ports_folder", + "parent_id": network_id, + "parent_type": "network", + 'environment': env, + 'id_path': "%s/%s-projects/%s/%s-networks/%s/%s-ports/" % (env, env, project_id, project_id, + network_id, network_id), + 'name_path': "/%s/Projects/%s/Networks/%s/Ports" % (env, project_id, network_name), + "show_in_tree": True, + "last_scanned": datetime.datetime.utcnow(), + "object_name": "Ports", + } + + self.inv.set(port_folder) + + def add_children_documents(self, env, project_id, network_id, network_name, host_id): + # generate port folder data. + self.add_ports_folder(env, project_id, network_id, network_name) + + # get ports ID. + port_id = DbFetchPort().get_id(network_id) + + # add specific ports documents. + self.add_port_document(env, port_id, network_name=network_name) + + port_handler = EventPortAdd() + + # add network_services_folder document. + port_handler.add_network_services_folder(env, project_id, network_id, network_name) + + # add dhcp vservice document. + host = self.inv.get_by_id(env, host_id) + + port_handler.add_dhcp_document(env, host, network_id, network_name) + + # add vnics folder. + port_handler.add_vnics_folder(env, host, network_id, network_name) + + # add vnic docuemnt. + port_handler.add_vnic_document(env, host, network_id, network_name) + + def handle(self, env, notification): + # check for network document. + subnet = notification['payload']['subnet'] + project_id = subnet['tenant_id'] + network_id = subnet['network_id'] + if 'id' not in subnet: + self.log.info('Subnet payload doesn\'t have id, aborting subnet add') + return EventResult(result=False, retry=False) + + network_document = self.inv.get_by_id(env, network_id) + if not network_document: + self.log.info('network document does not exist, aborting subnet add') + return EventResult(result=False, retry=True) + network_name = network_document['name'] + + # build subnet document for adding network + if subnet['cidr'] not in network_document['cidrs']: + network_document['cidrs'].append(subnet['cidr']) + if not network_document.get('subnets'): + network_document['subnets'] = {} + + network_document['subnets'][subnet['name']] = subnet + if subnet['id'] not in network_document['subnet_ids']: + network_document['subnet_ids'].append(subnet['id']) + self.inv.set(network_document) + + # Check DHCP enable, if true, scan network. + if subnet['enable_dhcp'] is True: + # update network + # TODO: #AskCheng - why is this necessary? + if len(ApiAccess.regions) == 0: + fetcher = ApiFetchRegions() + fetcher.set_env(env) + fetcher.get(None) + + self.log.info("add new subnet.") + host_id = notification["publisher_id"].replace("network.", "", 1) + self.add_children_documents(env, project_id, network_id, network_name, host_id) + + # scan links and cliques + self.log.info("scanning for links") + FindLinksForPnics().add_links() + FindLinksForVserviceVnics().add_links(search={"parent_id": "qdhcp-%s-vnics" % network_id}) + + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + self.log.info("Finished subnet added.") + return EventResult(result=True, + related_object=subnet['id'], + display_context=network_id) diff --git a/app/discover/events/event_subnet_delete.py b/app/discover/events/event_subnet_delete.py new file mode 100644 index 0000000..900e701 --- /dev/null +++ b/app/discover/events/event_subnet_delete.py @@ -0,0 +1,57 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventResult +from discover.events.event_delete_base import EventDeleteBase + + +class EventSubnetDelete(EventDeleteBase): + + def delete_children_documents(self, env, vservice_id): + vnic_parent_id = vservice_id + '-vnics' + vnic = self.inv.get_by_field(env, 'vnic', 'parent_id', vnic_parent_id, get_single=True) + if not vnic: + self.log.info("Vnic document not found.") + return EventResult(result=False, retry=False) + + # delete port and vnic together by mac address. + self.inv.delete('inventory', {"mac_address": vnic.get("mac_address")}) + return self.delete_handler(env, vservice_id, 'vservice') + + def handle(self, env, notification): + subnet_id = notification['payload']['subnet_id'] + network_document = self.inv.get_by_field(env, "network", "subnet_ids", subnet_id, get_single=True) + if not network_document: + self.log.info("network document not found, aborting subnet deleting") + return EventResult(result=False, retry=False) + + # remove subnet_id from subnet_ids array + network_document["subnet_ids"].remove(subnet_id) + + # find the subnet in network_document by subnet_id + subnet = next( + filter(lambda s: s['id'] == subnet_id, + network_document['subnets'].values()), + None) + + # remove cidr from cidrs and delete subnet document. + if subnet: + network_document['cidrs'].remove(subnet['cidr']) + del network_document['subnets'][subnet['name']] + + self.inv.set(network_document) + + # when network does not have any subnet, delete vservice DHCP, port and vnic documents. + if not network_document["subnet_ids"]: + vservice_dhcp_id = 'qdhcp-{}'.format(network_document['id']) + self.delete_children_documents(env, vservice_dhcp_id) + + return EventResult(result=True, + related_object=subnet['id'], + display_context=network_document.get('id')) diff --git a/app/discover/events/event_subnet_update.py b/app/discover/events/event_subnet_update.py new file mode 100644 index 0000000..9d3c48b --- /dev/null +++ b/app/discover/events/event_subnet_update.py @@ -0,0 +1,102 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from discover.events.event_base import EventBase, EventResult +from discover.events.event_port_add import EventPortAdd +from discover.events.event_port_delete import EventPortDelete +from discover.events.event_subnet_add import EventSubnetAdd +from discover.fetchers.api.api_access import ApiAccess +from discover.fetchers.api.api_fetch_regions import ApiFetchRegions +from discover.fetchers.db.db_fetch_port import DbFetchPort +from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics +from discover.scanner import Scanner + + +class EventSubnetUpdate(EventBase): + + def handle(self, env, notification): + # check for network document. + subnet = notification['payload']['subnet'] + project = notification['_context_project_name'] + host_id = notification['publisher_id'].replace('network.', '', 1) + subnet_id = subnet['id'] + network_id = subnet['network_id'] + network_document = self.inv.get_by_id(env, network_id) + if not network_document: + self.log.info('network document does not exist, aborting subnet update') + return EventResult(result=False, retry=True) + + # update network document. + subnets = network_document['subnets'] + key = next(filter(lambda k: subnets[k]['id'] == subnet_id, subnets), + None) + + if key: + if subnet['enable_dhcp'] and subnets[key]['enable_dhcp'] is False: + # scan DHCP namespace to add related document. + # add dhcp vservice document. + host = self.inv.get_by_id(env, host_id) + port_handler = EventPortAdd() + port_handler.add_dhcp_document(env, host, network_id, + network_document['name']) + + # make sure that self.regions is not empty. + if len(ApiAccess.regions) == 0: + fetcher = ApiFetchRegions() + fetcher.set_env(env) + fetcher.get(None) + + self.log.info("add port binding to DHCP server.") + port_id = DbFetchPort(). \ + get_id_by_field(network_id, + """device_owner LIKE "%dhcp" """) + port = EventSubnetAdd(). \ + add_port_document(env, port_id, + network_name=network_document['name'], + project_name=project) + if port: + port_handler. \ + add_vnic_document(env, host, network_id, + network_name=network_document['name'], + mac_address=port['mac_address']) + # add link for vservice - vnic + FindLinksForVserviceVnics().add_links(search={"id": "qdhcp-%s" % network_id}) + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + FindLinksForVserviceVnics(). \ + add_links(search={"id": "qdhcp-%s" % network_id}) + scanner = Scanner() + scanner.set_env(env) + scanner.scan_cliques() + + if subnet['enable_dhcp'] is False and subnets[key]['enable_dhcp']: + # delete existed related DHCP documents. + self.inv.delete("inventory", + {'id': "qdhcp-%s" % subnet['network_id']}) + self.log.info("delete DHCP document: qdhcp-%s" % + subnet['network_id']) + + port = self.inv.find_items({'network_id': subnet['network_id'], + 'device_owner': 'network:dhcp'}, + get_single=True) + if 'id' in port: + EventPortDelete().delete_port(env, port['id']) + self.log.info("delete port binding to DHCP server.") + + if subnet['name'] == subnets[key]['name']: + subnets[key] = subnet + else: + # TODO: #AskCheng shouldn't we remove the old one? + subnets[subnet['name']] = subnet + + self.inv.set(network_document) + return EventResult(result=True, + related_object=subnet['id'], + display_context=network_id) diff --git a/app/discover/events/listeners/__init__.py b/app/discover/events/listeners/__init__.py new file mode 100644 index 0000000..1e85a2a --- /dev/null +++ b/app/discover/events/listeners/__init__.py @@ -0,0 +1,10 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### + diff --git a/app/discover/events/listeners/default_listener.py b/app/discover/events/listeners/default_listener.py new file mode 100755 index 0000000..a135673 --- /dev/null +++ b/app/discover/events/listeners/default_listener.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### + +import argparse +import datetime +import json +import os +import time +from collections import defaultdict +from typing import List + +from kombu import Connection, Queue, Exchange +from kombu.mixins import ConsumerMixin + +from discover.configuration import Configuration +from discover.event_handler import EventHandler +from discover.events.event_base import EventResult +from discover.events.event_metadata_parser import parse_metadata_file +from discover.events.listeners.listener_base import ListenerBase +from messages.message import Message +from monitoring.setup.monitoring_setup_manager import MonitoringSetupManager +from utils.constants import OperationalStatus, EnvironmentFeatures +from utils.inventory_mgr import InventoryMgr +from utils.logging.full_logger import FullLogger +from utils.mongo_access import MongoAccess +from utils.string_utils import stringify_datetime +from utils.util import SignalHandler, setup_args + + +class DefaultListener(ListenerBase, ConsumerMixin): + + SOURCE_SYSTEM = "OpenStack" + + COMMON_METADATA_FILE = "events.json" + + DEFAULTS = { + "env": "Mirantis-Liberty", + "mongo_config": "", + "metadata_file": "", + "inventory": "inventory", + "loglevel": "INFO", + "environments_collection": "environments_config", + "retry_limit": 10, + "consume_all": False + } + + def __init__(self, connection: Connection, + event_handler: EventHandler, + event_queues: List, + env_name: str = DEFAULTS["env"], + inventory_collection: str = DEFAULTS["inventory"], + retry_limit: int = DEFAULTS["retry_limit"], + consume_all: bool = DEFAULTS["consume_all"]): + super().__init__() + + self.connection = connection + self.retry_limit = retry_limit + self.env_name = env_name + self.consume_all = consume_all + self.handler = event_handler + self.event_queues = event_queues + self.failing_messages = defaultdict(int) + + self.inv = InventoryMgr() + self.inv.set_collections(inventory_collection) + if self.inv.is_feature_supported(self.env_name, EnvironmentFeatures.MONITORING): + self.inv.monitoring_setup_manager = \ + MonitoringSetupManager(self.env_name) + self.inv.monitoring_setup_manager.server_setup() + + def get_consumers(self, consumer, channel): + return [consumer(queues=self.event_queues, + accept=['json'], + callbacks=[self.process_task])] + + # Determines if message should be processed by a handler + # and extracts message body if yes. + @staticmethod + def _extract_event_data(body): + if "event_type" in body: + return True, body + elif "event_type" in body.get("oslo.message", ""): + return True, json.loads(body["oslo.message"]) + else: + return False, None + + def process_task(self, body, message): + received_timestamp = stringify_datetime(datetime.datetime.now()) + processable, event_data = self._extract_event_data(body) + # If env listener can't process the message + # or it's not intended for env listener to handle, + # leave the message in the queue unless "consume_all" flag is set + if processable and event_data["event_type"] in self.handler.handlers: + with open("/tmp/listener.log", "a") as f: + f.write("{}\n".format(event_data)) + event_result = self.handle_event(event_data["event_type"], + event_data) + finished_timestamp = stringify_datetime(datetime.datetime.now()) + self.save_message(message_body=event_data, + result=event_result, + started=received_timestamp, + finished=finished_timestamp) + + # Check whether the event was fully handled + # and, if not, whether it should be retried later + if event_result.result: + message.ack() + elif event_result.retry: + if 'message_id' not in event_data: + message.reject() + else: + # Track message retry count + message_id = event_data['message_id'] + self.failing_messages[message_id] += 1 + + # Retry handling the message + if self.failing_messages[message_id] <= self.retry_limit: + self.inv.log.info("Retrying handling message " + + "with id '{}'".format(message_id)) + message.requeue() + # Discard the message if it's not accepted + # after specified number of trials + else: + self.inv.log.warn("Discarding message with id '{}' ". + format(message_id) + + "as it's exceeded the retry limit") + message.reject() + del self.failing_messages[message_id] + else: + message.reject() + elif self.consume_all: + message.reject() + + # This method passes the event to its handler. + # Returns a (result, retry) tuple: + # 'Result' flag is True if handler has finished successfully, + # False otherwise + # 'Retry' flag specifies if the error is recoverable or not + # 'Retry' flag is checked only is 'result' is False + def handle_event(self, event_type: str, notification: dict) -> EventResult: + print("Got notification.\nEvent_type: {}\nNotification:\n{}". + format(event_type, notification)) + try: + result = self.handler.handle(event_name=event_type, + notification=notification) + return result if result else EventResult(result=False, retry=False) + except Exception as e: + self.inv.log.exception(e) + return EventResult(result=False, retry=False) + + def save_message(self, message_body: dict, result: EventResult, + started: str, finished: str): + try: + message = Message( + msg_id=message_body.get('message_id'), + env=self.env_name, + source=self.SOURCE_SYSTEM, + object_id=result.related_object, + display_context=result.display_context, + level=message_body.get('priority'), + msg=message_body, + ts=message_body.get('timestamp'), + received_ts=started, + finished_ts=finished + ) + self.inv.collections['messages'].insert_one(message.get()) + return True + except Exception as e: + self.inv.log.error("Failed to save message") + self.inv.log.exception(e) + return False + + @staticmethod + def listen(args: dict = None): + + args = setup_args(args, DefaultListener.DEFAULTS, get_args) + if 'process_vars' not in args: + args['process_vars'] = {} + + env_name = args["env"] + inventory_collection = args["inventory"] + + MongoAccess.set_config_file(args["mongo_config"]) + conf = Configuration(args["environments_collection"]) + conf.use_env(env_name) + + event_handler = EventHandler(env_name, inventory_collection) + event_queues = [] + + env_config = conf.get_env_config() + common_metadata_file = os.path.join(env_config.get('app_path', '/etc/calipso'), + 'config', + DefaultListener.COMMON_METADATA_FILE) + + # import common metadata + import_metadata(event_handler, event_queues, common_metadata_file) + + # import custom metadata if supplied + if args["metadata_file"]: + import_metadata(event_handler, event_queues, args["metadata_file"]) + + inv = InventoryMgr() + inv.set_collections(inventory_collection) + logger = FullLogger() + logger.set_loglevel(args["loglevel"]) + + amqp_config = conf.get("AMQP") + connect_url = 'amqp://{user}:{pwd}@{host}:{port}//' \ + .format(user=amqp_config["user"], + pwd=amqp_config["password"], + host=amqp_config["host"], + port=amqp_config["port"]) + + with Connection(connect_url) as conn: + try: + print(conn) + conn.connect() + args['process_vars']['operational'] = OperationalStatus.RUNNING + terminator = SignalHandler() + worker = \ + DefaultListener(connection=conn, + event_handler=event_handler, + event_queues=event_queues, + retry_limit=args["retry_limit"], + consume_all=args["consume_all"], + inventory_collection=inventory_collection, + env_name=env_name) + worker.run() + if terminator.terminated: + args.get('process_vars', {})['operational'] = \ + OperationalStatus.STOPPED + except KeyboardInterrupt: + print('Stopped') + args['process_vars']['operational'] = OperationalStatus.STOPPED + except Exception as e: + logger.log.exception(e) + args['process_vars']['operational'] = OperationalStatus.ERROR + finally: + # This should enable safe saving of shared variables + time.sleep(0.1) + + +def get_args(): + # Read listener config from command line args + parser = argparse.ArgumentParser() + parser.add_argument("-m", "--mongo_config", nargs="?", type=str, + default=DefaultListener.DEFAULTS["mongo_config"], + help="Name of config file with MongoDB access details") + parser.add_argument("--metadata_file", nargs="?", type=str, + default=DefaultListener.DEFAULTS["metadata_file"], + help="Name of custom configuration metadata file") + def_env_collection = DefaultListener.DEFAULTS["environments_collection"] + parser.add_argument("-c", "--environments_collection", nargs="?", type=str, + default=def_env_collection, + help="Name of collection where selected environment " + + "is taken from \n(default: {})" + .format(def_env_collection)) + parser.add_argument("-e", "--env", nargs="?", type=str, + default=DefaultListener.DEFAULTS["env"], + help="Name of target listener environment \n" + + "(default: {})" + .format(DefaultListener.DEFAULTS["env"])) + parser.add_argument("-y", "--inventory", nargs="?", type=str, + default=DefaultListener.DEFAULTS["inventory"], + help="Name of inventory collection \n"" +" + "(default: '{}')" + .format(DefaultListener.DEFAULTS["inventory"])) + parser.add_argument("-l", "--loglevel", nargs="?", type=str, + default=DefaultListener.DEFAULTS["loglevel"], + help="Logging level \n(default: '{}')" + .format(DefaultListener.DEFAULTS["loglevel"])) + parser.add_argument("-r", "--retry_limit", nargs="?", type=int, + default=DefaultListener.DEFAULTS["retry_limit"], + help="Maximum number of times the OpenStack message " + "should be requeued before being discarded \n" + + "(default: {})" + .format(DefaultListener.DEFAULTS["retry_limit"])) + parser.add_argument("--consume_all", action="store_true", + help="If this flag is set, " + + "environment listener will try to consume" + "all messages from OpenStack event queue " + "and reject incompatible messages." + "Otherwise they'll just be ignored.", + default=DefaultListener.DEFAULTS["consume_all"]) + args = parser.parse_args() + return args + + +# Imports metadata from file, +# updates event handler with new handlers +# and event queues with new queues +def import_metadata(event_handler: EventHandler, + event_queues: List[Queue], + metadata_file_path: str) -> None: + handlers_package, queues, event_handlers = \ + parse_metadata_file(metadata_file_path) + event_handler.discover_handlers(handlers_package, event_handlers) + event_queues.extend([ + Queue(q['queue'], + Exchange(q['exchange'], 'topic', durable=False), + durable=False, routing_key='#') for q in queues + ]) + + +if __name__ == '__main__': + DefaultListener.listen() diff --git a/app/discover/events/listeners/listener_base.py b/app/discover/events/listeners/listener_base.py new file mode 100644 index 0000000..7052dc9 --- /dev/null +++ b/app/discover/events/listeners/listener_base.py @@ -0,0 +1,18 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +from abc import ABC, abstractmethod + + +class ListenerBase(ABC): + + @staticmethod + @abstractmethod + def listen(self): + pass -- cgit 1.2.3-korg