From 7bb53c64da2dcf88894bfd31503accdd81498f3d Mon Sep 17 00:00:00 2001 From: Thomas Duval Date: Wed, 3 Jun 2020 10:06:52 +0200 Subject: Update to new version 5.4 Signed-off-by: Thomas Duval Change-Id: Idcd868133d75928a1ffd74d749ce98503e0555ea --- moon_manager/moon_manager/api/db/pdp.py | 115 ++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 moon_manager/moon_manager/api/db/pdp.py (limited to 'moon_manager/moon_manager/api/db/pdp.py') diff --git a/moon_manager/moon_manager/api/db/pdp.py b/moon_manager/moon_manager/api/db/pdp.py new file mode 100644 index 00000000..a4ca08f6 --- /dev/null +++ b/moon_manager/moon_manager/api/db/pdp.py @@ -0,0 +1,115 @@ +# Software Name: MOON + +# Version: 5.4 + +# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors +# SPDX-License-Identifier: Apache-2.0 + +# This software is distributed under the 'Apache License 2.0', +# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt' +# or see the "LICENSE" file for more details. + + +from uuid import uuid4 +import logging +from moon_utilities.security_functions import enforce +from moon_manager.api.db.managers import Managers +from moon_utilities import exceptions + +logger = logging.getLogger("moon.db.api.pdp") + + +class PDPManager(Managers): + + def __init__(self, connector=None): + self.driver = connector.driver + Managers.PDPManager = self + + @enforce(("read", "write"), "pdp") + def update_pdp(self, moon_user_id, pdp_id, value): + if not value or 'name' not in value or not value['name'].strip(): + raise exceptions.PdpContentError + + exists_security_pipeline = value and 'security_pipeline' in value and \ + len(value['security_pipeline']) > 0 + exists_vim_project_id = value and 'vim_project_id' in value and \ + value['vim_project_id'] != None and \ + value['vim_project_id'].strip() + if not exists_security_pipeline and exists_vim_project_id: + raise exceptions.PdpContentError + if exists_security_pipeline and not exists_vim_project_id: + raise exceptions.PdpContentError + + self.__pdp_validated_pipeline_name_id(pdp_id, value, "update") + + if value and 'security_pipeline' in value: + for policy_id in value['security_pipeline']: + if not policy_id or not policy_id.strip() or not \ + Managers.PolicyManager.get_policies(moon_user_id=moon_user_id, policy_id=policy_id): + raise exceptions.PolicyUnknown + + return self.driver.update_pdp(pdp_id=pdp_id, value=value) + + @enforce(("read", "write"), "pdp") + def delete_pdp(self, moon_user_id, pdp_id): + if pdp_id not in self.driver.get_pdp(pdp_id=pdp_id): + raise exceptions.PdpUnknown + return self.driver.delete_pdp(pdp_id=pdp_id) + + @enforce(("read", "write"), "pdp") + def add_pdp(self, moon_user_id, pdp_id=None, value=None): + if not value or 'name' not in value or not value['name'].strip(): + raise exceptions.PdpContentError + + exists_security_pipeline = value and 'security_pipeline' in value and \ + len(value['security_pipeline']) > 0 + exists_vim_project_id = value and 'vim_project_id' in value and \ + value['vim_project_id'] is not None and \ + value['vim_project_id'].strip() + if not exists_security_pipeline and exists_vim_project_id: + raise exceptions.PdpContentError + if exists_security_pipeline and not exists_vim_project_id: + raise exceptions.PdpContentError + + self.__pdp_validated_pipeline_name_id(pdp_id, value, "add") + + if value and 'security_pipeline' in value: + for policy_id in value['security_pipeline']: + if not policy_id or not policy_id.strip() or not \ + Managers.PolicyManager.get_policies(moon_user_id=moon_user_id, policy_id=policy_id): + raise exceptions.PolicyUnknown + + return self.driver.add_pdp(pdp_id=pdp_id, value=value) + + @enforce("read", "pdp") + def get_pdp(self, moon_user_id, pdp_id=None): + return self.driver.get_pdp(pdp_id=pdp_id) + + @enforce("read", "pdp") + def delete_policy_from_pdp(self, moon_user_id, pdp_id, policy_id): + + if pdp_id not in self.driver.get_pdp(pdp_id=pdp_id): + raise exceptions.PdpUnknown + if policy_id not in self.driver.get_policies(policy_id=policy_id): + raise exceptions.PolicyUnknown + x = self.driver.delete_policy_from_pdp(pdp_id=pdp_id, policy_id=policy_id) + return x + + def __pdp_validated_pipeline_name_id(self, pdp_id, value, method_type=None): + all_pdps = self.driver.get_pdp() + if method_type == 'update': + if pdp_id not in all_pdps: + raise exceptions.PdpUnknown + else: + if pdp_id in all_pdps: + raise exceptions.PdpExisting + if not pdp_id: + pdp_id = uuid4().hex + + for key in all_pdps: + if pdp_id != key: + if all_pdps[key]['name'] == value['name']: + raise exceptions.PdpExisting + for policy_id in value['security_pipeline']: + if policy_id in all_pdps[key]['security_pipeline']: + raise exceptions.PdpInUse -- cgit 1.2.3-korg