path: root/compass-tasks/db/api/
diff options
authorHarry Huang <>2017-11-01 15:05:30 +0800
committerHarry Huang <>2017-11-03 14:59:22 +0800
commit8120b2583c459c85116d69211038e0110d71e5b7 (patch)
tree886448deb230ba530fe8814cd5d224e0f2a6299d /compass-tasks/db/api/
parent3656ab7b5e3f2f26f7c98f9dcc97b3c461fa2a76 (diff)
Add compass-tasks
Registered tasks and MQ modules for Compass Change-Id: Id1569a61fe53357d53448478d5ba42cb1f386bc6 Signed-off-by: Harry Huang <>
Diffstat (limited to 'compass-tasks/db/api/')
1 files changed, 264 insertions, 0 deletions
diff --git a/compass-tasks/db/api/ b/compass-tasks/db/api/
new file mode 100644
index 0000000..49769d7
--- /dev/null
+++ b/compass-tasks/db/api/
@@ -0,0 +1,264 @@
+# Copyright 2014 Huawei Technologies Co. Ltd
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Provider interface to manipulate database."""
+import functools
+import logging
+import netaddr
+from contextlib import contextmanager
+from sqlalchemy import create_engine
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.exc import OperationalError
+from sqlalchemy.orm import scoped_session
+from sqlalchemy.orm import sessionmaker
+from sqlalchemy.pool import NullPool
+from sqlalchemy.pool import QueuePool
+from sqlalchemy.pool import SingletonThreadPool
+from sqlalchemy.pool import StaticPool
+from threading import local
+from compass.db import exception
+from compass.db import models
+from compass.utils import logsetting
+from compass.utils import setting_wrapper as setting
+ENGINE = None
+SESSION = sessionmaker(autocommit=False, autoflush=False)
+ 'instant': NullPool,
+ 'static': StaticPool,
+ 'queued': QueuePool,
+ 'thread_single': SingletonThreadPool
+def init(database_url=None):
+ """Initialize database.
+ Adjust sqlalchemy logging if necessary.
+ :param database_url: string, database url.
+ """
+ global ENGINE
+ if not database_url:
+ database_url = setting.SQLALCHEMY_DATABASE_URI
+'init database %s', database_url)
+ root_logger = logging.getLogger()
+ fine_debug = root_logger.isEnabledFor(logsetting.LOGLEVEL_MAPPING['fine'])
+ if fine_debug:
+ logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
+ finest_debug = root_logger.isEnabledFor(
+ logsetting.LOGLEVEL_MAPPING['finest']
+ )
+ if finest_debug:
+ logging.getLogger('sqlalchemy.dialects').setLevel(logging.INFO)
+ logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
+ logging.getLogger('sqlalchemy.orm').setLevel(logging.INFO)
+ ENGINE = create_engine(
+ database_url, convert_unicode=True,
+ poolclass=poolclass
+ )
+ SESSION.configure(bind=ENGINE)
+ SCOPED_SESSION = scoped_session(SESSION)
+ models.BASE.query = SCOPED_SESSION.query_property()
+def in_session():
+ """check if in database session scope."""
+ bool(hasattr(SESSION_HOLDER, 'session'))
+def session(exception_when_in_session=True):
+ """database session scope.
+ To operate database, it should be called in database session.
+ If not exception_when_in_session, the with session statement support
+ nested session and only the out most session commit/rollback the
+ transaction.
+ """
+ if not ENGINE:
+ init()
+ nested_session = False
+ if hasattr(SESSION_HOLDER, 'session'):
+ if exception_when_in_session:
+ logging.error('we are already in session')
+ raise exception.DatabaseException('session already exist')
+ else:
+ new_session = SESSION_HOLDER.session
+ nested_session = True
+ logging.log(
+ logsetting.getLevelByName('fine'),
+ 'reuse session %s', nested_session
+ )
+ else:
+ new_session = SCOPED_SESSION()
+ setattr(SESSION_HOLDER, 'session', new_session)
+ logging.log(
+ logsetting.getLevelByName('fine'),
+ 'enter session %s', new_session
+ )
+ try:
+ yield new_session
+ if not nested_session:
+ new_session.commit()
+ except Exception as error:
+ if not nested_session:
+ new_session.rollback()
+ logging.error('failed to commit session')
+ logging.exception(error)
+ if isinstance(error, IntegrityError):
+ for item in error.statement.split():
+ if item.islower():
+ object = item
+ break
+ raise exception.DuplicatedRecord(
+ '%s in %s' % (error.orig, object)
+ )
+ elif isinstance(error, OperationalError):
+ raise exception.DatabaseException(
+ 'operation error in database'
+ )
+ elif isinstance(error, exception.DatabaseException):
+ raise error
+ else:
+ raise exception.DatabaseException(str(error))
+ finally:
+ if not nested_session:
+ new_session.close()
+ delattr(SESSION_HOLDER, 'session')
+ logging.log(
+ logsetting.getLevelByName('fine'),
+ 'exit session %s', new_session
+ )
+def current_session():
+ """Get the current session scope when it is called.
+ :return: database session.
+ :raises: DatabaseException when it is not in session.
+ """
+ try:
+ return SESSION_HOLDER.session
+ except Exception as error:
+ logging.error('It is not in the session scope')
+ logging.exception(error)
+ if isinstance(error, exception.DatabaseException):
+ raise error
+ else:
+ raise exception.DatabaseException(str(error))
+def run_in_session(exception_when_in_session=True):
+ """Decorator to make sure the decorated function run in session.
+ When not exception_when_in_session, the run_in_session can be
+ decorated several times.
+ """
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ my_session = kwargs.get('session')
+ if my_session is not None:
+ return func(*args, **kwargs)
+ else:
+ with session(
+ exception_when_in_session=exception_when_in_session
+ ) as my_session:
+ kwargs['session'] = my_session
+ return func(*args, **kwargs)
+ except Exception as error:
+ logging.error(
+ 'got exception with func %s args %s kwargs %s',
+ func, args, kwargs
+ )
+ logging.exception(error)
+ raise error
+ return wrapper
+ return decorator
+def _setup_user_table(user_session):
+ """Initialize user table with default user."""
+'setup user table')
+ from compass.db.api import user
+ user.add_user(
+ session=user_session,
+ email=setting.COMPASS_ADMIN_EMAIL,
+ password=setting.COMPASS_ADMIN_PASSWORD,
+ is_admin=True
+ )
+def _setup_permission_table(permission_session):
+ """Initialize permission table."""
+'setup permission table.')
+ from compass.db.api import permission
+ permission.add_permissions_internal(
+ session=permission_session
+ )
+def _setup_switch_table(switch_session):
+ """Initialize switch table."""
+ # TODO(xicheng): deprecate setup default switch.
+'setup switch table')
+ from compass.db.api import switch
+ switch.add_switch(
+ True, setting.DEFAULT_SWITCH_IP,
+ session=switch_session,
+ machine_filters=['allow ports all']
+ )
+def _update_others(other_session):
+ """Update other tables."""
+'update other tables')
+ from compass.db.api import utils
+ from compass.db import models
+ utils.update_db_objects(
+ other_session, models.Cluster
+ )
+ utils.update_db_objects(
+ other_session, models.Host
+ )
+ utils.update_db_objects(
+ other_session, models.ClusterHost
+ )
+def create_db(session=None):
+ """Create database."""
+ models.BASE.metadata.create_all(bind=ENGINE)
+ _setup_permission_table(session)
+ _setup_user_table(session)
+ _setup_switch_table(session)
+ _update_others(session)
+def drop_db():
+ """Drop database."""
+ models.BASE.metadata.drop_all(bind=ENGINE)