diff options
Diffstat (limited to 'networking-odl/networking_odl/journal')
5 files changed, 18 insertions, 304 deletions
diff --git a/networking-odl/networking_odl/journal/cleanup.py b/networking-odl/networking_odl/journal/cleanup.py deleted file mode 100644 index 994fb82..0000000 --- a/networking-odl/networking_odl/journal/cleanup.py +++ /dev/null @@ -1,46 +0,0 @@ -# -# Copyright (C) 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from datetime import timedelta - -from oslo_config import cfg -from oslo_log import log as logging - -from networking_odl._i18n import _LI -from networking_odl.common import constants as odl_const -from networking_odl.db import db - -LOG = logging.getLogger(__name__) - - -class JournalCleanup(object): - """Journal maintenance operation for deleting completed rows.""" - def __init__(self): - self._rows_retention = cfg.CONF.ml2_odl.completed_rows_retention - self._processing_timeout = cfg.CONF.ml2_odl.processing_timeout - - def delete_completed_rows(self, session): - if self._rows_retention is not -1: - LOG.debug("Deleting completed rows") - db.delete_rows_by_state_and_time( - session, odl_const.COMPLETED, - timedelta(seconds=self._rows_retention)) - - def cleanup_processing_rows(self, session): - row_count = db.reset_processing_rows(session, self._processing_timeout) - if row_count: - LOG.info(_LI("Reset %(num)s orphaned rows back to pending"), - {"num": row_count}) diff --git a/networking-odl/networking_odl/journal/dependency_validations.py b/networking-odl/networking_odl/journal/dependency_validations.py index a6f5f96..07c657c 100644 --- a/networking-odl/networking_odl/journal/dependency_validations.py +++ b/networking-odl/networking_odl/journal/dependency_validations.py @@ -235,7 +235,7 @@ def validate_security_group_rule_operation(session, row): """ return True -_VALIDATION_MAP = { +VALIDATION_MAP = { odl_const.ODL_NETWORK: validate_network_operation, odl_const.ODL_SUBNET: validate_subnet_operation, odl_const.ODL_PORT: validate_port_operation, @@ -245,23 +245,3 @@ _VALIDATION_MAP = { odl_const.ODL_SG: validate_security_group_operation, odl_const.ODL_SG_RULE: validate_security_group_rule_operation, } - - -def validate(session, row): - """Validate resource dependency in journaled operations. - - :param session: db session - :param row: entry in journal entry to be validated - """ - return _VALIDATION_MAP[row.object_type](session, row) - - -def register_validator(object_type, validator): - """Register validator function for given resource. - - :param object_type: neutron resource type - :param validator: function to be registered which validates resource - dependencies - """ - assert object_type not in _VALIDATION_MAP - _VALIDATION_MAP[object_type] = validator diff --git a/networking-odl/networking_odl/journal/full_sync.py b/networking-odl/networking_odl/journal/full_sync.py deleted file mode 100644 index dad7215..0000000 --- a/networking-odl/networking_odl/journal/full_sync.py +++ /dev/null @@ -1,114 +0,0 @@ -# -# Copyright (C) 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import requests - -from neutron import context as neutron_context -from neutron import manager -from neutron.plugins.common import constants -from neutron_lib import constants as l3_constants - -from networking_odl.common import client -from networking_odl.common import constants as odl_const -from networking_odl.db import db - -# Define which pending operation types should be deleted -_CANARY_NETWORK_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142" -_CANARY_TENANT_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142" -_CANARY_NETWORK_DATA = {'id': _CANARY_NETWORK_ID, - 'tenant_id': _CANARY_TENANT_ID, - 'name': 'Sync Canary Network', - 'admin_state_up': False} -_OPS_TO_DELETE_ON_SYNC = (odl_const.ODL_CREATE, odl_const.ODL_UPDATE) -_L2_RESOURCES_TO_SYNC = [(odl_const.ODL_SG, odl_const.ODL_SGS), - (odl_const.ODL_SG_RULE, odl_const.ODL_SG_RULES), - (odl_const.ODL_NETWORK, odl_const.ODL_NETWORKS), - (odl_const.ODL_SUBNET, odl_const.ODL_SUBNETS), - (odl_const.ODL_PORT, odl_const.ODL_PORTS)] -_L3_RESOURCES_TO_SYNC = [(odl_const.ODL_ROUTER, odl_const.ODL_ROUTERS), - (odl_const.ODL_FLOATINGIP, odl_const.ODL_FLOATINGIPS)] -_CLIENT = client.OpenDaylightRestClient.create_client() - - -def full_sync(session): - if not _full_sync_needed(session): - return - - db.delete_pending_rows(session, _OPS_TO_DELETE_ON_SYNC) - - dbcontext = neutron_context.get_admin_context() - plugin = manager.NeutronManager.get_plugin() - for resource_type, collection_name in _L2_RESOURCES_TO_SYNC: - _sync_resources(session, plugin, dbcontext, resource_type, - collection_name) - - l3plugin = manager.NeutronManager.get_service_plugins().get( - constants.L3_ROUTER_NAT) - for resource_type, collection_name in _L3_RESOURCES_TO_SYNC: - _sync_resources(session, l3plugin, dbcontext, resource_type, - collection_name) - _sync_router_ports(session, plugin, dbcontext) - - db.create_pending_row(session, odl_const.ODL_NETWORK, _CANARY_NETWORK_ID, - odl_const.ODL_CREATE, _CANARY_NETWORK_DATA) - - -def _full_sync_needed(session): - return (_canary_network_missing_on_odl() and - _canary_network_not_in_journal(session)) - - -def _canary_network_missing_on_odl(): - # Try to reach the ODL server, sometimes it might be up & responding to - # HTTP calls but inoperative.. - response = _CLIENT.get(odl_const.ODL_NETWORKS) - response.raise_for_status() - - response = _CLIENT.get(odl_const.ODL_NETWORKS + "/" + _CANARY_NETWORK_ID) - if response.status_code == requests.codes.not_found: - return True - - # In case there was an error raise it up because we don't know how to deal - # with it.. - response.raise_for_status() - return False - - -def _canary_network_not_in_journal(session): - return not db.check_for_pending_or_processing_ops(session, - _CANARY_NETWORK_ID, - odl_const.ODL_CREATE) - - -def _sync_resources(session, plugin, dbcontext, object_type, collection_name): - obj_getter = getattr(plugin, 'get_%s' % collection_name) - resources = obj_getter(dbcontext) - - for resource in resources: - db.create_pending_row(session, object_type, resource['id'], - odl_const.ODL_CREATE, resource) - - -def _sync_router_ports(session, plugin, dbcontext): - filters = {'device_owner': [l3_constants.DEVICE_OWNER_ROUTER_INTF]} - router_ports = plugin.get_ports(dbcontext, filters=filters) - for port in router_ports: - resource = {'subnet_id': port['fixed_ips'][0]['subnet_id'], - 'port_id': port['id'], - 'id': port['device_id'], - 'tenant_id': port['tenant_id']} - db.create_pending_row(session, odl_const.ODL_ROUTER_INTF, port['id'], - odl_const.ODL_ADD, resource) diff --git a/networking-odl/networking_odl/journal/journal.py b/networking-odl/networking_odl/journal/journal.py index ca0d2c2..26295b3 100644 --- a/networking-odl/networking_odl/journal/journal.py +++ b/networking-odl/networking_odl/journal/journal.py @@ -21,9 +21,7 @@ from requests import exceptions from oslo_config import cfg from oslo_log import log as logging -from neutron import context as neutron_context from neutron.db import api as neutron_db_api -from neutron import manager from networking_odl.common import client from networking_odl.common import constants as odl_const @@ -44,51 +42,6 @@ def call_thread_on_end(func): return new_func -def _enrich_port(db_session, context, object_type, operation, data): - """Enrich the port with additional information needed by ODL""" - if context: - plugin = context._plugin - dbcontext = context._plugin_context - else: - dbcontext = neutron_context.get_admin_context() - plugin = manager.NeutronManager.get_plugin() - - groups = [plugin.get_security_group(dbcontext, sg) - for sg in data['security_groups']] - new_data = copy.deepcopy(data) - new_data['security_groups'] = groups - - # NOTE(yamahata): work around for port creation for router - # tenant_id=''(empty string) is passed when port is created - # by l3 plugin internally for router. - # On the other hand, ODL doesn't accept empty string for tenant_id. - # In that case, deduce tenant_id from network_id for now. - # Right fix: modify Neutron so that don't allow empty string - # for tenant_id even for port for internal use. - # TODO(yamahata): eliminate this work around when neutron side - # is fixed - # assert port['tenant_id'] != '' - if ('tenant_id' not in new_data or new_data['tenant_id'] == ''): - if context: - tenant_id = context._network_context._network['tenant_id'] - else: - network = plugin.get_network(dbcontext, new_data['network_id']) - tenant_id = network['tenant_id'] - new_data['tenant_id'] = tenant_id - - return new_data - - -def record(db_session, object_type, object_uuid, operation, data, - context=None): - if (object_type == odl_const.ODL_PORT and - operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)): - data = _enrich_port(db_session, context, object_type, operation, data) - - db.create_pending_row(db_session, object_type, object_uuid, operation, - data) - - class OpendaylightJournalThread(object): """Thread worker for the Opendaylight Journal Database.""" def __init__(self): @@ -123,28 +76,40 @@ class OpendaylightJournalThread(object): self._timer.start() def _json_data(self, row): - data = copy.deepcopy(row.data) - filters.filter_for_odl(row.object_type, row.operation, data) + filter_cls = filters.FILTER_MAP[row.object_type] url_object = row.object_type.replace('_', '-') if row.operation == odl_const.ODL_CREATE: method = 'post' + attr_filter = filter_cls.filter_create_attributes + data = copy.deepcopy(row.data) urlpath = url_object + 's' + attr_filter(data) to_send = {row.object_type: data} elif row.operation == odl_const.ODL_UPDATE: method = 'put' + attr_filter = filter_cls.filter_update_attributes + data = copy.deepcopy(row.data) urlpath = url_object + 's/' + row.object_uuid + attr_filter(data) to_send = {row.object_type: data} elif row.operation == odl_const.ODL_DELETE: method = 'delete' + data = None urlpath = url_object + 's/' + row.object_uuid to_send = None elif row.operation == odl_const.ODL_ADD: method = 'put' + attr_filter = filter_cls.filter_add_attributes + data = copy.deepcopy(row.data) + attr_filter(data) urlpath = 'routers/' + data['id'] + '/add_router_interface' to_send = data elif row.operation == odl_const.ODL_REMOVE: method = 'put' + attr_filter = filter_cls.filter_remove_attributes + data = copy.deepcopy(row.data) + attr_filter(data) urlpath = 'routers/' + data['id'] + '/remove_router_interface' to_send = data @@ -177,7 +142,9 @@ class OpendaylightJournalThread(object): break # Validate the operation - valid = dependency_validations.validate(session, row) + validate_func = (dependency_validations. + VALIDATION_MAP[row.object_type]) + valid = validate_func(session, row) if not valid: LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a " "valid operation yet, skipping for now"), diff --git a/networking-odl/networking_odl/journal/maintenance.py b/networking-odl/networking_odl/journal/maintenance.py deleted file mode 100644 index 7fb82a0..0000000 --- a/networking-odl/networking_odl/journal/maintenance.py +++ /dev/null @@ -1,73 +0,0 @@ -# -# Copyright (C) 2016 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -from neutron.db import api as neutron_db_api -from oslo_config import cfg -from oslo_log import log as logging -from oslo_service import loopingcall - -from networking_odl._i18n import _LI, _LE -from networking_odl.db import db - - -LOG = logging.getLogger(__name__) - - -class MaintenanceThread(object): - def __init__(self): - self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops) - self.maintenance_interval = cfg.CONF.ml2_odl.maintenance_interval - self.maintenance_ops = [] - - def start(self): - self.timer.start(self.maintenance_interval, stop_on_exception=False) - - def _execute_op(self, operation, session): - op_details = operation.__name__ - if operation.__doc__: - op_details += " (%s)" % operation.func_doc - - try: - LOG.info(_LI("Starting maintenance operation %s."), op_details) - db.update_maintenance_operation(session, operation=operation) - operation(session=session) - LOG.info(_LI("Finished maintenance operation %s."), op_details) - except Exception: - LOG.exception(_LE("Failed during maintenance operation %s."), - op_details) - - def execute_ops(self): - LOG.info(_LI("Starting journal maintenance run.")) - session = neutron_db_api.get_session() - if not db.lock_maintenance(session): - LOG.info(_LI("Maintenance already running, aborting.")) - return - - try: - for operation in self.maintenance_ops: - self._execute_op(operation, session) - finally: - db.update_maintenance_operation(session, operation=None) - db.unlock_maintenance(session) - LOG.info(_LI("Finished journal maintenance run.")) - - def register_operation(self, f): - """Register a function to be run by the maintenance thread. - - :param f: Function to call when the thread runs. The function will - receive a DB session to use for DB operations. - """ - self.maintenance_ops.append(f) |