summaryrefslogtreecommitdiffstats
path: root/compass-tasks/db/api/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'compass-tasks/db/api/cluster.py')
-rw-r--r--compass-tasks/db/api/cluster.py2444
1 files changed, 2444 insertions, 0 deletions
diff --git a/compass-tasks/db/api/cluster.py b/compass-tasks/db/api/cluster.py
new file mode 100644
index 0000000..7a7022c
--- /dev/null
+++ b/compass-tasks/db/api/cluster.py
@@ -0,0 +1,2444 @@
+# Copyright 2014 Huawei Technologies Co. Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Cluster database operations."""
+import copy
+import functools
+import logging
+import re
+
+from compass.db.api import adapter_holder as adapter_api
+from compass.db.api import database
+from compass.db.api import metadata_holder as metadata_api
+from compass.db.api import permission
+from compass.db.api import user as user_api
+from compass.db.api import utils
+from compass.db import exception
+from compass.db import models
+from compass.utils import util
+
+
+SUPPORTED_FIELDS = [
+ 'name', 'os_name', 'owner',
+ 'adapter_name', 'flavor_name'
+]
+SUPPORTED_CLUSTERHOST_FIELDS = []
+RESP_FIELDS = [
+ 'id', 'name', 'os_name', 'os_id', 'adapter_id', 'flavor_id',
+ 'reinstall_distributed_system', 'flavor',
+ 'distributed_system_installed',
+ 'owner', 'adapter_name', 'flavor_name',
+ 'created_at', 'updated_at'
+]
+RESP_CLUSTERHOST_FIELDS = [
+ 'id', 'host_id', 'clusterhost_id', 'machine_id',
+ 'name', 'hostname', 'roles', 'os_installer',
+ 'cluster_id', 'clustername', 'location', 'tag',
+ 'networks', 'mac', 'switch_ip', 'port', 'switches',
+ 'os_installed', 'distributed_system_installed',
+ 'os_name', 'os_id', 'ip',
+ 'reinstall_os', 'reinstall_distributed_system',
+ 'owner', 'cluster_id',
+ 'created_at', 'updated_at',
+ 'patched_roles'
+]
+RESP_CONFIG_FIELDS = [
+ 'os_config',
+ 'package_config',
+ 'config_step',
+ 'config_validated',
+ 'created_at',
+ 'updated_at'
+]
+RESP_DEPLOYED_CONFIG_FIELDS = [
+ 'deployed_os_config',
+ 'deployed_package_config',
+ 'created_at',
+ 'updated_at'
+]
+RESP_METADATA_FIELDS = [
+ 'os_config', 'package_config'
+]
+RESP_CLUSTERHOST_CONFIG_FIELDS = [
+ 'package_config',
+ 'os_config',
+ 'config_step',
+ 'config_validated',
+ 'networks',
+ 'created_at',
+ 'updated_at'
+]
+RESP_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS = [
+ 'deployed_os_config',
+ 'deployed_package_config',
+ 'created_at',
+ 'updated_at'
+]
+RESP_STATE_FIELDS = [
+ 'id', 'state', 'percentage', 'message', 'severity',
+ 'status', 'ready',
+ 'created_at', 'updated_at'
+]
+RESP_CLUSTERHOST_STATE_FIELDS = [
+ 'id', 'state', 'percentage', 'message', 'severity',
+ 'ready', 'created_at', 'updated_at'
+]
+RESP_REVIEW_FIELDS = [
+ 'cluster', 'hosts'
+]
+RESP_DEPLOY_FIELDS = [
+ 'status', 'cluster', 'hosts'
+]
+IGNORE_FIELDS = ['id', 'created_at', 'updated_at']
+ADDED_FIELDS = ['name', 'adapter_id', 'os_id']
+OPTIONAL_ADDED_FIELDS = ['flavor_id']
+UPDATED_FIELDS = ['name', 'reinstall_distributed_system']
+ADDED_HOST_FIELDS = ['machine_id']
+UPDATED_HOST_FIELDS = ['name', 'reinstall_os']
+UPDATED_CLUSTERHOST_FIELDS = ['roles', 'patched_roles']
+PATCHED_CLUSTERHOST_FIELDS = ['patched_roles']
+UPDATED_CONFIG_FIELDS = [
+ 'put_os_config', 'put_package_config', 'config_step'
+]
+UPDATED_DEPLOYED_CONFIG_FIELDS = [
+ 'deployed_os_config', 'deployed_package_config'
+]
+PATCHED_CONFIG_FIELDS = [
+ 'patched_os_config', 'patched_package_config', 'config_step'
+]
+UPDATED_CLUSTERHOST_CONFIG_FIELDS = [
+ 'put_os_config',
+ 'put_package_config'
+]
+PATCHED_CLUSTERHOST_CONFIG_FIELDS = [
+ 'patched_os_config',
+ 'patched_package_config'
+]
+UPDATED_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS = [
+ 'deployed_os_config',
+ 'deployed_package_config'
+]
+UPDATED_CLUSTERHOST_STATE_FIELDS = [
+ 'state', 'percentage', 'message', 'severity'
+]
+UPDATED_CLUSTERHOST_STATE_INTERNAL_FIELDS = [
+ 'ready'
+]
+UPDATED_CLUSTER_STATE_FIELDS = ['state']
+IGNORE_UPDATED_CLUSTER_STATE_FIELDS = ['percentage', 'message', 'severity']
+UPDATED_CLUSTER_STATE_INTERNAL_FIELDS = ['ready']
+RESP_CLUSTERHOST_LOG_FIELDS = [
+ 'clusterhost_id', 'id', 'host_id', 'cluster_id',
+ 'filename', 'position', 'partial_line',
+ 'percentage',
+ 'message', 'severity', 'line_matcher_name'
+]
+ADDED_CLUSTERHOST_LOG_FIELDS = [
+ 'filename'
+]
+UPDATED_CLUSTERHOST_LOG_FIELDS = [
+ 'position', 'partial_line', 'percentage',
+ 'message', 'severity', 'line_matcher_name'
+]
+
+
+@utils.supported_filters(optional_support_keys=SUPPORTED_FIELDS)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERS
+)
+@utils.wrap_to_dict(RESP_FIELDS)
+def list_clusters(user=None, session=None, **filters):
+ """List clusters."""
+ clusters = utils.list_db_objects(
+ session, models.Cluster, **filters
+ )
+ logging.info('user is %s', user.email)
+ if not user.is_admin and len(clusters):
+ clusters = [c for c in clusters if c.owner == user.email]
+ return clusters
+
+
+def _get_cluster(cluster_id, session=None, **kwargs):
+ """Get cluster by id."""
+ if isinstance(cluster_id, (int, long)):
+ return utils.get_db_object(
+ session, models.Cluster, id=cluster_id, **kwargs
+ )
+ raise exception.InvalidParameter(
+ 'cluster id %s type is not int compatible' % cluster_id
+ )
+
+
+def get_cluster_internal(cluster_id, session=None, **kwargs):
+ """Helper function to get cluster.
+
+ Should be only used by other files under db/api.
+ """
+ return _get_cluster(cluster_id, session=session, **kwargs)
+
+
+def _get_cluster_host(
+ cluster_id, host_id, session=None, **kwargs
+):
+ """Get clusterhost by cluster id and host id."""
+ cluster = _get_cluster(cluster_id, session=session, **kwargs)
+ from compass.db.api import host as host_api
+ host = host_api.get_host_internal(host_id, session=session, **kwargs)
+ return utils.get_db_object(
+ session, models.ClusterHost,
+ cluster_id=cluster.id,
+ host_id=host.id,
+ **kwargs
+ )
+
+
+def _get_clusterhost(clusterhost_id, session=None, **kwargs):
+ """Get clusterhost by clusterhost id."""
+ if isinstance(clusterhost_id, (int, long)):
+ return utils.get_db_object(
+ session, models.ClusterHost,
+ clusterhost_id=clusterhost_id,
+ **kwargs
+ )
+ raise exception.InvalidParameter(
+ 'clusterhost id %s type is not int compatible' % clusterhost_id
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERS
+)
+@utils.wrap_to_dict(RESP_FIELDS)
+def get_cluster(
+ cluster_id, exception_when_missing=True,
+ user=None, session=None, **kwargs
+):
+ """Get cluster info."""
+ return _get_cluster(
+ cluster_id,
+ session=session,
+ exception_when_missing=exception_when_missing
+ )
+
+
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERS)
+def is_cluster_os_ready(
+ cluster_id, exception_when_missing=True,
+ user=None, session=None, **kwargs
+):
+ cluster = utils.get_db_object(
+ session, models.Cluster, exception_when_missing, id=cluster_id)
+
+ all_states = ([i.host.state.ready for i in cluster.clusterhosts])
+
+ logging.info("is_cluster_os_ready: all_states %s" % all_states)
+
+ return all(all_states)
+
+
+def check_cluster_validated(cluster):
+ """Check cluster is validated."""
+ if not cluster.config_validated:
+ raise exception.Forbidden(
+ 'cluster %s is not validated' % cluster.name
+ )
+
+
+def check_clusterhost_validated(clusterhost):
+ """Check clusterhost is validated."""
+ if not clusterhost.config_validated:
+ raise exception.Forbidden(
+ 'clusterhost %s is not validated' % clusterhost.name
+ )
+
+
+def check_cluster_editable(
+ cluster, user=None,
+ check_in_installing=False
+):
+ """Check if cluster is editable.
+
+ If we try to set cluster
+ reinstall_distributed_system attribute or any
+ checking to make sure the cluster is not in installing state,
+ we can set check_in_installing to True.
+ Otherwise we will make sure the cluster is not in deploying or
+ deployed.
+ If user is not admin or not the owner of the cluster, the check
+ will fail to make sure he can not update the cluster attributes.
+ """
+ if check_in_installing:
+ if cluster.state.state == 'INSTALLING':
+ raise exception.Forbidden(
+ 'cluster %s is not editable '
+ 'when state is installing' % cluster.name
+ )
+# elif (
+# cluster.flavor_name and
+# not cluster.reinstall_distributed_system
+# ):
+# raise exception.Forbidden(
+# 'cluster %s is not editable '
+# 'when not to be reinstalled' % cluster.name
+# )
+ if user and not user.is_admin and cluster.creator_id != user.id:
+ raise exception.Forbidden(
+ 'cluster %s is not editable '
+ 'when user is not admin or cluster owner' % cluster.name
+ )
+
+
+def is_cluster_editable(
+ cluster, user=None,
+ check_in_installing=False
+):
+ """Get if cluster is editble."""
+ try:
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=check_in_installing
+ )
+ return True
+ except exception.Forbidden:
+ return False
+
+
+@utils.supported_filters(
+ ADDED_FIELDS,
+ optional_support_keys=OPTIONAL_ADDED_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@utils.input_validates(name=utils.check_name)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTER
+)
+@utils.wrap_to_dict(RESP_FIELDS)
+def add_cluster(
+ exception_when_existing=True,
+ name=None, adapter_id=None, flavor_id=None,
+ user=None, session=None, **kwargs
+):
+ """Create a cluster."""
+ adapter = adapter_api.get_adapter(
+ adapter_id, user=user, session=session
+ )
+ # if flavor_id is not None, also set flavor field.
+ # In future maybe we can move the use of flavor from
+ # models.py to db/api and explictly get flavor when
+ # needed instead of setting flavor into cluster record.
+ flavor = {}
+ if flavor_id:
+ flavor = adapter_api.get_flavor(
+ flavor_id,
+ user=user, session=session
+ )
+ if flavor['adapter_id'] != adapter['id']:
+ raise exception.InvalidParameter(
+ 'flavor %s is not of adapter %s' % (
+ flavor_id, adapter_id
+ )
+ )
+
+ cluster = utils.add_db_object(
+ session, models.Cluster, exception_when_existing,
+ name, user.id, adapter_id=adapter_id,
+ flavor_id=flavor_id, flavor=flavor, **kwargs
+ )
+ return cluster
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@utils.input_validates(name=utils.check_name)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTER
+)
+@utils.wrap_to_dict(RESP_FIELDS)
+def update_cluster(cluster_id, user=None, session=None, **kwargs):
+ """Update a cluster."""
+ cluster = _get_cluster(
+ cluster_id, session=session
+ )
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=(
+ kwargs.get('reinstall_distributed_system', False)
+ )
+ )
+ return utils.update_db_object(session, cluster, **kwargs)
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEL_CLUSTER
+)
+@utils.wrap_to_dict(
+ RESP_FIELDS + ['status', 'cluster', 'hosts'],
+ cluster=RESP_FIELDS,
+ hosts=RESP_CLUSTERHOST_FIELDS
+)
+def del_cluster(
+ cluster_id, force=False, from_database_only=False,
+ delete_underlying_host=False, user=None, session=None, **kwargs
+):
+ """Delete a cluster.
+
+ If force, the cluster will be deleted anyway. It is used by cli to
+ force clean a cluster in any case.
+ If from_database_only, the cluster recored will only be removed from
+ database. Otherwise, a del task is sent to celery to do clean deletion.
+ If delete_underlying_host, all hosts under this cluster will also be
+ deleted.
+ The backend will call del_cluster again with from_database_only set
+ when it has done the deletion work on os installer/package installer.
+ """
+ cluster = _get_cluster(
+ cluster_id, session=session
+ )
+ logging.debug(
+ 'delete cluster %s with force=%s '
+ 'from_database_only=%s delete_underlying_host=%s',
+ cluster.id, force, from_database_only, delete_underlying_host
+ )
+ # force set cluster state to ERROR and the state of any clusterhost
+ # in the cluster to ERROR when we want to delete the cluster anyway
+ # even the cluster is in installing or already installed.
+ # It let the api know the deleting is in doing when backend is doing
+ # the real deleting.
+ # In future we may import a new state like INDELETE to indicate
+ # the deleting is processing.
+ # We need discuss about if we can delete a cluster when it is already
+ # installed by api.
+ for clusterhost in cluster.clusterhosts:
+ if clusterhost.state.state != 'UNINITIALIZED' and force:
+ clusterhost.state.state = 'ERROR'
+ if delete_underlying_host:
+ host = clusterhost.host
+ if host.state.state != 'UNINITIALIZED' and force:
+ host.state.state = 'ERROR'
+ if cluster.state.state != 'UNINITIALIZED' and force:
+ cluster.state.state = 'ERROR'
+
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=True
+ )
+
+ # delete underlying host if delete_underlying_host is set.
+ if delete_underlying_host:
+ for clusterhost in cluster.clusterhosts:
+ # delete underlying host only user has permission.
+ from compass.db.api import host as host_api
+ host = clusterhost.host
+ if host_api.is_host_editable(
+ host, user=user, check_in_installing=True
+ ):
+ # Delete host record directly in database when there is no need
+ # to do the deletion in backend or from_database_only is set.
+ if host.state.state == 'UNINITIALIZED' or from_database_only:
+ utils.del_db_object(
+ session, host
+ )
+
+ # Delete cluster record directly in database when there
+ # is no need to do the deletion in backend or from_database_only is set.
+ if cluster.state.state == 'UNINITIALIZED' or from_database_only:
+ return utils.del_db_object(
+ session, cluster
+ )
+ else:
+ from compass.tasks import client as celery_client
+ logging.info('send del cluster %s task to celery', cluster_id)
+ celery_client.celery.send_task(
+ 'compass.tasks.delete_cluster',
+ (
+ user.email, cluster.id,
+ [
+ clusterhost.host_id
+ for clusterhost in cluster.clusterhosts
+ ],
+ delete_underlying_host
+ ),
+ queue=user.email,
+ exchange=user.email,
+ routing_key=user.email
+ )
+ return {
+ 'status': 'delete action is sent',
+ 'cluster': cluster,
+ 'hosts': cluster.clusterhosts
+ }
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTER_CONFIG
+)
+@utils.wrap_to_dict(RESP_CONFIG_FIELDS)
+def get_cluster_config(cluster_id, user=None, session=None, **kwargs):
+ """Get cluster config."""
+ return _get_cluster(cluster_id, session=session)
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTER_CONFIG
+)
+@utils.wrap_to_dict(RESP_DEPLOYED_CONFIG_FIELDS)
+def get_cluster_deployed_config(cluster_id, user=None, session=None, **kwargs):
+ """Get cluster deployed config."""
+ return _get_cluster(cluster_id, session=session)
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_METADATAS
+)
+@utils.wrap_to_dict(RESP_METADATA_FIELDS)
+def get_cluster_metadata(cluster_id, user=None, session=None, **kwargs):
+ """Get cluster metadata.
+
+ If no flavor in the cluster, it means this is a os only cluster.
+ We ignore package metadata for os only cluster.
+ """
+ cluster = _get_cluster(cluster_id, session=session)
+ metadatas = {}
+ os_name = cluster.os_name
+ if os_name:
+ metadatas.update(
+ metadata_api.get_os_metadata(
+ os_name, session=session
+ )
+ )
+ flavor_id = cluster.flavor_id
+ if flavor_id:
+ metadatas.update(
+ metadata_api.get_flavor_metadata(
+ flavor_id,
+ user=user, session=session
+ )
+ )
+
+ return metadatas
+
+
+def _cluster_os_config_validates(
+ config, cluster, session=None, user=None, **kwargs
+):
+ """Check cluster os config validation."""
+ metadata_api.validate_os_config(
+ config, cluster.os_id
+ )
+
+
+def _cluster_package_config_validates(
+ config, cluster, session=None, user=None, **kwargs
+):
+ """Check cluster package config validation."""
+ metadata_api.validate_flavor_config(
+ config, cluster.flavor_id
+ )
+
+
+@utils.input_validates_with_args(
+ put_os_config=_cluster_os_config_validates,
+ put_package_config=_cluster_package_config_validates
+)
+@utils.output_validates_with_args(
+ os_config=_cluster_os_config_validates,
+ package_config=_cluster_package_config_validates
+)
+@utils.wrap_to_dict(RESP_CONFIG_FIELDS)
+def _update_cluster_config(cluster, session=None, user=None, **kwargs):
+ """Update a cluster config."""
+ check_cluster_editable(cluster, user=user)
+ return utils.update_db_object(
+ session, cluster, **kwargs
+ )
+
+
+# replace os_config to deployed_os_config,
+# package_config to deployed_package_config
+@utils.replace_filters(
+ os_config='deployed_os_config',
+ package_config='deployed_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_DEPLOYED_CONFIG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTER_CONFIG
+)
+@utils.wrap_to_dict(RESP_DEPLOYED_CONFIG_FIELDS)
+def update_cluster_deployed_config(
+ cluster_id, user=None, session=None, **kwargs
+):
+ """Update cluster deployed config."""
+ cluster = _get_cluster(cluster_id, session=session)
+ check_cluster_editable(cluster, user=user)
+ check_cluster_validated(cluster)
+ return utils.update_db_object(
+ session, cluster, **kwargs
+ )
+
+
+# replace os_config to put_os_config,
+# package_config to put_package_config in kwargs.
+# It tells db these fields will be updated not patched.
+@utils.replace_filters(
+ os_config='put_os_config',
+ package_config='put_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CONFIG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTER_CONFIG
+)
+def update_cluster_config(cluster_id, user=None, session=None, **kwargs):
+ """Update cluster config."""
+ cluster = _get_cluster(cluster_id, session=session)
+ return _update_cluster_config(
+ cluster, session=session, user=user, **kwargs
+ )
+
+
+# replace os_config to patched_os_config and
+# package_config to patched_package_config in kwargs.
+# It tells db these fields will be patched not updated.
+@utils.replace_filters(
+ os_config='patched_os_config',
+ package_config='patched_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=PATCHED_CONFIG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTER_CONFIG
+)
+def patch_cluster_config(cluster_id, user=None, session=None, **kwargs):
+ """patch cluster config."""
+ cluster = _get_cluster(cluster_id, session=session)
+ return _update_cluster_config(
+ cluster, session=session, user=user, **kwargs
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEL_CLUSTER_CONFIG
+)
+@utils.wrap_to_dict(RESP_CONFIG_FIELDS)
+def del_cluster_config(cluster_id, user=None, session=None):
+ """Delete a cluster config."""
+ cluster = _get_cluster(
+ cluster_id, session=session
+ )
+ check_cluster_editable(cluster, user=user)
+ return utils.update_db_object(
+ session, cluster, os_config={},
+ package_config={}, config_validated=False
+ )
+
+
+def _roles_validates(roles, cluster, session=None, user=None):
+ """Check roles is validated to a cluster's roles."""
+ if roles:
+ if not cluster.flavor_name:
+ raise exception.InvalidParameter(
+ 'not flavor in cluster %s' % cluster.name
+ )
+ cluster_roles = [role['name'] for role in cluster.flavor['roles']]
+ for role in roles:
+ if role not in cluster_roles:
+ raise exception.InvalidParameter(
+ 'role %s is not in cluster roles %s' % (
+ role, cluster_roles
+ )
+ )
+
+
+def _cluster_host_roles_validates(
+ value, cluster, host, session=None, user=None, **kwargs
+):
+ """Check clusterhost roles is validated by cluster and host."""
+ _roles_validates(value, cluster, session=session, user=user)
+
+
+def _clusterhost_roles_validates(
+ value, clusterhost, session=None, user=None, **kwargs
+):
+ """Check clusterhost roles is validated by clusterhost."""
+ _roles_validates(
+ value, clusterhost.cluster, session=session, user=user
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_HOST_FIELDS,
+ ignore_support_keys=UPDATED_CLUSTERHOST_FIELDS
+)
+@utils.input_validates(name=utils.check_name)
+def _add_host_if_not_exist(
+ machine_id, cluster, session=None, user=None, **kwargs
+):
+ """Add underlying host if it does not exist."""
+ from compass.db.api import host as host_api
+ host = host_api.get_host_internal(
+ machine_id, session=session, exception_when_missing=False
+ )
+ if host:
+ if kwargs:
+ # ignore update underlying host if host is not editable.
+ from compass.db.api import host as host_api
+ if host_api.is_host_editable(
+ host, user=cluster.creator,
+ check_in_installing=kwargs.get('reinstall_os', False),
+ ):
+ utils.update_db_object(
+ session, host,
+ **kwargs
+ )
+ else:
+ logging.debug(
+ 'ignore update host host %s '
+ 'since it is not editable' % host.name
+ )
+ else:
+ logging.debug('nothing to update for host %s', host.name)
+ else:
+ from compass.db.api import adapter_holder as adapter_api
+ adapter = adapter_api.get_adapter(
+ cluster.adapter_name, user=user, session=session
+ )
+ host = utils.add_db_object(
+ session, models.Host, False, machine_id,
+ os_name=cluster.os_name,
+ os_installer=adapter['os_installer'],
+ creator=cluster.creator,
+ **kwargs
+ )
+ return host
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_FIELDS,
+ ignore_support_keys=UPDATED_HOST_FIELDS
+)
+@utils.input_validates_with_args(
+ roles=_cluster_host_roles_validates
+)
+def _add_clusterhost_only(
+ cluster, host,
+ exception_when_existing=False,
+ session=None, user=None,
+ **kwargs
+):
+ """Get clusterhost only."""
+ if not cluster.state.state == "UNINITIALIZED":
+ cluster.state.ready = False
+ cluster.state.state = "UNINITIALIZED"
+ cluster.state.percentage = 0.0
+ utils.update_db_object(session, cluster.state, state="UNINITIALIZED")
+
+ return utils.add_db_object(
+ session, models.ClusterHost, exception_when_existing,
+ cluster.id, host.id, **kwargs
+ )
+
+
+@utils.supported_filters(
+ ADDED_HOST_FIELDS,
+ optional_support_keys=UPDATED_HOST_FIELDS + UPDATED_CLUSTERHOST_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+def _add_clusterhost(
+ cluster,
+ exception_when_existing=False,
+ session=None, user=None, machine_id=None, **kwargs
+):
+ """Add clusterhost and add underlying host if it does not exist."""
+ host = _add_host_if_not_exist(
+ machine_id, cluster, session=session,
+ user=user, **kwargs
+ )
+
+ return _add_clusterhost_only(
+ cluster, host, exception_when_existing=exception_when_existing,
+ session=session, user=user, **kwargs
+ )
+
+
+def _add_clusterhosts(cluster, machines, session=None, user=None):
+ """Add machines to cluster.
+
+ Args:
+ machines: list of dict which contains clusterost attr to update.
+
+ Examples:
+ [{'machine_id': 1, 'name': 'host1'}]
+ """
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=True
+ )
+ if cluster.state.state == 'SUCCESSFUL':
+ cluster.state.state == 'UPDATE_PREPARING'
+ for machine_dict in machines:
+ _add_clusterhost(
+ cluster, session=session, user=user, **machine_dict
+ )
+
+
+def _remove_clusterhosts(cluster, hosts, session=None, user=None):
+ """Remove hosts from cluster.
+
+ Args:
+ hosts: list of host id.
+ """
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=True
+ )
+ utils.del_db_objects(
+ session, models.ClusterHost,
+ cluster_id=cluster.id, host_id=hosts
+ )
+
+
+def _set_clusterhosts(cluster, machines, session=None, user=None):
+ """set machines to cluster.
+
+ Args:
+ machines: list of dict which contains clusterost attr to update.
+
+ Examples:
+ [{'machine_id': 1, 'name': 'host1'}]
+ """
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=True
+ )
+ utils.del_db_objects(
+ session, models.ClusterHost,
+ cluster_id=cluster.id
+ )
+ if cluster.state.state == 'SUCCESSFUL':
+ cluster.state.state = 'UPDATE_PREPARING'
+ for machine_dict in machines:
+ _add_clusterhost(
+ cluster, True, session=session, user=user, **machine_dict
+ )
+
+
+@utils.supported_filters(optional_support_keys=SUPPORTED_CLUSTERHOST_FIELDS)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOSTS
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def list_cluster_hosts(cluster_id, user=None, session=None, **filters):
+ """List clusterhosts of a cluster."""
+ cluster = _get_cluster(cluster_id, session=session)
+ return utils.list_db_objects(
+ session, models.ClusterHost, cluster_id=cluster.id,
+ **filters
+ )
+
+
+@utils.supported_filters(optional_support_keys=SUPPORTED_CLUSTERHOST_FIELDS)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOSTS
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def list_clusterhosts(user=None, session=None, **filters):
+ """List all clusterhosts."""
+ return utils.list_db_objects(
+ session, models.ClusterHost, **filters
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOSTS
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def get_cluster_host(
+ cluster_id, host_id, exception_when_missing=True,
+ user=None, session=None, **kwargs
+):
+ """Get clusterhost info by cluster id and host id."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session,
+ exception_when_missing=exception_when_missing,
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOSTS
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def get_clusterhost(
+ clusterhost_id, exception_when_missing=True,
+ user=None, session=None, **kwargs
+):
+ """Get clusterhost info by clusterhost id."""
+ return _get_clusterhost(
+ clusterhost_id, session=session,
+ exception_when_missing=exception_when_missing,
+ user=user
+ )
+
+
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def add_cluster_host(
+ cluster_id, exception_when_existing=True,
+ user=None, session=None, **kwargs
+):
+ """Add a host to a cluster."""
+ cluster = _get_cluster(cluster_id, session=session)
+ check_cluster_editable(
+ cluster, user=user,
+ check_in_installing=True
+ )
+ if cluster.state.state == 'SUCCESSFUL':
+ cluster.state.state = 'UPDATE_PREPARING'
+ return _add_clusterhost(
+ cluster, exception_when_existing,
+ session=session, user=user, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_HOST_FIELDS,
+ ignore_support_keys=(
+ UPDATED_CLUSTERHOST_FIELDS +
+ PATCHED_CLUSTERHOST_FIELDS
+ )
+)
+def _update_host_if_necessary(
+ clusterhost, session=None, user=None, **kwargs
+):
+ """Update underlying host if there is something to update."""
+ host = clusterhost.host
+ if kwargs:
+ # ignore update underlying host if the host is not editable.
+ from compass.db.api import host as host_api
+ if host_api.is_host_editable(
+ host, user=clusterhost.cluster.creator,
+ check_in_installing=kwargs.get('reinstall_os', False),
+ ):
+ utils.update_db_object(
+ session, host,
+ **kwargs
+ )
+ else:
+ logging.debug(
+ 'ignore update host %s since it is not editable' % host.name
+ )
+ else:
+ logging.debug(
+ 'nothing to update for host %s', host.name
+ )
+ return host
+
+
+@utils.supported_filters(
+ optional_support_keys=(
+ UPDATED_CLUSTERHOST_FIELDS +
+ PATCHED_CLUSTERHOST_FIELDS
+ ),
+ ignore_support_keys=UPDATED_HOST_FIELDS
+)
+@utils.input_validates_with_args(
+ roles=_clusterhost_roles_validates,
+ patched_roles=_clusterhost_roles_validates
+)
+def _update_clusterhost_only(
+ clusterhost, session=None, user=None, **kwargs
+):
+ """Update clusterhost only."""
+ check_cluster_editable(clusterhost.cluster, user=user)
+ return utils.update_db_object(
+ session, clusterhost, **kwargs
+ )
+
+
+@utils.wrap_to_dict(RESP_CLUSTERHOST_FIELDS)
+def _update_clusterhost(clusterhost, session=None, user=None, **kwargs):
+ """Update clusterhost and underlying host if necessary."""
+ _update_host_if_necessary(
+ clusterhost, session=session, user=user, **kwargs
+ )
+ return _update_clusterhost_only(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=(UPDATED_HOST_FIELDS + UPDATED_CLUSTERHOST_FIELDS),
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+def update_cluster_host(
+ cluster_id, host_id, user=None,
+ session=None, **kwargs
+):
+ """Update clusterhost by cluster id and host id."""
+ logging.info('updating kwargs: %s', kwargs)
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _update_clusterhost(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=(UPDATED_HOST_FIELDS + UPDATED_CLUSTERHOST_FIELDS),
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+def update_clusterhost(
+ clusterhost_id, user=None,
+ session=None, **kwargs
+):
+ """Update clusterhost by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _update_clusterhost(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+# replace roles to patched_roles in kwargs.
+# It tells db roles field will be patched.
+@utils.replace_filters(
+ roles='patched_roles'
+)
+@utils.supported_filters(
+ optional_support_keys=PATCHED_CLUSTERHOST_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+def patch_cluster_host(
+ cluster_id, host_id, user=None,
+ session=None, **kwargs
+):
+ """Patch clusterhost by cluster id and host id."""
+ logging.info("kwargs are %s", kwargs)
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ updated_clusterhost = _update_clusterhost(
+ clusterhost, session=session, user=user, **kwargs
+ )
+ return updated_clusterhost
+
+
+# replace roles to patched_roles in kwargs.
+# It tells db roles field will be patched.
+@utils.replace_filters(
+ roles='patched_roles'
+)
+@utils.supported_filters(
+ optional_support_keys=PATCHED_CLUSTERHOST_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+def patch_clusterhost(
+ clusterhost_id, user=None, session=None,
+ **kwargs
+):
+ """Patch clusterhost by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _update_clusterhost(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+@user_api.check_user_permission(
+ permission.PERMISSION_DEL_CLUSTER_HOST
+)
+@utils.wrap_to_dict(
+ RESP_CLUSTERHOST_FIELDS + ['status', 'host'],
+ host=RESP_CLUSTERHOST_FIELDS
+)
+def _del_cluster_host(
+ clusterhost,
+ force=False, from_database_only=False,
+ delete_underlying_host=False, user=None,
+ session=None, **kwargs
+):
+ """delete clusterhost.
+
+ If force, the cluster host will be deleted anyway.
+ If from_database_only, the cluster host recored will only be
+ deleted from database. Otherwise a celery task sent to do
+ clean deletion.
+ If delete_underlying_host, the underlying host will also be deleted.
+ The backend will call _del_cluster_host again when the clusterhost is
+ deleted from os installer/package installer with from_database_only
+ set.
+ """
+ # force set clusterhost state to ERROR when we want to delete the
+ # clusterhost anyway even the clusterhost is in installing or already
+ # installed. It let the api know the deleting is in doing when backend
+ # is doing the real deleting. In future we may import a new state like
+ # INDELETE to indicate the deleting is processing.
+ # We need discuss about if we can delete a clusterhost when it is already
+ # installed by api.
+ if clusterhost.state.state != 'UNINITIALIZED' and force:
+ clusterhost.state.state = 'ERROR'
+ if not force:
+ check_cluster_editable(
+ clusterhost.cluster, user=user,
+ check_in_installing=True
+ )
+ # delete underlying host if delete_underlying_host is set.
+ if delete_underlying_host:
+ host = clusterhost.host
+ if host.state.state != 'UNINITIALIZED' and force:
+ host.state.state = 'ERROR'
+ # only delete the host when user have the permission to delete it.
+ import compass.db.api.host as host_api
+ if host_api.is_host_editable(
+ host, user=user,
+ check_in_installing=True
+ ):
+ # if there is no need to do the deletion by backend or
+ # from_database_only is set, we only delete the record
+ # in database.
+ if host.state.state == 'UNINITIALIZED' or from_database_only:
+ utils.del_db_object(
+ session, host
+ )
+
+ # if there is no need to do the deletion by backend or
+ # from_database_only is set, we only delete the record in database.
+ if clusterhost.state.state == 'UNINITIALIZED' or from_database_only:
+ return utils.del_db_object(
+ session, clusterhost
+ )
+ else:
+ logging.info(
+ 'send del cluster %s host %s task to celery',
+ clusterhost.cluster_id, clusterhost.host_id
+ )
+ from compass.tasks import client as celery_client
+ celery_client.celery.send_task(
+ 'compass.tasks.delete_cluster_host',
+ (
+ user.email, clusterhost.cluster_id, clusterhost.host_id,
+ delete_underlying_host
+ ),
+ queue=user.email,
+ exchange=user.email,
+ routing_key=user.email
+ )
+ return {
+ 'status': 'delete action sent',
+ 'host': clusterhost,
+ }
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+def del_cluster_host(
+ cluster_id, host_id,
+ force=False, from_database_only=False,
+ delete_underlying_host=False, user=None,
+ session=None, **kwargs
+):
+ """Delete clusterhost by cluster id and host id."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _del_cluster_host(
+ clusterhost, force=force, from_database_only=from_database_only,
+ delete_underlying_host=delete_underlying_host, user=user,
+ session=session, **kwargs
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+def del_clusterhost(
+ clusterhost_id,
+ force=False, from_database_only=False,
+ delete_underlying_host=False, user=None,
+ session=None, **kwargs
+):
+ """Delete clusterhost by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _del_cluster_host(
+ clusterhost, force=force, from_database_only=from_database_only,
+ delete_underlying_host=delete_underlying_host, user=user,
+ session=session, **kwargs
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOST_CONFIG
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_CONFIG_FIELDS)
+def get_cluster_host_config(
+ cluster_id, host_id, user=None,
+ session=None, **kwargs
+):
+ """Get clusterhost config by cluster id and host id."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOST_CONFIG
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS)
+def get_cluster_host_deployed_config(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost deployed config by cluster id and host id."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOST_CONFIG
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_CONFIG_FIELDS)
+def get_clusterhost_config(clusterhost_id, user=None, session=None, **kwargs):
+ """Get clusterhost config by clusterhost id."""
+ return _get_clusterhost(
+ clusterhost_id, session=session
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_LIST_CLUSTERHOST_CONFIG
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS)
+def get_clusterhost_deployed_config(
+ clusterhost_id, user=None,
+ session=None, **kwargs
+):
+ """Get clusterhost deployed config by clusterhost id."""
+ return _get_clusterhost(
+ clusterhost_id, session=session
+ )
+
+
+def _clusterhost_os_config_validates(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Validate clusterhost's underlying host os config."""
+ from compass.db.api import host as host_api
+ host = clusterhost.host
+ host_api.check_host_editable(host, user=user)
+ metadata_api.validate_os_config(
+ config, host.os_id
+ )
+
+
+def _clusterhost_package_config_validates(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Validate clusterhost's cluster package config."""
+ cluster = clusterhost.cluster
+ check_cluster_editable(cluster, user=user)
+ metadata_api.validate_flavor_config(
+ config, cluster.flavor_id
+ )
+
+
+def _filter_clusterhost_host_editable(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Filter fields if the underlying host is not editable."""
+ from compass.db.api import host as host_api
+ host = clusterhost.host
+ return host_api.is_host_editable(host, user=user)
+
+
+@utils.input_filters(
+ put_os_config=_filter_clusterhost_host_editable,
+ patched_os_config=_filter_clusterhost_host_editable
+)
+@utils.input_validates_with_args(
+ put_os_config=_clusterhost_os_config_validates,
+ put_package_config=_clusterhost_package_config_validates
+)
+@utils.output_validates_with_args(
+ os_config=_clusterhost_os_config_validates,
+ package_config=_clusterhost_package_config_validates
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_CONFIG_FIELDS)
+def _update_clusterhost_config(clusterhost, session=None, user=None, **kwargs):
+ """Update clusterhost config."""
+ return utils.update_db_object(
+ session, clusterhost, **kwargs
+ )
+
+
+def _clusterhost_host_validated(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Check clusterhost's underlying host is validated."""
+ from compass.db.api import host as host_api
+ host = clusterhost.host
+ host_api.check_host_editable(host, user=user)
+ host_api.check_host_validated(host)
+
+
+def _clusterhost_cluster_validated(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Check clusterhost's cluster is validated."""
+ cluster = clusterhost.cluster
+ check_cluster_editable(cluster, user=user)
+ check_clusterhost_validated(clusterhost)
+
+
+@utils.input_filters(
+ deployed_os_config=_filter_clusterhost_host_editable,
+)
+@utils.input_validates_with_args(
+ deployed_os_config=_clusterhost_host_validated,
+ deployed_package_config=_clusterhost_cluster_validated
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS)
+def _update_clusterhost_deployed_config(
+ clusterhost, session=None, user=None, **kwargs
+):
+ """Update clusterhost deployed config."""
+ return utils.update_db_object(
+ session, clusterhost, **kwargs
+ )
+
+
+# replace os_config to put_os_config and
+# package_config to put_package_config in kwargs.
+# It tells db these fields will be updated not patched.
+@utils.replace_filters(
+ os_config='put_os_config',
+ package_config='put_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_CONFIG_FIELDS,
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def update_cluster_host_config(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Update clusterhost config by cluster id and host id."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _update_clusterhost_config(
+ clusterhost, user=user, session=session, **kwargs
+ )
+
+
+# replace os_config to deployed_os_config and
+# package_config to deployed_package_config in kwargs.
+@utils.replace_filters(
+ os_config='deployed_os_config',
+ package_config='deployed_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def update_cluster_host_deployed_config(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Update clusterhost deployed config by cluster id and host id."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _update_clusterhost_deployed_config(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+# replace os_config to put_os_config and
+# package_config to put_package_config in kwargs.
+# It tells db these fields will be updated not patched.
+@utils.replace_filters(
+ os_config='put_os_config',
+ package_config='put_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_CONFIG_FIELDS,
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def update_clusterhost_config(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """Update clusterhost config by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _update_clusterhost_config(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+# replace os_config to deployed_os_config and
+# package_config to deployed_package_config in kwargs.
+@utils.replace_filters(
+ os_config='deployed_os_config',
+ package_config='deployed_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_DEPLOYED_CONFIG_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def update_clusterhost_deployed_config(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """Update clusterhost deployed config by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _update_clusterhost_deployed_config(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+# replace os_config to patched_os_config and
+# package_config to patched_package_config in kwargs
+# It tells db these fields will be patched not updated.
+@utils.replace_filters(
+ os_config='patched_os_config',
+ package_config='patched_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=PATCHED_CLUSTERHOST_CONFIG_FIELDS,
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def patch_cluster_host_config(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """patch clusterhost config by cluster id and host id."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _update_clusterhost_config(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+# replace os_config to patched_os_config and
+# package_config to patched_package_config in kwargs
+# It tells db these fields will be patched not updated.
+@utils.replace_filters(
+ os_config='patched_os_config',
+ package_config='patched_package_config'
+)
+@utils.supported_filters(
+ optional_support_keys=PATCHED_CLUSTERHOST_CONFIG_FIELDS,
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_ADD_CLUSTERHOST_CONFIG
+)
+def patch_clusterhost_config(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """patch clusterhost config by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _update_clusterhost_config(
+ clusterhost, session=session, user=user, **kwargs
+ )
+
+
+def _clusterhost_host_editable(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Check clusterhost underlying host is editable."""
+ from compass.db.api import host as host_api
+ host_api.check_host_editable(clusterhost.host, user=user)
+
+
+def _clusterhost_cluster_editable(
+ config, clusterhost, session=None, user=None, **kwargs
+):
+ """Check clusterhost's cluster is editable."""
+ check_cluster_editable(clusterhost.cluster, user=user)
+
+
+@utils.supported_filters(
+ optional_support_keys=['os_config', 'package_config']
+)
+@utils.input_filters(
+ os_config=_filter_clusterhost_host_editable,
+)
+@utils.output_validates_with_args(
+ package_config=_clusterhost_cluster_editable
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_CONFIG_FIELDS)
+def _delete_clusterhost_config(
+ clusterhost, session=None, user=None, **kwargs
+):
+ """delete clusterhost config."""
+ return utils.update_db_object(
+ session, clusterhost, config_validated=False,
+ **kwargs
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEL_CLUSTERHOST_CONFIG
+)
+def delete_cluster_host_config(
+ cluster_id, host_id, user=None, session=None
+):
+ """Delete a clusterhost config by cluster id and host id."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _delete_clusterhost_config(
+ clusterhost, session=session, user=user,
+ os_config={}, package_config={}
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEL_CLUSTERHOST_CONFIG
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_CONFIG_FIELDS)
+def delete_clusterhost_config(clusterhost_id, user=None, session=None):
+ """Delet a clusterhost config by clusterhost id."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ return _delete_clusterhost_config(
+ clusterhost, session=session, user=user,
+ os_config={}, package_config={}
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=['add_hosts', 'remove_hosts', 'set_hosts']
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_HOSTS
+)
+@utils.wrap_to_dict(
+ ['hosts'],
+ hosts=RESP_CLUSTERHOST_FIELDS
+)
+def update_cluster_hosts(
+ cluster_id, add_hosts={}, set_hosts=None,
+ remove_hosts={}, user=None, session=None
+):
+ """Update cluster hosts."""
+ cluster = _get_cluster(cluster_id, session=session)
+ if remove_hosts:
+ _remove_clusterhosts(
+ cluster, session=session, user=user, **remove_hosts
+ )
+ if add_hosts:
+ _add_clusterhosts(
+ cluster, session=session, user=user, **add_hosts
+ )
+ if set_hosts is not None:
+ _set_clusterhosts(
+ cluster, session=session, user=user, **set_hosts
+ )
+
+ return {
+ 'hosts': list_cluster_hosts(cluster_id, session=session)
+ }
+
+
+def validate_clusterhost(clusterhost, session=None):
+ """validate clusterhost."""
+ roles = clusterhost.roles
+ if not roles:
+ if clusterhost.cluster.flavor_name:
+ raise exception.InvalidParameter(
+ 'empty roles for clusterhost %s' % clusterhost.name
+ )
+
+
+def validate_cluster(cluster, session=None):
+ """Validate cluster."""
+ if not cluster.clusterhosts:
+ raise exception.InvalidParameter(
+ 'cluster %s does not have any hosts' % cluster.name
+ )
+ if cluster.flavor_name:
+ cluster_roles = cluster.flavor['roles']
+ else:
+ cluster_roles = []
+ necessary_roles = set([
+ role['name'] for role in cluster_roles if not role.get('optional')
+ ])
+ clusterhost_roles = set([])
+ interface_subnets = {}
+ for clusterhost in cluster.clusterhosts:
+ roles = clusterhost.roles
+ for role in roles:
+ clusterhost_roles.add(role['name'])
+ host = clusterhost.host
+ for host_network in host.host_networks:
+ interface_subnets.setdefault(
+ host_network.interface, set([])
+ ).add(host_network.subnet.subnet)
+ missing_roles = necessary_roles - clusterhost_roles
+ if missing_roles:
+ raise exception.InvalidParameter(
+ 'cluster %s have some roles %s not assigned to any host' % (
+ cluster.name, list(missing_roles)
+ )
+ )
+ for interface, subnets in interface_subnets.items():
+ if len(subnets) > 1:
+ raise exception.InvalidParameter(
+ 'cluster %s multi subnets %s in interface %s' % (
+ cluster.name, list(subnets), interface
+ )
+ )
+
+
+@utils.supported_filters(optional_support_keys=['review'])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_REVIEW_CLUSTER
+)
+@utils.wrap_to_dict(
+ RESP_REVIEW_FIELDS,
+ cluster=RESP_CONFIG_FIELDS,
+ hosts=RESP_CLUSTERHOST_CONFIG_FIELDS
+)
+def review_cluster(cluster_id, review={}, user=None, session=None, **kwargs):
+ """review cluster.
+
+ Args:
+ cluster_id: the cluster id.
+ review: dict contains hosts to be reviewed. either contains key
+ hosts or clusterhosts. where hosts is a list of host id,
+ clusterhosts is a list of clusterhost id.
+ """
+ from compass.db.api import host as host_api
+ cluster = _get_cluster(cluster_id, session=session)
+ check_cluster_editable(cluster, user=user)
+ host_ids = review.get('hosts', [])
+ clusterhost_ids = review.get('clusterhosts', [])
+ clusterhosts = []
+ # Get clusterhosts need to be reviewed.
+ for clusterhost in cluster.clusterhosts:
+ if (
+ clusterhost.clusterhost_id in clusterhost_ids or
+ clusterhost.host_id in host_ids
+ ):
+ clusterhosts.append(clusterhost)
+
+ os_config = copy.deepcopy(cluster.os_config)
+ os_config = metadata_api.autofill_os_config(
+ os_config, cluster.os_id, cluster=cluster
+ )
+ metadata_api.validate_os_config(
+ os_config, cluster.os_id, True
+ )
+ for clusterhost in clusterhosts:
+ host = clusterhost.host
+ # ignore underlying host os config validation
+ # since the host is not editable
+ if not host_api.is_host_editable(
+ host, user=user, check_in_installing=False
+ ):
+ logging.info(
+ 'ignore update host %s config '
+ 'since it is not editable' % host.name
+ )
+ continue
+ host_os_config = copy.deepcopy(host.os_config)
+ host_os_config = metadata_api.autofill_os_config(
+ host_os_config, host.os_id,
+ host=host
+ )
+ deployed_os_config = util.merge_dict(
+ os_config, host_os_config
+ )
+ metadata_api.validate_os_config(
+ deployed_os_config, host.os_id, True
+ )
+ host_api.validate_host(host)
+ utils.update_db_object(
+ session, host, os_config=host_os_config, config_validated=True
+ )
+
+ package_config = copy.deepcopy(cluster.package_config)
+ if cluster.flavor_name:
+ package_config = metadata_api.autofill_flavor_config(
+ package_config, cluster.flavor_id,
+ cluster=cluster
+ )
+ metadata_api.validate_flavor_config(
+ package_config, cluster.flavor_id, True
+ )
+ for clusterhost in clusterhosts:
+ clusterhost_package_config = copy.deepcopy(
+ clusterhost.package_config
+ )
+ clusterhost_package_config = (
+ metadata_api.autofill_flavor_config(
+ clusterhost_package_config,
+ cluster.flavor_id,
+ clusterhost=clusterhost
+ )
+ )
+ deployed_package_config = util.merge_dict(
+ package_config, clusterhost_package_config
+ )
+ metadata_api.validate_flavor_config(
+ deployed_package_config,
+ cluster.flavor_id, True
+ )
+ validate_clusterhost(clusterhost, session=session)
+ utils.update_db_object(
+ session, clusterhost,
+ package_config=clusterhost_package_config,
+ config_validated=True
+ )
+
+ validate_cluster(cluster, session=session)
+ utils.update_db_object(
+ session, cluster, os_config=os_config, package_config=package_config,
+ config_validated=True
+ )
+ return {
+ 'cluster': cluster,
+ 'hosts': clusterhosts
+ }
+
+
+@utils.supported_filters(optional_support_keys=['deploy'])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEPLOY_CLUSTER
+)
+@utils.wrap_to_dict(
+ RESP_DEPLOY_FIELDS,
+ cluster=RESP_CONFIG_FIELDS,
+ hosts=RESP_CLUSTERHOST_FIELDS
+)
+def deploy_cluster(
+ cluster_id, deploy={}, user=None, session=None, **kwargs
+):
+ """deploy cluster.
+
+ Args:
+ cluster_id: cluster id.
+ deploy: dict contains key either hosts or clusterhosts.
+ deploy['hosts'] is a list of host id,
+ deploy['clusterhosts'] is a list of clusterhost id.
+ """
+ from compass.db.api import host as host_api
+ from compass.tasks import client as celery_client
+ cluster = _get_cluster(cluster_id, session=session)
+ host_ids = deploy.get('hosts', [])
+ clusterhost_ids = deploy.get('clusterhosts', [])
+ clusterhosts = []
+ # get clusterhost to deploy.
+ for clusterhost in cluster.clusterhosts:
+ if (
+ clusterhost.clusterhost_id in clusterhost_ids or
+ clusterhost.host_id in host_ids
+ ):
+ clusterhosts.append(clusterhost)
+ check_cluster_editable(cluster, user=user)
+ check_cluster_validated(cluster)
+ utils.update_db_object(session, cluster.state, state='INITIALIZED')
+ for clusterhost in clusterhosts:
+ host = clusterhost.host
+ # ignore checking if underlying host is validated if
+ # the host is not editable.
+ if host_api.is_host_editable(host, user=user):
+ host_api.check_host_validated(host)
+ utils.update_db_object(session, host.state, state='INITIALIZED')
+ if cluster.flavor_name:
+ check_clusterhost_validated(clusterhost)
+ utils.update_db_object(
+ session, clusterhost.state, state='INITIALIZED'
+ )
+
+ celery_client.celery.send_task(
+ 'compass.tasks.deploy_cluster',
+ (
+ user.email, cluster_id,
+ [clusterhost.host_id for clusterhost in clusterhosts]
+ ),
+ queue=user.email,
+ exchange=user.email,
+ routing_key=user.email
+ )
+ return {
+ 'status': 'deploy action sent',
+ 'cluster': cluster,
+ 'hosts': clusterhosts
+ }
+
+
+@utils.supported_filters(optional_support_keys=['redeploy'])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEPLOY_CLUSTER
+)
+@utils.wrap_to_dict(
+ RESP_DEPLOY_FIELDS,
+ cluster=RESP_CONFIG_FIELDS,
+ hosts=RESP_CLUSTERHOST_FIELDS
+)
+def redeploy_cluster(
+ cluster_id, deploy={}, user=None, session=None, **kwargs
+):
+ """redeploy cluster.
+
+ Args:
+ cluster_id: cluster id.
+ """
+ from compass.db.api import host as host_api
+ from compass.tasks import client as celery_client
+ cluster = _get_cluster(cluster_id, session=session)
+
+ check_cluster_editable(cluster, user=user)
+ check_cluster_validated(cluster)
+ utils.update_db_object(
+ session, cluster.state,
+ state='INITIALIZED',
+ percentage=0,
+ ready=False
+ )
+ for clusterhost in cluster.clusterhosts:
+ host = clusterhost.host
+ # ignore checking if underlying host is validated if
+ # the host is not editable.
+ host_api.check_host_validated(host)
+ utils.update_db_object(
+ session, host.state,
+ state='INITIALIZED',
+ percentage=0,
+ ready=False
+ )
+ if cluster.flavor_name:
+ check_clusterhost_validated(clusterhost)
+ utils.update_db_object(
+ session,
+ clusterhost.state,
+ state='INITIALIZED',
+ percentage=0,
+ ready=False
+ )
+
+ celery_client.celery.send_task(
+ 'compass.tasks.redeploy_cluster',
+ (
+ user.email, cluster_id
+ ),
+ queue=user.email,
+ exchange=user.email,
+ routing_key=user.email
+ )
+ return {
+ 'status': 'redeploy action sent',
+ 'cluster': cluster
+ }
+
+
+@utils.supported_filters(optional_support_keys=['apply_patch'])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_DEPLOY_CLUSTER
+)
+@utils.wrap_to_dict(
+ RESP_DEPLOY_FIELDS,
+ cluster=RESP_CONFIG_FIELDS,
+ hosts=RESP_CLUSTERHOST_FIELDS
+)
+def patch_cluster(cluster_id, user=None, session=None, **kwargs):
+
+ from compass.tasks import client as celery_client
+
+ cluster = _get_cluster(cluster_id, session=session)
+ celery_client.celery.send_task(
+ 'compass.tasks.patch_cluster',
+ (
+ user.email, cluster_id,
+ ),
+ queue=user.email,
+ exchange=user.email,
+ routing_key=user.email
+ )
+ return {
+ 'status': 'patch action sent',
+ 'cluster': cluster
+ }
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_GET_CLUSTER_STATE
+)
+@utils.wrap_to_dict(RESP_STATE_FIELDS)
+def get_cluster_state(cluster_id, user=None, session=None, **kwargs):
+ """Get cluster state info."""
+ return _get_cluster(cluster_id, session=session).state_dict()
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_GET_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def get_cluster_host_state(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost state merged with underlying host state."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session
+ ).state_dict()
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_GET_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def get_cluster_host_self_state(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost itself state."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session
+ ).state
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_GET_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def get_clusterhost_state(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost state merged with underlying host state."""
+ return _get_clusterhost(
+ clusterhost_id, session=session
+ ).state_dict()
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_GET_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def get_clusterhost_self_state(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost itself state."""
+ return _get_clusterhost(
+ clusterhost_id, session=session
+ ).state
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def update_cluster_host_state(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Update a clusterhost itself state."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ # Modify(harry): without progress_update.py to update cluster state
+ # update cluster state here
+ cluster = _get_cluster(clusterhost.cluster_id, session=session)
+ utils.update_db_object(session, clusterhost.state, **kwargs)
+ utils.update_db_object(session, cluster.state, **kwargs)
+ return clusterhost.state_dict()
+
+
+def _update_clusterhost_state(
+ clusterhost, from_database_only=False,
+ session=None, user=None, **kwargs
+):
+ """Update clusterhost state.
+
+ If from_database_only, the state will only be updated in database.
+ Otherwise a task sent to celery and os installer/package installer
+ will also update its state if needed.
+ """
+ if 'ready' in kwargs and kwargs['ready'] and not clusterhost.state.ready:
+ ready_triggered = True
+ else:
+ ready_triggered = False
+ cluster_ready = False
+ host = clusterhost.host
+ cluster = clusterhost.cluster
+ host_ready = not host.state.ready
+ if ready_triggered:
+ cluster_ready = True
+ for clusterhost_in_cluster in cluster.clusterhosts:
+ if (
+ clusterhost_in_cluster.clusterhost_id
+ == clusterhost.clusterhost_id
+ ):
+ continue
+ if not clusterhost_in_cluster.state.ready:
+ cluster_ready = False
+
+ logging.info(
+ 'clusterhost %s ready: %s',
+ clusterhost.name, ready_triggered
+ )
+ logging.info('cluster ready: %s', cluster_ready)
+ logging.info('host ready: %s', host_ready)
+ if not ready_triggered or from_database_only:
+ logging.info('%s state is set to %s', clusterhost.name, kwargs)
+ utils.update_db_object(session, clusterhost.state, **kwargs)
+ if not clusterhost.state.ready:
+ logging.info('%s state ready is set to False', cluster.name)
+ utils.update_db_object(session, cluster.state, ready=False)
+ status = '%s state is updated' % clusterhost.name
+ else:
+ if not user:
+ user_id = cluster.creator_id
+ user_dict = user_api.get_user(user_id, session=session)
+ user_email = user_dict['email']
+ else:
+ user_email = user.email
+ from compass.tasks import client as celery_client
+ celery_client.celery.send_task(
+ 'compass.tasks.package_installed',
+ (
+ clusterhost.cluster_id, clusterhost.host_id,
+ cluster_ready, host_ready
+ ),
+ queue=user_email,
+ exchange=user_email,
+ routing_key=user_email
+ )
+ status = '%s: cluster ready %s host ready %s' % (
+ clusterhost.name, cluster_ready, host_ready
+ )
+ logging.info('action status: %s', status)
+ return {
+ 'status': status,
+ 'clusterhost': clusterhost.state_dict()
+ }
+
+
+@util.deprecated
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_STATE_INTERNAL_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(['status', 'clusterhost'])
+def update_cluster_host_state_internal(
+ cluster_id, host_id, from_database_only=False,
+ user=None, session=None, **kwargs
+):
+ """Update a clusterhost state by installation process."""
+ # TODO(xicheng): it should be merged into update_cluster_host_state
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return _update_clusterhost_state(
+ clusterhost, from_database_only=from_database_only,
+ session=session, users=user, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_STATE_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(RESP_CLUSTERHOST_STATE_FIELDS)
+def update_clusterhost_state(
+ clusterhost_id, user=None, session=None, **kwargs
+):
+ """Update a clusterhost itself state."""
+ clusterhost = _get_clusterhost(
+ clusterhost_id, session=session
+ )
+ # Modify(harry): without progress_update.py to update cluster state
+ # update cluster state here
+ cluster = _get_cluster(clusterhost.cluster_id, session=session)
+ utils.update_db_object(session, clusterhost.state, **kwargs)
+ utils.update_db_object(session, cluster.state, **kwargs)
+ return clusterhost.state_dict()
+
+
+@util.deprecated
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_STATE_INTERNAL_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTERHOST_STATE
+)
+@utils.wrap_to_dict(['status', 'clusterhost'])
+def update_clusterhost_state_internal(
+ clusterhost_id, from_database_only=False,
+ user=None, session=None, **kwargs
+):
+ """Update a clusterhost state by installation process."""
+ # TODO(xicheng): it should be merged into update_clusterhost_state
+ clusterhost = _get_clusterhost(clusterhost_id, session=session)
+ return _update_clusterhost_state(
+ clusterhost, from_database_only=from_database_only,
+ session=session, user=user, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTER_STATE_FIELDS,
+ ignore_support_keys=(IGNORE_FIELDS + IGNORE_UPDATED_CLUSTER_STATE_FIELDS)
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_STATE
+)
+@utils.wrap_to_dict(RESP_STATE_FIELDS)
+def update_cluster_state(
+ cluster_id, user=None, session=None, **kwargs
+):
+ """Update a cluster state."""
+ cluster = _get_cluster(
+ cluster_id, session=session
+ )
+ utils.update_db_object(session, cluster.state, **kwargs)
+ return cluster.state_dict()
+
+
+@util.deprecated
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTER_STATE_INTERNAL_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@user_api.check_user_permission(
+ permission.PERMISSION_UPDATE_CLUSTER_STATE
+)
+@utils.wrap_to_dict(['status', 'cluster'])
+def update_cluster_state_internal(
+ cluster_id, from_database_only=False,
+ user=None, session=None, **kwargs
+):
+ """Update a cluster state by installation process.
+
+ If from_database_only, the state will only be updated in database.
+ Otherwise a task sent to do state update in os installer and
+ package installer.
+ """
+ # TODO(xicheng): it should be merged into update_cluster_state
+ cluster = _get_cluster(cluster_id, session=session)
+ if 'ready' in kwargs and kwargs['ready'] and not cluster.state.ready:
+ ready_triggered = True
+ else:
+ ready_triggered = False
+ clusterhost_ready = {}
+ if ready_triggered:
+ for clusterhost in cluster.clusterhosts:
+ clusterhost_ready[clusterhost.host_id] = (
+ not clusterhost.state.ready
+ )
+
+ logging.info('cluster %s ready: %s', cluster_id, ready_triggered)
+ logging.info('clusterhost ready: %s', clusterhost_ready)
+
+ if not ready_triggered or from_database_only:
+ logging.info('%s state is set to %s', cluster.name, kwargs)
+ utils.update_db_object(session, cluster.state, **kwargs)
+ if not cluster.state.ready:
+ for clusterhost in cluster.clusterhosts:
+ logging.info('%s state ready is to False', clusterhost.name)
+ utils.update_db_object(
+ session, clusterhost.state, ready=False
+ )
+ status = '%s state is updated' % cluster.name
+ else:
+ if not user:
+ user_id = cluster.creator_id
+ user_dict = user_api.get_user(user_id, session=session)
+ user_email = user_dict['email']
+ else:
+ user_email = user.email
+ from compass.tasks import client as celery_client
+ celery_client.celery.send_task(
+ 'compass.tasks.cluster_installed',
+ (clusterhost.cluster_id, clusterhost_ready),
+ queue=user_email,
+ exchange=user_email,
+ routing_key=user_email
+ )
+ status = '%s installed action set clusterhost ready %s' % (
+ cluster.name, clusterhost_ready
+ )
+ logging.info('action status: %s', status)
+ return {
+ 'status': status,
+ 'cluster': cluster.state_dict()
+ }
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def get_cluster_host_log_histories(
+ cluster_id, host_id, user=None, session=None, **kwargs
+):
+ """Get clusterhost log history by cluster id and host id."""
+ return _get_cluster_host(
+ cluster_id, host_id, session=session
+ ).log_histories
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def get_clusterhost_log_histories(
+ clusterhost_id, user=None,
+ session=None, **kwargs
+):
+ """Get clusterhost log history by clusterhost id."""
+ return _get_clusterhost(
+ clusterhost_id, session=session
+ ).log_histories
+
+
+def _get_cluster_host_log_history(
+ cluster_id, host_id, filename, session=None, **kwargs
+):
+ """Get clusterhost log history by cluster id, host id and filename."""
+ clusterhost = _get_cluster_host(cluster_id, host_id, session=session)
+ return utils.get_db_object(
+ session, models.ClusterHostLogHistory,
+ clusterhost_id=clusterhost.clusterhost_id, filename=filename,
+ **kwargs
+ )
+
+
+def _get_clusterhost_log_history(
+ clusterhost_id, filename, session=None, **kwargs
+):
+ """Get clusterhost log history by clusterhost id and filename."""
+ clusterhost = _get_clusterhost(clusterhost_id, session=session)
+ return utils.get_db_object(
+ session, models.ClusterHostLogHistory,
+ clusterhost_id=clusterhost.clusterhost_id, filename=filename,
+ **kwargs
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def get_cluster_host_log_history(
+ cluster_id, host_id, filename, user=None, session=None, **kwargs
+):
+ """Get clusterhost log history by cluster id, host id and filename."""
+ return _get_cluster_host_log_history(
+ cluster_id, host_id, filename, session=session
+ )
+
+
+@utils.supported_filters([])
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def get_clusterhost_log_history(
+ clusterhost_id, filename, user=None, session=None, **kwargs
+):
+ """Get host log history by clusterhost id and filename."""
+ return _get_clusterhost_log_history(
+ clusterhost_id, filename, session=session
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_LOG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def update_cluster_host_log_history(
+ cluster_id, host_id, filename, user=None, session=None, **kwargs
+):
+ """Update a host log history by cluster id, host id and filename."""
+ cluster_host_log_history = _get_cluster_host_log_history(
+ cluster_id, host_id, filename, session=session
+ )
+ return utils.update_db_object(
+ session, cluster_host_log_history, **kwargs
+ )
+
+
+@utils.supported_filters(
+ optional_support_keys=UPDATED_CLUSTERHOST_LOG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def update_clusterhost_log_history(
+ clusterhost_id, filename, user=None, session=None, **kwargs
+):
+ """Update a host log history by clusterhost id and filename."""
+ clusterhost_log_history = _get_clusterhost_log_history(
+ clusterhost_id, filename, session=session
+ )
+ return utils.update_db_object(session, clusterhost_log_history, **kwargs)
+
+
+@utils.supported_filters(
+ ADDED_CLUSTERHOST_LOG_FIELDS,
+ optional_support_keys=UPDATED_CLUSTERHOST_LOG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def add_clusterhost_log_history(
+ clusterhost_id, exception_when_existing=False,
+ filename=None, user=None, session=None, **kwargs
+):
+ """add a host log history by clusterhost id and filename."""
+ clusterhost = _get_clusterhost(clusterhost_id, session=session)
+ return utils.add_db_object(
+ session, models.ClusterHostLogHistory,
+ exception_when_existing,
+ clusterhost.clusterhost_id, filename, **kwargs
+ )
+
+
+@utils.supported_filters(
+ ADDED_CLUSTERHOST_LOG_FIELDS,
+ optional_support_keys=UPDATED_CLUSTERHOST_LOG_FIELDS,
+ ignore_support_keys=IGNORE_FIELDS
+)
+@database.run_in_session()
+@utils.wrap_to_dict(RESP_CLUSTERHOST_LOG_FIELDS)
+def add_cluster_host_log_history(
+ cluster_id, host_id, exception_when_existing=False,
+ filename=None, user=None, session=None, **kwargs
+):
+ """add a host log history by cluster id, host id and filename."""
+ clusterhost = _get_cluster_host(
+ cluster_id, host_id, session=session
+ )
+ return utils.add_db_object(
+ session, models.ClusterHostLogHistory, exception_when_existing,
+ clusterhost.clusterhost_id, filename, **kwargs
+ )