diff options
Diffstat (limited to 'networking-odl/networking_odl/journal')
-rw-r--r-- | networking-odl/networking_odl/journal/__init__.py | 0 | ||||
-rw-r--r-- | networking-odl/networking_odl/journal/cleanup.py | 46 | ||||
-rw-r--r-- | networking-odl/networking_odl/journal/dependency_validations.py | 267 | ||||
-rw-r--r-- | networking-odl/networking_odl/journal/full_sync.py | 114 | ||||
-rw-r--r-- | networking-odl/networking_odl/journal/journal.py | 220 | ||||
-rw-r--r-- | networking-odl/networking_odl/journal/maintenance.py | 73 |
6 files changed, 720 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/journal/__init__.py b/networking-odl/networking_odl/journal/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking-odl/networking_odl/journal/__init__.py diff --git a/networking-odl/networking_odl/journal/cleanup.py b/networking-odl/networking_odl/journal/cleanup.py new file mode 100644 index 0000000..994fb82 --- /dev/null +++ b/networking-odl/networking_odl/journal/cleanup.py @@ -0,0 +1,46 @@ +# +# Copyright (C) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from datetime import timedelta + +from oslo_config import cfg +from oslo_log import log as logging + +from networking_odl._i18n import _LI +from networking_odl.common import constants as odl_const +from networking_odl.db import db + +LOG = logging.getLogger(__name__) + + +class JournalCleanup(object): + """Journal maintenance operation for deleting completed rows.""" + def __init__(self): + self._rows_retention = cfg.CONF.ml2_odl.completed_rows_retention + self._processing_timeout = cfg.CONF.ml2_odl.processing_timeout + + def delete_completed_rows(self, session): + if self._rows_retention is not -1: + LOG.debug("Deleting completed rows") + db.delete_rows_by_state_and_time( + session, odl_const.COMPLETED, + timedelta(seconds=self._rows_retention)) + + def cleanup_processing_rows(self, session): + row_count = db.reset_processing_rows(session, self._processing_timeout) + if row_count: + LOG.info(_LI("Reset %(num)s orphaned rows back to pending"), + {"num": row_count}) diff --git a/networking-odl/networking_odl/journal/dependency_validations.py b/networking-odl/networking_odl/journal/dependency_validations.py new file mode 100644 index 0000000..a6f5f96 --- /dev/null +++ b/networking-odl/networking_odl/journal/dependency_validations.py @@ -0,0 +1,267 @@ +# Copyright (c) 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from networking_odl.common import constants as odl_const +from networking_odl.db import db + + +def _is_valid_update_operation(session, row): + # Check if there are older updates in the queue + if db.check_for_older_ops(session, row): + return False + + # Check for a pending or processing create operation on this uuid + if db.check_for_pending_or_processing_ops( + session, row.object_uuid, odl_const.ODL_CREATE): + return False + return True + + +def validate_network_operation(session, row): + """Validate the network operation based on dependencies. + + Validate network operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. e.g. + """ + if row.operation == odl_const.ODL_DELETE: + # Check for any pending or processing create or update + # ops on this uuid itself + if db.check_for_pending_or_processing_ops( + session, row.object_uuid, [odl_const.ODL_UPDATE, + odl_const.ODL_CREATE]): + return False + # Check for dependent operations + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_SUBNET, row.object_uuid): + return False + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_PORT, row.object_uuid): + return False + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_ROUTER, row.object_uuid): + return False + elif (row.operation == odl_const.ODL_UPDATE and + not _is_valid_update_operation(session, row)): + return False + return True + + +def validate_subnet_operation(session, row): + """Validate the subnet operation based on dependencies. + + Validate subnet operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. e.g. + """ + if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE): + network_id = row.data['network_id'] + # Check for pending or processing network operations + if db.check_for_pending_or_processing_ops(session, network_id): + return False + if (row.operation == odl_const.ODL_UPDATE and + not _is_valid_update_operation(session, row)): + return False + elif row.operation == odl_const.ODL_DELETE: + # Check for any pending or processing create or update + # ops on this uuid itself + if db.check_for_pending_or_processing_ops( + session, row.object_uuid, [odl_const.ODL_UPDATE, + odl_const.ODL_CREATE]): + return False + # Check for dependent operations + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_PORT, row.object_uuid): + return False + + return True + + +def validate_port_operation(session, row): + """Validate port operation based on dependencies. + + Validate port operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. e.g. + """ + if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE): + network_id = row.data['network_id'] + # Check for pending or processing network operations + ops = db.check_for_pending_or_processing_ops(session, network_id) + # Check for pending subnet operations. + for fixed_ip in row.data['fixed_ips']: + ip_ops = db.check_for_pending_or_processing_ops( + session, fixed_ip['subnet_id']) + ops = ops or ip_ops + + if ops: + return False + if (row.operation == odl_const.ODL_UPDATE and + not _is_valid_update_operation(session, row)): + return False + elif row.operation == odl_const.ODL_DELETE: + # Check for any pending or processing create or update + # ops on this uuid itself + if db.check_for_pending_or_processing_ops( + session, row.object_uuid, [odl_const.ODL_UPDATE, + odl_const.ODL_CREATE]): + return False + + return True + + +def validate_router_operation(session, row): + """Validate router operation based on dependencies. + + Validate router operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. + """ + if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE): + if row.data['gw_port_id'] is not None: + if db.check_for_pending_or_processing_ops(session, + row.data['gw_port_id']): + return False + if (row.operation == odl_const.ODL_UPDATE and + not _is_valid_update_operation(session, row)): + return False + elif row.operation == odl_const.ODL_DELETE: + # Check for any pending or processing create or update + # operations on this uuid. + if db.check_for_pending_or_processing_ops(session, row.object_uuid, + [odl_const.ODL_UPDATE, + odl_const.ODL_CREATE]): + return False + + # Check that dependent port delete operation has completed. + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_PORT, row.object_uuid): + return False + + # Check that dependent floatingip delete operation has completed. + if db.check_for_pending_delete_ops_with_parent( + session, odl_const.ODL_FLOATINGIP, row.object_uuid): + return False + + # Check that dependent router interface remove operation has completed. + if db.check_for_pending_remove_ops_with_parent( + session, row.object_uuid): + return False + + return True + + +def validate_floatingip_operation(session, row): + """Validate floatingip operation based on dependencies. + + Validate floating IP operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. + """ + if row.operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE): + network_id = row.data.get('floating_network_id') + if network_id is not None: + if not db.check_for_pending_or_processing_ops(session, network_id): + port_id = row.data.get('port_id') + if port_id is not None: + if db.check_for_pending_or_processing_ops(session, + port_id): + return False + else: + return False + + router_id = row.data.get('router_id') + if router_id is not None: + if db.check_for_pending_or_processing_ops(session, router_id): + return False + if (row.operation == odl_const.ODL_UPDATE and + not _is_valid_update_operation(session, row)): + return False + elif row.operation == odl_const.ODL_DELETE: + # Check for any pending or processing create or update + # ops on this uuid itself + if db.check_for_pending_or_processing_ops(session, row.object_uuid, + [odl_const.ODL_UPDATE, + odl_const.ODL_CREATE]): + return False + + return True + + +def validate_router_interface_operation(session, row): + """Validate router_interface operation based on dependencies. + + Validate router_interface operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. + """ + if row.operation == odl_const.ODL_ADD: + # Verify that router event has been completed. + if db.check_for_pending_or_processing_ops(session, row.data['id']): + return False + + # TODO(rcurran): Check for port_id? + if db.check_for_pending_or_processing_ops(session, + row.data['subnet_id']): + return False + elif row.operation == odl_const.ODL_REMOVE: + if db.check_for_pending_or_processing_add(session, row.data['id'], + row.data['subnet_id']): + return False + + return True + + +def validate_security_group_operation(session, row): + """Validate security_group operation based on dependencies. + + Validate security_group operation depending on whether it's dependencies + are still in 'pending' or 'processing' state. e.g. + """ + return True + + +def validate_security_group_rule_operation(session, row): + """Validate security_group_rule operation based on dependencies. + + Validate security_group_rule operation depending on whether it's + dependencies are still in 'pending' or 'processing' state. e.g. + """ + return True + +_VALIDATION_MAP = { + odl_const.ODL_NETWORK: validate_network_operation, + odl_const.ODL_SUBNET: validate_subnet_operation, + odl_const.ODL_PORT: validate_port_operation, + odl_const.ODL_ROUTER: validate_router_operation, + odl_const.ODL_ROUTER_INTF: validate_router_interface_operation, + odl_const.ODL_FLOATINGIP: validate_floatingip_operation, + odl_const.ODL_SG: validate_security_group_operation, + odl_const.ODL_SG_RULE: validate_security_group_rule_operation, +} + + +def validate(session, row): + """Validate resource dependency in journaled operations. + + :param session: db session + :param row: entry in journal entry to be validated + """ + return _VALIDATION_MAP[row.object_type](session, row) + + +def register_validator(object_type, validator): + """Register validator function for given resource. + + :param object_type: neutron resource type + :param validator: function to be registered which validates resource + dependencies + """ + assert object_type not in _VALIDATION_MAP + _VALIDATION_MAP[object_type] = validator diff --git a/networking-odl/networking_odl/journal/full_sync.py b/networking-odl/networking_odl/journal/full_sync.py new file mode 100644 index 0000000..dad7215 --- /dev/null +++ b/networking-odl/networking_odl/journal/full_sync.py @@ -0,0 +1,114 @@ +# +# Copyright (C) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import requests + +from neutron import context as neutron_context +from neutron import manager +from neutron.plugins.common import constants +from neutron_lib import constants as l3_constants + +from networking_odl.common import client +from networking_odl.common import constants as odl_const +from networking_odl.db import db + +# Define which pending operation types should be deleted +_CANARY_NETWORK_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142" +_CANARY_TENANT_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142" +_CANARY_NETWORK_DATA = {'id': _CANARY_NETWORK_ID, + 'tenant_id': _CANARY_TENANT_ID, + 'name': 'Sync Canary Network', + 'admin_state_up': False} +_OPS_TO_DELETE_ON_SYNC = (odl_const.ODL_CREATE, odl_const.ODL_UPDATE) +_L2_RESOURCES_TO_SYNC = [(odl_const.ODL_SG, odl_const.ODL_SGS), + (odl_const.ODL_SG_RULE, odl_const.ODL_SG_RULES), + (odl_const.ODL_NETWORK, odl_const.ODL_NETWORKS), + (odl_const.ODL_SUBNET, odl_const.ODL_SUBNETS), + (odl_const.ODL_PORT, odl_const.ODL_PORTS)] +_L3_RESOURCES_TO_SYNC = [(odl_const.ODL_ROUTER, odl_const.ODL_ROUTERS), + (odl_const.ODL_FLOATINGIP, odl_const.ODL_FLOATINGIPS)] +_CLIENT = client.OpenDaylightRestClient.create_client() + + +def full_sync(session): + if not _full_sync_needed(session): + return + + db.delete_pending_rows(session, _OPS_TO_DELETE_ON_SYNC) + + dbcontext = neutron_context.get_admin_context() + plugin = manager.NeutronManager.get_plugin() + for resource_type, collection_name in _L2_RESOURCES_TO_SYNC: + _sync_resources(session, plugin, dbcontext, resource_type, + collection_name) + + l3plugin = manager.NeutronManager.get_service_plugins().get( + constants.L3_ROUTER_NAT) + for resource_type, collection_name in _L3_RESOURCES_TO_SYNC: + _sync_resources(session, l3plugin, dbcontext, resource_type, + collection_name) + _sync_router_ports(session, plugin, dbcontext) + + db.create_pending_row(session, odl_const.ODL_NETWORK, _CANARY_NETWORK_ID, + odl_const.ODL_CREATE, _CANARY_NETWORK_DATA) + + +def _full_sync_needed(session): + return (_canary_network_missing_on_odl() and + _canary_network_not_in_journal(session)) + + +def _canary_network_missing_on_odl(): + # Try to reach the ODL server, sometimes it might be up & responding to + # HTTP calls but inoperative.. + response = _CLIENT.get(odl_const.ODL_NETWORKS) + response.raise_for_status() + + response = _CLIENT.get(odl_const.ODL_NETWORKS + "/" + _CANARY_NETWORK_ID) + if response.status_code == requests.codes.not_found: + return True + + # In case there was an error raise it up because we don't know how to deal + # with it.. + response.raise_for_status() + return False + + +def _canary_network_not_in_journal(session): + return not db.check_for_pending_or_processing_ops(session, + _CANARY_NETWORK_ID, + odl_const.ODL_CREATE) + + +def _sync_resources(session, plugin, dbcontext, object_type, collection_name): + obj_getter = getattr(plugin, 'get_%s' % collection_name) + resources = obj_getter(dbcontext) + + for resource in resources: + db.create_pending_row(session, object_type, resource['id'], + odl_const.ODL_CREATE, resource) + + +def _sync_router_ports(session, plugin, dbcontext): + filters = {'device_owner': [l3_constants.DEVICE_OWNER_ROUTER_INTF]} + router_ports = plugin.get_ports(dbcontext, filters=filters) + for port in router_ports: + resource = {'subnet_id': port['fixed_ips'][0]['subnet_id'], + 'port_id': port['id'], + 'id': port['device_id'], + 'tenant_id': port['tenant_id']} + db.create_pending_row(session, odl_const.ODL_ROUTER_INTF, port['id'], + odl_const.ODL_ADD, resource) diff --git a/networking-odl/networking_odl/journal/journal.py b/networking-odl/networking_odl/journal/journal.py new file mode 100644 index 0000000..ca0d2c2 --- /dev/null +++ b/networking-odl/networking_odl/journal/journal.py @@ -0,0 +1,220 @@ +# Copyright (c) 2015 OpenStack Foundation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import threading + +from requests import exceptions + +from oslo_config import cfg +from oslo_log import log as logging + +from neutron import context as neutron_context +from neutron.db import api as neutron_db_api +from neutron import manager + +from networking_odl.common import client +from networking_odl.common import constants as odl_const +from networking_odl.common import filters +from networking_odl._i18n import _LI, _LE +from networking_odl.db import db +from networking_odl.journal import dependency_validations + + +LOG = logging.getLogger(__name__) + + +def call_thread_on_end(func): + def new_func(obj, *args, **kwargs): + return_value = func(obj, *args, **kwargs) + obj.journal.set_sync_event() + return return_value + return new_func + + +def _enrich_port(db_session, context, object_type, operation, data): + """Enrich the port with additional information needed by ODL""" + if context: + plugin = context._plugin + dbcontext = context._plugin_context + else: + dbcontext = neutron_context.get_admin_context() + plugin = manager.NeutronManager.get_plugin() + + groups = [plugin.get_security_group(dbcontext, sg) + for sg in data['security_groups']] + new_data = copy.deepcopy(data) + new_data['security_groups'] = groups + + # NOTE(yamahata): work around for port creation for router + # tenant_id=''(empty string) is passed when port is created + # by l3 plugin internally for router. + # On the other hand, ODL doesn't accept empty string for tenant_id. + # In that case, deduce tenant_id from network_id for now. + # Right fix: modify Neutron so that don't allow empty string + # for tenant_id even for port for internal use. + # TODO(yamahata): eliminate this work around when neutron side + # is fixed + # assert port['tenant_id'] != '' + if ('tenant_id' not in new_data or new_data['tenant_id'] == ''): + if context: + tenant_id = context._network_context._network['tenant_id'] + else: + network = plugin.get_network(dbcontext, new_data['network_id']) + tenant_id = network['tenant_id'] + new_data['tenant_id'] = tenant_id + + return new_data + + +def record(db_session, object_type, object_uuid, operation, data, + context=None): + if (object_type == odl_const.ODL_PORT and + operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)): + data = _enrich_port(db_session, context, object_type, operation, data) + + db.create_pending_row(db_session, object_type, object_uuid, operation, + data) + + +class OpendaylightJournalThread(object): + """Thread worker for the Opendaylight Journal Database.""" + def __init__(self): + self.client = client.OpenDaylightRestClient.create_client() + self._odl_sync_timeout = cfg.CONF.ml2_odl.sync_timeout + self._row_retry_count = cfg.CONF.ml2_odl.retry_count + self.event = threading.Event() + self.lock = threading.Lock() + self._odl_sync_thread = self.start_odl_sync_thread() + self._start_sync_timer() + + def start_odl_sync_thread(self): + # Start the sync thread + LOG.debug("Starting a new sync thread") + odl_sync_thread = threading.Thread( + name='sync', + target=self.run_sync_thread) + odl_sync_thread.start() + return odl_sync_thread + + def set_sync_event(self): + # Prevent race when starting the timer + with self.lock: + LOG.debug("Resetting thread timer") + self._timer.cancel() + self._start_sync_timer() + self.event.set() + + def _start_sync_timer(self): + self._timer = threading.Timer(self._odl_sync_timeout, + self.set_sync_event) + self._timer.start() + + def _json_data(self, row): + data = copy.deepcopy(row.data) + filters.filter_for_odl(row.object_type, row.operation, data) + url_object = row.object_type.replace('_', '-') + + if row.operation == odl_const.ODL_CREATE: + method = 'post' + urlpath = url_object + 's' + to_send = {row.object_type: data} + elif row.operation == odl_const.ODL_UPDATE: + method = 'put' + urlpath = url_object + 's/' + row.object_uuid + to_send = {row.object_type: data} + elif row.operation == odl_const.ODL_DELETE: + method = 'delete' + urlpath = url_object + 's/' + row.object_uuid + to_send = None + elif row.operation == odl_const.ODL_ADD: + method = 'put' + urlpath = 'routers/' + data['id'] + '/add_router_interface' + to_send = data + elif row.operation == odl_const.ODL_REMOVE: + method = 'put' + urlpath = 'routers/' + data['id'] + '/remove_router_interface' + to_send = data + + return method, urlpath, to_send + + def run_sync_thread(self, exit_after_run=False): + while True: + try: + self.event.wait() + self.event.clear() + + session = neutron_db_api.get_session() + self._sync_pending_rows(session, exit_after_run) + + LOG.debug("Clearing sync thread event") + if exit_after_run: + # Permanently waiting thread model breaks unit tests + # Adding this arg to exit here only for unit tests + break + except Exception: + # Catch exceptions to protect the thread while running + LOG.exception(_LE("Error on run_sync_thread")) + + def _sync_pending_rows(self, session, exit_after_run): + while True: + LOG.debug("Thread walking database") + row = db.get_oldest_pending_db_row_with_lock(session) + if not row: + LOG.debug("No rows to sync") + break + + # Validate the operation + valid = dependency_validations.validate(session, row) + if not valid: + LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a " + "valid operation yet, skipping for now"), + {'operation': row.operation, + 'type': row.object_type, + 'uuid': row.object_uuid}) + + # Set row back to pending. + db.update_db_row_state(session, row, odl_const.PENDING) + if exit_after_run: + break + continue + + LOG.info(_LI("Syncing %(operation)s %(type)s %(uuid)s"), + {'operation': row.operation, 'type': row.object_type, + 'uuid': row.object_uuid}) + + # Add code to sync this to ODL + method, urlpath, to_send = self._json_data(row) + + try: + self.client.sendjson(method, urlpath, to_send) + db.update_db_row_state(session, row, odl_const.COMPLETED) + except exceptions.ConnectionError as e: + # Don't raise the retry count, just log an error + LOG.error(_LE("Cannot connect to the Opendaylight Controller")) + # Set row back to pending + db.update_db_row_state(session, row, odl_const.PENDING) + # Break our of the loop and retry with the next + # timer interval + break + except Exception as e: + LOG.error(_LE("Error syncing %(type)s %(operation)s," + " id %(uuid)s Error: %(error)s"), + {'type': row.object_type, + 'uuid': row.object_uuid, + 'operation': row.operation, + 'error': e.message}) + db.update_pending_db_row_retry(session, row, + self._row_retry_count) diff --git a/networking-odl/networking_odl/journal/maintenance.py b/networking-odl/networking_odl/journal/maintenance.py new file mode 100644 index 0000000..7fb82a0 --- /dev/null +++ b/networking-odl/networking_odl/journal/maintenance.py @@ -0,0 +1,73 @@ +# +# Copyright (C) 2016 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +from neutron.db import api as neutron_db_api +from oslo_config import cfg +from oslo_log import log as logging +from oslo_service import loopingcall + +from networking_odl._i18n import _LI, _LE +from networking_odl.db import db + + +LOG = logging.getLogger(__name__) + + +class MaintenanceThread(object): + def __init__(self): + self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops) + self.maintenance_interval = cfg.CONF.ml2_odl.maintenance_interval + self.maintenance_ops = [] + + def start(self): + self.timer.start(self.maintenance_interval, stop_on_exception=False) + + def _execute_op(self, operation, session): + op_details = operation.__name__ + if operation.__doc__: + op_details += " (%s)" % operation.func_doc + + try: + LOG.info(_LI("Starting maintenance operation %s."), op_details) + db.update_maintenance_operation(session, operation=operation) + operation(session=session) + LOG.info(_LI("Finished maintenance operation %s."), op_details) + except Exception: + LOG.exception(_LE("Failed during maintenance operation %s."), + op_details) + + def execute_ops(self): + LOG.info(_LI("Starting journal maintenance run.")) + session = neutron_db_api.get_session() + if not db.lock_maintenance(session): + LOG.info(_LI("Maintenance already running, aborting.")) + return + + try: + for operation in self.maintenance_ops: + self._execute_op(operation, session) + finally: + db.update_maintenance_operation(session, operation=None) + db.unlock_maintenance(session) + LOG.info(_LI("Finished journal maintenance run.")) + + def register_operation(self, f): + """Register a function to be run by the maintenance thread. + + :param f: Function to call when the thread runs. The function will + receive a DB session to use for DB operations. + """ + self.maintenance_ops.append(f) |