From 03feb49199097769cbc8900801855a912769fc9a Mon Sep 17 00:00:00 2001 From: Edward MacGillivray Date: Tue, 5 Sep 2017 13:20:20 -0700 Subject: Added line parser to INI parser Line parser handles comments, keys and values and makes exceptions. Change-Id: I5cd3612ffd8cb08b14051bd0ef4b757c310f77bd Signed-off-by: Edward MacGillivray --- .../network_services/vnf_generic/vnf/iniparser.py | 299 +++++++++++++-------- 1 file changed, 186 insertions(+), 113 deletions(-) (limited to 'yardstick') diff --git a/yardstick/network_services/vnf_generic/vnf/iniparser.py b/yardstick/network_services/vnf_generic/vnf/iniparser.py index 70e24de5b..98256e08a 100644 --- a/yardstick/network_services/vnf_generic/vnf/iniparser.py +++ b/yardstick/network_services/vnf_generic/vnf/iniparser.py @@ -14,163 +14,236 @@ class ParseError(Exception): - def __init__(self, message, lineno, line): + + def __init__(self, message, line_no, line): self.msg = message self.line = line - self.lineno = lineno + self.line_no = line_no def __str__(self): - return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line) + return 'at line %d, %s: %r' % (self.line_no, self.msg, self.line) + + +class SectionParseError(ParseError): + + pass + + +class LineParser(object): + + PARSE_EXC = ParseError + + @staticmethod + def strip_key_value(key, value): + key = key.strip() + value = value.strip() + if value and value[0] == value[-1] and value.startswith(('"', "'")): + value = value[1:-1] + return key, [value] + + def __init__(self, line, line_no): + super(LineParser, self).__init__() + self.line = line + self.line_no = line_no + self.continuation = line != line.lstrip() + semi_active, _, semi_comment = line.partition(';') + pound_active, _, pound_comment = line.partition('#') + if not semi_comment and not pound_comment: + self.active = line.strip() + self.comment = '' + elif len(semi_comment) > len(pound_comment): + self.active = semi_active.strip() + self.comment = semi_comment.strip() + else: + self.active = pound_active.strip() + self.comment = pound_comment.strip() + self._section_name = None + + def __repr__(self): + template = "line %d: active '%s' comment '%s'\n%s" + return template % (self.line_no, self.active, self.comment, self.line) + + @property + def section_name(self): + if self._section_name is None: + if not self.active.startswith('['): + raise self.error_no_section_start_bracket() + if not self.active.endswith(']'): + raise self.error_no_section_end_bracket() + self._section_name = '' + if self.active: + self._section_name = self.active[1:-1] + if not self._section_name: + raise self.error_no_section_name() + return self._section_name + + def is_active_line(self): + return bool(self.active) + + def is_continuation(self): + return self.continuation + + def split_key_value(self): + for sep in ['=', ':']: + words = self.active.split(sep, 1) + try: + return self.strip_key_value(*words) + except TypeError: + pass + + return self.active.rstrip(), '@' + + def error_invalid_assignment(self): + return self.PARSE_EXC("No ':' or '=' found in assignment", self.line_no, self.line) + + def error_empty_key(self): + return self.PARSE_EXC('Key cannot be empty', self.line_no, self.line) + + def error_unexpected_continuation(self): + return self.PARSE_EXC('Unexpected continuation line', self.line_no, self.line) + + def error_no_section_start_bracket(self): + return SectionParseError('Invalid section (must start with [)', self.line_no, self.line) + + def error_no_section_end_bracket(self): + return self.PARSE_EXC('Invalid section (must end with ])', self.line_no, self.line) + + def error_no_section_name(self): + return self.PARSE_EXC('Empty section name', self.line_no, self.line) class BaseParser(object): - lineno = 0 - parse_exc = ParseError - def _assignment(self, key, value): - self.assignment(key, value) - return None, [] + def parse(self, data=None): + if data is not None: + return self._parse(data.splitlines()) - def _get_section(self, line): - if not line.endswith(']'): - return self.error_no_section_end_bracket(line) - if len(line) <= 2: - return self.error_no_section_name(line) + def _next_key_value(self, line_parser, key, value): + self.comment(line_parser) - return line[1:-1] + if not line_parser.is_active_line(): + # Blank line, ends multi-line values + if key: + key, value = self.assignment(key, value, line_parser) + return key, value - def _split_key_value(self, line): - colon = line.find(':') - equal = line.find('=') - if colon < 0 and equal < 0: - return line.strip(), '@' + if line_parser.is_continuation(): + # Continuation of previous assignment + if key is None: + raise line_parser.error_unexpected_continuation() - if colon < 0 or (equal >= 0 and equal < colon): - key, value = line[:equal], line[equal + 1:] + value.append(line_parser.active.lstrip()) + return key, value + + if key: + # Flush previous assignment, if any + key, value = self.assignment(key, value, line_parser) + + try: + # Section start + self.new_section(line_parser) + except SectionParseError: + pass else: - key, value = line[:colon], line[colon + 1:] + return key, value - value = value.strip() - if value and value[0] == value[-1] and value.startswith(("\"", "'")): - value = value[1:-1] - return key.strip(), [value] + key, value = line_parser.split_key_value() + if not key: + raise line_parser.error_empty_key() + return key, value - def parse(self, lineiter): + def _parse(self, line_iter): key = None value = [] - for line in lineiter: - self.lineno += 1 - - line = line.rstrip() - lines = line.split(';') - line = lines[0] - if not line: - # Blank line, ends multi-line values - if key: - key, value = self._assignment(key, value) - continue - elif line.startswith((' ', '\t')): - # Continuation of previous assignment - if key is None: - self.error_unexpected_continuation(line) - else: - value.append(line.lstrip()) - continue - - if key: - # Flush previous assignment, if any - key, value = self._assignment(key, value) - - if line.startswith('['): - # Section start - section = self._get_section(line) - if section: - self.new_section(section) - elif line.startswith(('#', ';')): - self.comment(line[1:].lstrip()) - else: - key, value = self._split_key_value(line) - if not key: - return self.error_empty_key(line) + parse_iter = (LineParser(line, line_no) for line_no, line in enumerate(line_iter)) + for line_parser in parse_iter: + key, value = self._next_key_value(line_parser, key, value) if key: # Flush previous assignment, if any - self._assignment(key, value) + self.assignment(key, value, LineParser('EOF', -1)) - def assignment(self, key, value): + def _assignment(self, key, value, line_parser): """Called when a full assignment is parsed.""" raise NotImplementedError() - def new_section(self, section): + def assignment(self, key, value, line_parser): + self._assignment(key, value, line_parser) + return None, [] + + def new_section(self, line_parser): """Called when a new section is started.""" raise NotImplementedError() - def comment(self, comment): + def comment(self, line_parser): """Called when a comment is parsed.""" - pass - - def error_invalid_assignment(self, line): - raise self.parse_exc("No ':' or '=' found in assignment", - self.lineno, line) - - def error_empty_key(self, line): - raise self.parse_exc('Key cannot be empty', self.lineno, line) - - def error_unexpected_continuation(self, line): - raise self.parse_exc('Unexpected continuation line', - self.lineno, line) - - def error_no_section_end_bracket(self, line): - raise self.parse_exc('Invalid section (must end with ])', - self.lineno, line) - - def error_no_section_name(self, line): - raise self.parse_exc('Empty section name', self.lineno, line) + raise NotImplementedError() class ConfigParser(BaseParser): """Parses a single config file, populating 'sections' to look like: - {'DEFAULT': {'key': [value, ...], ...}, - ...} + [ + [ + 'section1', + [ + ['key1', 'value1\nvalue2'], + ['key2', 'value3\nvalue4'], + ], + ], + [ + 'section2', + [ + ['key3', 'value5\nvalue6'], + ], + ], + ] """ - def __init__(self, filename, sections): + def __init__(self, filename, sections=None): super(ConfigParser, self).__init__() self.filename = filename - self.sections = sections + if sections is not None: + self.sections = sections + else: + self.sections = [] + self.section_name = None self.section = None - def parse(self): - with open(self.filename) as f: - return super(ConfigParser, self).parse(f) + def parse(self, data=None): + if not data: + data = self.filename + with open(data) as f: + return self._parse(f) - def find_section(self, sections, section): - return next((i for i, sect in enumerate(sections) if sect == section), -1) + def __iter__(self): + return iter(self.sections) - def new_section(self, section): - self.section = section - index = self.find_section(self.sections, section) - if index == -1: - self.sections.append([section, []]) + def find_section_index(self, section_name): + return next((i for i, (name, value) in enumerate(self) if name == section_name), -1) - def assignment(self, key, value): - if not self.section: - raise self.error_no_section() + def find_section(self, section_name): + return next((value for name, value in self.sections if name == section_name), None) - value = '\n'.join(value) - - def append(sections, section): - entry = [key, value] - index = self.find_section(sections, section) - sections[index][1].append(entry) + def new_section(self, line_parser): + section_name = line_parser.section_name + index = self.find_section_index(section_name) + self.section_name = section_name + if index == -1: + self.section = [section_name, []] + self.sections.append(self.section) + else: + self.section = self.sections[index] - append(self.sections, self.section) + def _assignment(self, key, value, line_parser): + if not self.section_name: + raise line_parser.error_no_section_name() - def parse_exc(self, msg, lineno, line=None): - return ParseError(msg, lineno, line) + value = '\n'.join(value) + entry = [key, value] + self.section[1].append(entry) - def error_no_section(self): - return self.parse_exc('Section must be started before assignment', - self.lineno) + def comment(self, line_parser): + """Called when a comment is parsed.""" + pass -- cgit 1.2.3-korg