summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/journal
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/journal')
-rw-r--r--networking-odl/networking_odl/journal/cleanup.py46
-rw-r--r--networking-odl/networking_odl/journal/dependency_validations.py22
-rw-r--r--networking-odl/networking_odl/journal/full_sync.py114
-rw-r--r--networking-odl/networking_odl/journal/journal.py67
-rw-r--r--networking-odl/networking_odl/journal/maintenance.py73
5 files changed, 18 insertions, 304 deletions
diff --git a/networking-odl/networking_odl/journal/cleanup.py b/networking-odl/networking_odl/journal/cleanup.py
deleted file mode 100644
index 994fb82..0000000
--- a/networking-odl/networking_odl/journal/cleanup.py
+++ /dev/null
@@ -1,46 +0,0 @@
-#
-# Copyright (C) 2016 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-from datetime import timedelta
-
-from oslo_config import cfg
-from oslo_log import log as logging
-
-from networking_odl._i18n import _LI
-from networking_odl.common import constants as odl_const
-from networking_odl.db import db
-
-LOG = logging.getLogger(__name__)
-
-
-class JournalCleanup(object):
- """Journal maintenance operation for deleting completed rows."""
- def __init__(self):
- self._rows_retention = cfg.CONF.ml2_odl.completed_rows_retention
- self._processing_timeout = cfg.CONF.ml2_odl.processing_timeout
-
- def delete_completed_rows(self, session):
- if self._rows_retention is not -1:
- LOG.debug("Deleting completed rows")
- db.delete_rows_by_state_and_time(
- session, odl_const.COMPLETED,
- timedelta(seconds=self._rows_retention))
-
- def cleanup_processing_rows(self, session):
- row_count = db.reset_processing_rows(session, self._processing_timeout)
- if row_count:
- LOG.info(_LI("Reset %(num)s orphaned rows back to pending"),
- {"num": row_count})
diff --git a/networking-odl/networking_odl/journal/dependency_validations.py b/networking-odl/networking_odl/journal/dependency_validations.py
index a6f5f96..07c657c 100644
--- a/networking-odl/networking_odl/journal/dependency_validations.py
+++ b/networking-odl/networking_odl/journal/dependency_validations.py
@@ -235,7 +235,7 @@ def validate_security_group_rule_operation(session, row):
"""
return True
-_VALIDATION_MAP = {
+VALIDATION_MAP = {
odl_const.ODL_NETWORK: validate_network_operation,
odl_const.ODL_SUBNET: validate_subnet_operation,
odl_const.ODL_PORT: validate_port_operation,
@@ -245,23 +245,3 @@ _VALIDATION_MAP = {
odl_const.ODL_SG: validate_security_group_operation,
odl_const.ODL_SG_RULE: validate_security_group_rule_operation,
}
-
-
-def validate(session, row):
- """Validate resource dependency in journaled operations.
-
- :param session: db session
- :param row: entry in journal entry to be validated
- """
- return _VALIDATION_MAP[row.object_type](session, row)
-
-
-def register_validator(object_type, validator):
- """Register validator function for given resource.
-
- :param object_type: neutron resource type
- :param validator: function to be registered which validates resource
- dependencies
- """
- assert object_type not in _VALIDATION_MAP
- _VALIDATION_MAP[object_type] = validator
diff --git a/networking-odl/networking_odl/journal/full_sync.py b/networking-odl/networking_odl/journal/full_sync.py
deleted file mode 100644
index dad7215..0000000
--- a/networking-odl/networking_odl/journal/full_sync.py
+++ /dev/null
@@ -1,114 +0,0 @@
-#
-# Copyright (C) 2016 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-import requests
-
-from neutron import context as neutron_context
-from neutron import manager
-from neutron.plugins.common import constants
-from neutron_lib import constants as l3_constants
-
-from networking_odl.common import client
-from networking_odl.common import constants as odl_const
-from networking_odl.db import db
-
-# Define which pending operation types should be deleted
-_CANARY_NETWORK_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142"
-_CANARY_TENANT_ID = "bd8db3a8-2b30-4083-a8b3-b3fd46401142"
-_CANARY_NETWORK_DATA = {'id': _CANARY_NETWORK_ID,
- 'tenant_id': _CANARY_TENANT_ID,
- 'name': 'Sync Canary Network',
- 'admin_state_up': False}
-_OPS_TO_DELETE_ON_SYNC = (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)
-_L2_RESOURCES_TO_SYNC = [(odl_const.ODL_SG, odl_const.ODL_SGS),
- (odl_const.ODL_SG_RULE, odl_const.ODL_SG_RULES),
- (odl_const.ODL_NETWORK, odl_const.ODL_NETWORKS),
- (odl_const.ODL_SUBNET, odl_const.ODL_SUBNETS),
- (odl_const.ODL_PORT, odl_const.ODL_PORTS)]
-_L3_RESOURCES_TO_SYNC = [(odl_const.ODL_ROUTER, odl_const.ODL_ROUTERS),
- (odl_const.ODL_FLOATINGIP, odl_const.ODL_FLOATINGIPS)]
-_CLIENT = client.OpenDaylightRestClient.create_client()
-
-
-def full_sync(session):
- if not _full_sync_needed(session):
- return
-
- db.delete_pending_rows(session, _OPS_TO_DELETE_ON_SYNC)
-
- dbcontext = neutron_context.get_admin_context()
- plugin = manager.NeutronManager.get_plugin()
- for resource_type, collection_name in _L2_RESOURCES_TO_SYNC:
- _sync_resources(session, plugin, dbcontext, resource_type,
- collection_name)
-
- l3plugin = manager.NeutronManager.get_service_plugins().get(
- constants.L3_ROUTER_NAT)
- for resource_type, collection_name in _L3_RESOURCES_TO_SYNC:
- _sync_resources(session, l3plugin, dbcontext, resource_type,
- collection_name)
- _sync_router_ports(session, plugin, dbcontext)
-
- db.create_pending_row(session, odl_const.ODL_NETWORK, _CANARY_NETWORK_ID,
- odl_const.ODL_CREATE, _CANARY_NETWORK_DATA)
-
-
-def _full_sync_needed(session):
- return (_canary_network_missing_on_odl() and
- _canary_network_not_in_journal(session))
-
-
-def _canary_network_missing_on_odl():
- # Try to reach the ODL server, sometimes it might be up & responding to
- # HTTP calls but inoperative..
- response = _CLIENT.get(odl_const.ODL_NETWORKS)
- response.raise_for_status()
-
- response = _CLIENT.get(odl_const.ODL_NETWORKS + "/" + _CANARY_NETWORK_ID)
- if response.status_code == requests.codes.not_found:
- return True
-
- # In case there was an error raise it up because we don't know how to deal
- # with it..
- response.raise_for_status()
- return False
-
-
-def _canary_network_not_in_journal(session):
- return not db.check_for_pending_or_processing_ops(session,
- _CANARY_NETWORK_ID,
- odl_const.ODL_CREATE)
-
-
-def _sync_resources(session, plugin, dbcontext, object_type, collection_name):
- obj_getter = getattr(plugin, 'get_%s' % collection_name)
- resources = obj_getter(dbcontext)
-
- for resource in resources:
- db.create_pending_row(session, object_type, resource['id'],
- odl_const.ODL_CREATE, resource)
-
-
-def _sync_router_ports(session, plugin, dbcontext):
- filters = {'device_owner': [l3_constants.DEVICE_OWNER_ROUTER_INTF]}
- router_ports = plugin.get_ports(dbcontext, filters=filters)
- for port in router_ports:
- resource = {'subnet_id': port['fixed_ips'][0]['subnet_id'],
- 'port_id': port['id'],
- 'id': port['device_id'],
- 'tenant_id': port['tenant_id']}
- db.create_pending_row(session, odl_const.ODL_ROUTER_INTF, port['id'],
- odl_const.ODL_ADD, resource)
diff --git a/networking-odl/networking_odl/journal/journal.py b/networking-odl/networking_odl/journal/journal.py
index ca0d2c2..26295b3 100644
--- a/networking-odl/networking_odl/journal/journal.py
+++ b/networking-odl/networking_odl/journal/journal.py
@@ -21,9 +21,7 @@ from requests import exceptions
from oslo_config import cfg
from oslo_log import log as logging
-from neutron import context as neutron_context
from neutron.db import api as neutron_db_api
-from neutron import manager
from networking_odl.common import client
from networking_odl.common import constants as odl_const
@@ -44,51 +42,6 @@ def call_thread_on_end(func):
return new_func
-def _enrich_port(db_session, context, object_type, operation, data):
- """Enrich the port with additional information needed by ODL"""
- if context:
- plugin = context._plugin
- dbcontext = context._plugin_context
- else:
- dbcontext = neutron_context.get_admin_context()
- plugin = manager.NeutronManager.get_plugin()
-
- groups = [plugin.get_security_group(dbcontext, sg)
- for sg in data['security_groups']]
- new_data = copy.deepcopy(data)
- new_data['security_groups'] = groups
-
- # NOTE(yamahata): work around for port creation for router
- # tenant_id=''(empty string) is passed when port is created
- # by l3 plugin internally for router.
- # On the other hand, ODL doesn't accept empty string for tenant_id.
- # In that case, deduce tenant_id from network_id for now.
- # Right fix: modify Neutron so that don't allow empty string
- # for tenant_id even for port for internal use.
- # TODO(yamahata): eliminate this work around when neutron side
- # is fixed
- # assert port['tenant_id'] != ''
- if ('tenant_id' not in new_data or new_data['tenant_id'] == ''):
- if context:
- tenant_id = context._network_context._network['tenant_id']
- else:
- network = plugin.get_network(dbcontext, new_data['network_id'])
- tenant_id = network['tenant_id']
- new_data['tenant_id'] = tenant_id
-
- return new_data
-
-
-def record(db_session, object_type, object_uuid, operation, data,
- context=None):
- if (object_type == odl_const.ODL_PORT and
- operation in (odl_const.ODL_CREATE, odl_const.ODL_UPDATE)):
- data = _enrich_port(db_session, context, object_type, operation, data)
-
- db.create_pending_row(db_session, object_type, object_uuid, operation,
- data)
-
-
class OpendaylightJournalThread(object):
"""Thread worker for the Opendaylight Journal Database."""
def __init__(self):
@@ -123,28 +76,40 @@ class OpendaylightJournalThread(object):
self._timer.start()
def _json_data(self, row):
- data = copy.deepcopy(row.data)
- filters.filter_for_odl(row.object_type, row.operation, data)
+ filter_cls = filters.FILTER_MAP[row.object_type]
url_object = row.object_type.replace('_', '-')
if row.operation == odl_const.ODL_CREATE:
method = 'post'
+ attr_filter = filter_cls.filter_create_attributes
+ data = copy.deepcopy(row.data)
urlpath = url_object + 's'
+ attr_filter(data)
to_send = {row.object_type: data}
elif row.operation == odl_const.ODL_UPDATE:
method = 'put'
+ attr_filter = filter_cls.filter_update_attributes
+ data = copy.deepcopy(row.data)
urlpath = url_object + 's/' + row.object_uuid
+ attr_filter(data)
to_send = {row.object_type: data}
elif row.operation == odl_const.ODL_DELETE:
method = 'delete'
+ data = None
urlpath = url_object + 's/' + row.object_uuid
to_send = None
elif row.operation == odl_const.ODL_ADD:
method = 'put'
+ attr_filter = filter_cls.filter_add_attributes
+ data = copy.deepcopy(row.data)
+ attr_filter(data)
urlpath = 'routers/' + data['id'] + '/add_router_interface'
to_send = data
elif row.operation == odl_const.ODL_REMOVE:
method = 'put'
+ attr_filter = filter_cls.filter_remove_attributes
+ data = copy.deepcopy(row.data)
+ attr_filter(data)
urlpath = 'routers/' + data['id'] + '/remove_router_interface'
to_send = data
@@ -177,7 +142,9 @@ class OpendaylightJournalThread(object):
break
# Validate the operation
- valid = dependency_validations.validate(session, row)
+ validate_func = (dependency_validations.
+ VALIDATION_MAP[row.object_type])
+ valid = validate_func(session, row)
if not valid:
LOG.info(_LI("%(operation)s %(type)s %(uuid)s is not a "
"valid operation yet, skipping for now"),
diff --git a/networking-odl/networking_odl/journal/maintenance.py b/networking-odl/networking_odl/journal/maintenance.py
deleted file mode 100644
index 7fb82a0..0000000
--- a/networking-odl/networking_odl/journal/maintenance.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#
-# Copyright (C) 2016 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-#
-
-from neutron.db import api as neutron_db_api
-from oslo_config import cfg
-from oslo_log import log as logging
-from oslo_service import loopingcall
-
-from networking_odl._i18n import _LI, _LE
-from networking_odl.db import db
-
-
-LOG = logging.getLogger(__name__)
-
-
-class MaintenanceThread(object):
- def __init__(self):
- self.timer = loopingcall.FixedIntervalLoopingCall(self.execute_ops)
- self.maintenance_interval = cfg.CONF.ml2_odl.maintenance_interval
- self.maintenance_ops = []
-
- def start(self):
- self.timer.start(self.maintenance_interval, stop_on_exception=False)
-
- def _execute_op(self, operation, session):
- op_details = operation.__name__
- if operation.__doc__:
- op_details += " (%s)" % operation.func_doc
-
- try:
- LOG.info(_LI("Starting maintenance operation %s."), op_details)
- db.update_maintenance_operation(session, operation=operation)
- operation(session=session)
- LOG.info(_LI("Finished maintenance operation %s."), op_details)
- except Exception:
- LOG.exception(_LE("Failed during maintenance operation %s."),
- op_details)
-
- def execute_ops(self):
- LOG.info(_LI("Starting journal maintenance run."))
- session = neutron_db_api.get_session()
- if not db.lock_maintenance(session):
- LOG.info(_LI("Maintenance already running, aborting."))
- return
-
- try:
- for operation in self.maintenance_ops:
- self._execute_op(operation, session)
- finally:
- db.update_maintenance_operation(session, operation=None)
- db.unlock_maintenance(session)
- LOG.info(_LI("Finished journal maintenance run."))
-
- def register_operation(self, f):
- """Register a function to be run by the maintenance thread.
-
- :param f: Function to call when the thread runs. The function will
- receive a DB session to use for DB operations.
- """
- self.maintenance_ops.append(f)