summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/journal
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/journal')
-rw-r--r--networking-odl/networking_odl/journal/__init__.py0
-rw-r--r--networking-odl/networking_odl/journal/cleanup.py46
-rw-r--r--networking-odl/networking_odl/journal/dependency_validations.py267
-rw-r--r--networking-odl/networking_odl/journal/full_sync.py114
-rw-r--r--networking-odl/networking_odl/journal/journal.py220
-rw-r--r--networking-odl/networking_odl/journal/maintenance.py73
6 files changed, 720 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/journal/__init__.py b/networking-odl/networking_odl/journal/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking-odl/networking_odl/journal/__init__.py
diff --git a/networking-odl/networking_odl/journal/cleanup.py b/networking-odl/networking_odl/journal/cleanup.py
new file mode 100644
index 0000000..994fb82
--- /dev/null
+++ b/networking-odl/networking_odl/journal/cleanup.py
@@ -0,0 +1,46 @@
+#
+# Copyright (C) 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from datetime import timedelta
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from networking_odl._i18n import _LI
+from networking_odl.common import constants as odl_const
+from networking_odl.db import db
+
+LOG = logging.getLogger(__name__)
+
+
+class JournalCleanup(object):
+ """Journal maintenance operation for deleting completed rows."""
+ def __init__(self):
+ self._rows_retention = cfg.CONF.ml2_odl.completed_rows_retention
+ self._processing_timeout = cfg.CONF.ml2_odl.processing_timeout
+
+ def delete_completed_rows(self, session):
+ if self._rows_retention is not -1:
+ LOG.debug("Deleting completed rows")
+ db.delete_rows_by_state_and_time(
+ session, odl_const.COMPLETED,
+ timedelta(seconds=self._rows_retention))
+
+ def cleanup_processing_rows(self, session):
+ row_count = db.reset_processing_rows(session, self._processing_timeout)
+ if row_count:
+ LOG.info(_LI("Reset %(num)s orphaned rows back to pending"),
+ {"num": row_count})
diff --git a/networking-odl/networking_odl/journal/dependency_validations.py b/networking-odl/networking_odl/journal/dependency_validations.py
new file mode 100644
index 0000000..a6f5f96
--- /dev/null
+++ b/networking-odl/networking_odl/journal/dependency_validations.py
@@ -0,0 +1,267 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from networking_odl.common import constants as odl_const
+from networking_odl.db import db
+
+
+def _is_valid_update_operation(session, row):
+ # Check if there are older updates in the queue
+ if db.check_for_older_ops(session, row):
+ return False
+
+ # Check for a pending or processing create operation on this uuid
+ if db.check_for_pending_or_processing_ops(
+ session, row.object_uuid, odl_const.ODL_CREATE):
+ return False
+ return True
+
+
+def validate_network_operation(session, row):
+ """Validate the network operation based on dependencies.
+
+ Validate network operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state. e.g.
+ """
+ if row.operation == odl_const.ODL_DELETE:
+ # Check for any pending or processing create or update
+ # ops on this uuid itself
+ if db.check_for_pending_or_processing_ops(
+ session, row.object_uuid, [odl_const.ODL_UPDATE,
+ odl_const.ODL_CREATE]):
+ return False
+ # Check for dependent operations
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_SUBNET, row.object_uuid):
+ return False
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_PORT, row.object_uuid):
+ return False
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_ROUTER, row.object_uuid):
+ return False
+ elif (row.operation == odl_const.ODL_UPDATE and
+ not _is_valid_update_operation(session, row)):
+ return False
+ return True
+
+
+def validate_subnet_operation(session, row):
+ """Validate the subnet operation based on dependencies.
+
+ Validate subnet operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state. e.g.
+ """
+ if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE):
+ network_id = row.data['network_id']
+ # Check for pending or processing network operations
+ if db.check_for_pending_or_processing_ops(session, network_id):
+ return False
+ if (row.operation == odl_const.ODL_UPDATE and
+ not _is_valid_update_operation(session, row)):
+ return False
+ elif row.operation == odl_const.ODL_DELETE:
+ # Check for any pending or processing create or update
+ # ops on this uuid itself
+ if db.check_for_pending_or_processing_ops(
+ session, row.object_uuid, [odl_const.ODL_UPDATE,
+ odl_const.ODL_CREATE]):
+ return False
+ # Check for dependent operations
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_PORT, row.object_uuid):
+ return False
+
+ return True
+
+
+def validate_port_operation(session, row):
+ """Validate port operation based on dependencies.
+
+ Validate port operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state. e.g.
+ """
+ if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE):
+ network_id = row.data['network_id']
+ # Check for pending or processing network operations
+ ops = db.check_for_pending_or_processing_ops(session, network_id)
+ # Check for pending subnet operations.
+ for fixed_ip in row.data['fixed_ips']:
+ ip_ops = db.check_for_pending_or_processing_ops(
+ session, fixed_ip['subnet_id'])
+ ops = ops or ip_ops
+
+ if ops:
+ return False
+ if (row.operation == odl_const.ODL_UPDATE and
+ not _is_valid_update_operation(session, row)):
+ return False
+ elif row.operation == odl_const.ODL_DELETE:
+ # Check for any pending or processing create or update
+ # ops on this uuid itself
+ if db.check_for_pending_or_processing_ops(
+ session, row.object_uuid, [odl_const.ODL_UPDATE,
+ odl_const.ODL_CREATE]):
+ return False
+
+ return True
+
+
+def validate_router_operation(session, row):
+ """Validate router operation based on dependencies.
+
+ Validate router operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state.
+ """
+ if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE):
+ if row.data['gw_port_id'] is not None:
+ if db.check_for_pending_or_processing_ops(session,
+ row.data['gw_port_id']):
+ return False
+ if (row.operation == odl_const.ODL_UPDATE and
+ not _is_valid_update_operation(session, row)):
+ return False
+ elif row.operation == odl_const.ODL_DELETE:
+ # Check for any pending or processing create or update
+ # operations on this uuid.
+ if db.check_for_pending_or_processing_ops(session, row.object_uuid,
+ [odl_const.ODL_UPDATE,
+ odl_const.ODL_CREATE]):
+ return False
+
+ # Check that dependent port delete operation has completed.
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_PORT, row.object_uuid):
+ return False
+
+ # Check that dependent floatingip delete operation has completed.
+ if db.check_for_pending_delete_ops_with_parent(
+ session, odl_const.ODL_FLOATINGIP, row.object_uuid):
+ return False
+
+ # Check that dependent router interface remove operation has completed.
+ if db.check_for_pending_remove_ops_with_parent(
+ session, row.object_uuid):
+ return False
+
+ return True
+
+
+def validate_floatingip_operation(session, row):
+ """Validate floatingip operation based on dependencies.
+
+ Validate floating IP operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state.
+ """
+ if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE):
+ network_id = row.data.get('floating_network_id')
+ if network_id is not None:
+ if not db.check_for_pending_or_processing_ops(session, network_id):
+ port_id = row.data.get('port_id')
+ if port_id is not None:
+ if db.check_for_pending_or_processing_ops(session,
+ port_id):
+ return False
+ else:
+ return False
+
+ router_id = row.data.get('router_id')
+ if router_id is not None:
+ if db.check_for_pending_or_processing_ops(session, router_id):
+ return False
+ if (row.operation == odl_const.ODL_UPDATE and
+ not _is_valid_update_operation(session, row)):
+ return False
+ elif row.operation == odl_const.ODL_DELETE:
+ # Check for any pending or processing create or update
+ # ops on this uuid itself
+ if db.check_for_pending_or_processing_ops(session, row.object_uuid,
+ [odl_const.ODL_UPDATE,
+ odl_const.ODL_CREATE]):
+ return False
+
+ return True
+
+
+def validate_router_interface_operation(session, row):
+ """Validate router_interface operation based on dependencies.
+
+ Validate router_interface operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state.
+ """
+ if row.operation == odl_const.ODL_ADD:
+ # Verify that router event has been completed.
+ if db.check_for_pending_or_processing_ops(session, row.data['id']):
+ return False
+
+ # TODO(rcurran): Check for port_id?
+ if db.check_for_pending_or_processing_ops(session,
+ row.data['subnet_id']):
+ return False
+ elif row.operation == odl_const.ODL_REMOVE:
+ if db.check_for_pending_or_processing_add(session, row.data['id'],
+ row.data['subnet_id']):
+ return False
+
+ return True
+
+
+def validate_security_group_operation(session, row):
+ """Validate security_group operation based on dependencies.
+
+ Validate security_group operation depending on whether it's dependencies
+ are still in 'pending' or 'processing' state. e.g.
+ """
+ return True
+
+
+def validate_security_group_rule_operation(session, row):
+ """Validate security_group_rule operation based on dependencies.
+
+ Validate security_group_rule operation depending on whether it's
+ dependencies are still in 'pending' or 'processing' state. e.g.
+ """
+ return True
+
+_VALIDATION_MAP = {
+ odl_const.ODL_NETWORK: validate_network_operation,
+ odl_const.ODL_SUBNET: validate_subnet_operation,
+ odl_const.ODL_PORT: validate_port_operation,
+ odl_const.ODL_ROUTER: validate_router_operation,
+ odl_const.ODL_ROUTER_INTF: validate_router_interface_operation,
+ odl_const.ODL_FLOATINGIP: validate_floatingip_operation,
+ odl_const.ODL_SG: validate_security_group_operation,
+ odl_const.ODL_SG_RULE: validate_security_group_rule_operation,
+}
+
+
+def validate(session, row):
+ """Validate resource dependency in journaled operations.
+
+ :param session: db session
+ :param row: entry in journal entry to be validated
+ """
+ return _VALIDATION_MAP[row.object_type](session, row)
+
+
+def register_validator(object_type, validator):
+ """Register validator function for given resource.
+
+ :param object_type: neutron resource type
+ :param validator: function to be registered which validates resource
+ dependencies
+ """
+ assert object_type not in _VALIDATION_MAP
+ _VALIDATION_MAP[object_type] = validator
diff --git a/networking-odl/networking_odl/journal/full_sync.py b/networking-odl/networking_odl/journal/full_sync.py
new file mode 100644
index 0000000..dad7215
--- /dev/null
+++ b/networking-odl/networking_odl/journal/full_sync.py
@@ -0,0 +1,114 @@
+#
+# Copyright (C) 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import requests
+
+from neutron import context as neutron_context
+from neutron import manager
+from neutron.plugins.common import constants
+from neutron_lib import constants as l3_constants
+
+from networking_odl.common import client
+from networking_odl.common import constants as odl_const
+from networking_odl.db import db
+
+# Define which pending operation types should be deleted
+_CANARY_NETWORK_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142"
+_CANARY_TENANT_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142"
+_CANARY_NETWORK_DATA = {'id': _CANARY_NETWORK_ID,
+ 'tenant_id': _CANARY_TENANT_ID,
+ 'name': 'Sync Canary Network',
+ 'admin_state_up': False}
+_OPS_TO_DELETE_ON_SYNC = (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)
+_L2_RESOURCES_TO_SYNC = [(odl_const.ODL_SG, odl_const.ODL_SGS),
+ (odl_const.ODL_SG_RULE, odl_const.ODL_SG_RULES),
+ (odl_const.ODL_NETWORK, odl_const.ODL_NETWORKS),
+ (odl_const.ODL_SUBNET, odl_const.ODL_SUBNETS),
+ (odl_const.ODL_PORT, odl_const.ODL_PORTS)]
+_L3_RESOURCES_TO_SYNC = [(odl_const.ODL_ROUTER, odl_const.ODL_ROUTERS),
+ (odl_const.ODL_FLOATINGIP, odl_const.ODL_FLOATINGIPS)]
+_CLIENT = client.OpenDaylightRestClient.create_client()
+
+
+def full_sync(session):
+ if not _full_sync_needed(session):
+ return
+
+ db.delete_pending_rows(session, _OPS_TO_DELETE_ON_SYNC)
+
+ dbcontext = neutron_context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+ for resource_type, collection_name in _L2_RESOURCES_TO_SYNC:
+ _sync_resources(session, plugin, dbcontext, resource_type,
+ collection_name)
+
+ l3plugin = manager.NeutronManager.get_service_plugins().get(
+ constants.L3_ROUTER_NAT)
+ for resource_type, collection_name in _L3_RESOURCES_TO_SYNC:
+ _sync_resources(session, l3plugin, dbcontext, resource_type,
+ collection_name)
+ _sync_router_ports(session, plugin, dbcontext)
+
+ db.create_pending_row(session, odl_const.ODL_NETWORK, _CANARY_NETWORK_ID,
+ odl_const.ODL_CREATE, _CANARY_NETWORK_DATA)
+
+
+def _full_sync_needed(session):
+ return (_canary_network_missing_on_odl() and
+ _canary_network_not_in_journal(session))
+
+
+def _canary_network_missing_on_odl():
+ # Try to reach the ODL server, sometimes it might be up & responding to
+ # HTTP calls but inoperative..
+ response = _CLIENT.get(odl_const.ODL_NETWORKS)
+ response.raise_for_status()
+
+ response = _CLIENT.get(odl_const.ODL_NETWORKS + "/" + _CANARY_NETWORK_ID)
+ if response.status_code == requests.codes.not_found:
+ return True
+
+ # In case there was an error raise it up because we don't know how to deal
+ # with it..
+ response.raise_for_status()
+ return False
+
+
+def _canary_network_not_in_journal(session):
+ return not db.check_for_pending_or_processing_ops(session,
+ _CANARY_NETWORK_ID,
+ odl_const.ODL_CREATE)
+
+
+def _sync_resources(session, plugin, dbcontext, object_type, collection_name):
+ obj_getter = getattr(plugin, 'get_%s' % collection_name)
+ resources = obj_getter(dbcontext)
+
+ for resource in resources:
+ db.create_pending_row(session, object_type, resource['id'],
+ odl_const.ODL_CREATE, resource)
+
+
+def _sync_router_ports(session, plugin, dbcontext):
+ filters = {'device_owner': [l3_constants.DEVICE_OWNER_ROUTER_INTF]}
+ router_ports = plugin.get_ports(dbcontext, filters=filters)
+ for port in router_ports:
+ resource = {'subnet_id': port['fixed_ips'][0]['subnet_id'],
+ 'port_id': port['id'],
+ 'id': port['device_id'],
+ 'tenant_id': port['tenant_id']}
+ db.create_pending_row(session, odl_const.ODL_ROUTER_INTF, port['id'],
+ odl_const.ODL_ADD, resource)
diff --git a/networking-odl/networking_odl/journal/journal.py b/networking-odl/networking_odl/journal/journal.py
new file mode 100644
index 0000000..ca0d2c2
--- /dev/null
+++ b/networking-odl/networking_odl/journal/journal.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import threading
+
+from requests import exceptions
+
+from oslo_config import cfg
+from oslo_log import log as logging
+
+from neutron import context as neutron_context
+from neutron.db import api as neutron_db_api
+from neutron import manager
+
+from networking_odl.common import client
+from networking_odl.common import constants as odl_const
+from networking_odl.common import filters
+from networking_odl._i18n import _LI, _LE
+from networking_odl.db import db
+from networking_odl.journal import dependency_validations
+
+
+LOG = logging.getLogger(__name__)
+
+
+def call_thread_on_end(func):
+ def new_func(obj, *args, **kwargs):
+ return_value = func(obj, *args, **kwargs)
+ obj.journal.set_sync_event()
+ return return_value
+ return new_func
+
+
+def _enrich_port(db_session, context, object_type, operation, data):
+ """Enrich the port with additional information needed by ODL"""
+ if context:
+ plugin = context._plugin
+ dbcontext = context._plugin_context
+ else:
+ dbcontext = neutron_context.get_admin_context()
+ plugin = manager.NeutronManager.get_plugin()
+
+ groups = [plugin.get_security_group(dbcontext, sg)
+ for sg in data['security_groups']]
+ new_data = copy.deepcopy(data)
+ new_data['security_groups'] = groups
+
+ # NOTE(yamahata): work around for port creation for router
+ # tenant_id=''(empty string) is passed when port is created
+ # by l3 plugin internally for router.
+ # On the other hand, ODL doesn't accept empty string for tenant_id.
+ # In that case, deduce tenant_id from network_id for now.
+ # Right fix: modify Neutron so that don't allow empty string
+ # for tenant_id even for port for internal use.
+ # TODO(yamahata): eliminate this work around when neutron side
+ # is fixed
+ # assert port['tenant_id'] != ''
+ if ('tenant_id' not in new_data or new_data['tenant_id'] == ''):
+ if context:
+ tenant_id = context._network_context._network['tenant_id']
+ else:
+ network = plugin.get_network(dbcontext, new_data['network_id'])
+ tenant_id = network['tenant_id']
+ new_data['tenant_id'] = tenant_id
+
+ return new_data
+
+
+def record(db_session, object_type, object_uuid, operation, data,
+ context=None):
+ if (object_type == odl_const.ODL_PORT and
+ operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)):
+ data = _enrich_port(db_session, context, object_type, operation, data)
+
+ db.create_pending_row(db_session, object_type, object_uuid, operation,
+ data)
+
+
+class OpendaylightJournalThread(object):
+ """Thread worker for the Opendaylight Journal Database."""
+ def __init__(self):
+ self.client = client.OpenDaylightRestClient.create_client()
+ self._odl_sync_timeout = cfg.CONF.ml2_odl.sync_timeout
+ self._row_retry_count = cfg.CONF.ml2_odl.retry_count
+ self.event = threading.Event()
+ self.lock = threading.Lock()
+ self._odl_sync_thread = self.start_odl_sync_thread()
+ self._start_sync_timer()
+
+ def start_odl_sync_thread(self):
+ # Start the sync thread
+ LOG.debug("Starting a new sync thread")
+ odl_sync_thread = threading.Thread(
+ name='sync',
+ target=self.run_sync_thread)
+ odl_sync_thread.start()
+ return odl_sync_thread
+
+ def set_sync_event(self):
+ # Prevent race when starting the timer
+ with self.lock:
+ LOG.debug("Resetting thread timer")
+ self._timer.cancel()
+ self._start_sync_timer()
+ self.event.set()
+
+ def _start_sync_timer(self):
+ self._timer = threading.Timer(self._odl_sync_timeout,
+ self.set_sync_event)
+ self._timer.start()
+
+ def _json_data(self, row):
+ data = copy.deepcopy(row.data)
+ filters.filter_for_odl(row.object_type, row.operation, data)
+ url_object = row.object_type.replace('_', '-')
+
+ if row.operation == odl_const.ODL_CREATE:
+ method = 'post'
+ urlpath = url_object + 's'
+ to_send = {row.object_type: data}
+ elif row.operation == odl_const.ODL_UPDATE:
+ method = 'put'
+ urlpath = url_object + 's/' + row.object_uuid
+ to_send = {row.object_type: data}
+ elif row.operation == odl_const.ODL_DELETE:
+ method = 'delete'
+ urlpath = url_object + 's/' + row.object_uuid
+ to_send = None
+ elif row.operation == odl_const.ODL_ADD:
+ method = 'put'
+ urlpath = 'routers/' + data['id'] + '/add_router_interface'
+ to_send = data
+ elif row.operation == odl_const.ODL_REMOVE:
+ method = 'put'
+ urlpath = 'routers/' + data['id'] + '/remove_router_interface'
+ to_send = data
+
+ return method, urlpath, to_send
+
+ def run_sync_thread(self, exit_after_run=False):
+ while True:
+ try:
+ self.event.wait()
+ self.event.clear()
+
+ session = neutron_db_api.get_session()
+ self._sync_pending_rows(session, exit_after_run)
+
+ LOG.debug("Clearing sync thread event")
+ if exit_after_run:
+ # Permanently waiting thread model breaks unit tests
+ # Adding this arg to exit here only for unit tests
+ break
+ except Exception:
+ # Catch exceptions to protect the thread while running
+ LOG.exception(_LE("Error on run_sync_thread"))
+
+ def _sync_pending_rows(self, session, exit_after_run):
+ while True:
+ LOG.debug("Thread walking database")
+ row = db.get_oldest_pending_db_row_with_lock(session)
+ if not row:
+ LOG.debug("No rows to sync")
+ break
+
+ # Validate the operation
+ valid = dependency_validations.validate(session, row)
+ if not valid:
+ LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a "
+ "valid operation yet, skipping for now"),
+ {'operation': row.operation,
+ 'type': row.object_type,
+ 'uuid': row.object_uuid})
+
+ # Set row back to pending.
+ db.update_db_row_state(session, row, odl_const.PENDING)
+ if exit_after_run:
+ break
+ continue
+
+ LOG.info(_LI("Syncing %(operation)s %(type)s %(uuid)s"),
+ {'operation': row.operation, 'type': row.object_type,
+ 'uuid': row.object_uuid})
+
+ # Add code to sync this to ODL
+ method, urlpath, to_send = self._json_data(row)
+
+ try:
+ self.client.sendjson(method, urlpath, to_send)
+ db.update_db_row_state(session, row, odl_const.COMPLETED)
+ except exceptions.ConnectionError as e:
+ # Don't raise the retry count, just log an error
+ LOG.error(_LE("Cannot connect to the Opendaylight Controller"))
+ # Set row back to pending
+ db.update_db_row_state(session, row, odl_const.PENDING)
+ # Break our of the loop and retry with the next
+ # timer interval
+ break
+ except Exception as e:
+ LOG.error(_LE("Error syncing %(type)s %(operation)s,"
+ " id %(uuid)s Error: %(error)s"),
+ {'type': row.object_type,
+ 'uuid': row.object_uuid,
+ 'operation': row.operation,
+ 'error': e.message})
+ db.update_pending_db_row_retry(session, row,
+ self._row_retry_count)
diff --git a/networking-odl/networking_odl/journal/maintenance.py b/networking-odl/networking_odl/journal/maintenance.py
new file mode 100644
index 0000000..7fb82a0
--- /dev/null
+++ b/networking-odl/networking_odl/journal/maintenance.py
@@ -0,0 +1,73 @@
+#
+# Copyright (C) 2016 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+from neutron.db import api as neutron_db_api
+from oslo_config import cfg
+from oslo_log import log as logging
+from oslo_service import loopingcall
+
+from networking_odl._i18n import _LI, _LE
+from networking_odl.db import db
+
+
+LOG = logging.getLogger(__name__)
+
+
+class MaintenanceThread(object):
+ def __init__(self):
+ self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
+ self.maintenance_interval = cfg.CONF.ml2_odl.maintenance_interval
+ self.maintenance_ops = []
+
+ def start(self):
+ self.timer.start(self.maintenance_interval, stop_on_exception=False)
+
+ def _execute_op(self, operation, session):
+ op_details = operation.__name__
+ if operation.__doc__:
+ op_details += " (%s)" % operation.func_doc
+
+ try:
+ LOG.info(_LI("Starting maintenance operation %s."), op_details)
+ db.update_maintenance_operation(session, operation=operation)
+ operation(session=session)
+ LOG.info(_LI("Finished maintenance operation %s."), op_details)
+ except Exception:
+ LOG.exception(_LE("Failed during maintenance operation %s."),
+ op_details)
+
+ def execute_ops(self):
+ LOG.info(_LI("Starting journal maintenance run."))
+ session = neutron_db_api.get_session()
+ if not db.lock_maintenance(session):
+ LOG.info(_LI("Maintenance already running, aborting."))
+ return
+
+ try:
+ for operation in self.maintenance_ops:
+ self._execute_op(operation, session)
+ finally:
+ db.update_maintenance_operation(session, operation=None)
+ db.unlock_maintenance(session)
+ LOG.info(_LI("Finished journal maintenance run."))
+
+ def register_operation(self, f):
+ """Register a function to be run by the maintenance thread.
+
+ :param f: Function to call when the thread runs. The function will
+ receive a DB session to use for DB operations.
+ """
+ self.maintenance_ops.append(f)