aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/tarfile.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/tarfile.py')
-rw-r--r--Lib/tarfile.py163
1 files changed, 129 insertions, 34 deletions
diff --git a/Lib/tarfile.py b/Lib/tarfile.py
index 212b71f6509..068aa13ed70 100644
--- a/Lib/tarfile.py
+++ b/Lib/tarfile.py
@@ -67,7 +67,7 @@ __all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError",
"DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter",
"tar_filter", "FilterError", "AbsoluteLinkError",
"OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
- "LinkOutsideDestinationError"]
+ "LinkOutsideDestinationError", "LinkFallbackError"]
#---------------------------------------------------------
@@ -766,10 +766,22 @@ class LinkOutsideDestinationError(FilterError):
super().__init__(f'{tarinfo.name!r} would link to {path!r}, '
+ 'which is outside the destination')
+class LinkFallbackError(FilterError):
+ def __init__(self, tarinfo, path):
+ self.tarinfo = tarinfo
+ self._path = path
+ super().__init__(f'link {tarinfo.name!r} would be extracted as a '
+ + f'copy of {path!r}, which was rejected')
+
+# Errors caused by filters -- both "fatal" and "non-fatal" -- that
+# we consider to be issues with the argument, rather than a bug in the
+# filter function
+_FILTER_ERRORS = (FilterError, OSError, ExtractError)
+
def _get_filtered_attrs(member, dest_path, for_data=True):
new_attrs = {}
name = member.name
- dest_path = os.path.realpath(dest_path)
+ dest_path = os.path.realpath(dest_path, strict=os.path.ALLOW_MISSING)
# Strip leading / (tar's directory separator) from filenames.
# Include os.sep (target OS directory separator) as well.
if name.startswith(('/', os.sep)):
@@ -779,7 +791,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
# For example, 'C:/foo' on Windows.
raise AbsolutePathError(member)
# Ensure we stay in the destination
- target_path = os.path.realpath(os.path.join(dest_path, name))
+ target_path = os.path.realpath(os.path.join(dest_path, name),
+ strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path:
raise OutsideDestinationError(member, target_path)
# Limit permissions (no high bits, and go-w)
@@ -817,6 +830,9 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
if member.islnk() or member.issym():
if os.path.isabs(member.linkname):
raise AbsoluteLinkError(member)
+ normalized = os.path.normpath(member.linkname)
+ if normalized != member.linkname:
+ new_attrs['linkname'] = normalized
if member.issym():
target_path = os.path.join(dest_path,
os.path.dirname(name),
@@ -824,7 +840,8 @@ def _get_filtered_attrs(member, dest_path, for_data=True):
else:
target_path = os.path.join(dest_path,
member.linkname)
- target_path = os.path.realpath(target_path)
+ target_path = os.path.realpath(target_path,
+ strict=os.path.ALLOW_MISSING)
if os.path.commonpath([target_path, dest_path]) != dest_path:
raise LinkOutsideDestinationError(member, target_path)
return new_attrs
@@ -2386,30 +2403,58 @@ class TarFile(object):
members = self
for member in members:
- tarinfo = self._get_extract_tarinfo(member, filter_function, path)
+ tarinfo, unfiltered = self._get_extract_tarinfo(
+ member, filter_function, path)
if tarinfo is None:
continue
if tarinfo.isdir():
# For directories, delay setting attributes until later,
# since permissions can interfere with extraction and
# extracting contents can reset mtime.
- directories.append(tarinfo)
+ directories.append(unfiltered)
self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(),
- numeric_owner=numeric_owner)
+ numeric_owner=numeric_owner,
+ filter_function=filter_function)
# Reverse sort directories.
directories.sort(key=lambda a: a.name, reverse=True)
+
# Set correct owner, mtime and filemode on directories.
- for tarinfo in directories:
- dirpath = os.path.join(path, tarinfo.name)
+ for unfiltered in directories:
try:
+ # Need to re-apply any filter, to take the *current* filesystem
+ # state into account.
+ try:
+ tarinfo = filter_function(unfiltered, path)
+ except _FILTER_ERRORS as exc:
+ self._log_no_directory_fixup(unfiltered, repr(exc))
+ continue
+ if tarinfo is None:
+ self._log_no_directory_fixup(unfiltered,
+ 'excluded by filter')
+ continue
+ dirpath = os.path.join(path, tarinfo.name)
+ try:
+ lstat = os.lstat(dirpath)
+ except FileNotFoundError:
+ self._log_no_directory_fixup(tarinfo, 'missing')
+ continue
+ if not stat.S_ISDIR(lstat.st_mode):
+ # This is no longer a directory; presumably a later
+ # member overwrote the entry.
+ self._log_no_directory_fixup(tarinfo, 'not a directory')
+ continue
self.chown(tarinfo, dirpath, numeric_owner=numeric_owner)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError as e:
self._handle_nonfatal_error(e)
+ def _log_no_directory_fixup(self, member, reason):
+ self._dbg(2, "tarfile: Not fixing up directory %r (%s)" %
+ (member.name, reason))
+
def extract(self, member, path="", set_attrs=True, *, numeric_owner=False,
filter=None):
"""Extract a member from the archive to the current working directory,
@@ -2425,41 +2470,56 @@ class TarFile(object):
String names of common filters are accepted.
"""
filter_function = self._get_filter_function(filter)
- tarinfo = self._get_extract_tarinfo(member, filter_function, path)
+ tarinfo, unfiltered = self._get_extract_tarinfo(
+ member, filter_function, path)
if tarinfo is not None:
self._extract_one(tarinfo, path, set_attrs, numeric_owner)
def _get_extract_tarinfo(self, member, filter_function, path):
- """Get filtered TarInfo (or None) from member, which might be a str"""
+ """Get (filtered, unfiltered) TarInfos from *member*
+
+ *member* might be a string.
+
+ Return (None, None) if not found.
+ """
+
if isinstance(member, str):
- tarinfo = self.getmember(member)
+ unfiltered = self.getmember(member)
else:
- tarinfo = member
+ unfiltered = member
- unfiltered = tarinfo
+ filtered = None
try:
- tarinfo = filter_function(tarinfo, path)
+ filtered = filter_function(unfiltered, path)
except (OSError, UnicodeEncodeError, FilterError) as e:
self._handle_fatal_error(e)
except ExtractError as e:
self._handle_nonfatal_error(e)
- if tarinfo is None:
+ if filtered is None:
self._dbg(2, "tarfile: Excluded %r" % unfiltered.name)
- return None
+ return None, None
+
# Prepare the link target for makelink().
- if tarinfo.islnk():
- tarinfo = copy.copy(tarinfo)
- tarinfo._link_target = os.path.join(path, tarinfo.linkname)
- return tarinfo
+ if filtered.islnk():
+ filtered = copy.copy(filtered)
+ filtered._link_target = os.path.join(path, filtered.linkname)
+ return filtered, unfiltered
+
+ def _extract_one(self, tarinfo, path, set_attrs, numeric_owner,
+ filter_function=None):
+ """Extract from filtered tarinfo to disk.
- def _extract_one(self, tarinfo, path, set_attrs, numeric_owner):
- """Extract from filtered tarinfo to disk"""
+ filter_function is only used when extracting a *different*
+ member (e.g. as fallback to creating a symlink)
+ """
self._check("r")
try:
self._extract_member(tarinfo, os.path.join(path, tarinfo.name),
set_attrs=set_attrs,
- numeric_owner=numeric_owner)
+ numeric_owner=numeric_owner,
+ filter_function=filter_function,
+ extraction_root=path)
except (OSError, UnicodeEncodeError) as e:
self._handle_fatal_error(e)
except ExtractError as e:
@@ -2517,9 +2577,13 @@ class TarFile(object):
return None
def _extract_member(self, tarinfo, targetpath, set_attrs=True,
- numeric_owner=False):
- """Extract the TarInfo object tarinfo to a physical
+ numeric_owner=False, *, filter_function=None,
+ extraction_root=None):
+ """Extract the filtered TarInfo object tarinfo to a physical
file called targetpath.
+
+ filter_function is only used when extracting a *different*
+ member (e.g. as fallback to creating a symlink)
"""
# Fetch the TarInfo object for the given name
# and build the destination pathname, replacing
@@ -2548,7 +2612,10 @@ class TarFile(object):
elif tarinfo.ischr() or tarinfo.isblk():
self.makedev(tarinfo, targetpath)
elif tarinfo.islnk() or tarinfo.issym():
- self.makelink(tarinfo, targetpath)
+ self.makelink_with_filter(
+ tarinfo, targetpath,
+ filter_function=filter_function,
+ extraction_root=extraction_root)
elif tarinfo.type not in SUPPORTED_TYPES:
self.makeunknown(tarinfo, targetpath)
else:
@@ -2631,10 +2698,18 @@ class TarFile(object):
os.makedev(tarinfo.devmajor, tarinfo.devminor))
def makelink(self, tarinfo, targetpath):
+ return self.makelink_with_filter(tarinfo, targetpath, None, None)
+
+ def makelink_with_filter(self, tarinfo, targetpath,
+ filter_function, extraction_root):
"""Make a (symbolic) link called targetpath. If it cannot be created
(platform limitation), we try to make a copy of the referenced file
instead of a link.
+
+ filter_function is only used when extracting a *different*
+ member (e.g. as fallback to creating a link).
"""
+ keyerror_to_extracterror = False
try:
# For systems that support symbolic and hard links.
if tarinfo.issym():
@@ -2642,18 +2717,38 @@ class TarFile(object):
# Avoid FileExistsError on following os.symlink.
os.unlink(targetpath)
os.symlink(tarinfo.linkname, targetpath)
+ return
else:
if os.path.exists(tarinfo._link_target):
os.link(tarinfo._link_target, targetpath)
- else:
- self._extract_member(self._find_link_target(tarinfo),
- targetpath)
+ return
except symlink_exception:
+ keyerror_to_extracterror = True
+
+ try:
+ unfiltered = self._find_link_target(tarinfo)
+ except KeyError:
+ if keyerror_to_extracterror:
+ raise ExtractError(
+ "unable to resolve link inside archive") from None
+ else:
+ raise
+
+ if filter_function is None:
+ filtered = unfiltered
+ else:
+ if extraction_root is None:
+ raise ExtractError(
+ "makelink_with_filter: if filter_function is not None, "
+ + "extraction_root must also not be None")
try:
- self._extract_member(self._find_link_target(tarinfo),
- targetpath)
- except KeyError:
- raise ExtractError("unable to resolve link inside archive") from None
+ filtered = filter_function(unfiltered, extraction_root)
+ except _FILTER_ERRORS as cause:
+ raise LinkFallbackError(tarinfo, unfiltered.name) from cause
+ if filtered is not None:
+ self._extract_member(filtered, targetpath,
+ filter_function=filter_function,
+ extraction_root=extraction_root)
def chown(self, tarinfo, targetpath, numeric_owner):
"""Set owner of targetpath according to tarinfo. If numeric_owner