summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/journal/journal.py
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/journal/journal.py')
-rw-r--r--networking-odl/networking_odl/journal/journal.py220
1 files changed, 220 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/journal/journal.py b/networking-odl/networking_odl/journal/journal.py
new file mode 100644
index 0000000..ca0d2c2
--- /dev/null
+++ b/networking-odl/networking_odl/journal/journal.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import threading
+
+from requests import exceptions
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from neutron import context as neutron_context
+from neutron.db import api as neutron_db_api
+from neutron import manager
+
+from networking_odl.common import client
+from networking_odl.common import constants as odl_const
+from networking_odl.common import filters
+from networking_odl._i18n import _LI, _LE
+from networking_odl.db import db
+from networking_odl.journal import dependency_validations
+
+
+LOG = logging.getLogger(__name__)
+
+
+def call_thread_on_end(func):
+ def new_func(obj, *args, **kwargs):
+ return_value = func(obj, *args, **kwargs)
+ obj.journal.set_sync_event()
+ return return_value
+ return new_func
+
+
+def _enrich_port(db_session, context, object_type, operation, data):
+ """Enrich the port with additional information needed by ODL"""
+ if context:
+ plugin = context._plugin
+ dbcontext = context._plugin_context
+ else:
+ dbcontext = neutron_context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+
+ groups = [plugin.get_security_group(dbcontext, sg)
+ for sg in data['security_groups']]
+ new_data = copy.deepcopy(data)
+ new_data['security_groups'] = groups
+
+ # NOTE(yamahata): work around for port creation for router
+ # tenant_id=''(empty string) is passed when port is created
+ # by l3 plugin internally for router.
+ # On the other hand, ODL doesn't accept empty string for tenant_id.
+ # In that case, deduce tenant_id from network_id for now.
+ # Right fix: modify Neutron so that don't allow empty string
+ # for tenant_id even for port for internal use.
+ # TODO(yamahata): eliminate this work around when neutron side
+ # is fixed
+ # assert port['tenant_id'] != ''
+ if ('tenant_id' not in new_data or new_data['tenant_id'] == ''):
+ if context:
+ tenant_id = context._network_context._network['tenant_id']
+ else:
+ network = plugin.get_network(dbcontext, new_data['network_id'])
+ tenant_id = network['tenant_id']
+ new_data['tenant_id'] = tenant_id
+
+ return new_data
+
+
+def record(db_session, object_type, object_uuid, operation, data,
+ context=None):
+ if (object_type == odl_const.ODL_PORT and
+ operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)):
+ data = _enrich_port(db_session, context, object_type, operation, data)
+
+ db.create_pending_row(db_session, object_type, object_uuid, operation,
+ data)
+
+
+class OpendaylightJournalThread(object):
+ """Thread worker for the Opendaylight Journal Database."""
+ def __init__(self):
+ self.client = client.OpenDaylightRestClient.create_client()
+ self._odl_sync_timeout = cfg.CONF.ml2_odl.sync_timeout
+ self._row_retry_count = cfg.CONF.ml2_odl.retry_count
+ self.event = threading.Event()
+ self.lock = threading.Lock()
+ self._odl_sync_thread = self.start_odl_sync_thread()
+ self._start_sync_timer()
+
+ def start_odl_sync_thread(self):
+ # Start the sync thread
+ LOG.debug("Starting a new sync thread")
+ odl_sync_thread = threading.Thread(
+ name='sync',
+ target=self.run_sync_thread)
+ odl_sync_thread.start()
+ return odl_sync_thread
+
+ def set_sync_event(self):
+ # Prevent race when starting the timer
+ with self.lock:
+ LOG.debug("Resetting thread timer")
+ self._timer.cancel()
+ self._start_sync_timer()
+ self.event.set()
+
+ def _start_sync_timer(self):
+ self._timer = threading.Timer(self._odl_sync_timeout,
+ self.set_sync_event)
+ self._timer.start()
+
+ def _json_data(self, row):
+ data = copy.deepcopy(row.data)
+ filters.filter_for_odl(row.object_type, row.operation, data)
+ url_object = row.object_type.replace('_', '-')
+
+ if row.operation == odl_const.ODL_CREATE:
+ method = 'post'
+ urlpath = url_object + 's'
+ to_send = {row.object_type: data}
+ elif row.operation == odl_const.ODL_UPDATE:
+ method = 'put'
+ urlpath = url_object + 's/' + row.object_uuid
+ to_send = {row.object_type: data}
+ elif row.operation == odl_const.ODL_DELETE:
+ method = 'delete'
+ urlpath = url_object + 's/' + row.object_uuid
+ to_send = None
+ elif row.operation == odl_const.ODL_ADD:
+ method = 'put'
+ urlpath = 'routers/' + data['id'] + '/add_router_interface'
+ to_send = data
+ elif row.operation == odl_const.ODL_REMOVE:
+ method = 'put'
+ urlpath = 'routers/' + data['id'] + '/remove_router_interface'
+ to_send = data
+
+ return method, urlpath, to_send
+
+ def run_sync_thread(self, exit_after_run=False):
+ while True:
+ try:
+ self.event.wait()
+ self.event.clear()
+
+ session = neutron_db_api.get_session()
+ self._sync_pending_rows(session, exit_after_run)
+
+ LOG.debug("Clearing sync thread event")
+ if exit_after_run:
+ # Permanently waiting thread model breaks unit tests
+ # Adding this arg to exit here only for unit tests
+ break
+ except Exception:
+ # Catch exceptions to protect the thread while running
+ LOG.exception(_LE("Error on run_sync_thread"))
+
+ def _sync_pending_rows(self, session, exit_after_run):
+ while True:
+ LOG.debug("Thread walking database")
+ row = db.get_oldest_pending_db_row_with_lock(session)
+ if not row:
+ LOG.debug("No rows to sync")
+ break
+
+ # Validate the operation
+ valid = dependency_validations.validate(session, row)
+ if not valid:
+ LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a "
+ "valid operation yet, skipping for now"),
+ {'operation': row.operation,
+ 'type': row.object_type,
+ 'uuid': row.object_uuid})
+
+ # Set row back to pending.
+ db.update_db_row_state(session, row, odl_const.PENDING)
+ if exit_after_run:
+ break
+ continue
+
+ LOG.info(_LI("Syncing %(operation)s %(type)s %(uuid)s"),
+ {'operation': row.operation, 'type': row.object_type,
+ 'uuid': row.object_uuid})
+
+ # Add code to sync this to ODL
+ method, urlpath, to_send = self._json_data(row)
+
+ try:
+ self.client.sendjson(method, urlpath, to_send)
+ db.update_db_row_state(session, row, odl_const.COMPLETED)
+ except exceptions.ConnectionError as e:
+ # Don't raise the retry count, just log an error
+ LOG.error(_LE("Cannot connect to the Opendaylight Controller"))
+ # Set row back to pending
+ db.update_db_row_state(session, row, odl_const.PENDING)
+ # Break our of the loop and retry with the next
+ # timer interval
+ break
+ except Exception as e:
+ LOG.error(_LE("Error syncing %(type)s %(operation)s,"
+ " id %(uuid)s Error: %(error)s"),
+ {'type': row.object_type,
+ 'uuid': row.object_uuid,
+ 'operation': row.operation,
+ 'error': e.message})
+ db.update_pending_db_row_retry(session, row,
+ self._row_retry_count)