aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/configparser.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/configparser.py')
-rw-r--r--Lib/configparser.py330
1 files changed, 183 insertions, 147 deletions
diff --git a/Lib/configparser.py b/Lib/configparser.py
index 3040e1fbe5b..d0326c60e9b 100644
--- a/Lib/configparser.py
+++ b/Lib/configparser.py
@@ -145,12 +145,15 @@ ConfigParser -- responsible for parsing a list of
from collections.abc import MutableMapping
from collections import ChainMap as _ChainMap
+import contextlib
+from dataclasses import dataclass, field
import functools
import io
import itertools
import os
import re
import sys
+from typing import Iterable
__all__ = ("NoSectionError", "DuplicateOptionError", "DuplicateSectionError",
"NoOptionError", "InterpolationError", "InterpolationDepthError",
@@ -302,15 +305,33 @@ class InterpolationDepthError(InterpolationError):
class ParsingError(Error):
"""Raised when a configuration file does not follow legal syntax."""
- def __init__(self, source):
+ def __init__(self, source, *args):
super().__init__(f'Source contains parsing errors: {source!r}')
self.source = source
self.errors = []
self.args = (source, )
+ if args:
+ self.append(*args)
def append(self, lineno, line):
self.errors.append((lineno, line))
- self.message += '\n\t[line %2d]: %s' % (lineno, line)
+ self.message += '\n\t[line %2d]: %s' % (lineno, repr(line))
+
+ def combine(self, others):
+ for other in others:
+ for error in other.errors:
+ self.append(*error)
+ return self
+
+ @staticmethod
+ def _raise_all(exceptions: Iterable['ParsingError']):
+ """
+ Combine any number of ParsingErrors into one and raise it.
+ """
+ exceptions = iter(exceptions)
+ with contextlib.suppress(StopIteration):
+ raise next(exceptions).combine(exceptions)
+
class MissingSectionHeaderError(ParsingError):
@@ -517,6 +538,55 @@ class ExtendedInterpolation(Interpolation):
"found: %r" % (rest,))
+@dataclass
+class _ReadState:
+ elements_added : set[str] = field(default_factory=set)
+ cursect : dict[str, str] | None = None
+ sectname : str | None = None
+ optname : str | None = None
+ lineno : int = 0
+ indent_level : int = 0
+ errors : list[ParsingError] = field(default_factory=list)
+
+
+@dataclass
+class _Prefixes:
+ full : Iterable[str]
+ inline : Iterable[str]
+
+
+class _Line(str):
+
+ def __new__(cls, val, *args, **kwargs):
+ return super().__new__(cls, val)
+
+ def __init__(self, val, prefixes: _Prefixes):
+ self.prefixes = prefixes
+
+ @functools.cached_property
+ def clean(self):
+ return self._strip_full() and self._strip_inline()
+
+ @property
+ def has_comments(self):
+ return self.strip() != self.clean
+
+ def _strip_inline(self):
+ """
+ Search for the earliest prefix at the beginning of the line or following a space.
+ """
+ matcher = re.compile(
+ '|'.join(fr'(^|\s)({re.escape(prefix)})' for prefix in self.prefixes.inline)
+ # match nothing if no prefixes
+ or '(?!)'
+ )
+ match = matcher.search(self)
+ return self[:match.start() if match else None].strip()
+
+ def _strip_full(self):
+ return '' if any(map(self.strip().startswith, self.prefixes.full)) else True
+
+
class RawConfigParser(MutableMapping):
"""ConfigParser that does not do interpolation."""
@@ -583,8 +653,10 @@ class RawConfigParser(MutableMapping):
else:
self._optcre = re.compile(self._OPT_TMPL.format(delim=d),
re.VERBOSE)
- self._comment_prefixes = tuple(comment_prefixes or ())
- self._inline_comment_prefixes = tuple(inline_comment_prefixes or ())
+ self._prefixes = _Prefixes(
+ full=tuple(comment_prefixes or ()),
+ inline=tuple(inline_comment_prefixes or ()),
+ )
self._strict = strict
self._allow_no_value = allow_no_value
self._empty_lines_in_values = empty_lines_in_values
@@ -975,147 +1047,117 @@ class RawConfigParser(MutableMapping):
in an otherwise empty line or may be entered in lines holding values or
section names. Please note that comments get stripped off when reading configuration files.
"""
- elements_added = set()
- cursect = None # None, or a dictionary
- sectname = None
- optname = None
- lineno = 0
- indent_level = 0
- e = None # None, or an exception
try:
- for lineno, line in enumerate(fp, start=1):
- comment_start = sys.maxsize
- # strip inline comments
- inline_prefixes = {p: -1 for p in self._inline_comment_prefixes}
- while comment_start == sys.maxsize and inline_prefixes:
- next_prefixes = {}
- for prefix, index in inline_prefixes.items():
- index = line.find(prefix, index+1)
- if index == -1:
- continue
- next_prefixes[prefix] = index
- if index == 0 or (index > 0 and line[index-1].isspace()):
- comment_start = min(comment_start, index)
- inline_prefixes = next_prefixes
- # strip full line comments
- for prefix in self._comment_prefixes:
- if line.strip().startswith(prefix):
- comment_start = 0
- break
- if comment_start == sys.maxsize:
- comment_start = None
- value = line[:comment_start].strip()
- if not value:
- if self._empty_lines_in_values:
- # add empty line to the value, but only if there was no
- # comment on the line
- if (comment_start is None and
- cursect is not None and
- optname and
- cursect[optname] is not None):
- cursect[optname].append('') # newlines added at join
- else:
- # empty line marks end of value
- indent_level = sys.maxsize
- continue
- # continuation line?
- first_nonspace = self.NONSPACECRE.search(line)
- cur_indent_level = first_nonspace.start() if first_nonspace else 0
- if (cursect is not None and optname and
- cur_indent_level > indent_level):
- if cursect[optname] is None:
- raise MultilineContinuationError(fpname, lineno, line)
- cursect[optname].append(value)
- # a section header or option header?
- else:
- if self._allow_unnamed_section and cursect is None:
- sectname = UNNAMED_SECTION
- cursect = self._dict()
- self._sections[sectname] = cursect
- self._proxies[sectname] = SectionProxy(self, sectname)
- elements_added.add(sectname)
-
- indent_level = cur_indent_level
- # is it a section header?
- mo = self.SECTCRE.match(value)
- if mo:
- sectname = mo.group('header')
- if sectname in self._sections:
- if self._strict and sectname in elements_added:
- raise DuplicateSectionError(sectname, fpname,
- lineno)
- cursect = self._sections[sectname]
- elements_added.add(sectname)
- elif sectname == self.default_section:
- cursect = self._defaults
- else:
- cursect = self._dict()
- self._sections[sectname] = cursect
- self._proxies[sectname] = SectionProxy(self, sectname)
- elements_added.add(sectname)
- # So sections can't start with a continuation line
- optname = None
- # no section header?
- elif cursect is None:
- raise MissingSectionHeaderError(fpname, lineno, line)
- # an option line?
- else:
- indent_level = cur_indent_level
- # is it a section header?
- mo = self.SECTCRE.match(value)
- if mo:
- sectname = mo.group('header')
- if sectname in self._sections:
- if self._strict and sectname in elements_added:
- raise DuplicateSectionError(sectname, fpname,
- lineno)
- cursect = self._sections[sectname]
- elements_added.add(sectname)
- elif sectname == self.default_section:
- cursect = self._defaults
- else:
- cursect = self._dict()
- self._sections[sectname] = cursect
- self._proxies[sectname] = SectionProxy(self, sectname)
- elements_added.add(sectname)
- # So sections can't start with a continuation line
- optname = None
- # no section header in the file?
- elif cursect is None:
- raise MissingSectionHeaderError(fpname, lineno, line)
- # an option line?
- else:
- mo = self._optcre.match(value)
- if mo:
- optname, vi, optval = mo.group('option', 'vi', 'value')
- if not optname:
- e = self._handle_error(e, fpname, lineno, line)
- optname = self.optionxform(optname.rstrip())
- if (self._strict and
- (sectname, optname) in elements_added):
- raise DuplicateOptionError(sectname, optname,
- fpname, lineno)
- elements_added.add((sectname, optname))
- # This check is fine because the OPTCRE cannot
- # match if it would set optval to None
- if optval is not None:
- optval = optval.strip()
- cursect[optname] = [optval]
- else:
- # valueless option handling
- cursect[optname] = None
- else:
- # a non-fatal parsing error occurred. set up the
- # exception but keep going. the exception will be
- # raised at the end of the file and will contain a
- # list of all bogus lines
- e = self._handle_error(e, fpname, lineno, line)
+ ParsingError._raise_all(self._read_inner(fp, fpname))
finally:
self._join_multiline_values()
- # if any parsing errors occurred, raise an exception
- if e:
- raise e
+
+ def _read_inner(self, fp, fpname):
+ st = _ReadState()
+
+ Line = functools.partial(_Line, prefixes=self._prefixes)
+ for st.lineno, line in enumerate(map(Line, fp), start=1):
+ if not line.clean:
+ if self._empty_lines_in_values:
+ # add empty line to the value, but only if there was no
+ # comment on the line
+ if (not line.has_comments and
+ st.cursect is not None and
+ st.optname and
+ st.cursect[st.optname] is not None):
+ st.cursect[st.optname].append('') # newlines added at join
+ else:
+ # empty line marks end of value
+ st.indent_level = sys.maxsize
+ continue
+
+ first_nonspace = self.NONSPACECRE.search(line)
+ st.cur_indent_level = first_nonspace.start() if first_nonspace else 0
+
+ if self._handle_continuation_line(st, line, fpname):
+ continue
+
+ self._handle_rest(st, line, fpname)
+
+ return st.errors
+
+ def _handle_continuation_line(self, st, line, fpname):
+ # continuation line?
+ is_continue = (st.cursect is not None and st.optname and
+ st.cur_indent_level > st.indent_level)
+ if is_continue:
+ if st.cursect[st.optname] is None:
+ raise MultilineContinuationError(fpname, st.lineno, line)
+ st.cursect[st.optname].append(line.clean)
+ return is_continue
+
+ def _handle_rest(self, st, line, fpname):
+ # a section header or option header?
+ if self._allow_unnamed_section and st.cursect is None:
+ st.sectname = UNNAMED_SECTION
+ st.cursect = self._dict()
+ self._sections[st.sectname] = st.cursect
+ self._proxies[st.sectname] = SectionProxy(self, st.sectname)
+ st.elements_added.add(st.sectname)
+
+ st.indent_level = st.cur_indent_level
+ # is it a section header?
+ mo = self.SECTCRE.match(line.clean)
+
+ if not mo and st.cursect is None:
+ raise MissingSectionHeaderError(fpname, st.lineno, line)
+
+ self._handle_header(st, mo, fpname) if mo else self._handle_option(st, line, fpname)
+
+ def _handle_header(self, st, mo, fpname):
+ st.sectname = mo.group('header')
+ if st.sectname in self._sections:
+ if self._strict and st.sectname in st.elements_added:
+ raise DuplicateSectionError(st.sectname, fpname,
+ st.lineno)
+ st.cursect = self._sections[st.sectname]
+ st.elements_added.add(st.sectname)
+ elif st.sectname == self.default_section:
+ st.cursect = self._defaults
+ else:
+ st.cursect = self._dict()
+ self._sections[st.sectname] = st.cursect
+ self._proxies[st.sectname] = SectionProxy(self, st.sectname)
+ st.elements_added.add(st.sectname)
+ # So sections can't start with a continuation line
+ st.optname = None
+
+ def _handle_option(self, st, line, fpname):
+ # an option line?
+ st.indent_level = st.cur_indent_level
+
+ mo = self._optcre.match(line.clean)
+ if not mo:
+ # a non-fatal parsing error occurred. set up the
+ # exception but keep going. the exception will be
+ # raised at the end of the file and will contain a
+ # list of all bogus lines
+ st.errors.append(ParsingError(fpname, st.lineno, line))
+ return
+
+ st.optname, vi, optval = mo.group('option', 'vi', 'value')
+ if not st.optname:
+ st.errors.append(ParsingError(fpname, st.lineno, line))
+ st.optname = self.optionxform(st.optname.rstrip())
+ if (self._strict and
+ (st.sectname, st.optname) in st.elements_added):
+ raise DuplicateOptionError(st.sectname, st.optname,
+ fpname, st.lineno)
+ st.elements_added.add((st.sectname, st.optname))
+ # This check is fine because the OPTCRE cannot
+ # match if it would set optval to None
+ if optval is not None:
+ optval = optval.strip()
+ st.cursect[st.optname] = [optval]
+ else:
+ # valueless option handling
+ st.cursect[st.optname] = None
def _join_multiline_values(self):
defaults = self.default_section, self._defaults
@@ -1135,12 +1177,6 @@ class RawConfigParser(MutableMapping):
for key, value in defaults.items():
self._defaults[self.optionxform(key)] = value
- def _handle_error(self, exc, fpname, lineno, line):
- if not exc:
- exc = ParsingError(fpname)
- exc.append(lineno, repr(line))
- return exc
-
def _unify_values(self, section, vars):
"""Create a sequence of lookups with 'vars' taking priority over
the 'section' which takes priority over the DEFAULTSECT.