diff options
Diffstat (limited to 'Lib/tarfile.py')
-rw-r--r-- | Lib/tarfile.py | 163 |
1 files changed, 129 insertions, 34 deletions
diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 212b71f6509..068aa13ed70 100644 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -67,7 +67,7 @@ __all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError", "DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter", "tar_filter", "FilterError", "AbsoluteLinkError", "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", - "LinkOutsideDestinationError"] + "LinkOutsideDestinationError", "LinkFallbackError"] #--------------------------------------------------------- @@ -766,10 +766,22 @@ class LinkOutsideDestinationError(FilterError): super().__init__(f'{tarinfo.name!r} would link to {path!r}, ' + 'which is outside the destination') +class LinkFallbackError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'link {tarinfo.name!r} would be extracted as a ' + + f'copy of {path!r}, which was rejected') + +# Errors caused by filters -- both "fatal" and "non-fatal" -- that +# we consider to be issues with the argument, rather than a bug in the +# filter function +_FILTER_ERRORS = (FilterError, OSError, ExtractError) + def _get_filtered_attrs(member, dest_path, for_data=True): new_attrs = {} name = member.name - dest_path = os.path.realpath(dest_path) + dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING) # Strip leading / (tar's directory separator) from filenames. # Include os.sep (target OS directory separator) as well. if name.startswith(('/', os.sep)): @@ -779,7 +791,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True): # For example, 'C:/foo' on Windows. raise AbsolutePathError(member) # Ensure we stay in the destination - target_path = os.path.realpath(os.path.join(dest_path, name)) + target_path = os.path.realpath(os.path.join(dest_path, name), + strict=os.path.ALLOW_MISSING) if os.path.commonpath([target_path, dest_path]) != dest_path: raise OutsideDestinationError(member, target_path) # Limit permissions (no high bits, and go-w) @@ -817,6 +830,9 @@ def _get_filtered_attrs(member, dest_path, for_data=True): if member.islnk() or member.issym(): if os.path.isabs(member.linkname): raise AbsoluteLinkError(member) + normalized = os.path.normpath(member.linkname) + if normalized != member.linkname: + new_attrs['linkname'] = normalized if member.issym(): target_path = os.path.join(dest_path, os.path.dirname(name), @@ -824,7 +840,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True): else: target_path = os.path.join(dest_path, member.linkname) - target_path = os.path.realpath(target_path) + target_path = os.path.realpath(target_path, + strict=os.path.ALLOW_MISSING) if os.path.commonpath([target_path, dest_path]) != dest_path: raise LinkOutsideDestinationError(member, target_path) return new_attrs @@ -2386,30 +2403,58 @@ class TarFile(object): members = self for member in members: - tarinfo = self._get_extract_tarinfo(member, filter_function, path) + tarinfo, unfiltered = self._get_extract_tarinfo( + member, filter_function, path) if tarinfo is None: continue if tarinfo.isdir(): # For directories, delay setting attributes until later, # since permissions can interfere with extraction and # extracting contents can reset mtime. - directories.append(tarinfo) + directories.append(unfiltered) self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(), - numeric_owner=numeric_owner) + numeric_owner=numeric_owner, + filter_function=filter_function) # Reverse sort directories. directories.sort(key=lambda a: a.name, reverse=True) + # Set correct owner, mtime and filemode on directories. - for tarinfo in directories: - dirpath = os.path.join(path, tarinfo.name) + for unfiltered in directories: try: + # Need to re-apply any filter, to take the *current* filesystem + # state into account. + try: + tarinfo = filter_function(unfiltered, path) + except _FILTER_ERRORS as exc: + self._log_no_directory_fixup(unfiltered, repr(exc)) + continue + if tarinfo is None: + self._log_no_directory_fixup(unfiltered, + 'excluded by filter') + continue + dirpath = os.path.join(path, tarinfo.name) + try: + lstat = os.lstat(dirpath) + except FileNotFoundError: + self._log_no_directory_fixup(tarinfo, 'missing') + continue + if not stat.S_ISDIR(lstat.st_mode): + # This is no longer a directory; presumably a later + # member overwrote the entry. + self._log_no_directory_fixup(tarinfo, 'not a directory') + continue self.chown(tarinfo, dirpath, numeric_owner=numeric_owner) self.utime(tarinfo, dirpath) self.chmod(tarinfo, dirpath) except ExtractError as e: self._handle_nonfatal_error(e) + def _log_no_directory_fixup(self, member, reason): + self._dbg(2, "tarfile: Not fixing up directory %r (%s)" % + (member.name, reason)) + def extract(self, member, path="", set_attrs=True, *, numeric_owner=False, filter=None): """Extract a member from the archive to the current working directory, @@ -2425,41 +2470,56 @@ class TarFile(object): String names of common filters are accepted. """ filter_function = self._get_filter_function(filter) - tarinfo = self._get_extract_tarinfo(member, filter_function, path) + tarinfo, unfiltered = self._get_extract_tarinfo( + member, filter_function, path) if tarinfo is not None: self._extract_one(tarinfo, path, set_attrs, numeric_owner) def _get_extract_tarinfo(self, member, filter_function, path): - """Get filtered TarInfo (or None) from member, which might be a str""" + """Get (filtered, unfiltered) TarInfos from *member* + + *member* might be a string. + + Return (None, None) if not found. + """ + if isinstance(member, str): - tarinfo = self.getmember(member) + unfiltered = self.getmember(member) else: - tarinfo = member + unfiltered = member - unfiltered = tarinfo + filtered = None try: - tarinfo = filter_function(tarinfo, path) + filtered = filter_function(unfiltered, path) except (OSError, UnicodeEncodeError, FilterError) as e: self._handle_fatal_error(e) except ExtractError as e: self._handle_nonfatal_error(e) - if tarinfo is None: + if filtered is None: self._dbg(2, "tarfile: Excluded %r" % unfiltered.name) - return None + return None, None + # Prepare the link target for makelink(). - if tarinfo.islnk(): - tarinfo = copy.copy(tarinfo) - tarinfo._link_target = os.path.join(path, tarinfo.linkname) - return tarinfo + if filtered.islnk(): + filtered = copy.copy(filtered) + filtered._link_target = os.path.join(path, filtered.linkname) + return filtered, unfiltered + + def _extract_one(self, tarinfo, path, set_attrs, numeric_owner, + filter_function=None): + """Extract from filtered tarinfo to disk. - def _extract_one(self, tarinfo, path, set_attrs, numeric_owner): - """Extract from filtered tarinfo to disk""" + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a symlink) + """ self._check("r") try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name), set_attrs=set_attrs, - numeric_owner=numeric_owner) + numeric_owner=numeric_owner, + filter_function=filter_function, + extraction_root=path) except (OSError, UnicodeEncodeError) as e: self._handle_fatal_error(e) except ExtractError as e: @@ -2517,9 +2577,13 @@ class TarFile(object): return None def _extract_member(self, tarinfo, targetpath, set_attrs=True, - numeric_owner=False): - """Extract the TarInfo object tarinfo to a physical + numeric_owner=False, *, filter_function=None, + extraction_root=None): + """Extract the filtered TarInfo object tarinfo to a physical file called targetpath. + + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a symlink) """ # Fetch the TarInfo object for the given name # and build the destination pathname, replacing @@ -2548,7 +2612,10 @@ class TarFile(object): elif tarinfo.ischr() or tarinfo.isblk(): self.makedev(tarinfo, targetpath) elif tarinfo.islnk() or tarinfo.issym(): - self.makelink(tarinfo, targetpath) + self.makelink_with_filter( + tarinfo, targetpath, + filter_function=filter_function, + extraction_root=extraction_root) elif tarinfo.type not in SUPPORTED_TYPES: self.makeunknown(tarinfo, targetpath) else: @@ -2631,10 +2698,18 @@ class TarFile(object): os.makedev(tarinfo.devmajor, tarinfo.devminor)) def makelink(self, tarinfo, targetpath): + return self.makelink_with_filter(tarinfo, targetpath, None, None) + + def makelink_with_filter(self, tarinfo, targetpath, + filter_function, extraction_root): """Make a (symbolic) link called targetpath. If it cannot be created (platform limitation), we try to make a copy of the referenced file instead of a link. + + filter_function is only used when extracting a *different* + member (e.g. as fallback to creating a link). """ + keyerror_to_extracterror = False try: # For systems that support symbolic and hard links. if tarinfo.issym(): @@ -2642,18 +2717,38 @@ class TarFile(object): # Avoid FileExistsError on following os.symlink. os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) + return else: if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) - else: - self._extract_member(self._find_link_target(tarinfo), - targetpath) + return except symlink_exception: + keyerror_to_extracterror = True + + try: + unfiltered = self._find_link_target(tarinfo) + except KeyError: + if keyerror_to_extracterror: + raise ExtractError( + "unable to resolve link inside archive") from None + else: + raise + + if filter_function is None: + filtered = unfiltered + else: + if extraction_root is None: + raise ExtractError( + "makelink_with_filter: if filter_function is not None, " + + "extraction_root must also not be None") try: - self._extract_member(self._find_link_target(tarinfo), - targetpath) - except KeyError: - raise ExtractError("unable to resolve link inside archive") from None + filtered = filter_function(unfiltered, extraction_root) + except _FILTER_ERRORS as cause: + raise LinkFallbackError(tarinfo, unfiltered.name) from cause + if filtered is not None: + self._extract_member(filtered, targetpath, + filter_function=filter_function, + extraction_root=extraction_root) def chown(self, tarinfo, targetpath, numeric_owner): """Set owner of targetpath according to tarinfo. If numeric_owner |