1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# Software Name: MOON
# Version: 5.4
# SPDX-FileCopyrightText: Copyright (c) 2018-2020 Orange and its contributors
# SPDX-License-Identifier: Apache-2.0
# This software is distributed under the 'Apache License 2.0',
# the text of which is available at 'http://www.apache.org/licenses/LICENSE-2.0.txt'
# or see the "LICENSE" file for more details.
from uuid import uuid4
import logging
from moon_utilities.security_functions import enforce
from moon_manager.api.db.managers import Managers
from moon_utilities import exceptions
logger = logging.getLogger("moon.db.api.pdp")
class PDPManager(Managers):
def __init__(self, connector=None):
self.driver = connector.driver
Managers.PDPManager = self
@enforce(("read", "write"), "pdp")
def update_pdp(self, moon_user_id, pdp_id, value):
if not value or 'name' not in value or not value['name'].strip():
raise exceptions.PdpContentError
exists_security_pipeline = value and 'security_pipeline' in value and \
len(value['security_pipeline']) > 0
exists_vim_project_id = value and 'vim_project_id' in value and \
value['vim_project_id'] != None and \
value['vim_project_id'].strip()
if not exists_security_pipeline and exists_vim_project_id:
raise exceptions.PdpContentError
if exists_security_pipeline and not exists_vim_project_id:
raise exceptions.PdpContentError
self.__pdp_validated_pipeline_name_id(pdp_id, value, "update")
if value and 'security_pipeline' in value:
for policy_id in value['security_pipeline']:
if not policy_id or not policy_id.strip() or not \
Managers.PolicyManager.get_policies(moon_user_id=moon_user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
return self.driver.update_pdp(pdp_id=pdp_id, value=value)
@enforce(("read", "write"), "pdp")
def delete_pdp(self, moon_user_id, pdp_id):
if pdp_id not in self.driver.get_pdp(pdp_id=pdp_id):
raise exceptions.PdpUnknown
return self.driver.delete_pdp(pdp_id=pdp_id)
@enforce(("read", "write"), "pdp")
def add_pdp(self, moon_user_id, pdp_id=None, value=None):
if not value or 'name' not in value or not value['name'].strip():
raise exceptions.PdpContentError
exists_security_pipeline = value and 'security_pipeline' in value and \
len(value['security_pipeline']) > 0
exists_vim_project_id = value and 'vim_project_id' in value and \
value['vim_project_id'] is not None and \
value['vim_project_id'].strip()
if not exists_security_pipeline and exists_vim_project_id:
raise exceptions.PdpContentError
if exists_security_pipeline and not exists_vim_project_id:
raise exceptions.PdpContentError
self.__pdp_validated_pipeline_name_id(pdp_id, value, "add")
if value and 'security_pipeline' in value:
for policy_id in value['security_pipeline']:
if not policy_id or not policy_id.strip() or not \
Managers.PolicyManager.get_policies(moon_user_id=moon_user_id, policy_id=policy_id):
raise exceptions.PolicyUnknown
return self.driver.add_pdp(pdp_id=pdp_id, value=value)
@enforce("read", "pdp")
def get_pdp(self, moon_user_id, pdp_id=None):
return self.driver.get_pdp(pdp_id=pdp_id)
@enforce("read", "pdp")
def delete_policy_from_pdp(self, moon_user_id, pdp_id, policy_id):
if pdp_id not in self.driver.get_pdp(pdp_id=pdp_id):
raise exceptions.PdpUnknown
if policy_id not in self.driver.get_policies(policy_id=policy_id):
raise exceptions.PolicyUnknown
x = self.driver.delete_policy_from_pdp(pdp_id=pdp_id, policy_id=policy_id)
return x
def __pdp_validated_pipeline_name_id(self, pdp_id, value, method_type=None):
all_pdps = self.driver.get_pdp()
if method_type == 'update':
if pdp_id not in all_pdps:
raise exceptions.PdpUnknown
else:
if pdp_id in all_pdps:
raise exceptions.PdpExisting
if not pdp_id:
pdp_id = uuid4().hex
for key in all_pdps:
if pdp_id != key:
if all_pdps[key]['name'] == value['name']:
raise exceptions.PdpExisting
for policy_id in value['security_pipeline']:
if policy_id in all_pdps[key]['security_pipeline']:
raise exceptions.PdpInUse
|