From 03feb49199097769cbc8900801855a912769fc9a Mon Sep 17 00:00:00 2001 From: Edward MacGillivray Date: Tue, 5 Sep 2017 13:20:20 -0700 Subject: Added line parser to INI parser Line parser handles comments, keys and values and makes exceptions. Change-Id: I5cd3612ffd8cb08b14051bd0ef4b757c310f77bd Signed-off-by: Edward MacGillivray --- .../vnf_generic/vnf/test_iniparser.py | 52 +++- .../network_services/vnf_generic/vnf/iniparser.py | 299 +++++++++++++-------- 2 files changed, 226 insertions(+), 125 deletions(-) diff --git a/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py b/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py index 15d6adea1..1ad8df9c6 100644 --- a/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py +++ b/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py @@ -28,6 +28,7 @@ stl_patch.start() if stl_patch: from yardstick.network_services.vnf_generic.vnf.iniparser import ParseError + from yardstick.network_services.vnf_generic.vnf.iniparser import LineParser from yardstick.network_services.vnf_generic.vnf.iniparser import BaseParser from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser @@ -38,16 +39,18 @@ key1=value1 list1: value2 value3 value4 -key2="double quote value" key3='single quote value' ; comment here key4= -[section2] +[section2] ; comment with #2 other symbol # here is a comment line list2: value5 -key with no value +key with no value # mixed comment ; symbols ; another comment line key5= + +[section1] ; reopen a section! +key2="double quote value" """ PARSE_TEXT_2 = """\ @@ -86,6 +89,17 @@ class TestParseError(unittest.TestCase): self.assertEqual(str(error), "at line 2, a: 'c'") +class TestLineParser(unittest.TestCase): + + def test___repr__(self): + line_parser = LineParser('', 101) + self.assertIsNotNone(repr(line_parser)) + + def test_error_invalid_assignment(self): + line_parser = LineParser('', 101) + self.assertIsNotNone(line_parser.error_invalid_assignment()) + + class TestBaseParser(unittest.TestCase): @staticmethod @@ -96,23 +110,26 @@ class TestBaseParser(unittest.TestCase): return internal_open - @mock.patch('yardstick.network_services.vnf_generic.vnf.iniparser.open') - def test_parse_none(self, mock_open): - mock_open.side_effect = self.make_open('') - + def test_parse(self): parser = BaseParser() + parser.parse() - parser.parse([]) + def test_parse_empty_string(self): + parser = BaseParser() + self.assertIsNone(parser.parse('')) def test_not_implemented_methods(self): parser = BaseParser() with self.assertRaises(NotImplementedError): - parser.assignment('key', 'value') + parser.assignment('key', 'value', LineParser('', 100)) with self.assertRaises(NotImplementedError): parser.new_section('section') + with self.assertRaises(NotImplementedError): + parser.comment('comment') + class TestConfigParser(unittest.TestCase): @@ -128,18 +145,25 @@ class TestConfigParser(unittest.TestCase): def test_parse(self, mock_open): mock_open.side_effect = self.make_open(PARSE_TEXT_1) - config_parser = ConfigParser('my_file', []) + existing_data = [['section0', [['key0', 'value0']]]] + config_parser = ConfigParser('my_file', existing_data) config_parser.parse() expected = [ + [ + 'section0', + [ + ['key0', 'value0'], + ], + ], [ 'section1', [ ['key1', 'value1'], ['list1', 'value2\nvalue3\nvalue4'], - ['key2', 'double quote value'], ['key3', 'single quote value'], ['key4', ''], + ['key2', 'double quote value'], ], ], [ @@ -153,12 +177,16 @@ class TestConfigParser(unittest.TestCase): ] self.assertEqual(config_parser.sections, expected) + self.assertIsNotNone(config_parser.find_section('section1')) + self.assertIsNone(config_parser.find_section('section3')) + self.assertEqual(config_parser.find_section_index('section1'), 1) + self.assertEqual(config_parser.find_section_index('section3'), -1) @mock.patch('yardstick.network_services.vnf_generic.vnf.iniparser.open') def test_parse_2(self, mock_open): mock_open.side_effect = self.make_open(PARSE_TEXT_2) - config_parser = ConfigParser('my_file', []) + config_parser = ConfigParser('my_file') config_parser.parse() expected = [ diff --git a/yardstick/network_services/vnf_generic/vnf/iniparser.py b/yardstick/network_services/vnf_generic/vnf/iniparser.py index 70e24de5b..98256e08a 100644 --- a/yardstick/network_services/vnf_generic/vnf/iniparser.py +++ b/yardstick/network_services/vnf_generic/vnf/iniparser.py @@ -14,163 +14,236 @@ class ParseError(Exception): - def __init__(self, message, lineno, line): + + def __init__(self, message, line_no, line): self.msg = message self.line = line - self.lineno = lineno + self.line_no = line_no def __str__(self): - return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line) + return 'at line %d, %s: %r' % (self.line_no, self.msg, self.line) + + +class SectionParseError(ParseError): + + pass + + +class LineParser(object): + + PARSE_EXC = ParseError + + @staticmethod + def strip_key_value(key, value): + key = key.strip() + value = value.strip() + if value and value[0] == value[-1] and value.startswith(('"', "'")): + value = value[1:-1] + return key, [value] + + def __init__(self, line, line_no): + super(LineParser, self).__init__() + self.line = line + self.line_no = line_no + self.continuation = line != line.lstrip() + semi_active, _, semi_comment = line.partition(';') + pound_active, _, pound_comment = line.partition('#') + if not semi_comment and not pound_comment: + self.active = line.strip() + self.comment = '' + elif len(semi_comment) > len(pound_comment): + self.active = semi_active.strip() + self.comment = semi_comment.strip() + else: + self.active = pound_active.strip() + self.comment = pound_comment.strip() + self._section_name = None + + def __repr__(self): + template = "line %d: active '%s' comment '%s'\n%s" + return template % (self.line_no, self.active, self.comment, self.line) + + @property + def section_name(self): + if self._section_name is None: + if not self.active.startswith('['): + raise self.error_no_section_start_bracket() + if not self.active.endswith(']'): + raise self.error_no_section_end_bracket() + self._section_name = '' + if self.active: + self._section_name = self.active[1:-1] + if not self._section_name: + raise self.error_no_section_name() + return self._section_name + + def is_active_line(self): + return bool(self.active) + + def is_continuation(self): + return self.continuation + + def split_key_value(self): + for sep in ['=', ':']: + words = self.active.split(sep, 1) + try: + return self.strip_key_value(*words) + except TypeError: + pass + + return self.active.rstrip(), '@' + + def error_invalid_assignment(self): + return self.PARSE_EXC("No ':' or '=' found in assignment", self.line_no, self.line) + + def error_empty_key(self): + return self.PARSE_EXC('Key cannot be empty', self.line_no, self.line) + + def error_unexpected_continuation(self): + return self.PARSE_EXC('Unexpected continuation line', self.line_no, self.line) + + def error_no_section_start_bracket(self): + return SectionParseError('Invalid section (must start with [)', self.line_no, self.line) + + def error_no_section_end_bracket(self): + return self.PARSE_EXC('Invalid section (must end with ])', self.line_no, self.line) + + def error_no_section_name(self): + return self.PARSE_EXC('Empty section name', self.line_no, self.line) class BaseParser(object): - lineno = 0 - parse_exc = ParseError - def _assignment(self, key, value): - self.assignment(key, value) - return None, [] + def parse(self, data=None): + if data is not None: + return self._parse(data.splitlines()) - def _get_section(self, line): - if not line.endswith(']'): - return self.error_no_section_end_bracket(line) - if len(line) <= 2: - return self.error_no_section_name(line) + def _next_key_value(self, line_parser, key, value): + self.comment(line_parser) - return line[1:-1] + if not line_parser.is_active_line(): + # Blank line, ends multi-line values + if key: + key, value = self.assignment(key, value, line_parser) + return key, value - def _split_key_value(self, line): - colon = line.find(':') - equal = line.find('=') - if colon < 0 and equal < 0: - return line.strip(), '@' + if line_parser.is_continuation(): + # Continuation of previous assignment + if key is None: + raise line_parser.error_unexpected_continuation() - if colon < 0 or (equal >= 0 and equal < colon): - key, value = line[:equal], line[equal + 1:] + value.append(line_parser.active.lstrip()) + return key, value + + if key: + # Flush previous assignment, if any + key, value = self.assignment(key, value, line_parser) + + try: + # Section start + self.new_section(line_parser) + except SectionParseError: + pass else: - key, value = line[:colon], line[colon + 1:] + return key, value - value = value.strip() - if value and value[0] == value[-1] and value.startswith(("\"", "'")): - value = value[1:-1] - return key.strip(), [value] + key, value = line_parser.split_key_value() + if not key: + raise line_parser.error_empty_key() + return key, value - def parse(self, lineiter): + def _parse(self, line_iter): key = None value = [] - for line in lineiter: - self.lineno += 1 - - line = line.rstrip() - lines = line.split(';') - line = lines[0] - if not line: - # Blank line, ends multi-line values - if key: - key, value = self._assignment(key, value) - continue - elif line.startswith((' ', '\t')): - # Continuation of previous assignment - if key is None: - self.error_unexpected_continuation(line) - else: - value.append(line.lstrip()) - continue - - if key: - # Flush previous assignment, if any - key, value = self._assignment(key, value) - - if line.startswith('['): - # Section start - section = self._get_section(line) - if section: - self.new_section(section) - elif line.startswith(('#', ';')): - self.comment(line[1:].lstrip()) - else: - key, value = self._split_key_value(line) - if not key: - return self.error_empty_key(line) + parse_iter = (LineParser(line, line_no) for line_no, line in enumerate(line_iter)) + for line_parser in parse_iter: + key, value = self._next_key_value(line_parser, key, value) if key: # Flush previous assignment, if any - self._assignment(key, value) + self.assignment(key, value, LineParser('EOF', -1)) - def assignment(self, key, value): + def _assignment(self, key, value, line_parser): """Called when a full assignment is parsed.""" raise NotImplementedError() - def new_section(self, section): + def assignment(self, key, value, line_parser): + self._assignment(key, value, line_parser) + return None, [] + + def new_section(self, line_parser): """Called when a new section is started.""" raise NotImplementedError() - def comment(self, comment): + def comment(self, line_parser): """Called when a comment is parsed.""" - pass - - def error_invalid_assignment(self, line): - raise self.parse_exc("No ':' or '=' found in assignment", - self.lineno, line) - - def error_empty_key(self, line): - raise self.parse_exc('Key cannot be empty', self.lineno, line) - - def error_unexpected_continuation(self, line): - raise self.parse_exc('Unexpected continuation line', - self.lineno, line) - - def error_no_section_end_bracket(self, line): - raise self.parse_exc('Invalid section (must end with ])', - self.lineno, line) - - def error_no_section_name(self, line): - raise self.parse_exc('Empty section name', self.lineno, line) + raise NotImplementedError() class ConfigParser(BaseParser): """Parses a single config file, populating 'sections' to look like: - {'DEFAULT': {'key': [value, ...], ...}, - ...} + [ + [ + 'section1', + [ + ['key1', 'value1\nvalue2'], + ['key2', 'value3\nvalue4'], + ], + ], + [ + 'section2', + [ + ['key3', 'value5\nvalue6'], + ], + ], + ] """ - def __init__(self, filename, sections): + def __init__(self, filename, sections=None): super(ConfigParser, self).__init__() self.filename = filename - self.sections = sections + if sections is not None: + self.sections = sections + else: + self.sections = [] + self.section_name = None self.section = None - def parse(self): - with open(self.filename) as f: - return super(ConfigParser, self).parse(f) + def parse(self, data=None): + if not data: + data = self.filename + with open(data) as f: + return self._parse(f) - def find_section(self, sections, section): - return next((i for i, sect in enumerate(sections) if sect == section), -1) + def __iter__(self): + return iter(self.sections) - def new_section(self, section): - self.section = section - index = self.find_section(self.sections, section) - if index == -1: - self.sections.append([section, []]) + def find_section_index(self, section_name): + return next((i for i, (name, value) in enumerate(self) if name == section_name), -1) - def assignment(self, key, value): - if not self.section: - raise self.error_no_section() + def find_section(self, section_name): + return next((value for name, value in self.sections if name == section_name), None) - value = '\n'.join(value) - - def append(sections, section): - entry = [key, value] - index = self.find_section(sections, section) - sections[index][1].append(entry) + def new_section(self, line_parser): + section_name = line_parser.section_name + index = self.find_section_index(section_name) + self.section_name = section_name + if index == -1: + self.section = [section_name, []] + self.sections.append(self.section) + else: + self.section = self.sections[index] - append(self.sections, self.section) + def _assignment(self, key, value, line_parser): + if not self.section_name: + raise line_parser.error_no_section_name() - def parse_exc(self, msg, lineno, line=None): - return ParseError(msg, lineno, line) + value = '\n'.join(value) + entry = [key, value] + self.section[1].append(entry) - def error_no_section(self): - return self.parse_exc('Section must be started before assignment', - self.lineno) + def comment(self, line_parser): + """Called when a comment is parsed.""" + pass -- cgit 1.2.3-korg