aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/Lib/pathlib/_os.py
diff options
context:
space:
mode:
authorBarney Gale <barney.gale@gmail.com>2025-02-17 19:15:25 +0000
committerGitHub <noreply@github.com>2025-02-17 19:15:25 +0000
commit7fcace99bbe1716c3e9713804e5bffaebdc8dd8f (patch)
tree0fb1a905454a744346aed55f6225cebfa18713ff /Lib/pathlib/_os.py
parentbd1642c6e569d6b328e4296e14355178d3a79778 (diff)
downloadcpython-7fcace99bbe1716c3e9713804e5bffaebdc8dd8f.tar.gz
cpython-7fcace99bbe1716c3e9713804e5bffaebdc8dd8f.zip
GH-125413: Add private metadata methods to `pathlib.Path.info` (#129897)
Add the following private methods to `pathlib.Path.info`: - `_posix_permissions()`: the POSIX file permissions (`S_IMODE(st_mode)`) - `_file_id()`: the file ID (`(st_dev, st_ino)`) - `_access_time_ns()`: the access time in nanoseconds (`st_atime_ns`) - `_mod_time_ns()`: the modify time in nanoseconds (`st_mtime_ns`) - `_bsd_flags()`: the BSD file flags (`st_flags`) - `_xattrs()`: the file extended attributes as a list of key, value pairs, or an empty list if `listxattr()` or `getxattr()` fail in an ignorable way. These methods replace `LocalCopyReader.read_metadata()`, and so we can delete the `CopyReader` and `LocalCopyReader` classes. Rather than reading metadata via `source._copy_reader.read_metadata()`, we instead call `source.info._posix_permissions()`, `_access_time_ns()`, etc. Preserving metadata is only supported for local-to-local copies at the moment. To support copying metadata between arbitrary `ReadablePath` and `WritablePath` objects, we'd need to make the new methods public and documented. Co-authored-by: Petr Viktorin <encukou@gmail.com>
Diffstat (limited to 'Lib/pathlib/_os.py')
-rw-r--r--Lib/pathlib/_os.py393
1 files changed, 196 insertions, 197 deletions
diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py
index c0c81ada858..e5b894b524c 100644
--- a/Lib/pathlib/_os.py
+++ b/Lib/pathlib/_os.py
@@ -200,26 +200,6 @@ def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
-class CopyReader:
- """
- Class that implements the "read" part of copying between path objects.
- An instance of this class is available from the ReadablePath._copy_reader
- property.
- """
- __slots__ = ('_path',)
-
- def __init__(self, path):
- self._path = path
-
- _readable_metakeys = frozenset()
-
- def _read_metadata(self, metakeys, *, follow_symlinks=True):
- """
- Returns path metadata as a dict with string keys.
- """
- raise NotImplementedError
-
-
class CopyWriter:
"""
Class that implements the "write" part of copying between path objects. An
@@ -231,48 +211,39 @@ class CopyWriter:
def __init__(self, path):
self._path = path
- _writable_metakeys = frozenset()
-
- def _write_metadata(self, metadata, *, follow_symlinks=True):
- """
- Sets path metadata from the given dict with string keys.
- """
- raise NotImplementedError
+ def _copy_metadata(self, source, follow_symlinks=True):
+ """Copy metadata from the given path to our path."""
+ pass
def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
- self._ensure_distinct_path(source)
- if preserve_metadata:
- metakeys = self._writable_metakeys & source._copy_reader._readable_metakeys
- else:
- metakeys = None
+ ensure_distinct_paths(source, self._path)
if not follow_symlinks and source.is_symlink():
- self._create_symlink(source, metakeys)
+ self._create_symlink(source, preserve_metadata)
elif source.is_dir():
- self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
+ self._create_dir(source, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
- self._create_file(source, metakeys)
+ self._create_file(source, preserve_metadata)
return self._path
- def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
+ def _create_dir(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
- dst._copy_writer._create_symlink(src, metakeys)
+ dst._copy_writer._create_symlink(src, preserve_metadata)
elif src.is_dir():
- dst._copy_writer._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
+ dst._copy_writer._create_dir(src, follow_symlinks, dirs_exist_ok, preserve_metadata)
else:
- dst._copy_writer._create_file(src, metakeys)
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys)
- if metadata:
- self._write_metadata(metadata)
+ dst._copy_writer._create_file(src, preserve_metadata)
- def _create_file(self, source, metakeys):
+ if preserve_metadata:
+ self._copy_metadata(source)
+
+ def _create_file(self, source, preserve_metadata):
"""Copy the given file to our path."""
- self._ensure_different_file(source)
+ ensure_different_files(source, self._path)
with magic_open(source, 'rb') as source_f:
try:
with magic_open(self._path, 'wb') as target_f:
@@ -283,77 +254,34 @@ class CopyWriter:
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys)
- if metadata:
- self._write_metadata(metadata)
+ if preserve_metadata:
+ self._copy_metadata(source)
- def _create_symlink(self, source, metakeys):
+ def _create_symlink(self, source, preserve_metadata):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
- if metadata:
- self._write_metadata(metadata, follow_symlinks=False)
-
- def _ensure_different_file(self, source):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
- pass
-
- def _ensure_distinct_path(self, source):
- """
- Raise OSError(EINVAL) if the other path is within this path.
- """
- # Note: there is no straightforward, foolproof algorithm to determine
- # if one directory is within another (a particularly perverse example
- # would be a single network share mounted in one location via NFS, and
- # in another location via CIFS), so we simply checks whether the
- # other path is lexically equal to, or within, this path.
- if source == self._path:
- err = OSError(EINVAL, "Source and target are the same path")
- elif source in self._path.parents:
- err = OSError(EINVAL, "Source path is a parent of target path")
- else:
- return
- err.filename = str(source)
- err.filename2 = str(self._path)
- raise err
+ if preserve_metadata:
+ self._copy_metadata(source, follow_symlinks=False)
-class LocalCopyReader(CopyReader):
- """This object implements the "read" part of copying local paths. Don't
- try to construct it yourself.
+def ensure_distinct_paths(source, target):
"""
- __slots__ = ()
-
- _readable_metakeys = {'mode', 'times_ns'}
- if hasattr(os.stat_result, 'st_flags'):
- _readable_metakeys.add('flags')
- if hasattr(os, 'listxattr'):
- _readable_metakeys.add('xattrs')
- _readable_metakeys = frozenset(_readable_metakeys)
-
- def _read_metadata(self, metakeys, *, follow_symlinks=True):
- metadata = {}
- if 'mode' in metakeys or 'times_ns' in metakeys or 'flags' in metakeys:
- st = self._path.stat(follow_symlinks=follow_symlinks)
- if 'mode' in metakeys:
- metadata['mode'] = S_IMODE(st.st_mode)
- if 'times_ns' in metakeys:
- metadata['times_ns'] = st.st_atime_ns, st.st_mtime_ns
- if 'flags' in metakeys:
- metadata['flags'] = st.st_flags
- if 'xattrs' in metakeys:
- try:
- metadata['xattrs'] = [
- (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
- for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
- except OSError as err:
- if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
- raise
- return metadata
+ Raise OSError(EINVAL) if the other path is within this path.
+ """
+ # Note: there is no straightforward, foolproof algorithm to determine
+ # if one directory is within another (a particularly perverse example
+ # would be a single network share mounted in one location via NFS, and
+ # in another location via CIFS), so we simply checks whether the
+ # other path is lexically equal to, or within, this path.
+ if source == target:
+ err = OSError(EINVAL, "Source and target are the same path")
+ elif source in target.parents:
+ err = OSError(EINVAL, "Source path is a parent of target path")
+ else:
+ return
+ err.filename = str(source)
+ err.filename2 = str(target)
+ raise err
class LocalCopyWriter(CopyWriter):
@@ -362,42 +290,42 @@ class LocalCopyWriter(CopyWriter):
"""
__slots__ = ()
- _writable_metakeys = LocalCopyReader._readable_metakeys
+ def _copy_metadata(self, source, follow_symlinks=True):
+ """Copy metadata from the given path to our path."""
+ target = self._path
+ info = source.info
- def _write_metadata(self, metadata, *, follow_symlinks=True):
- def _nop(*args, ns=None, follow_symlinks=None):
- pass
+ copy_times_ns = (
+ hasattr(info, '_access_time_ns') and
+ hasattr(info, '_mod_time_ns') and
+ (follow_symlinks or os.utime in os.supports_follow_symlinks))
+ if copy_times_ns:
+ t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
+ t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
+ os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
- if follow_symlinks:
- # use the real function if it exists
- def lookup(name):
- return getattr(os, name, _nop)
- else:
- # use the real function only if it exists
- # *and* it supports follow_symlinks
- def lookup(name):
- fn = getattr(os, name, _nop)
- if fn in os.supports_follow_symlinks:
- return fn
- return _nop
-
- times_ns = metadata.get('times_ns')
- if times_ns is not None:
- lookup("utime")(self._path, ns=times_ns, follow_symlinks=follow_symlinks)
# We must copy extended attributes before the file is (potentially)
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
- xattrs = metadata.get('xattrs')
- if xattrs is not None:
+ copy_xattrs = (
+ hasattr(info, '_xattrs') and
+ hasattr(os, 'setxattr') and
+ (follow_symlinks or os.setxattr in os.supports_follow_symlinks))
+ if copy_xattrs:
+ xattrs = info._xattrs(follow_symlinks=follow_symlinks)
for attr, value in xattrs:
try:
- os.setxattr(self._path, attr, value, follow_symlinks=follow_symlinks)
+ os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
except OSError as e:
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
raise
- mode = metadata.get('mode')
- if mode is not None:
+
+ copy_posix_permissions = (
+ hasattr(info, '_posix_permissions') and
+ (follow_symlinks or os.chmod in os.supports_follow_symlinks))
+ if copy_posix_permissions:
+ posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
try:
- lookup("chmod")(self._path, mode, follow_symlinks=follow_symlinks)
+ os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
except NotImplementedError:
# if we got a NotImplementedError, it's because
# * follow_symlinks=False,
@@ -410,66 +338,146 @@ class LocalCopyWriter(CopyWriter):
# symlink. give up, suppress the error.
# (which is what shutil always did in this circumstance.)
pass
- flags = metadata.get('flags')
- if flags is not None:
+
+ copy_bsd_flags = (
+ hasattr(info, '_bsd_flags') and
+ hasattr(os, 'chflags') and
+ (follow_symlinks or os.chflags in os.supports_follow_symlinks))
+ if copy_bsd_flags:
+ bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
try:
- lookup("chflags")(self._path, flags, follow_symlinks=follow_symlinks)
+ os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
except OSError as why:
if why.errno not in (EOPNOTSUPP, ENOTSUP):
raise
if copyfile:
# Use fast OS routine for local file copying where available.
- def _create_file(self, source, metakeys):
+ def _create_file(self, source, preserve_metadata):
"""Copy the given file to the given target."""
try:
source = os.fspath(source)
except TypeError:
- super()._create_file(source, metakeys)
+ super()._create_file(source, preserve_metadata)
else:
copyfile(source, os.fspath(self._path))
if os.name == 'nt':
# Windows: symlink target might not exist yet if we're copying several
# files, so ensure we pass is_dir to os.symlink().
- def _create_symlink(self, source, metakeys):
+ def _create_symlink(self, source, preserve_metadata):
"""Copy the given symlink to the given target."""
self._path.symlink_to(source.readlink(), source.is_dir())
- if metakeys:
- metadata = source._copy_reader._read_metadata(metakeys, follow_symlinks=False)
- if metadata:
- self._write_metadata(metadata, follow_symlinks=False)
+ if preserve_metadata:
+ self._copy_metadata(source, follow_symlinks=False)
- def _ensure_different_file(self, source):
- """
- Raise OSError(EINVAL) if both paths refer to the same file.
- """
+
+def ensure_different_files(source, target):
+ """
+ Raise OSError(EINVAL) if both paths refer to the same file.
+ """
+ try:
+ source_file_id = source.info._file_id
+ target_file_id = target.info._file_id
+ except AttributeError:
+ if source != target:
+ return
+ else:
try:
- if not self._path.samefile(source):
+ if source_file_id() != target_file_id():
return
except (OSError, ValueError):
return
- err = OSError(EINVAL, "Source and target are the same file")
- err.filename = str(source)
- err.filename2 = str(self._path)
- raise err
+ err = OSError(EINVAL, "Source and target are the same file")
+ err.filename = str(source)
+ err.filename2 = str(target)
+ raise err
class _PathInfoBase:
- __slots__ = ()
+ __slots__ = ('_path', '_stat_result', '_lstat_result')
+
+ def __init__(self, path):
+ self._path = str(path)
def __repr__(self):
path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
return f"<{path_type}.info>"
+ def _stat(self, *, follow_symlinks=True, ignore_errors=False):
+ """Return the status as an os.stat_result, or None if stat() fails and
+ ignore_errors is true."""
+ if follow_symlinks:
+ try:
+ result = self._stat_result
+ except AttributeError:
+ pass
+ else:
+ if ignore_errors or result is not None:
+ return result
+ try:
+ self._stat_result = os.stat(self._path)
+ except (OSError, ValueError):
+ self._stat_result = None
+ if not ignore_errors:
+ raise
+ return self._stat_result
+ else:
+ try:
+ result = self._lstat_result
+ except AttributeError:
+ pass
+ else:
+ if ignore_errors or result is not None:
+ return result
+ try:
+ self._lstat_result = os.lstat(self._path)
+ except (OSError, ValueError):
+ self._lstat_result = None
+ if not ignore_errors:
+ raise
+ return self._lstat_result
+
+ def _posix_permissions(self, *, follow_symlinks=True):
+ """Return the POSIX file permissions."""
+ return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode)
+
+ def _file_id(self, *, follow_symlinks=True):
+ """Returns the identifier of the file."""
+ st = self._stat(follow_symlinks=follow_symlinks)
+ return st.st_dev, st.st_ino
+
+ def _access_time_ns(self, *, follow_symlinks=True):
+ """Return the access time in nanoseconds."""
+ return self._stat(follow_symlinks=follow_symlinks).st_atime_ns
+
+ def _mod_time_ns(self, *, follow_symlinks=True):
+ """Return the modify time in nanoseconds."""
+ return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns
+
+ if hasattr(os.stat_result, 'st_flags'):
+ def _bsd_flags(self, *, follow_symlinks=True):
+ """Return the flags."""
+ return self._stat(follow_symlinks=follow_symlinks).st_flags
+
+ if hasattr(os, 'listxattr'):
+ def _xattrs(self, *, follow_symlinks=True):
+ """Return the xattrs as a list of (attr, value) pairs, or an empty
+ list if extended attributes aren't supported."""
+ try:
+ return [
+ (attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
+ for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
+ except OSError as err:
+ if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
+ raise
+ return []
+
class _WindowsPathInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information for Windows paths. Don't try to construct it yourself."""
- __slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink')
-
- def __init__(self, path):
- self._path = str(path)
+ __slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink')
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
@@ -525,44 +533,35 @@ class _WindowsPathInfo(_PathInfoBase):
class _PosixPathInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information for POSIX paths. Don't try to construct it yourself."""
- __slots__ = ('_path', '_mode')
-
- def __init__(self, path):
- self._path = str(path)
- self._mode = [None, None]
-
- def _get_mode(self, *, follow_symlinks=True):
- idx = bool(follow_symlinks)
- mode = self._mode[idx]
- if mode is None:
- try:
- st = os.stat(self._path, follow_symlinks=follow_symlinks)
- except (OSError, ValueError):
- mode = 0
- else:
- mode = st.st_mode
- if follow_symlinks or S_ISLNK(mode):
- self._mode[idx] = mode
- else:
- # Not a symlink, so stat() will give the same result
- self._mode = [mode, mode]
- return mode
+ __slots__ = ()
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
- return self._get_mode(follow_symlinks=follow_symlinks) > 0
+ st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
+ if st is None:
+ return False
+ return True
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""
- return S_ISDIR(self._get_mode(follow_symlinks=follow_symlinks))
+ st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
+ if st is None:
+ return False
+ return S_ISDIR(st.st_mode)
def is_file(self, *, follow_symlinks=True):
"""Whether this path is a regular file."""
- return S_ISREG(self._get_mode(follow_symlinks=follow_symlinks))
+ st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
+ if st is None:
+ return False
+ return S_ISREG(st.st_mode)
def is_symlink(self):
"""Whether this path is a symbolic link."""
- return S_ISLNK(self._get_mode(follow_symlinks=False))
+ st = self._stat(follow_symlinks=False, ignore_errors=True)
+ if st is None:
+ return False
+ return S_ISLNK(st.st_mode)
PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo
@@ -572,25 +571,25 @@ class DirEntryInfo(_PathInfoBase):
"""Implementation of pathlib.types.PathInfo that provides status
information by querying a wrapped os.DirEntry object. Don't try to
construct it yourself."""
- __slots__ = ('_entry', '_exists')
+ __slots__ = ('_entry',)
def __init__(self, entry):
+ super().__init__(entry.path)
self._entry = entry
+ def _stat(self, *, follow_symlinks=True, ignore_errors=False):
+ try:
+ return self._entry.stat(follow_symlinks=follow_symlinks)
+ except OSError:
+ if not ignore_errors:
+ raise
+ return None
+
def exists(self, *, follow_symlinks=True):
"""Whether this path exists."""
if not follow_symlinks:
return True
- try:
- return self._exists
- except AttributeError:
- try:
- self._entry.stat()
- except OSError:
- self._exists = False
- else:
- self._exists = True
- return self._exists
+ return self._stat(ignore_errors=True) is not None
def is_dir(self, *, follow_symlinks=True):
"""Whether this path is a directory."""