aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoss Brattain <ross.b.brattain@intel.com>2017-09-12 18:22:30 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-09-12 18:22:30 +0000
commit2430ee2bbede7c26f50883ed3bf8d2ab92f9c66b (patch)
treebe033c3859f267cd42956c27dbbc0a73ad453290
parentaff74a1ac608b430dc10d62e3ea543c3061586ce (diff)
parent03feb49199097769cbc8900801855a912769fc9a (diff)
Merge "Added line parser to INI parser"
-rw-r--r--tests/unit/network_services/vnf_generic/vnf/test_iniparser.py52
-rw-r--r--yardstick/network_services/vnf_generic/vnf/iniparser.py299
2 files changed, 226 insertions, 125 deletions
diff --git a/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py b/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py
index 15d6adea1..1ad8df9c6 100644
--- a/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py
+++ b/tests/unit/network_services/vnf_generic/vnf/test_iniparser.py
@@ -28,6 +28,7 @@ stl_patch.start()
if stl_patch:
from yardstick.network_services.vnf_generic.vnf.iniparser import ParseError
+ from yardstick.network_services.vnf_generic.vnf.iniparser import LineParser
from yardstick.network_services.vnf_generic.vnf.iniparser import BaseParser
from yardstick.network_services.vnf_generic.vnf.iniparser import ConfigParser
@@ -38,16 +39,18 @@ key1=value1
list1: value2
value3
value4
-key2="double quote value"
key3='single quote value' ; comment here
key4=
-[section2]
+[section2] ; comment with #2 other symbol
# here is a comment line
list2: value5
-key with no value
+key with no value # mixed comment ; symbols
; another comment line
key5=
+
+[section1] ; reopen a section!
+key2="double quote value"
"""
PARSE_TEXT_2 = """\
@@ -86,6 +89,17 @@ class TestParseError(unittest.TestCase):
self.assertEqual(str(error), "at line 2, a: 'c'")
+class TestLineParser(unittest.TestCase):
+
+ def test___repr__(self):
+ line_parser = LineParser('', 101)
+ self.assertIsNotNone(repr(line_parser))
+
+ def test_error_invalid_assignment(self):
+ line_parser = LineParser('', 101)
+ self.assertIsNotNone(line_parser.error_invalid_assignment())
+
+
class TestBaseParser(unittest.TestCase):
@staticmethod
@@ -96,23 +110,26 @@ class TestBaseParser(unittest.TestCase):
return internal_open
- @mock.patch('yardstick.network_services.vnf_generic.vnf.iniparser.open')
- def test_parse_none(self, mock_open):
- mock_open.side_effect = self.make_open('')
-
+ def test_parse(self):
parser = BaseParser()
+ parser.parse()
- parser.parse([])
+ def test_parse_empty_string(self):
+ parser = BaseParser()
+ self.assertIsNone(parser.parse(''))
def test_not_implemented_methods(self):
parser = BaseParser()
with self.assertRaises(NotImplementedError):
- parser.assignment('key', 'value')
+ parser.assignment('key', 'value', LineParser('', 100))
with self.assertRaises(NotImplementedError):
parser.new_section('section')
+ with self.assertRaises(NotImplementedError):
+ parser.comment('comment')
+
class TestConfigParser(unittest.TestCase):
@@ -128,18 +145,25 @@ class TestConfigParser(unittest.TestCase):
def test_parse(self, mock_open):
mock_open.side_effect = self.make_open(PARSE_TEXT_1)
- config_parser = ConfigParser('my_file', [])
+ existing_data = [['section0', [['key0', 'value0']]]]
+ config_parser = ConfigParser('my_file', existing_data)
config_parser.parse()
expected = [
[
+ 'section0',
+ [
+ ['key0', 'value0'],
+ ],
+ ],
+ [
'section1',
[
['key1', 'value1'],
['list1', 'value2\nvalue3\nvalue4'],
- ['key2', 'double quote value'],
['key3', 'single quote value'],
['key4', ''],
+ ['key2', 'double quote value'],
],
],
[
@@ -153,12 +177,16 @@ class TestConfigParser(unittest.TestCase):
]
self.assertEqual(config_parser.sections, expected)
+ self.assertIsNotNone(config_parser.find_section('section1'))
+ self.assertIsNone(config_parser.find_section('section3'))
+ self.assertEqual(config_parser.find_section_index('section1'), 1)
+ self.assertEqual(config_parser.find_section_index('section3'), -1)
@mock.patch('yardstick.network_services.vnf_generic.vnf.iniparser.open')
def test_parse_2(self, mock_open):
mock_open.side_effect = self.make_open(PARSE_TEXT_2)
- config_parser = ConfigParser('my_file', [])
+ config_parser = ConfigParser('my_file')
config_parser.parse()
expected = [
diff --git a/yardstick/network_services/vnf_generic/vnf/iniparser.py b/yardstick/network_services/vnf_generic/vnf/iniparser.py
index 70e24de5b..98256e08a 100644
--- a/yardstick/network_services/vnf_generic/vnf/iniparser.py
+++ b/yardstick/network_services/vnf_generic/vnf/iniparser.py
@@ -14,163 +14,236 @@
class ParseError(Exception):
- def __init__(self, message, lineno, line):
+
+ def __init__(self, message, line_no, line):
self.msg = message
self.line = line
- self.lineno = lineno
+ self.line_no = line_no
def __str__(self):
- return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line)
+ return 'at line %d, %s: %r' % (self.line_no, self.msg, self.line)
+
+
+class SectionParseError(ParseError):
+
+ pass
+
+
+class LineParser(object):
+
+ PARSE_EXC = ParseError
+
+ @staticmethod
+ def strip_key_value(key, value):
+ key = key.strip()
+ value = value.strip()
+ if value and value[0] == value[-1] and value.startswith(('"', "'")):
+ value = value[1:-1]
+ return key, [value]
+
+ def __init__(self, line, line_no):
+ super(LineParser, self).__init__()
+ self.line = line
+ self.line_no = line_no
+ self.continuation = line != line.lstrip()
+ semi_active, _, semi_comment = line.partition(';')
+ pound_active, _, pound_comment = line.partition('#')
+ if not semi_comment and not pound_comment:
+ self.active = line.strip()
+ self.comment = ''
+ elif len(semi_comment) > len(pound_comment):
+ self.active = semi_active.strip()
+ self.comment = semi_comment.strip()
+ else:
+ self.active = pound_active.strip()
+ self.comment = pound_comment.strip()
+ self._section_name = None
+
+ def __repr__(self):
+ template = "line %d: active '%s' comment '%s'\n%s"
+ return template % (self.line_no, self.active, self.comment, self.line)
+
+ @property
+ def section_name(self):
+ if self._section_name is None:
+ if not self.active.startswith('['):
+ raise self.error_no_section_start_bracket()
+ if not self.active.endswith(']'):
+ raise self.error_no_section_end_bracket()
+ self._section_name = ''
+ if self.active:
+ self._section_name = self.active[1:-1]
+ if not self._section_name:
+ raise self.error_no_section_name()
+ return self._section_name
+
+ def is_active_line(self):
+ return bool(self.active)
+
+ def is_continuation(self):
+ return self.continuation
+
+ def split_key_value(self):
+ for sep in ['=', ':']:
+ words = self.active.split(sep, 1)
+ try:
+ return self.strip_key_value(*words)
+ except TypeError:
+ pass
+
+ return self.active.rstrip(), '@'
+
+ def error_invalid_assignment(self):
+ return self.PARSE_EXC("No ':' or '=' found in assignment", self.line_no, self.line)
+
+ def error_empty_key(self):
+ return self.PARSE_EXC('Key cannot be empty', self.line_no, self.line)
+
+ def error_unexpected_continuation(self):
+ return self.PARSE_EXC('Unexpected continuation line', self.line_no, self.line)
+
+ def error_no_section_start_bracket(self):
+ return SectionParseError('Invalid section (must start with [)', self.line_no, self.line)
+
+ def error_no_section_end_bracket(self):
+ return self.PARSE_EXC('Invalid section (must end with ])', self.line_no, self.line)
+
+ def error_no_section_name(self):
+ return self.PARSE_EXC('Empty section name', self.line_no, self.line)
class BaseParser(object):
- lineno = 0
- parse_exc = ParseError
- def _assignment(self, key, value):
- self.assignment(key, value)
- return None, []
+ def parse(self, data=None):
+ if data is not None:
+ return self._parse(data.splitlines())
- def _get_section(self, line):
- if not line.endswith(']'):
- return self.error_no_section_end_bracket(line)
- if len(line) <= 2:
- return self.error_no_section_name(line)
+ def _next_key_value(self, line_parser, key, value):
+ self.comment(line_parser)
- return line[1:-1]
+ if not line_parser.is_active_line():
+ # Blank line, ends multi-line values
+ if key:
+ key, value = self.assignment(key, value, line_parser)
+ return key, value
- def _split_key_value(self, line):
- colon = line.find(':')
- equal = line.find('=')
- if colon < 0 and equal < 0:
- return line.strip(), '@'
+ if line_parser.is_continuation():
+ # Continuation of previous assignment
+ if key is None:
+ raise line_parser.error_unexpected_continuation()
- if colon < 0 or (equal >= 0 and equal < colon):
- key, value = line[:equal], line[equal + 1:]
+ value.append(line_parser.active.lstrip())
+ return key, value
+
+ if key:
+ # Flush previous assignment, if any
+ key, value = self.assignment(key, value, line_parser)
+
+ try:
+ # Section start
+ self.new_section(line_parser)
+ except SectionParseError:
+ pass
else:
- key, value = line[:colon], line[colon + 1:]
+ return key, value
- value = value.strip()
- if value and value[0] == value[-1] and value.startswith(("\"", "'")):
- value = value[1:-1]
- return key.strip(), [value]
+ key, value = line_parser.split_key_value()
+ if not key:
+ raise line_parser.error_empty_key()
+ return key, value
- def parse(self, lineiter):
+ def _parse(self, line_iter):
key = None
value = []
- for line in lineiter:
- self.lineno += 1
-
- line = line.rstrip()
- lines = line.split(';')
- line = lines[0]
- if not line:
- # Blank line, ends multi-line values
- if key:
- key, value = self._assignment(key, value)
- continue
- elif line.startswith((' ', '\t')):
- # Continuation of previous assignment
- if key is None:
- self.error_unexpected_continuation(line)
- else:
- value.append(line.lstrip())
- continue
-
- if key:
- # Flush previous assignment, if any
- key, value = self._assignment(key, value)
-
- if line.startswith('['):
- # Section start
- section = self._get_section(line)
- if section:
- self.new_section(section)
- elif line.startswith(('#', ';')):
- self.comment(line[1:].lstrip())
- else:
- key, value = self._split_key_value(line)
- if not key:
- return self.error_empty_key(line)
+ parse_iter = (LineParser(line, line_no) for line_no, line in enumerate(line_iter))
+ for line_parser in parse_iter:
+ key, value = self._next_key_value(line_parser, key, value)
if key:
# Flush previous assignment, if any
- self._assignment(key, value)
+ self.assignment(key, value, LineParser('EOF', -1))
- def assignment(self, key, value):
+ def _assignment(self, key, value, line_parser):
"""Called when a full assignment is parsed."""
raise NotImplementedError()
- def new_section(self, section):
+ def assignment(self, key, value, line_parser):
+ self._assignment(key, value, line_parser)
+ return None, []
+
+ def new_section(self, line_parser):
"""Called when a new section is started."""
raise NotImplementedError()
- def comment(self, comment):
+ def comment(self, line_parser):
"""Called when a comment is parsed."""
- pass
-
- def error_invalid_assignment(self, line):
- raise self.parse_exc("No ':' or '=' found in assignment",
- self.lineno, line)
-
- def error_empty_key(self, line):
- raise self.parse_exc('Key cannot be empty', self.lineno, line)
-
- def error_unexpected_continuation(self, line):
- raise self.parse_exc('Unexpected continuation line',
- self.lineno, line)
-
- def error_no_section_end_bracket(self, line):
- raise self.parse_exc('Invalid section (must end with ])',
- self.lineno, line)
-
- def error_no_section_name(self, line):
- raise self.parse_exc('Empty section name', self.lineno, line)
+ raise NotImplementedError()
class ConfigParser(BaseParser):
"""Parses a single config file, populating 'sections' to look like:
- {'DEFAULT': {'key': [value, ...], ...},
- ...}
+ [
+ [
+ 'section1',
+ [
+ ['key1', 'value1\nvalue2'],
+ ['key2', 'value3\nvalue4'],
+ ],
+ ],
+ [
+ 'section2',
+ [
+ ['key3', 'value5\nvalue6'],
+ ],
+ ],
+ ]
"""
- def __init__(self, filename, sections):
+ def __init__(self, filename, sections=None):
super(ConfigParser, self).__init__()
self.filename = filename
- self.sections = sections
+ if sections is not None:
+ self.sections = sections
+ else:
+ self.sections = []
+ self.section_name = None
self.section = None
- def parse(self):
- with open(self.filename) as f:
- return super(ConfigParser, self).parse(f)
+ def parse(self, data=None):
+ if not data:
+ data = self.filename
+ with open(data) as f:
+ return self._parse(f)
- def find_section(self, sections, section):
- return next((i for i, sect in enumerate(sections) if sect == section), -1)
+ def __iter__(self):
+ return iter(self.sections)
- def new_section(self, section):
- self.section = section
- index = self.find_section(self.sections, section)
- if index == -1:
- self.sections.append([section, []])
+ def find_section_index(self, section_name):
+ return next((i for i, (name, value) in enumerate(self) if name == section_name), -1)
- def assignment(self, key, value):
- if not self.section:
- raise self.error_no_section()
+ def find_section(self, section_name):
+ return next((value for name, value in self.sections if name == section_name), None)
- value = '\n'.join(value)
-
- def append(sections, section):
- entry = [key, value]
- index = self.find_section(sections, section)
- sections[index][1].append(entry)
+ def new_section(self, line_parser):
+ section_name = line_parser.section_name
+ index = self.find_section_index(section_name)
+ self.section_name = section_name
+ if index == -1:
+ self.section = [section_name, []]
+ self.sections.append(self.section)
+ else:
+ self.section = self.sections[index]
- append(self.sections, self.section)
+ def _assignment(self, key, value, line_parser):
+ if not self.section_name:
+ raise line_parser.error_no_section_name()
- def parse_exc(self, msg, lineno, line=None):
- return ParseError(msg, lineno, line)
+ value = '\n'.join(value)
+ entry = [key, value]
+ self.section[1].append(entry)
- def error_no_section(self):
- return self.parse_exc('Section must be started before assignment',
- self.lineno)
+ def comment(self, line_parser):
+ """Called when a comment is parsed."""
+ pass