aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/email/header.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/email/header.py')
-rw-r--r--Lib/email/header.py662
1 files changed, 347 insertions, 315 deletions
diff --git a/Lib/email/header.py b/Lib/email/header.py
index 2cf870fd575..e33324ad38e 100644
--- a/Lib/email/header.py
+++ b/Lib/email/header.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2002-2006 Python Software Foundation
+# Copyright (C) 2002-2007 Python Software Foundation
# Author: Ben Gertzfield, Barry Warsaw
# Contact: email-sig@python.org
@@ -17,15 +17,16 @@ import email.quoprimime
import email.base64mime
from email.errors import HeaderParseError
-from email.charset import Charset
+from email import charset as _charset
+Charset = _charset.Charset
NL = '\n'
SPACE = ' '
-USPACE = u' '
+BSPACE = b' '
SPACE8 = ' ' * 8
-UEMPTYSTRING = u''
-
-MAXLINELEN = 76
+EMPTYSTRING = ''
+MAXLINELEN = 78
+FWS = ' \t'
USASCII = Charset('us-ascii')
UTF8 = Charset('utf-8')
@@ -61,60 +62,82 @@ _max_append = email.quoprimime._max_append
def decode_header(header):
"""Decode a message header value without converting charset.
- Returns a list of (decoded_string, charset) pairs containing each of the
- decoded parts of the header. Charset is None for non-encoded parts of the
- header, otherwise a lower-case string containing the name of the character
- set specified in the encoded string.
+ Returns a list of (string, charset) pairs containing each of the decoded
+ parts of the header. Charset is None for non-encoded parts of the header,
+ otherwise a lower-case string containing the name of the character set
+ specified in the encoded string.
+
+ header may be a string that may or may not contain RFC2047 encoded words,
+ or it may be a Header object.
An email.errors.HeaderParseError may be raised when certain decoding error
occurs (e.g. a base64 decoding exception).
"""
- # If no encoding, just return the header
- header = str(header)
+ # If it is a Header object, we can just return the encoded chunks.
+ if hasattr(header, '_chunks'):
+ return [(_charset._encode(string, str(charset)), str(charset))
+ for string, charset in header._chunks]
+ # If no encoding, just return the header with no charset.
if not ecre.search(header):
return [(header, None)]
- decoded = []
- dec = ''
+ # First step is to parse all the encoded parts into triplets of the form
+ # (encoded_string, encoding, charset). For unencoded strings, the last
+ # two parts will be None.
+ words = []
for line in header.splitlines():
- # This line might not have an encoding in it
- if not ecre.search(line):
- decoded.append((line, None))
- continue
parts = ecre.split(line)
while parts:
- unenc = parts.pop(0).strip()
- if unenc:
- # Should we continue a long line?
- if decoded and decoded[-1][1] is None:
- decoded[-1] = (decoded[-1][0] + SPACE + unenc, None)
- else:
- decoded.append((unenc, None))
+ unencoded = parts.pop(0).strip()
+ if unencoded:
+ words.append((unencoded, None, None))
if parts:
- charset, encoding = [s.lower() for s in parts[0:2]]
- encoded = parts[2]
- dec = None
- if encoding == 'q':
- dec = email.quoprimime.header_decode(encoded)
- elif encoding == 'b':
- paderr = len(encoded) % 4 # Postel's law: add missing padding
- if paderr:
- encoded += '==='[:4 - paderr]
- try:
- dec = email.base64mime.decode(encoded)
- except binascii.Error:
- # Turn this into a higher level exception. BAW: Right
- # now we throw the lower level exception away but
- # when/if we get exception chaining, we'll preserve it.
- raise HeaderParseError
- if dec is None:
- dec = encoded
-
- if decoded and decoded[-1][1] == charset:
- decoded[-1] = (decoded[-1][0] + dec, decoded[-1][1])
- else:
- decoded.append((dec, charset))
- del parts[0:3]
- return decoded
+ charset = parts.pop(0).lower()
+ encoding = parts.pop(0).lower()
+ encoded = parts.pop(0)
+ words.append((encoded, encoding, charset))
+ # The next step is to decode each encoded word by applying the reverse
+ # base64 or quopri transformation. decoded_words is now a list of the
+ # form (decoded_word, charset).
+ decoded_words = []
+ for encoded_string, encoding, charset in words:
+ if encoding is None:
+ # This is an unencoded word.
+ decoded_words.append((encoded_string, charset))
+ elif encoding == 'q':
+ word = email.quoprimime.header_decode(encoded_string)
+ decoded_words.append((word, charset))
+ elif encoding == 'b':
+ paderr = len(encoded_string) % 4 # Postel's law: add missing padding
+ if paderr:
+ encoded_string += '==='[:4 - paderr]
+ try:
+ word = email.base64mime.decode(encoded_string)
+ except binascii.Error:
+ raise HeaderParseError('Base64 decoding error')
+ else:
+ decoded_words.append((word, charset))
+ else:
+ raise AssertionError('Unexpected encoding: ' + encoding)
+ # Now convert all words to bytes and collapse consecutive runs of
+ # similarly encoded words.
+ collapsed = []
+ last_word = last_charset = None
+ for word, charset in decoded_words:
+ if isinstance(word, str):
+ word = bytes(word, 'raw-unicode-escape')
+ if last_word is None:
+ last_word = word
+ last_charset = charset
+ elif charset != last_charset:
+ collapsed.append((last_word, last_charset))
+ last_word = word
+ last_charset = charset
+ elif last_charset is None:
+ last_word += BSPACE + word
+ else:
+ last_word += word
+ collapsed.append((last_word, last_charset))
+ return collapsed
@@ -159,10 +182,11 @@ class Header:
charset is used both as s's initial charset and as the default for
subsequent .append() calls.
- The maximum line length can be specified explicit via maxlinelen. For
+ The maximum line length can be specified explicitly via maxlinelen. For
splitting the first line to a shorter value (to account for the field
header which isn't included in s, e.g. `Subject') pass in the name of
- the field in header_name. The default maxlinelen is 76.
+ the field in header_name. The default maxlinelen is 78 as recommended
+ by RFC 2822.
continuation_ws must be RFC 2822 compliant folding whitespace (usually
either a space or a hard tab) which will be prepended to continuation
@@ -172,60 +196,54 @@ class Header:
"""
if charset is None:
charset = USASCII
- if not isinstance(charset, Charset):
+ elif not isinstance(charset, Charset):
charset = Charset(charset)
self._charset = charset
self._continuation_ws = continuation_ws
- cws_expanded_len = len(continuation_ws.replace('\t', SPACE8))
- # BAW: I believe `chunks' and `maxlinelen' should be non-public.
self._chunks = []
if s is not None:
self.append(s, charset, errors)
if maxlinelen is None:
maxlinelen = MAXLINELEN
+ self._maxlinelen = maxlinelen
if header_name is None:
- # We don't know anything about the field header so the first line
- # is the same length as subsequent lines.
- self._firstlinelen = maxlinelen
+ self._headerlen = 0
else:
- # The first line should be shorter to take into account the field
- # header. Also subtract off 2 extra for the colon and space.
- self._firstlinelen = maxlinelen - len(header_name) - 2
- # Second and subsequent lines should subtract off the length in
- # columns of the continuation whitespace prefix.
- self._maxlinelen = maxlinelen - cws_expanded_len
+ # Take the separating colon and space into account.
+ self._headerlen = len(header_name) + 2
def __str__(self):
- """A synonym for self.encode()."""
- return self.encode()
-
- def __unicode__(self):
- """Helper for the built-in unicode function."""
+ """Return the string value of the header."""
+ self._normalize()
uchunks = []
lastcs = None
- for s, charset in self._chunks:
+ for string, charset in self._chunks:
# We must preserve spaces between encoded and non-encoded word
# boundaries, which means for us we need to add a space when we go
# from a charset to None/us-ascii, or from None/us-ascii to a
# charset. Only do this for the second and subsequent chunks.
nextcs = charset
+ if nextcs == _charset.UNKNOWN8BIT:
+ original_bytes = string.encode('ascii', 'surrogateescape')
+ string = original_bytes.decode('ascii', 'replace')
if uchunks:
if lastcs not in (None, 'us-ascii'):
if nextcs in (None, 'us-ascii'):
- uchunks.append(USPACE)
+ uchunks.append(SPACE)
nextcs = None
elif nextcs not in (None, 'us-ascii'):
- uchunks.append(USPACE)
+ uchunks.append(SPACE)
lastcs = nextcs
- uchunks.append(unicode(s, str(charset)))
- return UEMPTYSTRING.join(uchunks)
+ uchunks.append(string)
+ return EMPTYSTRING.join(uchunks)
# Rich comparison operators for equality only. BAW: does it make sense to
# have or explicitly disable <, <=, >, >= operators?
def __eq__(self, other):
# other may be a Header or a string. Both are fine so coerce
- # ourselves to a string, swap the args and do another comparison.
- return other == self.encode()
+ # ourselves to a unicode (of the unencoded header value), swap the
+ # args and do another comparison.
+ return other == str(self)
def __ne__(self, other):
return not self == other
@@ -239,140 +257,42 @@ class Header:
constructor is used.
s may be a byte string or a Unicode string. If it is a byte string
- (i.e. isinstance(s, str) is true), then charset is the encoding of
+ (i.e. isinstance(s, str) is false), then charset is the encoding of
that byte string, and a UnicodeError will be raised if the string
cannot be decoded with that charset. If s is a Unicode string, then
charset is a hint specifying the character set of the characters in
- the string. In this case, when producing an RFC 2822 compliant header
- using RFC 2047 rules, the Unicode string will be encoded using the
- following charsets in order: us-ascii, the charset hint, utf-8. The
- first character set not to provoke a UnicodeError is used.
+ the string. In either case, when producing an RFC 2822 compliant
+ header using RFC 2047 rules, the string will be encoded using the
+ output codec of the charset. If the string cannot be encoded to the
+ output codec, a UnicodeError will be raised.
- Optional `errors' is passed as the third argument to any unicode() or
- ustr.encode() call.
+ Optional `errors' is passed as the errors argument to the decode
+ call if s is a byte string.
"""
if charset is None:
charset = self._charset
elif not isinstance(charset, Charset):
charset = Charset(charset)
- # If the charset is our faux 8bit charset, leave the string unchanged
- if charset != '8bit':
- # We need to test that the string can be converted to unicode and
- # back to a byte string, given the input and output codecs of the
- # charset.
- if isinstance(s, str):
- # Possibly raise UnicodeError if the byte string can't be
- # converted to a unicode with the input codec of the charset.
- incodec = charset.input_codec or 'us-ascii'
- ustr = unicode(s, incodec, errors)
- # Now make sure that the unicode could be converted back to a
- # byte string with the output codec, which may be different
- # than the iput coded. Still, use the original byte string.
- outcodec = charset.output_codec or 'us-ascii'
- ustr.encode(outcodec, errors)
- elif isinstance(s, unicode):
- # Now we have to be sure the unicode string can be converted
- # to a byte string with a reasonable output codec. We want to
- # use the byte string in the chunk.
- for charset in USASCII, charset, UTF8:
- try:
- outcodec = charset.output_codec or 'us-ascii'
- s = s.encode(outcodec, errors)
- break
- except UnicodeError:
- pass
- else:
- assert False, 'utf-8 conversion failed'
- self._chunks.append((s, charset))
-
- def _split(self, s, charset, maxlinelen, splitchars):
- # Split up a header safely for use with encode_chunks.
- splittable = charset.to_splittable(s)
- encoded = charset.from_splittable(splittable, True)
- elen = charset.encoded_header_len(encoded)
- # If the line's encoded length first, just return it
- if elen <= maxlinelen:
- return [(encoded, charset)]
- # If we have undetermined raw 8bit characters sitting in a byte
- # string, we really don't know what the right thing to do is. We
- # can't really split it because it might be multibyte data which we
- # could break if we split it between pairs. The least harm seems to
- # be to not split the header at all, but that means they could go out
- # longer than maxlinelen.
- if charset == '8bit':
- return [(s, charset)]
- # BAW: I'm not sure what the right test here is. What we're trying to
- # do is be faithful to RFC 2822's recommendation that ($2.2.3):
- #
- # "Note: Though structured field bodies are defined in such a way that
- # folding can take place between many of the lexical tokens (and even
- # within some of the lexical tokens), folding SHOULD be limited to
- # placing the CRLF at higher-level syntactic breaks."
- #
- # For now, I can only imagine doing this when the charset is us-ascii,
- # although it's possible that other charsets may also benefit from the
- # higher-level syntactic breaks.
- elif charset == 'us-ascii':
- return self._split_ascii(s, charset, maxlinelen, splitchars)
- # BAW: should we use encoded?
- elif elen == len(s):
- # We can split on _maxlinelen boundaries because we know that the
- # encoding won't change the size of the string
- splitpnt = maxlinelen
- first = charset.from_splittable(splittable[:splitpnt], False)
- last = charset.from_splittable(splittable[splitpnt:], False)
- else:
- # Binary search for split point
- first, last = _binsplit(splittable, charset, maxlinelen)
- # first is of the proper length so just wrap it in the appropriate
- # chrome. last must be recursively split.
- fsplittable = charset.to_splittable(first)
- fencoded = charset.from_splittable(fsplittable, True)
- chunk = [(fencoded, charset)]
- return chunk + self._split(last, charset, self._maxlinelen, splitchars)
-
- def _split_ascii(self, s, charset, firstlen, splitchars):
- chunks = _split_ascii(s, firstlen, self._maxlinelen,
- self._continuation_ws, splitchars)
- return zip(chunks, [charset]*len(chunks))
-
- def _encode_chunks(self, newchunks, maxlinelen):
- # MIME-encode a header with many different charsets and/or encodings.
- #
- # Given a list of pairs (string, charset), return a MIME-encoded
- # string suitable for use in a header field. Each pair may have
- # different charsets and/or encodings, and the resulting header will
- # accurately reflect each setting.
- #
- # Each encoding can be email.utils.QP (quoted-printable, for
- # ASCII-like character sets like iso-8859-1), email.utils.BASE64
- # (Base64, for non-ASCII like character sets like KOI8-R and
- # iso-2022-jp), or None (no encoding).
- #
- # Each pair will be represented on a separate line; the resulting
- # string will be in the format:
- #
- # =?charset1?q?Mar=EDa_Gonz=E1lez_Alonso?=\n
- # =?charset2?b?SvxyZ2VuIEL2aW5n?="
- chunks = []
- for header, charset in newchunks:
- if not header:
- continue
- if charset is None or charset.header_encoding is None:
- s = header
- else:
- s = charset.header_encode(header)
- # Don't add more folding whitespace than necessary
- if chunks and chunks[-1].endswith(' '):
- extra = ''
+ if not isinstance(s, str):
+ input_charset = charset.input_codec or 'us-ascii'
+ if input_charset == _charset.UNKNOWN8BIT:
+ s = s.decode('us-ascii', 'surrogateescape')
else:
- extra = ' '
- _max_append(chunks, s, maxlinelen, extra)
- joiner = NL + self._continuation_ws
- return joiner.join(chunks)
+ s = s.decode(input_charset, errors)
+ # Ensure that the bytes we're storing can be decoded to the output
+ # character set, otherwise an early error is raised.
+ output_charset = charset.output_codec or 'us-ascii'
+ if output_charset != _charset.UNKNOWN8BIT:
+ try:
+ s.encode(output_charset, errors)
+ except UnicodeEncodeError:
+ if output_charset!='us-ascii':
+ raise
+ charset = UTF8
+ self._chunks.append((s, charset))
- def encode(self, splitchars=';, '):
- """Encode a message header into an RFC-compliant format.
+ def encode(self, splitchars=';, \t', maxlinelen=None, linesep='\n'):
+ r"""Encode a message header into an RFC-compliant format.
There are many issues involved in converting a given string for use in
an email header. Only certain character sets are readable in most
@@ -382,133 +302,245 @@ class Header:
75-character length limit on any given encoded header field, so
line-wrapping must be performed, even with double-byte character sets.
- This method will do its best to convert the string to the correct
- character set used in email, and encode and line wrap it safely with
- the appropriate scheme for that character set.
-
- If the given charset is not known or an error occurs during
- conversion, this function will return the header untouched.
-
- Optional splitchars is a string containing characters to split long
- ASCII lines on, in rough support of RFC 2822's `highest level
- syntactic breaks'. This doesn't affect RFC 2047 encoded lines.
+ Optional maxlinelen specifies the maximum length of each generated
+ line, exclusive of the linesep string. Individual lines may be longer
+ than maxlinelen if a folding point cannot be found. The first line
+ will be shorter by the length of the header name plus ": " if a header
+ name was specified at Header construction time. The default value for
+ maxlinelen is determined at header construction time.
+
+ Optional splitchars is a string containing characters which should be
+ given extra weight by the splitting algorithm during normal header
+ wrapping. This is in very rough support of RFC 2822's `higher level
+ syntactic breaks': split points preceded by a splitchar are preferred
+ during line splitting, with the characters preferred in the order in
+ which they appear in the string. Space and tab may be included in the
+ string to indicate whether preference should be given to one over the
+ other as a split point when other split chars do not appear in the line
+ being split. Splitchars does not affect RFC 2047 encoded lines.
+
+ Optional linesep is a string to be used to separate the lines of
+ the value. The default value is the most useful for typical
+ Python applications, but it can be set to \r\n to produce RFC-compliant
+ line separators when needed.
"""
- newchunks = []
- maxlinelen = self._firstlinelen
- lastlen = 0
- for s, charset in self._chunks:
- # The first bit of the next chunk should be just long enough to
- # fill the next line. Don't forget the space separating the
- # encoded words.
- targetlen = maxlinelen - lastlen - 1
- if targetlen < charset.encoded_header_len(''):
- # Stick it on the next line
- targetlen = maxlinelen
- newchunks += self._split(s, charset, targetlen, splitchars)
- lastchunk, lastcharset = newchunks[-1]
- lastlen = lastcharset.encoded_header_len(lastchunk)
- value = self._encode_chunks(newchunks, maxlinelen)
+ self._normalize()
+ if maxlinelen is None:
+ maxlinelen = self._maxlinelen
+ # A maxlinelen of 0 means don't wrap. For all practical purposes,
+ # choosing a huge number here accomplishes that and makes the
+ # _ValueFormatter algorithm much simpler.
+ if maxlinelen == 0:
+ maxlinelen = 1000000
+ formatter = _ValueFormatter(self._headerlen, maxlinelen,
+ self._continuation_ws, splitchars)
+ for string, charset in self._chunks:
+ lines = string.splitlines()
+ if lines:
+ formatter.feed('', lines[0], charset)
+ else:
+ formatter.feed('', '', charset)
+ for line in lines[1:]:
+ formatter.newline()
+ if charset.header_encoding is not None:
+ formatter.feed(self._continuation_ws, ' ' + line.lstrip(),
+ charset)
+ else:
+ sline = line.lstrip()
+ fws = line[:len(line)-len(sline)]
+ formatter.feed(fws, sline, charset)
+ if len(lines) > 1:
+ formatter.newline()
+ formatter.add_transition()
+ value = formatter._str(linesep)
if _embeded_header.search(value):
raise HeaderParseError("header value appears to contain "
"an embedded header: {!r}".format(value))
return value
+ def _normalize(self):
+ # Step 1: Normalize the chunks so that all runs of identical charsets
+ # get collapsed into a single unicode string.
+ chunks = []
+ last_charset = None
+ last_chunk = []
+ for string, charset in self._chunks:
+ if charset == last_charset:
+ last_chunk.append(string)
+ else:
+ if last_charset is not None:
+ chunks.append((SPACE.join(last_chunk), last_charset))
+ last_chunk = [string]
+ last_charset = charset
+ if last_chunk:
+ chunks.append((SPACE.join(last_chunk), last_charset))
+ self._chunks = chunks
+
-def _split_ascii(s, firstlen, restlen, continuation_ws, splitchars):
- lines = []
- maxlen = firstlen
- for line in s.splitlines():
- # Ignore any leading whitespace (i.e. continuation whitespace) already
- # on the line, since we'll be adding our own.
- line = line.lstrip()
- if len(line) < maxlen:
- lines.append(line)
- maxlen = restlen
- continue
- # Attempt to split the line at the highest-level syntactic break
- # possible. Note that we don't have a lot of smarts about field
+class _ValueFormatter:
+ def __init__(self, headerlen, maxlen, continuation_ws, splitchars):
+ self._maxlen = maxlen
+ self._continuation_ws = continuation_ws
+ self._continuation_ws_len = len(continuation_ws)
+ self._splitchars = splitchars
+ self._lines = []
+ self._current_line = _Accumulator(headerlen)
+
+ def _str(self, linesep):
+ self.newline()
+ return linesep.join(self._lines)
+
+ def __str__(self):
+ return self._str(NL)
+
+ def newline(self):
+ end_of_line = self._current_line.pop()
+ if end_of_line != (' ', ''):
+ self._current_line.push(*end_of_line)
+ if len(self._current_line) > 0:
+ if self._current_line.is_onlyws():
+ self._lines[-1] += str(self._current_line)
+ else:
+ self._lines.append(str(self._current_line))
+ self._current_line.reset()
+
+ def add_transition(self):
+ self._current_line.push(' ', '')
+
+ def feed(self, fws, string, charset):
+ # If the charset has no header encoding (i.e. it is an ASCII encoding)
+ # then we must split the header at the "highest level syntactic break"
+ # possible. Note that we don't have a lot of smarts about field
# syntax; we just try to break on semi-colons, then commas, then
- # whitespace.
- for ch in splitchars:
- if ch in line:
- break
- else:
- # There's nothing useful to split the line on, not even spaces, so
- # just append this line unchanged
- lines.append(line)
- maxlen = restlen
- continue
- # Now split the line on the character plus trailing whitespace
- cre = re.compile(r'%s\s*' % ch)
- if ch in ';,':
- eol = ch
+ # whitespace. Eventually, this should be pluggable.
+ if charset.header_encoding is None:
+ self._ascii_split(fws, string, self._splitchars)
+ return
+ # Otherwise, we're doing either a Base64 or a quoted-printable
+ # encoding which means we don't need to split the line on syntactic
+ # breaks. We can basically just find enough characters to fit on the
+ # current line, minus the RFC 2047 chrome. What makes this trickier
+ # though is that we have to split at octet boundaries, not character
+ # boundaries but it's only safe to split at character boundaries so at
+ # best we can only get close.
+ encoded_lines = charset.header_encode_lines(string, self._maxlengths())
+ # The first element extends the current line, but if it's None then
+ # nothing more fit on the current line so start a new line.
+ try:
+ first_line = encoded_lines.pop(0)
+ except IndexError:
+ # There are no encoded lines, so we're done.
+ return
+ if first_line is not None:
+ self._append_chunk(fws, first_line)
+ try:
+ last_line = encoded_lines.pop()
+ except IndexError:
+ # There was only one line.
+ return
+ self.newline()
+ self._current_line.push(self._continuation_ws, last_line)
+ # Everything else are full lines in themselves.
+ for line in encoded_lines:
+ self._lines.append(self._continuation_ws + line)
+
+ def _maxlengths(self):
+ # The first line's length.
+ yield self._maxlen - len(self._current_line)
+ while True:
+ yield self._maxlen - self._continuation_ws_len
+
+ def _ascii_split(self, fws, string, splitchars):
+ # The RFC 2822 header folding algorithm is simple in principle but
+ # complex in practice. Lines may be folded any place where "folding
+ # white space" appears by inserting a linesep character in front of the
+ # FWS. The complication is that not all spaces or tabs qualify as FWS,
+ # and we are also supposed to prefer to break at "higher level
+ # syntactic breaks". We can't do either of these without intimate
+ # knowledge of the structure of structured headers, which we don't have
+ # here. So the best we can do here is prefer to break at the specified
+ # splitchars, and hope that we don't choose any spaces or tabs that
+ # aren't legal FWS. (This is at least better than the old algorithm,
+ # where we would sometimes *introduce* FWS after a splitchar, or the
+ # algorithm before that, where we would turn all white space runs into
+ # single spaces or tabs.)
+ parts = re.split("(["+FWS+"]+)", fws+string)
+ if parts[0]:
+ parts[:0] = ['']
else:
- eol = ''
- joiner = eol + ' '
- joinlen = len(joiner)
- wslen = len(continuation_ws.replace('\t', SPACE8))
- this = []
- linelen = 0
- for part in cre.split(line):
- curlen = linelen + max(0, len(this)-1) * joinlen
- partlen = len(part)
- onfirstline = not lines
- # We don't want to split after the field name, if we're on the
- # first line and the field name is present in the header string.
- if ch == ' ' and onfirstline and \
- len(this) == 1 and fcre.match(this[0]):
- this.append(part)
- linelen += partlen
- elif curlen + partlen > maxlen:
- if this:
- lines.append(joiner.join(this) + eol)
- # If this part is longer than maxlen and we aren't already
- # splitting on whitespace, try to recursively split this line
- # on whitespace.
- if partlen > maxlen and ch != ' ':
- subl = _split_ascii(part, maxlen, restlen,
- continuation_ws, ' ')
- lines.extend(subl[:-1])
- this = [subl[-1]]
+ parts.pop(0)
+ for fws, part in zip(*[iter(parts)]*2):
+ self._append_chunk(fws, part)
+
+ def _append_chunk(self, fws, string):
+ self._current_line.push(fws, string)
+ if len(self._current_line) > self._maxlen:
+ # Find the best split point, working backward from the end.
+ # There might be none, on a long first line.
+ for ch in self._splitchars:
+ for i in range(self._current_line.part_count()-1, 0, -1):
+ if ch.isspace():
+ fws = self._current_line[i][0]
+ if fws and fws[0]==ch:
+ break
+ prevpart = self._current_line[i-1][1]
+ if prevpart and prevpart[-1]==ch:
+ break
else:
- this = [part]
- linelen = wslen + len(this[-1])
- maxlen = restlen
+ continue
+ break
else:
- this.append(part)
- linelen += partlen
- # Put any left over parts on a line by themselves
- if this:
- lines.append(joiner.join(this))
- return lines
+ fws, part = self._current_line.pop()
+ if self._current_line._initial_size > 0:
+ # There will be a header, so leave it on a line by itself.
+ self.newline()
+ if not fws:
+ # We don't use continuation_ws here because the whitespace
+ # after a header should always be a space.
+ fws = ' '
+ self._current_line.push(fws, part)
+ return
+ remainder = self._current_line.pop_from(i)
+ self._lines.append(str(self._current_line))
+ self._current_line.reset(remainder)
+
+
+class _Accumulator(list):
+
+ def __init__(self, initial_size=0):
+ self._initial_size = initial_size
+ super().__init__()
+
+ def push(self, fws, string):
+ self.append((fws, string))
+
+ def pop_from(self, i=0):
+ popped = self[i:]
+ self[i:] = []
+ return popped
+
+ def pop(self):
+ if self.part_count()==0:
+ return ('', '')
+ return super().pop()
+
+ def __len__(self):
+ return sum((len(fws)+len(part) for fws, part in self),
+ self._initial_size)
+ def __str__(self):
+ return EMPTYSTRING.join((EMPTYSTRING.join((fws, part))
+ for fws, part in self))
-
-def _binsplit(splittable, charset, maxlinelen):
- i = 0
- j = len(splittable)
- while i < j:
- # Invariants:
- # 1. splittable[:k] fits for all k <= i (note that we *assume*,
- # at the start, that splittable[:0] fits).
- # 2. splittable[:k] does not fit for any k > j (at the start,
- # this means we shouldn't look at any k > len(splittable)).
- # 3. We don't know about splittable[:k] for k in i+1..j.
- # 4. We want to set i to the largest k that fits, with i <= k <= j.
- #
- m = (i+j+1) >> 1 # ceiling((i+j)/2); i < m <= j
- chunk = charset.from_splittable(splittable[:m], True)
- chunklen = charset.encoded_header_len(chunk)
- if chunklen <= maxlinelen:
- # m is acceptable, so is a new lower bound.
- i = m
- else:
- # m is not acceptable, so final i must be < m.
- j = m - 1
- # i == j. Invariant #1 implies that splittable[:i] fits, and
- # invariant #2 implies that splittable[:i+1] does not fit, so i
- # is what we're looking for.
- first = charset.from_splittable(splittable[:i], False)
- last = charset.from_splittable(splittable[i:], False)
- return first, last
+ def reset(self, startval=None):
+ if startval is None:
+ startval = []
+ self[:] = startval
+ self._initial_size = 0
+
+ def is_onlyws(self):
+ return self._initial_size==0 and (not self or str(self).isspace())
+
+ def part_count(self):
+ return super().__len__()