aboutsummaryrefslogtreecommitdiffstats
path: root/app/discover/events
diff options
context:
space:
mode:
Diffstat (limited to 'app/discover/events')
-rw-r--r--app/discover/events/__init__.py10
-rw-r--r--app/discover/events/event_base.py36
-rw-r--r--app/discover/events/event_delete_base.py60
-rw-r--r--app/discover/events/event_instance_add.py45
-rw-r--r--app/discover/events/event_instance_delete.py18
-rw-r--r--app/discover/events/event_instance_update.py55
-rw-r--r--app/discover/events/event_interface_add.py139
-rw-r--r--app/discover/events/event_interface_delete.py40
-rw-r--r--app/discover/events/event_metadata_parser.py75
-rw-r--r--app/discover/events/event_network_add.py50
-rw-r--r--app/discover/events/event_network_delete.py17
-rw-r--r--app/discover/events/event_network_update.py44
-rw-r--r--app/discover/events/event_port_add.py309
-rw-r--r--app/discover/events/event_port_delete.py80
-rw-r--r--app/discover/events/event_port_update.py38
-rw-r--r--app/discover/events/event_router_add.py123
-rw-r--r--app/discover/events/event_router_delete.py37
-rw-r--r--app/discover/events/event_router_update.py82
-rw-r--r--app/discover/events/event_subnet_add.py154
-rw-r--r--app/discover/events/event_subnet_delete.py57
-rw-r--r--app/discover/events/event_subnet_update.py102
-rw-r--r--app/discover/events/listeners/__init__.py10
-rwxr-xr-xapp/discover/events/listeners/default_listener.py314
-rw-r--r--app/discover/events/listeners/listener_base.py18
24 files changed, 1913 insertions, 0 deletions
diff --git a/app/discover/events/__init__.py b/app/discover/events/__init__.py
new file mode 100644
index 0000000..1e85a2a
--- /dev/null
+++ b/app/discover/events/__init__.py
@@ -0,0 +1,10 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+
diff --git a/app/discover/events/event_base.py b/app/discover/events/event_base.py
new file mode 100644
index 0000000..6b3b290
--- /dev/null
+++ b/app/discover/events/event_base.py
@@ -0,0 +1,36 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from abc import abstractmethod, ABC
+
+from discover.fetcher import Fetcher
+from utils.inventory_mgr import InventoryMgr
+
+
+class EventResult:
+ def __init__(self,
+ result: bool, retry: bool = False, message: str = None,
+ related_object: str = None,
+ display_context: str = None):
+ self.result = result
+ self.retry = retry
+ self.message = message
+ self.related_object = related_object
+ self.display_context = display_context
+
+
+class EventBase(Fetcher, ABC):
+
+ def __init__(self):
+ super().__init__()
+ self.inv = InventoryMgr()
+
+ @abstractmethod
+ def handle(self, env, values) -> EventResult:
+ pass
diff --git a/app/discover/events/event_delete_base.py b/app/discover/events/event_delete_base.py
new file mode 100644
index 0000000..1cf94c3
--- /dev/null
+++ b/app/discover/events/event_delete_base.py
@@ -0,0 +1,60 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import re
+
+from bson.objectid import ObjectId
+
+from discover.clique_finder import CliqueFinder
+from discover.events.event_base import EventBase, EventResult
+
+
+class EventDeleteBase(EventBase):
+
+ def delete_handler(self, env, object_id, object_type) -> EventResult:
+ item = self.inv.get_by_id(env, object_id)
+ if not item:
+ self.log.info('{0} document is not found, aborting {0} delete'.format(object_type))
+ return EventResult(result=False, retry=False)
+
+ db_id = ObjectId(item['_id'])
+ id_path = item['id_path'] + '/'
+
+ # remove related clique
+ clique_finder = CliqueFinder()
+ self.inv.delete('cliques', {'focal_point': db_id})
+
+ # keep related links to do rebuild of cliques using them
+ matched_links_source = clique_finder.find_links_by_source(db_id)
+ matched_links_target = clique_finder.find_links_by_target(db_id)
+
+ links_using_object = []
+ links_using_object.extend([l['_id'] for l in matched_links_source])
+ links_using_object.extend([l['_id'] for l in matched_links_target])
+
+ # find cliques using these links
+ if links_using_object:
+ matched_cliques = clique_finder.find_cliques_by_link(links_using_object)
+ # find cliques using these links and rebuild them
+ for clique in matched_cliques:
+ clique_finder.rebuild_clique(clique)
+
+ # remove all related links
+ self.inv.delete('links', {'source': db_id})
+ self.inv.delete('links', {'target': db_id})
+
+ # remove object itself
+ self.inv.delete('inventory', {'_id': db_id})
+
+ # remove children
+ regexp = re.compile('^' + id_path)
+ self.inv.delete('inventory', {'id_path': {'$regex': regexp}})
+ return EventResult(result=True,
+ related_object=object_id,
+ display_context=object_id)
diff --git a/app/discover/events/event_instance_add.py b/app/discover/events/event_instance_add.py
new file mode 100644
index 0000000..4dd2b20
--- /dev/null
+++ b/app/discover/events/event_instance_add.py
@@ -0,0 +1,45 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventBase, EventResult
+from discover.scanner import Scanner
+
+
+class EventInstanceAdd(EventBase):
+
+ def handle(self, env, values):
+ # find the host, to serve as parent
+ instance_id = values['payload']['instance_id']
+ host_id = values['payload']['host']
+ instances_root_id = host_id + '-instances'
+ instances_root = self.inv.get_by_id(env, instances_root_id)
+ if not instances_root:
+ self.log.info('instances root not found, aborting instance add')
+ return EventResult(result=False, retry=True)
+
+ # scan instance
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan("ScanInstancesRoot", instances_root,
+ limit_to_child_id=instance_id,
+ limit_to_child_type='instance')
+ scanner.scan_from_queue()
+
+ # scan host
+ host = self.inv.get_by_id(env, host_id)
+ scanner.scan('ScanHost', host,
+ limit_to_child_type=['vconnectors_folder',
+ 'vedges_folder'])
+ scanner.scan_from_queue()
+ scanner.scan_links()
+ scanner.scan_cliques()
+
+ return EventResult(result=True,
+ related_object=instance_id,
+ display_context=instance_id)
diff --git a/app/discover/events/event_instance_delete.py b/app/discover/events/event_instance_delete.py
new file mode 100644
index 0000000..714d0c7
--- /dev/null
+++ b/app/discover/events/event_instance_delete.py
@@ -0,0 +1,18 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_delete_base import EventDeleteBase
+
+
+class EventInstanceDelete(EventDeleteBase):
+
+ def handle(self, env, values):
+ # find the corresponding object
+ instance_id = values['payload']['instance_id']
+ return self.delete_handler(env, instance_id, "instance")
diff --git a/app/discover/events/event_instance_update.py b/app/discover/events/event_instance_update.py
new file mode 100644
index 0000000..6231c30
--- /dev/null
+++ b/app/discover/events/event_instance_update.py
@@ -0,0 +1,55 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import re
+
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_instance_add import EventInstanceAdd
+from discover.events.event_instance_delete import EventInstanceDelete
+
+
+class EventInstanceUpdate(EventBase):
+
+ def handle(self, env, values):
+ # find the host, to serve as parent
+ payload = values['payload']
+ instance_id = payload['instance_id']
+ state = payload['state']
+ old_state = payload['old_state']
+
+ if state == 'building':
+ return EventResult(result=False, retry=False)
+
+ if state == 'active' and old_state == 'building':
+ return EventInstanceAdd().handle(env, values)
+
+ if state == 'deleted' and old_state == 'active':
+ return EventInstanceDelete().handle(env, values)
+
+ name = payload['display_name']
+ instance = self.inv.get_by_id(env, instance_id)
+ if not instance:
+ self.log.info('instance document not found, aborting instance update')
+ return EventResult(result=False, retry=True)
+
+ instance['name'] = name
+ instance['object_name'] = name
+ name_path = instance['name_path']
+ instance['name_path'] = name_path[:name_path.rindex('/') + 1] + name
+
+ # TBD: fix name_path for descendants
+ if name_path != instance['name_path']:
+ self.inv.values_replace({
+ "environment": env,
+ "name_path": {"$regex": r"^" + re.escape(name_path + '/')}},
+ {"name_path": {"from": name_path, "to": instance['name_path']}})
+ self.inv.set(instance)
+ return EventResult(result=True,
+ related_object=instance_id,
+ display_context=instance_id)
diff --git a/app/discover/events/event_interface_add.py b/app/discover/events/event_interface_add.py
new file mode 100644
index 0000000..a06ad14
--- /dev/null
+++ b/app/discover/events/event_interface_add.py
@@ -0,0 +1,139 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import time
+
+from functools import partial
+
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_port_add import EventPortAdd
+from discover.events.event_subnet_add import EventSubnetAdd
+from discover.fetchers.api.api_access import ApiAccess
+from discover.fetchers.api.api_fetch_regions import ApiFetchRegions
+from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice
+from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics
+from discover.scanner import Scanner
+from utils.util import decode_router_id, encode_router_id
+
+
+class EventInterfaceAdd(EventBase):
+
+ def __init__(self):
+ super().__init__()
+ self.delay = 2
+
+ def add_gateway_port(self, env, project, network_name, router_doc, host_id):
+ fetcher = CliFetchHostVservice()
+ fetcher.set_env(env)
+ router_id = router_doc['id']
+ router = fetcher.get_vservice(host_id, router_id)
+ device_id = decode_router_id(router_id)
+ router_doc['gw_port_id'] = router['gw_port_id']
+
+ # add gateway port documents.
+ port_doc = EventSubnetAdd().add_port_document(env, router_doc['gw_port_id'], project_name=project)
+
+ mac_address = port_doc['mac_address'] if port_doc else None
+
+ # add vnic document
+ host = self.inv.get_by_id(env, host_id)
+
+ add_vnic_document = partial(EventPortAdd().add_vnic_document,
+ env=env,
+ host=host,
+ object_id=device_id,
+ object_type='router',
+ network_name=network_name,
+ router_name=router_doc['name'],
+ mac_address=mac_address)
+
+ ret = add_vnic_document()
+ if not ret:
+ time.sleep(self.delay)
+ self.log.info("Wait %s second, and then fetch vnic document again." % self.delay)
+ add_vnic_document()
+
+ def update_router(self, env, project, network_id, network_name, router_doc, host_id):
+ if router_doc:
+ if 'network' in router_doc:
+ if network_id not in router_doc['network']:
+ router_doc['network'].append(network_id)
+ else:
+ router_doc['network'] = [network_id]
+
+ # if gw_port_id is None, add gateway port first.
+ if not router_doc.get('gw_port_id'):
+ self.add_gateway_port(env, project, network_name, router_doc, host_id)
+ else:
+ # check the gateway port document, add it if document does not exist.
+ port = self.inv.get_by_id(env, router_doc['gw_port_id'])
+ if not port:
+ self.add_gateway_port(env, project, network_name, router_doc, host_id)
+ self.inv.set(router_doc)
+ else:
+ self.log.info("router document not found, aborting interface adding")
+
+ def handle(self, env, values):
+ interface = values['payload']['router_interface']
+ project = values['_context_project_name']
+ host_id = values["publisher_id"].replace("network.", "", 1)
+ port_id = interface['port_id']
+ subnet_id = interface['subnet_id']
+ router_id = encode_router_id(host_id, interface['id'])
+
+ network_document = self.inv.get_by_field(env, "network", "subnet_ids", subnet_id, get_single=True)
+ if not network_document:
+ self.log.info("network document not found, aborting interface adding")
+ return EventResult(result=False, retry=True)
+ network_name = network_document['name']
+ network_id = network_document['id']
+
+ # add router-interface port document.
+ if len(ApiAccess.regions) == 0:
+ fetcher = ApiFetchRegions()
+ fetcher.set_env(env)
+ fetcher.get(None)
+ port_doc = EventSubnetAdd().add_port_document(env, port_id, network_name=network_name)
+
+ mac_address = port_doc['mac_address'] if port_doc else None
+
+ # add vnic document
+ host = self.inv.get_by_id(env, host_id)
+ router_doc = self.inv.get_by_id(env, router_id)
+
+ add_vnic_document = partial(EventPortAdd().add_vnic_document,
+ env=env,
+ host=host,
+ object_id=interface['id'],
+ object_type='router',
+ network_name=network_name,
+ router_name=router_doc['name'],
+ mac_address=mac_address)
+
+ ret = add_vnic_document()
+ if ret is False:
+ # try it again to fetch vnic document, vnic will be created a little bit late before CLI fetch.
+ time.sleep(self.delay)
+ self.log.info("Wait {} seconds, and then fetch vnic document again.".format(self.delay))
+ add_vnic_document()
+
+ # update the router document: gw_port_id, network.
+ self.update_router(env, project, network_id, network_name, router_doc, host_id)
+
+ # update vservice-vnic, vnic-network,
+ FindLinksForVserviceVnics().add_links(search={"parent_id": router_id})
+ scanner = Scanner()
+ scanner.set_env(env)
+
+ scanner.scan_cliques()
+ self.log.info("Finished router-interface added.")
+
+ return EventResult(result=True,
+ related_object=interface['id'],
+ display_context=network_id)
diff --git a/app/discover/events/event_interface_delete.py b/app/discover/events/event_interface_delete.py
new file mode 100644
index 0000000..b1df978
--- /dev/null
+++ b/app/discover/events/event_interface_delete.py
@@ -0,0 +1,40 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventResult
+from discover.events.event_delete_base import EventDeleteBase
+from discover.events.event_port_delete import EventPortDelete
+from utils.util import encode_router_id
+
+
+class EventInterfaceDelete(EventDeleteBase):
+
+ def handle(self, env, values):
+ interface = values['payload']['router_interface']
+ port_id = interface['port_id']
+ host_id = values["publisher_id"].replace("network.", "", 1)
+ router_id = encode_router_id(host_id, interface['id'])
+
+ # update router document
+ port_doc = self.inv.get_by_id(env, port_id)
+ if not port_doc:
+ self.log.info("Interface deleting handler: port document not found.")
+ return EventResult(result=False, retry=False)
+ network_id = port_doc['network_id']
+
+ router_doc = self.inv.get_by_id(env, router_id)
+ if router_doc and network_id in router_doc.get('network', []):
+ router_doc['network'].remove(network_id)
+ self.inv.set(router_doc)
+
+ # delete port document
+ result = EventPortDelete().delete_port(env, port_id)
+ result.related_object = interface['id']
+ result.display_context = network_id
+ return result
diff --git a/app/discover/events/event_metadata_parser.py b/app/discover/events/event_metadata_parser.py
new file mode 100644
index 0000000..5d09376
--- /dev/null
+++ b/app/discover/events/event_metadata_parser.py
@@ -0,0 +1,75 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from typing import List, Tuple
+
+from utils.metadata_parser import MetadataParser
+
+
+class EventMetadataParser(MetadataParser):
+
+ HANDLERS_PACKAGE = 'handlers_package'
+ QUEUES = 'queues'
+ EVENT_HANDLERS = 'event_handlers'
+
+ REQUIRED_EXPORTS = [HANDLERS_PACKAGE, EVENT_HANDLERS]
+
+ def __init__(self):
+ super().__init__()
+ self.handlers_package = None
+ self.queues = []
+ self.event_handlers = []
+
+ def get_required_fields(self) -> list:
+ return self.REQUIRED_EXPORTS
+
+ def validate_metadata(self, metadata: dict) -> bool:
+ super().validate_metadata(metadata)
+
+ package = metadata.get(self.HANDLERS_PACKAGE)
+ if not package or not isinstance(package, str):
+ self.add_error("Handlers package '{}' is invalid".format(package))
+
+ event_handlers = metadata.get(self.EVENT_HANDLERS)
+ if not event_handlers or not isinstance(event_handlers, dict):
+ self.add_error("Event handlers attribute is invalid or empty"
+ "(should be a non-empty dict)")
+
+ return len(self.errors) == 0
+
+ def _finalize_parsing(self, metadata):
+ handlers_package = metadata[self.HANDLERS_PACKAGE]
+ queues = metadata.get(self.QUEUES, None)
+ event_handlers = metadata[self.EVENT_HANDLERS]
+
+ # Convert variables to EventHandler-friendly format
+ self.handlers_package = handlers_package
+
+ try:
+ if queues and isinstance(queues, list):
+ self.queues = [{"queue": q["queue"],
+ "exchange": q["exchange"]}
+ for q in queues]
+ except KeyError:
+ self.add_error("Queues variable has invalid format")
+ return
+
+ self.event_handlers = event_handlers
+
+ def parse_metadata_file(self, file_path: str) -> dict:
+ metadata = super().parse_metadata_file(file_path)
+ self._finalize_parsing(metadata)
+ super().check_errors()
+ return metadata
+
+
+def parse_metadata_file(file_path: str):
+ parser = EventMetadataParser()
+ parser.parse_metadata_file(file_path)
+ return parser.handlers_package, parser.queues, parser.event_handlers
diff --git a/app/discover/events/event_network_add.py b/app/discover/events/event_network_add.py
new file mode 100644
index 0000000..41fafd4
--- /dev/null
+++ b/app/discover/events/event_network_add.py
@@ -0,0 +1,50 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventBase, EventResult
+
+
+class EventNetworkAdd(EventBase):
+
+ def handle(self, env, notification):
+ network = notification['payload']['network']
+ network_id = network['id']
+ network_document = self.inv.get_by_id(env, network_id)
+ if network_document:
+ self.log.info('network already existed, aborting network add')
+ return EventResult(result=False, retry=False)
+
+ # build network document for adding network
+ project_name = notification['_context_project_name']
+ project_id = notification['_context_project_id']
+ parent_id = project_id + '-networks'
+ network_name = network['name']
+
+ network['environment'] = env
+ network['type'] = 'network'
+ network['id_path'] = "/%s/%s-projects/%s/%s/%s" \
+ % (env, env, project_id, parent_id, network_id)
+ network['cidrs'] = []
+ network['subnet_ids'] = []
+ network['last_scanned'] = notification['timestamp']
+ network['name_path'] = "/%s/Projects/%s/Networks/%s" \
+ % (env, project_name, network_name)
+ network['network'] = network_id
+ network['object_name'] = network_name
+ network['parent_id'] = parent_id
+ network['parent_text'] = "Networks"
+ network['parent_type'] = "networks_folder"
+ network['project'] = project_name
+ network["show_in_tree"] = True
+ network['subnets'] = {}
+
+ self.inv.set(network)
+ return EventResult(result=True,
+ related_object=network_id,
+ display_context=network_id)
diff --git a/app/discover/events/event_network_delete.py b/app/discover/events/event_network_delete.py
new file mode 100644
index 0000000..b3277da
--- /dev/null
+++ b/app/discover/events/event_network_delete.py
@@ -0,0 +1,17 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_delete_base import EventDeleteBase
+
+
+class EventNetworkDelete(EventDeleteBase):
+
+ def handle(self, env, notification):
+ network_id = notification['payload']['network_id']
+ return self.delete_handler(env, network_id, "network")
diff --git a/app/discover/events/event_network_update.py b/app/discover/events/event_network_update.py
new file mode 100644
index 0000000..3e1432e
--- /dev/null
+++ b/app/discover/events/event_network_update.py
@@ -0,0 +1,44 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import re
+
+from discover.events.event_base import EventBase, EventResult
+
+
+class EventNetworkUpdate(EventBase):
+
+ def handle(self, env, notification):
+ network = notification['payload']['network']
+ network_id = network['id']
+
+ network_document = self.inv.get_by_id(env, network_id)
+ if not network_document:
+ self.log.info('Network document not found, aborting network update')
+ return EventResult(result=False, retry=True)
+
+ # update network document
+ name = network['name']
+ if name != network_document['name']:
+ network_document['name'] = name
+ network_document['object_name'] = name
+
+ name_path = network_document['name_path']
+ network_document['name_path'] = name_path[:name_path.rindex('/') + 1] + name
+
+ # TBD: fix name_path for descendants
+ self.inv.values_replace({"environment": env,
+ "name_path": {"$regex": r"^" + re.escape(name_path + '/')}},
+ {"name_path": {"from": name_path, "to": network_document['name_path']}})
+
+ network_document['admin_state_up'] = network['admin_state_up']
+ self.inv.set(network_document)
+ return EventResult(result=True,
+ related_object=network_id,
+ display_context=network_id)
diff --git a/app/discover/events/event_port_add.py b/app/discover/events/event_port_add.py
new file mode 100644
index 0000000..63a5e80
--- /dev/null
+++ b/app/discover/events/event_port_add.py
@@ -0,0 +1,309 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import datetime
+
+from discover.events.event_base import EventBase, EventResult
+from discover.fetchers.api.api_fetch_host_instances import ApiFetchHostInstances
+from discover.fetchers.cli.cli_fetch_instance_vnics import CliFetchInstanceVnics
+from discover.fetchers.cli.cli_fetch_instance_vnics_vpp import CliFetchInstanceVnicsVpp
+from discover.fetchers.cli.cli_fetch_vservice_vnics import CliFetchVserviceVnics
+from discover.find_links_for_instance_vnics import FindLinksForInstanceVnics
+from discover.find_links_for_vedges import FindLinksForVedges
+from discover.scanner import Scanner
+
+
+class EventPortAdd(EventBase):
+
+ def get_name_by_id(self, object_id):
+ item = self.inv.get_by_id(self.env, object_id)
+ if item:
+ return item['name']
+ return None
+
+ def add_port_document(self, env, project_name, project_id, network_name, network_id, port):
+ # add other data for port document
+ port['type'] = 'port'
+ port['environment'] = env
+
+ port['parent_id'] = port['network_id'] + '-ports'
+ port['parent_text'] = 'Ports'
+ port['parent_type'] = 'ports_folder'
+
+ port['name'] = port['mac_address']
+ port['object'] = port['name']
+ port['project'] = project_name
+
+ port['id_path'] = "{}/{}-projects/{}/{}-networks/{}/{}-ports/{}" \
+ .format(env, env,
+ project_id, project_id,
+ network_id, network_id, port['id'])
+ port['name_path'] = "/{}/Projects/{}/Networks/{}/Ports/{}" \
+ .format(env, project_name, network_name, port['id'])
+
+ port['show_in_tree'] = True
+ port['last_scanned'] = datetime.datetime.utcnow()
+ self.inv.set(port)
+ self.log.info("add port document for port: {}".format(port['id']))
+
+ def add_ports_folder(self, env, project_id, network_id, network_name):
+ port_folder = {
+ "id": network_id + "-ports",
+ "create_object": True,
+ "name": "Ports",
+ "text": "Ports",
+ "type": "ports_folder",
+ "parent_id": network_id,
+ "parent_type": "network",
+ 'environment': env,
+ 'id_path': "{}/{}-projects/{}/{}-networks/{}/{}-ports/"
+ .format(env, env, project_id, project_id,
+ network_id, network_id),
+ 'name_path': "/{}/Projects/{}/Networks/{}/Ports"
+ .format(env, project_id, network_name),
+ "show_in_tree": True,
+ "last_scanned": datetime.datetime.utcnow(),
+ "object_name": "Ports",
+ }
+ self.inv.set(port_folder)
+ self.log.info("add ports_folder document for network: {}.".format(network_id))
+
+ def add_network_services_folder(self, env, project_id, network_id, network_name):
+ network_services_folder = {
+ "create_object": True,
+ "environment": env,
+ "id": network_id + "-network_services",
+ "id_path": "{}/{}-projects/{}/{}-networks/{}/{}-network_services/"
+ .format(env, env, project_id, project_id,
+ network_id, network_id),
+ "last_scanned": datetime.datetime.utcnow(),
+ "name": "Network vServices",
+ "name_path": "/{}/Projects/{}/Networks/{}/Network vServices"
+ .format(env, project_id, network_name),
+ "object_name": "Network vServices",
+ "parent_id": network_id,
+ "parent_type": "network",
+ "show_in_tree": True,
+ "text": "Network vServices",
+ "type": "network_services_folder"
+ }
+ self.inv.set(network_services_folder)
+ self.log.info("add network services folder for network:{}".format(network_id))
+
+ def add_dhcp_document(self, env, host, network_id, network_name):
+ dhcp_document = {
+ "environment": env,
+ "host": host['id'],
+ "id": "qdhcp-" + network_id,
+ "id_path": "{}/{}-vservices/{}-vservices-dhcps/qdhcp-{}"
+ .format(host['id_path'], host['id'],
+ host['id'], network_id),
+ "last_scanned": datetime.datetime.utcnow(),
+ "local_service_id": "qdhcp-" + network_id,
+ "name": "dhcp-" + network_name,
+ "name_path": host['name_path'] + "/Vservices/DHCP servers/dhcp-" + network_name,
+ "network": [network_id],
+ "object_name": "dhcp-" + network_name,
+ "parent_id": host['id'] + "-vservices-dhcps",
+ "parent_text": "DHCP servers",
+ "parent_type": "vservice_dhcps_folder",
+ "service_type": "dhcp",
+ "show_in_tree": True,
+ "type": "vservice"
+ }
+ self.inv.set(dhcp_document)
+ self.log.info("add DHCP document for network: {}.".format(network_id))
+
+ # This method has dynamic usages, take caution when changing its signature
+ def add_vnics_folder(self,
+ env, host,
+ object_id, network_name='',
+ object_type="dhcp", router_name=''):
+ # when vservice is DHCP, id = network_id,
+ # when vservice is router, id = router_id
+ type_map = {"dhcp": ('DHCP servers', 'dhcp-' + network_name),
+ "router": ('Gateways', router_name)}
+
+ vnics_folder = {
+ "environment": env,
+ "id": "q{}-{}-vnics".format(object_type, object_id),
+ "id_path": "{}/{}-vservices/{}-vservices-{}s/q{}-{}/q{}-{}-vnics"
+ .format(host['id_path'], host['id'], host['id'],
+ object_type, object_type, object_id,
+ object_type, object_id),
+ "last_scanned": datetime.datetime.utcnow(),
+ "name": "q{}-{}-vnics".format(object_type, object_id),
+ "name_path": "{}/Vservices/{}/{}/vNICs"
+ .format(host['name_path'],
+ type_map[object_type][0],
+ type_map[object_type][1]),
+ "object_name": "vNICs",
+ "parent_id": "q{}-{}".format(object_type, object_id),
+ "parent_type": "vservice",
+ "show_in_tree": True,
+ "text": "vNICs",
+ "type": "vnics_folder"
+ }
+ self.inv.set(vnics_folder)
+ self.log.info("add vnics_folder document for q{}-{}-vnics"
+ .format(object_type, object_id))
+
+ # This method has dynamic usages, take caution when changing its signature
+ def add_vnic_document(self,
+ env, host,
+ object_id, network_name='',
+ object_type='dhcp', router_name='',
+ mac_address=None):
+ # when vservice is DHCP, id = network_id,
+ # when vservice is router, id = router_id
+ type_map = {"dhcp": ('DHCP servers', 'dhcp-' + network_name),
+ "router": ('Gateways', router_name)}
+
+ fetcher = CliFetchVserviceVnics()
+ fetcher.set_env(env)
+ namespace = 'q{}-{}'.format(object_type, object_id)
+ vnic_documents = fetcher.handle_service(host['id'], namespace, enable_cache=False)
+ if not vnic_documents:
+ self.log.info("Vnic document not found in namespace.")
+ return False
+
+ if mac_address is not None:
+ for doc in vnic_documents:
+ if doc['mac_address'] == mac_address:
+ # add a specific vnic document.
+ doc["environment"] = env
+ doc["id_path"] = "{}/{}-vservices/{}-vservices-{}s/{}/{}-vnics/{}"\
+ .format(host['id_path'], host['id'],
+ host['id'], object_type, namespace,
+ namespace, doc["id"])
+ doc["name_path"] = "{}/Vservices/{}/{}/vNICs/{}" \
+ .format(host['name_path'],
+ type_map[object_type][0],
+ type_map[object_type][1],
+ doc["id"])
+ self.inv.set(doc)
+ self.log.info("add vnic document with mac_address: {}."
+ .format(mac_address))
+ return True
+
+ self.log.info("Can not find vnic document by mac_address: {}"
+ .format(mac_address))
+ return False
+ else:
+ for doc in vnic_documents:
+ # add all vnic documents.
+ doc["environment"] = env
+ doc["id_path"] = "{}/{}-vservices/{}-vservices-{}s/{}/{}-vnics/{}" \
+ .format(host['id_path'], host['id'],
+ host['id'], object_type,
+ namespace, namespace, doc["id"])
+ doc["name_path"] = "{}/Vservices/{}/{}/vNICs/{}" \
+ .format(host['name_path'],
+ type_map[object_type][0],
+ type_map[object_type][1],
+ doc["id"])
+ self.inv.set(doc)
+ self.log.info("add vnic document with mac_address: {}."
+ .format(doc["mac_address"]))
+ return True
+
+ def handle_dhcp_device(self, env, notification, network_id, network_name, mac_address=None):
+ # add dhcp vservice document.
+ host_id = notification["publisher_id"].replace("network.", "", 1)
+ host = self.inv.get_by_id(env, host_id)
+
+ self.add_dhcp_document(env, host, network_id, network_name)
+
+ # add vnics folder.
+ self.add_vnics_folder(env, host, network_id, network_name)
+
+ # add vnic document.
+ self.add_vnic_document(env, host, network_id, network_name, mac_address=mac_address)
+
+ def handle(self, env, notification):
+ project = notification['_context_project_name']
+ project_id = notification['_context_project_id']
+ payload = notification['payload']
+ port = payload['port']
+ network_id = port['network_id']
+ network_name = self.get_name_by_id(network_id)
+ mac_address = port['mac_address']
+
+ # check ports folder document.
+ ports_folder = self.inv.get_by_id(env, network_id + '-ports')
+ if not ports_folder:
+ self.log.info("ports folder not found, add ports folder first.")
+ self.add_ports_folder(env, project_id, network_id, network_name)
+ self.add_port_document(env, project, project_id, network_name, network_id, port)
+
+ # update the port related documents.
+ if 'compute' in port['device_owner']:
+ # update the instance related document.
+ host_id = port['binding:host_id']
+ instance_id = port['device_id']
+ old_instance_doc = self.inv.get_by_id(env, instance_id)
+ instances_root_id = host_id + '-instances'
+ instances_root = self.inv.get_by_id(env, instances_root_id)
+ if not instances_root:
+ self.log.info('instance document not found, aborting port adding')
+ return EventResult(result=False, retry=True)
+
+ # update instance
+ instance_fetcher = ApiFetchHostInstances()
+ instance_fetcher.set_env(env)
+ instance_docs = instance_fetcher.get(host_id + '-')
+ instance = next(filter(lambda i: i['id'] == instance_id, instance_docs), None)
+
+ if instance:
+ old_instance_doc['network_info'] = instance['network_info']
+ old_instance_doc['network'] = instance['network']
+ if old_instance_doc.get('mac_address') is None:
+ old_instance_doc['mac_address'] = mac_address
+
+ self.inv.set(old_instance_doc)
+ self.log.info("update instance document")
+
+ # add vnic document.
+ if port['binding:vif_type'] == 'vpp':
+ vnic_fetcher = CliFetchInstanceVnicsVpp()
+ else:
+ # set ovs as default type.
+ vnic_fetcher = CliFetchInstanceVnics()
+
+ vnic_fetcher.set_env(env)
+ vnic_docs = vnic_fetcher.get(instance_id + '-')
+ vnic = next(filter(lambda vnic: vnic['mac_address'] == mac_address, vnic_docs), None)
+
+ if vnic:
+ vnic['environment'] = env
+ vnic['type'] = 'vnic'
+ vnic['name_path'] = old_instance_doc['name_path'] + '/vNICs/' + vnic['name']
+ vnic['id_path'] = '{}/{}/{}'.format(old_instance_doc['id_path'],
+ old_instance_doc['id'],
+ vnic['name'])
+ self.inv.set(vnic)
+ self.log.info("add instance-vnic document, mac_address: {}"
+ .format(mac_address))
+
+ self.log.info("scanning for links")
+ fetchers_implementing_add_links = [FindLinksForInstanceVnics(), FindLinksForVedges()]
+ for fetcher in fetchers_implementing_add_links:
+ fetcher.add_links()
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+
+ port_document = self.inv.get_by_id(env, port['id'])
+ if not port_document:
+ self.log.error("Port {} failed to add".format(port['id']))
+ return EventResult(result=False, retry=True)
+
+ return EventResult(result=True,
+ related_object=port['id'],
+ display_context=network_id)
diff --git a/app/discover/events/event_port_delete.py b/app/discover/events/event_port_delete.py
new file mode 100644
index 0000000..1e55870
--- /dev/null
+++ b/app/discover/events/event_port_delete.py
@@ -0,0 +1,80 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventResult
+from discover.events.event_delete_base import EventDeleteBase
+from discover.fetchers.api.api_fetch_host_instances import ApiFetchHostInstances
+
+
+class EventPortDelete(EventDeleteBase):
+
+ def delete_port(self, env, port_id):
+ port_doc = self.inv.get_by_id(env, port_id)
+ if not port_doc:
+ self.log.info("Port document not found, aborting port deleting.")
+ return EventResult(result=False, retry=False)
+
+ # if port is binding to a instance, instance document needs to be updated.
+ if 'compute' in port_doc['device_owner']:
+ self.log.info("update instance document to which port is binding.")
+ self.update_instance(env, port_doc)
+
+ # delete port document
+ self.inv.delete('inventory', {'id': port_id})
+
+ # delete vnic and related document
+ vnic_doc = self.inv.get_by_field(env, 'vnic', 'mac_address', port_doc['mac_address'], get_single=True)
+ if not vnic_doc:
+ self.log.info("Vnic document not found, aborting vnic deleting.")
+ return EventResult(result=False, retry=False)
+
+ result = self.delete_handler(env, vnic_doc['id'], 'vnic')
+ result.related_object = port_id
+ result.display_context = port_doc.get('network_id')
+ self.log.info('Finished port deleting')
+ return result
+
+ def update_instance(self, env, port_doc):
+ # update instance document if port
+ network_id = port_doc['network_id']
+ instance_doc = self.inv.get_by_field(env, 'instance', 'network_info.id', port_doc['id'], get_single=True)
+ if instance_doc:
+ port_num = 0
+
+ for port in instance_doc['network_info']:
+ if port['network']['id'] == network_id:
+ port_num += 1
+ if port['id'] == port_doc['id']:
+ instance_doc['network_info'].remove(port)
+ self.log.info("update network information of instance document.")
+
+ if port_num == 1:
+ # remove network information only when last port in network will be deleted.
+ instance_doc['network'].remove(network_id)
+
+ # update instance mac address.
+ if port_doc['mac_address'] == instance_doc['mac_address']:
+ instance_fetcher = ApiFetchHostInstances()
+ instance_fetcher.set_env(env)
+ host_id = port_doc['binding:host_id']
+ instance_id = port_doc['device_id']
+ instance_docs = instance_fetcher.get(host_id + '-')
+ instance = next(filter(lambda i: i['id'] == instance_id, instance_docs), None)
+ if instance:
+ if 'mac_address' not in instance:
+ instance_doc['mac_address'] = None
+ self.log.info("update mac_address:%s of instance document." % instance_doc['mac_address'])
+
+ self.inv.set(instance_doc)
+ else:
+ self.log.info("No instance document binding to network:%s." % network_id)
+
+ def handle(self, env, notification):
+ port_id = notification['payload']['port_id']
+ return self.delete_port(env, port_id)
diff --git a/app/discover/events/event_port_update.py b/app/discover/events/event_port_update.py
new file mode 100644
index 0000000..298b565
--- /dev/null
+++ b/app/discover/events/event_port_update.py
@@ -0,0 +1,38 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventBase, EventResult
+
+
+class EventPortUpdate(EventBase):
+
+ def handle(self, env, notification):
+ # check port document.
+ port = notification['payload']['port']
+ port_id = port['id']
+ port_document = self.inv.get_by_id(env, port_id)
+ if not port_document:
+ self.log.info('port document does not exist, aborting port update')
+ return EventResult(result=False, retry=True)
+
+ # build port document
+ port_document['name'] = port['name']
+ port_document['admin_state_up'] = port['admin_state_up']
+ if port_document['admin_state_up']:
+ port_document['status'] = 'ACTIVE'
+ else:
+ port_document['status'] = 'DOWN'
+
+ port_document['binding:vnic_type'] = port['binding:vnic_type']
+
+ # update port document.
+ self.inv.set(port_document)
+ return EventResult(result=True,
+ related_object=port_id,
+ display_context=port_document.get('network_id'))
diff --git a/app/discover/events/event_router_add.py b/app/discover/events/event_router_add.py
new file mode 100644
index 0000000..20e07e5
--- /dev/null
+++ b/app/discover/events/event_router_add.py
@@ -0,0 +1,123 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import datetime
+
+from functools import partial
+
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_port_add import EventPortAdd
+from discover.events.event_subnet_add import EventSubnetAdd
+from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice
+from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics
+from discover.scanner import Scanner
+from utils.util import decode_router_id, encode_router_id
+
+
+class EventRouterAdd(EventBase):
+
+ def add_router_document(self, env, network_id, router_doc, host):
+ router_doc["environment"] = env
+ router_doc["id_path"] = "{}/{}-vservices/{}-vservices-routers/{}"\
+ .format(host['id_path'], host['id'],
+ host['id'], router_doc['id'])
+ router_doc['last_scanned'] = datetime.datetime.utcnow()
+ router_doc['name_path'] = "{}/Vservices/Gateways/{}"\
+ .format(host['name_path'],
+ router_doc['name'])
+ router_doc['network'] = []
+ if network_id:
+ router_doc['network'] = [network_id]
+
+ router_doc['object_name'] = router_doc['name']
+ router_doc['parent_id'] = host['id'] + "-vservices-routers"
+ router_doc['show_in_tree'] = True
+ router_doc['type'] = "vservice"
+
+ self.inv.set(router_doc)
+
+ def add_children_documents(self, env, project_id, network_id, host, router_doc):
+
+ network_document = self.inv.get_by_id(env, network_id)
+ network_name = network_document['name']
+ router_id = decode_router_id(router_doc['id'])
+
+ # add port for binding to vservice:router
+ subnet_handler = EventSubnetAdd()
+ ports_folder = self.inv.get_by_id(env, network_id + '-ports')
+ if not ports_folder:
+ self.log.info("Ports_folder not found.")
+ subnet_handler.add_ports_folder(env, project_id, network_id, network_name)
+ add_port_return = subnet_handler.add_port_document(env,
+ router_doc['gw_port_id'],
+ network_name=network_name)
+
+ # add vnics folder and vnic document
+ port_handler = EventPortAdd()
+ add_vnic_folder = partial(port_handler.add_vnics_folder,
+ env=env,
+ host=host,
+ object_id=router_id,
+ object_type='router',
+ network_name=network_name,
+ router_name=router_doc['name'])
+ add_vnic_document = partial(port_handler.add_vnic_document,
+ env=env,
+ host=host,
+ object_id=router_id,
+ object_type='router',
+ network_name=network_name,
+ router_name=router_doc['name'])
+
+ add_vnic_folder()
+ if add_port_return:
+ add_vnic_return = add_vnic_document()
+ if not add_vnic_return:
+ self.log.info("Try to add vnic document again.")
+ add_vnic_document()
+ else:
+ # in some cases, port has been created,
+ # but port doc cannot be fetched by OpenStack API
+ self.log.info("Try to add port document again.")
+ # TODO: #AskCheng - this never returns anything!
+ add_port_return = add_vnic_folder()
+ # TODO: #AskCheng - this will never evaluate to True!
+ if add_port_return is False:
+ self.log.info("Try to add vnic document again.")
+ add_vnic_document()
+
+ def handle(self, env, values):
+ router = values['payload']['router']
+ host_id = values["publisher_id"].replace("network.", "", 1)
+ project_id = values['_context_project_id']
+ router_id = encode_router_id(host_id, router['id'])
+ host = self.inv.get_by_id(env, host_id)
+
+ fetcher = CliFetchHostVservice()
+ fetcher.set_env(env)
+ router_doc = fetcher.get_vservice(host_id, router_id)
+ gateway_info = router['external_gateway_info']
+
+ if gateway_info:
+ network_id = gateway_info['network_id']
+ self.add_router_document(env, network_id, router_doc, host)
+ self.add_children_documents(env, project_id, network_id, host, router_doc)
+ else:
+ self.add_router_document(env, None, router_doc, host)
+
+ # scan links and cliques
+ FindLinksForVserviceVnics().add_links(search={"parent_id": router_id})
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+ self.log.info("Finished router added.")
+
+ return EventResult(result=True,
+ related_object=router_id,
+ display_context=router_id)
diff --git a/app/discover/events/event_router_delete.py b/app/discover/events/event_router_delete.py
new file mode 100644
index 0000000..65072d6
--- /dev/null
+++ b/app/discover/events/event_router_delete.py
@@ -0,0 +1,37 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventResult
+from discover.events.event_delete_base import EventDeleteBase
+from utils.util import encode_router_id
+
+
+class EventRouterDelete(EventDeleteBase):
+
+ def handle(self, env, values):
+ payload = values['payload']
+
+ if 'publisher_id' not in values:
+ self.log.error("Publisher_id is not in event values. Aborting router delete")
+ return EventResult(result=False, retry=False)
+
+ host_id = values['publisher_id'].replace('network.', '', 1)
+ if 'router_id' in payload:
+ router_id = payload['router_id']
+ elif 'id' in payload:
+ router_id = payload['id']
+ else:
+ router_id = payload.get('router', {}).get('id')
+
+ if not router_id:
+ self.log.error("Router id is not in payload. Aborting router delete")
+ return EventResult(result=False, retry=False)
+
+ router_full_id = encode_router_id(host_id, router_id)
+ return self.delete_handler(env, router_full_id, "vservice")
diff --git a/app/discover/events/event_router_update.py b/app/discover/events/event_router_update.py
new file mode 100644
index 0000000..8dd53f0
--- /dev/null
+++ b/app/discover/events/event_router_update.py
@@ -0,0 +1,82 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_port_delete import EventPortDelete
+from discover.events.event_router_add import EventRouterAdd
+from discover.fetchers.cli.cli_fetch_host_vservice import CliFetchHostVservice
+from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics
+from discover.scanner import Scanner
+from utils.util import encode_router_id
+
+
+class EventRouterUpdate(EventBase):
+
+ def handle(self, env, values):
+ payload = values['payload']
+ router = payload['router']
+
+ project_id = values['_context_project_id']
+ host_id = values["publisher_id"].replace("network.", "", 1)
+ router_id = payload['id'] if 'id' in payload else router['id']
+
+ router_full_id = encode_router_id(host_id, router_id)
+ router_doc = self.inv.get_by_id(env, router_full_id)
+ if not router_doc:
+ self.log.info("Router document not found, aborting router updating")
+ return EventResult(result=False, retry=True)
+
+ router_doc['admin_state_up'] = router['admin_state_up']
+ router_doc['name'] = router['name']
+ gateway_info = router.get('external_gateway_info')
+ if gateway_info is None:
+ # when delete gateway, need to delete the port relate document.
+ port_doc = {}
+ if router_doc.get('gw_port_id'):
+ port_doc = self.inv.get_by_id(env, router_doc['gw_port_id'])
+ EventPortDelete().delete_port(env, router_doc['gw_port_id'])
+
+ if router_doc.get('network'):
+ if port_doc:
+ router_doc['network'].remove(port_doc['network_id'])
+ router_doc['gw_port_id'] = None
+
+ # remove related links
+ self.inv.delete('links', {'source_id': router_full_id})
+ else:
+ if 'network' in router_doc:
+ if gateway_info['network_id'] not in router_doc['network']:
+ router_doc['network'].append(gateway_info['network_id'])
+ else:
+ router_doc['network'] = [gateway_info['network_id']]
+ # update static route
+ router_doc['routes'] = router['routes']
+
+ # add gw_port_id info and port document.
+ fetcher = CliFetchHostVservice()
+ fetcher.set_env(env)
+ router_vservice = fetcher.get_vservice(host_id, router_full_id)
+ if router_vservice.get('gw_port_id'):
+ router_doc['gw_port_id'] = router_vservice['gw_port_id']
+
+ host = self.inv.get_by_id(env, host_id)
+ EventRouterAdd().add_children_documents(env, project_id, gateway_info['network_id'], host, router_doc)
+
+ # rescan the vnic links.
+ FindLinksForVserviceVnics().add_links(search={'parent_id': router_full_id + '-vnics'})
+ self.inv.set(router_doc)
+
+ # update the cliques.
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+ self.log.info("Finished router update.")
+ return EventResult(result=True,
+ related_object=router_full_id,
+ display_context=router_full_id)
diff --git a/app/discover/events/event_subnet_add.py b/app/discover/events/event_subnet_add.py
new file mode 100644
index 0000000..b519b1c
--- /dev/null
+++ b/app/discover/events/event_subnet_add.py
@@ -0,0 +1,154 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import datetime
+
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_port_add import EventPortAdd
+from discover.fetchers.api.api_access import ApiAccess
+from discover.fetchers.api.api_fetch_port import ApiFetchPort
+from discover.fetchers.api.api_fetch_regions import ApiFetchRegions
+from discover.fetchers.db.db_fetch_port import DbFetchPort
+from discover.find_links_for_pnics import FindLinksForPnics
+from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics
+from discover.scanner import Scanner
+
+
+class EventSubnetAdd(EventBase):
+
+ def add_port_document(self, env, port_id, network_name=None, project_name=''):
+ # when add router-interface port, network_name need to be given to enhance efficiency.
+ # when add gateway port, project_name need to be specified, cause this type of port
+ # document does not has project attribute. In this case, network_name should not be provided.
+
+ fetcher = ApiFetchPort()
+ fetcher.set_env(env)
+ ports = fetcher.get(port_id)
+
+ if ports:
+ port = ports[0]
+ project_id = port['tenant_id']
+ network_id = port['network_id']
+
+ if not network_name:
+ network = self.inv.get_by_id(env, network_id)
+ network_name = network['name']
+
+ port['type'] = "port"
+ port['environment'] = env
+ port_id = port['id']
+ port['id_path'] = "%s/%s-projects/%s/%s-networks/%s/%s-ports/%s" % \
+ (env, env, project_id, project_id, network_id, network_id, port_id)
+ port['last_scanned'] = datetime.datetime.utcnow()
+ if 'project' in port:
+ project_name = port['project']
+ port['name_path'] = "/%s/Projects/%s/Networks/%s/Ports/%s" % \
+ (env, project_name, network_name, port_id)
+ self.inv.set(port)
+ self.log.info("add port document for port:%s" % port_id)
+ return port
+ return False
+
+ def add_ports_folder(self, env, project_id, network_id, network_name):
+ port_folder = {
+ "id": network_id + "-ports",
+ "create_object": True,
+ "name": "Ports",
+ "text": "Ports",
+ "type": "ports_folder",
+ "parent_id": network_id,
+ "parent_type": "network",
+ 'environment': env,
+ 'id_path': "%s/%s-projects/%s/%s-networks/%s/%s-ports/" % (env, env, project_id, project_id,
+ network_id, network_id),
+ 'name_path': "/%s/Projects/%s/Networks/%s/Ports" % (env, project_id, network_name),
+ "show_in_tree": True,
+ "last_scanned": datetime.datetime.utcnow(),
+ "object_name": "Ports",
+ }
+
+ self.inv.set(port_folder)
+
+ def add_children_documents(self, env, project_id, network_id, network_name, host_id):
+ # generate port folder data.
+ self.add_ports_folder(env, project_id, network_id, network_name)
+
+ # get ports ID.
+ port_id = DbFetchPort().get_id(network_id)
+
+ # add specific ports documents.
+ self.add_port_document(env, port_id, network_name=network_name)
+
+ port_handler = EventPortAdd()
+
+ # add network_services_folder document.
+ port_handler.add_network_services_folder(env, project_id, network_id, network_name)
+
+ # add dhcp vservice document.
+ host = self.inv.get_by_id(env, host_id)
+
+ port_handler.add_dhcp_document(env, host, network_id, network_name)
+
+ # add vnics folder.
+ port_handler.add_vnics_folder(env, host, network_id, network_name)
+
+ # add vnic docuemnt.
+ port_handler.add_vnic_document(env, host, network_id, network_name)
+
+ def handle(self, env, notification):
+ # check for network document.
+ subnet = notification['payload']['subnet']
+ project_id = subnet['tenant_id']
+ network_id = subnet['network_id']
+ if 'id' not in subnet:
+ self.log.info('Subnet payload doesn\'t have id, aborting subnet add')
+ return EventResult(result=False, retry=False)
+
+ network_document = self.inv.get_by_id(env, network_id)
+ if not network_document:
+ self.log.info('network document does not exist, aborting subnet add')
+ return EventResult(result=False, retry=True)
+ network_name = network_document['name']
+
+ # build subnet document for adding network
+ if subnet['cidr'] not in network_document['cidrs']:
+ network_document['cidrs'].append(subnet['cidr'])
+ if not network_document.get('subnets'):
+ network_document['subnets'] = {}
+
+ network_document['subnets'][subnet['name']] = subnet
+ if subnet['id'] not in network_document['subnet_ids']:
+ network_document['subnet_ids'].append(subnet['id'])
+ self.inv.set(network_document)
+
+ # Check DHCP enable, if true, scan network.
+ if subnet['enable_dhcp'] is True:
+ # update network
+ # TODO: #AskCheng - why is this necessary?
+ if len(ApiAccess.regions) == 0:
+ fetcher = ApiFetchRegions()
+ fetcher.set_env(env)
+ fetcher.get(None)
+
+ self.log.info("add new subnet.")
+ host_id = notification["publisher_id"].replace("network.", "", 1)
+ self.add_children_documents(env, project_id, network_id, network_name, host_id)
+
+ # scan links and cliques
+ self.log.info("scanning for links")
+ FindLinksForPnics().add_links()
+ FindLinksForVserviceVnics().add_links(search={"parent_id": "qdhcp-%s-vnics" % network_id})
+
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+ self.log.info("Finished subnet added.")
+ return EventResult(result=True,
+ related_object=subnet['id'],
+ display_context=network_id)
diff --git a/app/discover/events/event_subnet_delete.py b/app/discover/events/event_subnet_delete.py
new file mode 100644
index 0000000..900e701
--- /dev/null
+++ b/app/discover/events/event_subnet_delete.py
@@ -0,0 +1,57 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventResult
+from discover.events.event_delete_base import EventDeleteBase
+
+
+class EventSubnetDelete(EventDeleteBase):
+
+ def delete_children_documents(self, env, vservice_id):
+ vnic_parent_id = vservice_id + '-vnics'
+ vnic = self.inv.get_by_field(env, 'vnic', 'parent_id', vnic_parent_id, get_single=True)
+ if not vnic:
+ self.log.info("Vnic document not found.")
+ return EventResult(result=False, retry=False)
+
+ # delete port and vnic together by mac address.
+ self.inv.delete('inventory', {"mac_address": vnic.get("mac_address")})
+ return self.delete_handler(env, vservice_id, 'vservice')
+
+ def handle(self, env, notification):
+ subnet_id = notification['payload']['subnet_id']
+ network_document = self.inv.get_by_field(env, "network", "subnet_ids", subnet_id, get_single=True)
+ if not network_document:
+ self.log.info("network document not found, aborting subnet deleting")
+ return EventResult(result=False, retry=False)
+
+ # remove subnet_id from subnet_ids array
+ network_document["subnet_ids"].remove(subnet_id)
+
+ # find the subnet in network_document by subnet_id
+ subnet = next(
+ filter(lambda s: s['id'] == subnet_id,
+ network_document['subnets'].values()),
+ None)
+
+ # remove cidr from cidrs and delete subnet document.
+ if subnet:
+ network_document['cidrs'].remove(subnet['cidr'])
+ del network_document['subnets'][subnet['name']]
+
+ self.inv.set(network_document)
+
+ # when network does not have any subnet, delete vservice DHCP, port and vnic documents.
+ if not network_document["subnet_ids"]:
+ vservice_dhcp_id = 'qdhcp-{}'.format(network_document['id'])
+ self.delete_children_documents(env, vservice_dhcp_id)
+
+ return EventResult(result=True,
+ related_object=subnet['id'],
+ display_context=network_document.get('id'))
diff --git a/app/discover/events/event_subnet_update.py b/app/discover/events/event_subnet_update.py
new file mode 100644
index 0000000..9d3c48b
--- /dev/null
+++ b/app/discover/events/event_subnet_update.py
@@ -0,0 +1,102 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from discover.events.event_base import EventBase, EventResult
+from discover.events.event_port_add import EventPortAdd
+from discover.events.event_port_delete import EventPortDelete
+from discover.events.event_subnet_add import EventSubnetAdd
+from discover.fetchers.api.api_access import ApiAccess
+from discover.fetchers.api.api_fetch_regions import ApiFetchRegions
+from discover.fetchers.db.db_fetch_port import DbFetchPort
+from discover.find_links_for_vservice_vnics import FindLinksForVserviceVnics
+from discover.scanner import Scanner
+
+
+class EventSubnetUpdate(EventBase):
+
+ def handle(self, env, notification):
+ # check for network document.
+ subnet = notification['payload']['subnet']
+ project = notification['_context_project_name']
+ host_id = notification['publisher_id'].replace('network.', '', 1)
+ subnet_id = subnet['id']
+ network_id = subnet['network_id']
+ network_document = self.inv.get_by_id(env, network_id)
+ if not network_document:
+ self.log.info('network document does not exist, aborting subnet update')
+ return EventResult(result=False, retry=True)
+
+ # update network document.
+ subnets = network_document['subnets']
+ key = next(filter(lambda k: subnets[k]['id'] == subnet_id, subnets),
+ None)
+
+ if key:
+ if subnet['enable_dhcp'] and subnets[key]['enable_dhcp'] is False:
+ # scan DHCP namespace to add related document.
+ # add dhcp vservice document.
+ host = self.inv.get_by_id(env, host_id)
+ port_handler = EventPortAdd()
+ port_handler.add_dhcp_document(env, host, network_id,
+ network_document['name'])
+
+ # make sure that self.regions is not empty.
+ if len(ApiAccess.regions) == 0:
+ fetcher = ApiFetchRegions()
+ fetcher.set_env(env)
+ fetcher.get(None)
+
+ self.log.info("add port binding to DHCP server.")
+ port_id = DbFetchPort(). \
+ get_id_by_field(network_id,
+ """device_owner LIKE "%dhcp" """)
+ port = EventSubnetAdd(). \
+ add_port_document(env, port_id,
+ network_name=network_document['name'],
+ project_name=project)
+ if port:
+ port_handler. \
+ add_vnic_document(env, host, network_id,
+ network_name=network_document['name'],
+ mac_address=port['mac_address'])
+ # add link for vservice - vnic
+ FindLinksForVserviceVnics().add_links(search={"id": "qdhcp-%s" % network_id})
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+ FindLinksForVserviceVnics(). \
+ add_links(search={"id": "qdhcp-%s" % network_id})
+ scanner = Scanner()
+ scanner.set_env(env)
+ scanner.scan_cliques()
+
+ if subnet['enable_dhcp'] is False and subnets[key]['enable_dhcp']:
+ # delete existed related DHCP documents.
+ self.inv.delete("inventory",
+ {'id': "qdhcp-%s" % subnet['network_id']})
+ self.log.info("delete DHCP document: qdhcp-%s" %
+ subnet['network_id'])
+
+ port = self.inv.find_items({'network_id': subnet['network_id'],
+ 'device_owner': 'network:dhcp'},
+ get_single=True)
+ if 'id' in port:
+ EventPortDelete().delete_port(env, port['id'])
+ self.log.info("delete port binding to DHCP server.")
+
+ if subnet['name'] == subnets[key]['name']:
+ subnets[key] = subnet
+ else:
+ # TODO: #AskCheng shouldn't we remove the old one?
+ subnets[subnet['name']] = subnet
+
+ self.inv.set(network_document)
+ return EventResult(result=True,
+ related_object=subnet['id'],
+ display_context=network_id)
diff --git a/app/discover/events/listeners/__init__.py b/app/discover/events/listeners/__init__.py
new file mode 100644
index 0000000..1e85a2a
--- /dev/null
+++ b/app/discover/events/listeners/__init__.py
@@ -0,0 +1,10 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+
diff --git a/app/discover/events/listeners/default_listener.py b/app/discover/events/listeners/default_listener.py
new file mode 100755
index 0000000..a135673
--- /dev/null
+++ b/app/discover/events/listeners/default_listener.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python3
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+
+import argparse
+import datetime
+import json
+import os
+import time
+from collections import defaultdict
+from typing import List
+
+from kombu import Connection, Queue, Exchange
+from kombu.mixins import ConsumerMixin
+
+from discover.configuration import Configuration
+from discover.event_handler import EventHandler
+from discover.events.event_base import EventResult
+from discover.events.event_metadata_parser import parse_metadata_file
+from discover.events.listeners.listener_base import ListenerBase
+from messages.message import Message
+from monitoring.setup.monitoring_setup_manager import MonitoringSetupManager
+from utils.constants import OperationalStatus, EnvironmentFeatures
+from utils.inventory_mgr import InventoryMgr
+from utils.logging.full_logger import FullLogger
+from utils.mongo_access import MongoAccess
+from utils.string_utils import stringify_datetime
+from utils.util import SignalHandler, setup_args
+
+
+class DefaultListener(ListenerBase, ConsumerMixin):
+
+ SOURCE_SYSTEM = "OpenStack"
+
+ COMMON_METADATA_FILE = "events.json"
+
+ DEFAULTS = {
+ "env": "Mirantis-Liberty",
+ "mongo_config": "",
+ "metadata_file": "",
+ "inventory": "inventory",
+ "loglevel": "INFO",
+ "environments_collection": "environments_config",
+ "retry_limit": 10,
+ "consume_all": False
+ }
+
+ def __init__(self, connection: Connection,
+ event_handler: EventHandler,
+ event_queues: List,
+ env_name: str = DEFAULTS["env"],
+ inventory_collection: str = DEFAULTS["inventory"],
+ retry_limit: int = DEFAULTS["retry_limit"],
+ consume_all: bool = DEFAULTS["consume_all"]):
+ super().__init__()
+
+ self.connection = connection
+ self.retry_limit = retry_limit
+ self.env_name = env_name
+ self.consume_all = consume_all
+ self.handler = event_handler
+ self.event_queues = event_queues
+ self.failing_messages = defaultdict(int)
+
+ self.inv = InventoryMgr()
+ self.inv.set_collections(inventory_collection)
+ if self.inv.is_feature_supported(self.env_name, EnvironmentFeatures.MONITORING):
+ self.inv.monitoring_setup_manager = \
+ MonitoringSetupManager(self.env_name)
+ self.inv.monitoring_setup_manager.server_setup()
+
+ def get_consumers(self, consumer, channel):
+ return [consumer(queues=self.event_queues,
+ accept=['json'],
+ callbacks=[self.process_task])]
+
+ # Determines if message should be processed by a handler
+ # and extracts message body if yes.
+ @staticmethod
+ def _extract_event_data(body):
+ if "event_type" in body:
+ return True, body
+ elif "event_type" in body.get("oslo.message", ""):
+ return True, json.loads(body["oslo.message"])
+ else:
+ return False, None
+
+ def process_task(self, body, message):
+ received_timestamp = stringify_datetime(datetime.datetime.now())
+ processable, event_data = self._extract_event_data(body)
+ # If env listener can't process the message
+ # or it's not intended for env listener to handle,
+ # leave the message in the queue unless "consume_all" flag is set
+ if processable and event_data["event_type"] in self.handler.handlers:
+ with open("/tmp/listener.log", "a") as f:
+ f.write("{}\n".format(event_data))
+ event_result = self.handle_event(event_data["event_type"],
+ event_data)
+ finished_timestamp = stringify_datetime(datetime.datetime.now())
+ self.save_message(message_body=event_data,
+ result=event_result,
+ started=received_timestamp,
+ finished=finished_timestamp)
+
+ # Check whether the event was fully handled
+ # and, if not, whether it should be retried later
+ if event_result.result:
+ message.ack()
+ elif event_result.retry:
+ if 'message_id' not in event_data:
+ message.reject()
+ else:
+ # Track message retry count
+ message_id = event_data['message_id']
+ self.failing_messages[message_id] += 1
+
+ # Retry handling the message
+ if self.failing_messages[message_id] <= self.retry_limit:
+ self.inv.log.info("Retrying handling message " +
+ "with id '{}'".format(message_id))
+ message.requeue()
+ # Discard the message if it's not accepted
+ # after specified number of trials
+ else:
+ self.inv.log.warn("Discarding message with id '{}' ".
+ format(message_id) +
+ "as it's exceeded the retry limit")
+ message.reject()
+ del self.failing_messages[message_id]
+ else:
+ message.reject()
+ elif self.consume_all:
+ message.reject()
+
+ # This method passes the event to its handler.
+ # Returns a (result, retry) tuple:
+ # 'Result' flag is True if handler has finished successfully,
+ # False otherwise
+ # 'Retry' flag specifies if the error is recoverable or not
+ # 'Retry' flag is checked only is 'result' is False
+ def handle_event(self, event_type: str, notification: dict) -> EventResult:
+ print("Got notification.\nEvent_type: {}\nNotification:\n{}".
+ format(event_type, notification))
+ try:
+ result = self.handler.handle(event_name=event_type,
+ notification=notification)
+ return result if result else EventResult(result=False, retry=False)
+ except Exception as e:
+ self.inv.log.exception(e)
+ return EventResult(result=False, retry=False)
+
+ def save_message(self, message_body: dict, result: EventResult,
+ started: str, finished: str):
+ try:
+ message = Message(
+ msg_id=message_body.get('message_id'),
+ env=self.env_name,
+ source=self.SOURCE_SYSTEM,
+ object_id=result.related_object,
+ display_context=result.display_context,
+ level=message_body.get('priority'),
+ msg=message_body,
+ ts=message_body.get('timestamp'),
+ received_ts=started,
+ finished_ts=finished
+ )
+ self.inv.collections['messages'].insert_one(message.get())
+ return True
+ except Exception as e:
+ self.inv.log.error("Failed to save message")
+ self.inv.log.exception(e)
+ return False
+
+ @staticmethod
+ def listen(args: dict = None):
+
+ args = setup_args(args, DefaultListener.DEFAULTS, get_args)
+ if 'process_vars' not in args:
+ args['process_vars'] = {}
+
+ env_name = args["env"]
+ inventory_collection = args["inventory"]
+
+ MongoAccess.set_config_file(args["mongo_config"])
+ conf = Configuration(args["environments_collection"])
+ conf.use_env(env_name)
+
+ event_handler = EventHandler(env_name, inventory_collection)
+ event_queues = []
+
+ env_config = conf.get_env_config()
+ common_metadata_file = os.path.join(env_config.get('app_path', '/etc/calipso'),
+ 'config',
+ DefaultListener.COMMON_METADATA_FILE)
+
+ # import common metadata
+ import_metadata(event_handler, event_queues, common_metadata_file)
+
+ # import custom metadata if supplied
+ if args["metadata_file"]:
+ import_metadata(event_handler, event_queues, args["metadata_file"])
+
+ inv = InventoryMgr()
+ inv.set_collections(inventory_collection)
+ logger = FullLogger()
+ logger.set_loglevel(args["loglevel"])
+
+ amqp_config = conf.get("AMQP")
+ connect_url = 'amqp://{user}:{pwd}@{host}:{port}//' \
+ .format(user=amqp_config["user"],
+ pwd=amqp_config["password"],
+ host=amqp_config["host"],
+ port=amqp_config["port"])
+
+ with Connection(connect_url) as conn:
+ try:
+ print(conn)
+ conn.connect()
+ args['process_vars']['operational'] = OperationalStatus.RUNNING
+ terminator = SignalHandler()
+ worker = \
+ DefaultListener(connection=conn,
+ event_handler=event_handler,
+ event_queues=event_queues,
+ retry_limit=args["retry_limit"],
+ consume_all=args["consume_all"],
+ inventory_collection=inventory_collection,
+ env_name=env_name)
+ worker.run()
+ if terminator.terminated:
+ args.get('process_vars', {})['operational'] = \
+ OperationalStatus.STOPPED
+ except KeyboardInterrupt:
+ print('Stopped')
+ args['process_vars']['operational'] = OperationalStatus.STOPPED
+ except Exception as e:
+ logger.log.exception(e)
+ args['process_vars']['operational'] = OperationalStatus.ERROR
+ finally:
+ # This should enable safe saving of shared variables
+ time.sleep(0.1)
+
+
+def get_args():
+ # Read listener config from command line args
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-m", "--mongo_config", nargs="?", type=str,
+ default=DefaultListener.DEFAULTS["mongo_config"],
+ help="Name of config file with MongoDB access details")
+ parser.add_argument("--metadata_file", nargs="?", type=str,
+ default=DefaultListener.DEFAULTS["metadata_file"],
+ help="Name of custom configuration metadata file")
+ def_env_collection = DefaultListener.DEFAULTS["environments_collection"]
+ parser.add_argument("-c", "--environments_collection", nargs="?", type=str,
+ default=def_env_collection,
+ help="Name of collection where selected environment " +
+ "is taken from \n(default: {})"
+ .format(def_env_collection))
+ parser.add_argument("-e", "--env", nargs="?", type=str,
+ default=DefaultListener.DEFAULTS["env"],
+ help="Name of target listener environment \n" +
+ "(default: {})"
+ .format(DefaultListener.DEFAULTS["env"]))
+ parser.add_argument("-y", "--inventory", nargs="?", type=str,
+ default=DefaultListener.DEFAULTS["inventory"],
+ help="Name of inventory collection \n"" +"
+ "(default: '{}')"
+ .format(DefaultListener.DEFAULTS["inventory"]))
+ parser.add_argument("-l", "--loglevel", nargs="?", type=str,
+ default=DefaultListener.DEFAULTS["loglevel"],
+ help="Logging level \n(default: '{}')"
+ .format(DefaultListener.DEFAULTS["loglevel"]))
+ parser.add_argument("-r", "--retry_limit", nargs="?", type=int,
+ default=DefaultListener.DEFAULTS["retry_limit"],
+ help="Maximum number of times the OpenStack message "
+ "should be requeued before being discarded \n" +
+ "(default: {})"
+ .format(DefaultListener.DEFAULTS["retry_limit"]))
+ parser.add_argument("--consume_all", action="store_true",
+ help="If this flag is set, " +
+ "environment listener will try to consume"
+ "all messages from OpenStack event queue "
+ "and reject incompatible messages."
+ "Otherwise they'll just be ignored.",
+ default=DefaultListener.DEFAULTS["consume_all"])
+ args = parser.parse_args()
+ return args
+
+
+# Imports metadata from file,
+# updates event handler with new handlers
+# and event queues with new queues
+def import_metadata(event_handler: EventHandler,
+ event_queues: List[Queue],
+ metadata_file_path: str) -> None:
+ handlers_package, queues, event_handlers = \
+ parse_metadata_file(metadata_file_path)
+ event_handler.discover_handlers(handlers_package, event_handlers)
+ event_queues.extend([
+ Queue(q['queue'],
+ Exchange(q['exchange'], 'topic', durable=False),
+ durable=False, routing_key='#') for q in queues
+ ])
+
+
+if __name__ == '__main__':
+ DefaultListener.listen()
diff --git a/app/discover/events/listeners/listener_base.py b/app/discover/events/listeners/listener_base.py
new file mode 100644
index 0000000..7052dc9
--- /dev/null
+++ b/app/discover/events/listeners/listener_base.py
@@ -0,0 +1,18 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+from abc import ABC, abstractmethod
+
+
+class ListenerBase(ABC):
+
+ @staticmethod
+ @abstractmethod
+ def listen(self):
+ pass