summaryrefslogtreecommitdiffstats
path: root/tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py
diff options
context:
space:
mode:
Diffstat (limited to 'tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py')
-rw-r--r--tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py286
1 files changed, 0 insertions, 286 deletions
diff --git a/tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py b/tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py
deleted file mode 100644
index 1cba5c4..0000000
--- a/tosca2heat/tosca-parser-0.3.0/toscaparser/prereq/csar.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os.path
-import requests
-import shutil
-import six
-import tempfile
-import yaml
-import zipfile
-
-from toscaparser.common.exception import ExceptionCollector
-from toscaparser.common.exception import URLException
-from toscaparser.common.exception import ValidationError
-from toscaparser.imports import ImportsLoader
-from toscaparser.utils.gettextutils import _
-from toscaparser.utils.urlutils import UrlUtils
-
-try: # Python 2.x
- from BytesIO import BytesIO
-except ImportError: # Python 3.x
- from io import BytesIO
-
-
-class CSAR(object):
-
- def __init__(self, csar_file, a_file=True):
- self.path = csar_file
- self.a_file = a_file
- self.is_validated = False
- self.error_caught = False
- self.csar = None
- self.temp_dir = None
-
- def validate(self):
- """Validate the provided CSAR file."""
-
- self.is_validated = True
-
- # validate that the file or URL exists
- missing_err_msg = (_('"%s" does not exist.') % self.path)
- if self.a_file:
- if not os.path.isfile(self.path):
- ExceptionCollector.appendException(
- ValidationError(message=missing_err_msg))
- return False
- else:
- self.csar = self.path
- else: # a URL
- if not UrlUtils.validate_url(self.path):
- ExceptionCollector.appendException(
- ValidationError(message=missing_err_msg))
- return False
- else:
- response = requests.get(self.path)
- self.csar = BytesIO(response.content)
-
- # validate that it is a valid zip file
- if not zipfile.is_zipfile(self.csar):
- err_msg = (_('"%s" is not a valid zip file.') % self.path)
- ExceptionCollector.appendException(
- ValidationError(message=err_msg))
- return False
-
- # validate that it contains the metadata file in the correct location
- self.zfile = zipfile.ZipFile(self.csar, 'r')
- filelist = self.zfile.namelist()
- if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
- err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
- 'required file "TOSCA.meta" in the folder '
- '"TOSCA-Metadata".') % self.path)
- ExceptionCollector.appendException(
- ValidationError(message=err_msg))
- return False
-
- # validate that 'Entry-Definitions' property exists in TOSCA.meta
- data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
- invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
- 'the CSAR "%s" does not contain valid YAML '
- 'content.') % self.path)
- try:
- meta = yaml.load(data)
- if type(meta) is dict:
- self.metadata = meta
- else:
- ExceptionCollector.appendException(
- ValidationError(message=invalid_yaml_err_msg))
- return False
- except yaml.YAMLError:
- ExceptionCollector.appendException(
- ValidationError(message=invalid_yaml_err_msg))
- return False
-
- if 'Entry-Definitions' not in self.metadata:
- err_msg = (_('The CSAR "%s" is missing the required metadata '
- '"Entry-Definitions" in '
- '"TOSCA-Metadata/TOSCA.meta".')
- % self.path)
- ExceptionCollector.appendException(
- ValidationError(message=err_msg))
- return False
-
- # validate that 'Entry-Definitions' metadata value points to an
- # existing file in the CSAR
- entry = self.metadata.get('Entry-Definitions')
- if entry and entry not in filelist:
- err_msg = (_('The "Entry-Definitions" file defined in the '
- 'CSAR "%s" does not exist.') % self.path)
- ExceptionCollector.appendException(
- ValidationError(message=err_msg))
- return False
-
- # validate that external references in the main template actually
- # exist and are accessible
- self._validate_external_references()
- return not self.error_caught
-
- def get_metadata(self):
- """Return the metadata dictionary."""
-
- # validate the csar if not already validated
- if not self.is_validated:
- self.validate()
-
- # return a copy to avoid changes overwrite the original
- return dict(self.metadata) if self.metadata else None
-
- def _get_metadata(self, key):
- if not self.is_validated:
- self.validate()
- return self.metadata.get(key)
-
- def get_author(self):
- return self._get_metadata('Created-By')
-
- def get_version(self):
- return self._get_metadata('CSAR-Version')
-
- def get_main_template(self):
- entry_def = self._get_metadata('Entry-Definitions')
- if entry_def in self.zfile.namelist():
- return entry_def
-
- def get_main_template_yaml(self):
- main_template = self.get_main_template()
- if main_template:
- data = self.zfile.read(main_template)
- invalid_tosca_yaml_err_msg = (
- _('The file "%(template)s" in the CSAR "%(csar)s" does not '
- 'contain valid TOSCA YAML content.') %
- {'template': main_template, 'csar': self.path})
- try:
- tosca_yaml = yaml.load(data)
- if type(tosca_yaml) is not dict:
- ExceptionCollector.appendException(
- ValidationError(message=invalid_tosca_yaml_err_msg))
- return tosca_yaml
- except Exception:
- ExceptionCollector.appendException(
- ValidationError(message=invalid_tosca_yaml_err_msg))
-
- def get_description(self):
- desc = self._get_metadata('Description')
- if desc is not None:
- return desc
-
- self.metadata['Description'] = \
- self.get_main_template_yaml().get('description')
- return self.metadata['Description']
-
- def decompress(self):
- if not self.is_validated:
- self.validate()
- self.temp_dir = tempfile.NamedTemporaryFile().name
- with zipfile.ZipFile(self.csar, "r") as zf:
- zf.extractall(self.temp_dir)
-
- def _validate_external_references(self):
- """Extracts files referenced in the main template
-
- These references are currently supported:
- * imports
- * interface implementations
- * artifacts
- """
- try:
- self.decompress()
- main_tpl_file = self.get_main_template()
- if not main_tpl_file:
- return
- main_tpl = self.get_main_template_yaml()
-
- if 'imports' in main_tpl:
- ImportsLoader(main_tpl['imports'],
- os.path.join(self.temp_dir, main_tpl_file))
-
- if 'topology_template' in main_tpl:
- topology_template = main_tpl['topology_template']
-
- if 'node_templates' in topology_template:
- node_templates = topology_template['node_templates']
-
- for node_template_key in node_templates:
- node_template = node_templates[node_template_key]
- if 'artifacts' in node_template:
- artifacts = node_template['artifacts']
- for artifact_key in artifacts:
- artifact = artifacts[artifact_key]
- if isinstance(artifact, six.string_types):
- self._validate_external_reference(
- main_tpl_file,
- artifact)
- elif isinstance(artifact, dict):
- if 'file' in artifact:
- self._validate_external_reference(
- main_tpl_file,
- artifact['file'])
- else:
- ExceptionCollector.appendException(
- ValueError(_('Unexpected artifact '
- 'definition for "%s".')
- % artifact_key))
- self.error_caught = True
- if 'interfaces' in node_template:
- interfaces = node_template['interfaces']
- for interface_key in interfaces:
- interface = interfaces[interface_key]
- for opertation_key in interface:
- operation = interface[opertation_key]
- if isinstance(operation, six.string_types):
- self._validate_external_reference(
- main_tpl_file,
- operation,
- False)
- elif isinstance(operation, dict):
- if 'implementation' in operation:
- self._validate_external_reference(
- main_tpl_file,
- operation['implementation'])
- finally:
- if self.temp_dir:
- shutil.rmtree(self.temp_dir)
-
- def _validate_external_reference(self, tpl_file, resource_file,
- raise_exc=True):
- """Verify that the external resource exists
-
- If resource_file is a URL verify that the URL is valid.
- If resource_file is a relative path verify that the path is valid
- considering base folder (self.temp_dir) and tpl_file.
- Note that in a CSAR resource_file cannot be an absolute path.
- """
- if UrlUtils.validate_url(resource_file):
- msg = (_('The resource at "%s" cannot be accessed.') %
- resource_file)
- try:
- if UrlUtils.url_accessible(resource_file):
- return
- else:
- ExceptionCollector.appendException(
- URLException(what=msg))
- self.error_caught = True
- except Exception:
- ExceptionCollector.appendException(
- URLException(what=msg))
- self.error_caught = True
-
- if os.path.isfile(os.path.join(self.temp_dir,
- os.path.dirname(tpl_file),
- resource_file)):
- return
-
- if raise_exc:
- ExceptionCollector.appendException(
- ValueError(_('The resource "%s" does not exist.')
- % resource_file))
- self.error_caught = True