summaryrefslogtreecommitdiffstats
path: root/networking-odl/networking_odl/db/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'networking-odl/networking_odl/db/db.py')
-rw-r--r--networking-odl/networking_odl/db/db.py234
1 files changed, 234 insertions, 0 deletions
diff --git a/networking-odl/networking_odl/db/db.py b/networking-odl/networking_odl/db/db.py
new file mode 100644
index 0000000..31f4ce2
--- /dev/null
+++ b/networking-odl/networking_odl/db/db.py
@@ -0,0 +1,234 @@
+# Copyright (c) 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import datetime
+
+from sqlalchemy import asc
+from sqlalchemy import func
+from sqlalchemy import or_
+
+from networking_odl.common import constants as odl_const
+from networking_odl.db import models
+
+from neutron.db import api as db_api
+
+from oslo_db import api as oslo_db_api
+
+
+def check_for_pending_or_processing_ops(session, object_uuid, operation=None):
+ q = session.query(models.OpendaylightJournal).filter(
+ or_(models.OpendaylightJournal.state == odl_const.PENDING,
+ models.OpendaylightJournal.state == odl_const.PROCESSING),
+ models.OpendaylightJournal.object_uuid == object_uuid)
+ if operation:
+ if isinstance(operation, (list, tuple)):
+ q = q.filter(models.OpendaylightJournal.operation.in_(operation))
+ else:
+ q = q.filter(models.OpendaylightJournal.operation == operation)
+ return session.query(q.exists()).scalar()
+
+
+def check_for_pending_delete_ops_with_parent(session, object_type, parent_id):
+ rows = session.query(models.OpendaylightJournal).filter(
+ or_(models.OpendaylightJournal.state == odl_const.PENDING,
+ models.OpendaylightJournal.state == odl_const.PROCESSING),
+ models.OpendaylightJournal.object_type == object_type,
+ models.OpendaylightJournal.operation == odl_const.ODL_DELETE
+ ).all()
+
+ for row in rows:
+ if parent_id in row.data:
+ return True
+
+ return False
+
+
+def check_for_pending_or_processing_add(session, router_id, subnet_id):
+ rows = session.query(models.OpendaylightJournal).filter(
+ or_(models.OpendaylightJournal.state == odl_const.PENDING,
+ models.OpendaylightJournal.state == odl_const.PROCESSING),
+ models.OpendaylightJournal.object_type == odl_const.ODL_ROUTER_INTF,
+ models.OpendaylightJournal.operation == odl_const.ODL_ADD
+ ).all()
+
+ for row in rows:
+ if router_id in row.data.values() and subnet_id in row.data.values():
+ return True
+
+ return False
+
+
+def check_for_pending_remove_ops_with_parent(session, parent_id):
+ rows = session.query(models.OpendaylightJournal).filter(
+ or_(models.OpendaylightJournal.state == odl_const.PENDING,
+ models.OpendaylightJournal.state == odl_const.PROCESSING),
+ models.OpendaylightJournal.object_type == odl_const.ODL_ROUTER_INTF,
+ models.OpendaylightJournal.operation == odl_const.ODL_REMOVE
+ ).all()
+
+ for row in rows:
+ if parent_id in row.data.values():
+ return True
+
+ return False
+
+
+def check_for_older_ops(session, row):
+ q = session.query(models.OpendaylightJournal).filter(
+ or_(models.OpendaylightJournal.state == odl_const.PENDING,
+ models.OpendaylightJournal.state == odl_const.PROCESSING),
+ models.OpendaylightJournal.operation == row.operation,
+ models.OpendaylightJournal.object_uuid == row.object_uuid,
+ models.OpendaylightJournal.created_at < row.created_at,
+ models.OpendaylightJournal.id != row.id)
+ return session.query(q.exists()).scalar()
+
+
+def get_all_db_rows(session):
+ return session.query(models.OpendaylightJournal).all()
+
+
+def get_all_db_rows_by_state(session, state):
+ return session.query(models.OpendaylightJournal).filter_by(
+ state=state).all()
+
+
+# Retry deadlock exception for Galera DB.
+# If two (or more) different threads call this method at the same time, they
+# might both succeed in changing the same row to pending, but at least one
+# of them will get a deadlock from Galera and will have to retry the operation.
+@db_api.retry_db_errors
+def get_oldest_pending_db_row_with_lock(session):
+ with session.begin():
+ row = session.query(models.OpendaylightJournal).filter_by(
+ state=odl_const.PENDING).order_by(
+ asc(models.OpendaylightJournal.last_retried)).with_for_update(
+ ).first()
+ if row:
+ update_db_row_state(session, row, odl_const.PROCESSING)
+
+ return row
+
+
+@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
+ retry_on_request=True)
+def update_db_row_state(session, row, state):
+ row.state = state
+ session.merge(row)
+ session.flush()
+
+
+def update_pending_db_row_retry(session, row, retry_count):
+ if row.retry_count >= retry_count:
+ update_db_row_state(session, row, odl_const.FAILED)
+ else:
+ row.retry_count += 1
+ update_db_row_state(session, row, odl_const.PENDING)
+
+
+# This function is currently not used.
+# Deleted resources are marked as 'deleted' in the database.
+@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
+ retry_on_request=True)
+def delete_row(session, row=None, row_id=None):
+ if row_id:
+ row = session.query(models.OpendaylightJournal).filter_by(
+ id=row_id).one()
+ if row:
+ session.delete(row)
+ session.flush()
+
+
+@oslo_db_api.wrap_db_retry(max_retries=db_api.MAX_RETRIES,
+ retry_on_request=True)
+def create_pending_row(session, object_type, object_uuid,
+ operation, data):
+ row = models.OpendaylightJournal(object_type=object_type,
+ object_uuid=object_uuid,
+ operation=operation, data=data,
+ created_at=func.now(),
+ state=odl_const.PENDING)
+ session.add(row)
+ # Keep session flush for unit tests. NOOP for L2/L3 events since calls are
+ # made inside database session transaction with subtransactions=True.
+ session.flush()
+
+
+@db_api.retry_db_errors
+def delete_pending_rows(session, operations_to_delete):
+ with session.begin():
+ session.query(models.OpendaylightJournal).filter(
+ models.OpendaylightJournal.operation.in_(operations_to_delete),
+ models.OpendaylightJournal.state == odl_const.PENDING).delete(
+ synchronize_session=False)
+ session.expire_all()
+
+
+@db_api.retry_db_errors
+def _update_maintenance_state(session, expected_state, state):
+ with session.begin():
+ row = session.query(models.OpendaylightMaintenance).filter_by(
+ state=expected_state).with_for_update().one_or_none()
+ if row is None:
+ return False
+
+ row.state = state
+ return True
+
+
+def lock_maintenance(session):
+ return _update_maintenance_state(session, odl_const.PENDING,
+ odl_const.PROCESSING)
+
+
+def unlock_maintenance(session):
+ return _update_maintenance_state(session, odl_const.PROCESSING,
+ odl_const.PENDING)
+
+
+def update_maintenance_operation(session, operation=None):
+ """Update the current maintenance operation details.
+
+ The function assumes the lock is held, so it mustn't be run outside of a
+ locked context.
+ """
+ op_text = None
+ if operation:
+ op_text = operation.__name__
+
+ with session.begin():
+ row = session.query(models.OpendaylightMaintenance).one_or_none()
+ row.processing_operation = op_text
+
+
+def delete_rows_by_state_and_time(session, state, time_delta):
+ with session.begin():
+ now = session.execute(func.now()).scalar()
+ session.query(models.OpendaylightJournal).filter(
+ models.OpendaylightJournal.state == state,
+ models.OpendaylightJournal.last_retried < now - time_delta).delete(
+ synchronize_session=False)
+ session.expire_all()
+
+
+def reset_processing_rows(session, max_timedelta):
+ with session.begin():
+ now = session.execute(func.now()).scalar()
+ max_timedelta = datetime.timedelta(seconds=max_timedelta)
+ rows = session.query(models.OpendaylightJournal).filter(
+ models.OpendaylightJournal.last_retried < now - max_timedelta,
+ models.OpendaylightJournal.state == odl_const.PROCESSING,
+ ).update({'state': odl_const.PENDING})
+
+ return rows