# Copyright 2020 Spirent Communications. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Airship implementation of Software Predeployment Validation """ import os import shutil from pathlib import Path import logging import json import git import urllib3 from tornado.web import Application from tornado.ioloop import IOLoop import tornado.concurrent import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web import tornado.log def check_link(link): """ Function the check the availability of Hyperlinks """ timeout = urllib3.util.Timeout(connect=5.0, read=7.0) http = urllib3.PoolManager(timeout=timeout) try: http.request('HEAD', link) except urllib3.exceptions.LocationValueError as err: print(err.args) return False except urllib3.exceptions.MaxRetryError as err: print(err.args) return False except urllib3.exceptions.RequestError as err: print(err.args) return False except urllib3.exceptions.ConnectTimeoutError as err: print(err.args) return False except urllib3.exceptions.PoolError as err: print(err.args) return False except urllib3.exceptions.HTTPError as err: print(err.args) return False return True class Airship(): """ Ariship URLS Validation """ def __init__(self, params): """ Airship class constructor """ self.url = params['AIRSHIP_MANIFEST_URL'] self.branch = params['AIRSHIP_MANIFEST_BRANCH'] self.dl_path = '/tmp' self.site_name = params['AIRSHIP_MANIFEST_SITE_NAME'] self.tmversion = params['AIRSHIP_TREASUREMAP_VERSION'] self.manifest = None self.dirpath = Path(self.dl_path, 'airship') self.tmdirpath = Path(self.dl_path, 'treasuremap') self.locations = [] self.validcount = 0 self.invalidcount = 0 self.respath = os.path.join(self.dl_path, ('urls-' + self.site_name + '-check.txt')) def clone_repo(self): """ Cloning the repos """ git.Repo.clone_from(self.url, self.dirpath, branch=self.branch) git.Repo.clone_from('https://github.com/airshipit/treasuremap', self.tmdirpath, branch=self.tmversion) def cleanup_manifest(self): """ Remove existing manifests """ # Next Remove any manifest files, if it exists if self.dirpath.exists() and self.dirpath.is_dir(): shutil.rmtree(self.dirpath) if self.tmdirpath.exists() and self.tmdirpath.is_dir(): shutil.rmtree(self.tmdirpath) def manifest_exists_locally(self): """ Check if manifests exists locally """ if self.dirpath.exists() and self.dirpath.is_dir(): return True return False def validate(self): """ Hyperlink Validation """ self.cleanup_manifest() # Next, clone the repo to the provided path. self.clone_repo() if self.dirpath.exists() and self.dirpath.is_dir(): # Get the file(s) where links are defined. self.find_locations( os.path.join(self.dirpath, 'type', 'cntt', 'software', 'config', 'versions.yaml')) self.find_locations( os.path.join(self.tmdirpath, 'global', 'software', 'config', 'versions.yaml')) with open(self.respath, "w+") as report: for location in self.locations: if check_link(location): report.write("The Link: %s is VALID" % (location)) self.validcount += 1 else: self.invalidcount += 1 report.write("The Link: %s is INVALID" % (location)) self.cleanup_manifest() def getresults(self): """ Return Valid and Invalid Counts """ return(self.validcount, self.invalidcount) # pylint: disable=consider-using-enumerate def find_locations(self, yamlfile): """ Find all the hyperlinks in the manifests """ with open(yamlfile, 'r') as filep: lines = filep.readlines() for index in range(len(lines)): line = lines[index].strip() if line.startswith('location:'): link = line.split(":", 1)[1] if "opendev" in link: if ((len(lines) > index+1) and (lines[index+1].strip().startswith( 'reference:'))): ref = lines[index+1].split(":", 1)[1] link = link + '/commit/' + ref.strip() if link.strip() not in self.locations: print(link) self.locations.append(link.strip()) if 'docker.' in line: link = line.split(":", 1)[1] link = link.replace('"', '') parts = link.split('/') if len(parts) == 3: link = ('https://index.' + parts[0].strip() + '/v1/repositories/' + parts[1] + '/' + parts[2].split(':')[0] + '/tags/' + parts[2].split(':')[-1]) if link.strip() not in self.locations: print(link) self.locations.append(link.strip()) # quay.io/coreos/etcd:v3.4.2 # https://quay.io/api/v1/repository/coreos/etcd/tag/v3.4.2 if 'quay.' in line: link = line.split(":", 1)[1] link = link.replace('"', '') parts = link.split('/') if len(parts) == 3: link = ('https://' + parts[0].strip() + '/api/v1/repository/' + parts[1] + '/' + parts[2].split(':')[0] + '/tag/' + parts[2].split(':')[-1]) if link.strip() not in self.locations: print(link) self.locations.append(link.strip()) # pylint: disable=W0223 class AirshipUrlsValidator(tornado.web.RequestHandler): """ Validate URLS """ def set_default_headers(self): """ set default headers""" self.set_header('Content-Type', 'application/json') def post(self): """ POST request usage: /airship/?name='' installer='' link='' version='' :return: logs from test results """ # decode the body data = json.loads(self.request.body.decode()) params = {} branch = 'master' installer = data['installer'] name = data['name'] link = data['link'] version = data['version'] if installer and 'airship' in installer.lower(): if name and link and branch and version: params['AIRSHIP_MANIFEST_URL'] = link params['AIRSHIP_MANIFEST_BRANCH'] = branch params['AIRSHIP_MANIFEST_SITE_NAME'] = name params['AIRSHIP_TREASUREMAP_VERSION'] = version airship = Airship(params) airship.validate() valid, invalid = airship.getresults() self.write("Valid Links: " + str(valid) + " Invalid Links: " + str(invalid)) # pylint: disable=W0223 class TripleoUrlsValidator(tornado.web.RequestHandler): """ Validate URLS """ def post(self): """ POST request """ self.write('error: Not Implemented') def main(): """ The Main Control """ app = Application([('/airship', AirshipUrlsValidator), ('/tripleo', TripleoUrlsValidator)]) # Cli Config tornado.options.define("port", default=8989, help="running on the given port", type=int) tornado.options.parse_command_line() # Server Config http_server = tornado.httpserver.HTTPServer(app) http_server.listen(tornado.options.options.port) # Tornado's event loop handles it from here print("# Server Listening.... \n [Ctrl + C] to quit") # Logging log_file_filename = "/var/log/tornado.log" handler = logging.FileHandler(log_file_filename) app_log = logging.getLogger("tornado.general") tornado.log.enable_pretty_logging() app_log.addHandler(handler) try: tornado.ioloop.IOLoop.instance().start() except KeyboardInterrupt: tornado.ioloop.IOLoop.instance().stop() # start IOLoop.instance().start() if __name__ == "__main__": main()