aboutsummaryrefslogtreecommitdiffstats
path: root/app/discover/fetchers/db/db_access.py
diff options
context:
space:
mode:
authorYaron Yogev <yaronyogev@gmail.com>2017-07-27 09:02:54 +0300
committerYaron Yogev <yaronyogev@gmail.com>2017-07-27 14:56:25 +0300
commit7e83d0876ddb84a45e130eeba28bc40ef53c074b (patch)
tree47d76239ae7658d87c66abd142df92709427e7dd /app/discover/fetchers/db/db_access.py
parent378ecbd8947589b9cbb39013a0c2e2aa201e03bd (diff)
Calipso initial release for OPNFV
Change-Id: I7210c244b0c10fa80bfa8c77cb86c9d6ddf8bc88 Signed-off-by: Yaron Yogev <yaronyogev@gmail.com>
Diffstat (limited to 'app/discover/fetchers/db/db_access.py')
-rw-r--r--app/discover/fetchers/db/db_access.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/app/discover/fetchers/db/db_access.py b/app/discover/fetchers/db/db_access.py
new file mode 100644
index 0000000..00bd776
--- /dev/null
+++ b/app/discover/fetchers/db/db_access.py
@@ -0,0 +1,142 @@
+###############################################################################
+# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) #
+# and others #
+# #
+# All rights reserved. This program and the accompanying materials #
+# are made available under the terms of the Apache License, Version 2.0 #
+# which accompanies this distribution, and is available at #
+# http://www.apache.org/licenses/LICENSE-2.0 #
+###############################################################################
+import mysql.connector
+
+from discover.configuration import Configuration
+from discover.fetcher import Fetcher
+from utils.string_utils import jsonify
+
+
+class DbAccess(Fetcher):
+ conn = None
+ query_count_per_con = 0
+
+ # connection timeout set to 30 seconds,
+ # due to problems over long connections
+ TIMEOUT = 30
+
+ def __init__(self):
+ super().__init__()
+ self.config = Configuration()
+ self.conf = self.config.get("mysql")
+ self.connect_to_db()
+ cursor = DbAccess.conn.cursor(dictionary=True)
+ try:
+ # check if DB schema 'neutron' exists
+ cursor.execute("SELECT COUNT(*) FROM neutron.agents")
+ for row in cursor:
+ pass
+ self.neutron_db = "neutron"
+ except (AttributeError, mysql.connector.errors.ProgrammingError):
+ self.neutron_db = "ml2_neutron"
+
+ def db_connect(self, _host, _port, _user, _password, _database):
+ if DbAccess.conn:
+ return
+ try:
+ connector = mysql.connector
+ DbAccess.conn = connector.connect(host=_host, port=_port,
+ connection_timeout=self.TIMEOUT,
+ user=_user,
+ password=_password,
+ database=_database,
+ raise_on_warnings=True)
+ DbAccess.conn.ping(True) # auto-reconnect if necessary
+ except:
+ self.log.critical("failed to connect to MySQL DB")
+ return
+ DbAccess.query_count_per_con = 0
+
+ def connect_to_db(self, force=False):
+ if DbAccess.conn:
+ if not force:
+ return
+ self.log.info("DbAccess: ****** forcing reconnect, " +
+ "query count: %s ******",
+ DbAccess.query_count_per_con)
+ DbAccess.conn = None
+ self.conf = self.config.get("mysql")
+ cnf = self.conf
+ cnf['schema'] = cnf['schema'] if 'schema' in cnf else 'nova'
+ self.db_connect(cnf["host"], cnf["port"],
+ cnf["user"], cnf["password"],
+ cnf["schema"])
+
+ def get_objects_list_for_id(self, query, object_type, id):
+ self.connect_to_db(DbAccess.query_count_per_con >= 25)
+ DbAccess.query_count_per_con += 1
+ self.log.debug("query count: %s, running query:\n%s\n",
+ str(DbAccess.query_count_per_con), query)
+
+ cursor = DbAccess.conn.cursor(dictionary=True)
+ try:
+ if id:
+ cursor.execute(query, [str(id)])
+ else:
+ cursor.execute(query)
+ except (AttributeError, mysql.connector.errors.OperationalError) as e:
+ self.log.error(e)
+ self.connect_to_db(True)
+ # try again to run the query
+ cursor = DbAccess.conn.cursor(dictionary=True)
+ if id:
+ cursor.execute(query, [str(id)])
+ else:
+ cursor.execute(query)
+
+ rows = []
+ for row in cursor:
+ rows.append(row)
+ return rows
+
+ def get_objects_list(self, query, object_type):
+ return self.get_objects_list_for_id(query, object_type, None)
+
+ def get_objects(self, qry, type, id):
+ return jsonify(self.get_objects_list(qry, type))
+
+ def get(self, id):
+ # return list of available fetch types
+ ret = {
+ "description": "List of available fetch calls for this interface",
+ "types": {
+ "regions": "Regions of this environment",
+ "projects": "Projects (tenants) of this environment",
+ "availability_zones": "Availability zones",
+ "aggregates": "Host aggregates",
+ "aggregate_hosts": "Hosts in aggregate X (parameter: id)",
+ "az_hosts": "Host in availability_zone X (parameter: id)"
+ }
+ }
+ return jsonify(ret)
+
+ def exec(self, query, table, field, values):
+ try:
+ cursor = DbAccess.conn.cursor(dictionary=True)
+ cursor.execute(query, [table, field, values])
+ except (AttributeError, mysql.connector.errors.OperationalError) as e:
+ self.log.error(e)
+ self.connect_to_db(True)
+ # try again to run the query
+ cursor = DbAccess.conn.cursor(dictionary=True)
+ cursor.execute(query, [table, field, values])
+
+ rows = []
+ for row in cursor:
+ rows.append(row)
+ return rows
+
+ def set(self, table, field, values):
+ query = """INSERT INTO %s %s VALUES %s"""
+ return self.exec(query, table, field, values)
+
+ def delete(self, table, field, values):
+ query = """DELETE FROM %s WHERE %s=%s"""
+ return self.exec(query, table, field, values)