diff options
Diffstat (limited to 'Lib/pathlib.py')
-rw-r--r-- | Lib/pathlib.py | 446 |
1 files changed, 359 insertions, 87 deletions
diff --git a/Lib/pathlib.py b/Lib/pathlib.py index bd5f61b0b7c..e6be9061013 100644 --- a/Lib/pathlib.py +++ b/Lib/pathlib.py @@ -5,6 +5,7 @@ paths with operations that have semantics appropriate for different operating systems. """ +import contextlib import fnmatch import functools import io @@ -15,10 +16,19 @@ import re import sys import warnings from _collections_abc import Sequence -from errno import ENOENT, ENOTDIR, EBADF, ELOOP +from errno import ENOENT, ENOTDIR, EBADF, ELOOP, EINVAL from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from urllib.parse import quote_from_bytes as urlquote_from_bytes +try: + import pwd +except ImportError: + pwd = None +try: + import grp +except ImportError: + grp = None + __all__ = [ "UnsupportedOperation", @@ -30,6 +40,9 @@ __all__ = [ # Internals # +# Maximum number of symlinks to follow in _PathBase.resolve() +_MAX_SYMLINKS = 40 + # Reference for Windows paths can be found at # https://learn.microsoft.com/en-gb/windows/win32/fileio/naming-a-file . _WIN_RESERVED_NAMES = frozenset( @@ -292,6 +305,11 @@ class PurePath: # The `_hash` slot stores the hash of the case-normalized string # path. It's set when `__hash__()` is called for the first time. '_hash', + + # The '_resolving' slot stores a boolean indicating whether the path + # is being processed by `_PathBase.resolve()`. This prevents duplicate + # work from occurring when `resolve()` calls `stat()` or `readlink()`. + '_resolving', ) pathmod = os.path @@ -331,6 +349,7 @@ class PurePath: f"not {type(path).__name__!r}") paths.append(path) self._raw_paths = paths + self._resolving = False def with_segments(self, *pathsegments): """Construct a new path object from any number of path-like objects. @@ -416,7 +435,7 @@ class PurePath: return "{}({!r})".format(self.__class__.__name__, self.as_posix()) def as_uri(self): - """Return the path as a 'file' URI.""" + """Return the path as a URI.""" if not self.is_absolute(): raise ValueError("relative path can't be expressed as a file URI") @@ -691,7 +710,9 @@ class PurePath: tail = self._tail if not tail: return self - return self._from_parsed_parts(drv, root, tail[:-1]) + path = self._from_parsed_parts(drv, root, tail[:-1]) + path._resolving = self._resolving + return path @property def parents(self): @@ -776,23 +797,35 @@ class PureWindowsPath(PurePath): # Filesystem-accessing classes -class Path(PurePath): - """PurePath subclass that can make system calls. +class _PathBase(PurePath): + """Base class for concrete path objects. - Path represents a filesystem path but unlike PurePath, also offers - methods to do system calls on path objects. Depending on your system, - instantiating a Path will return either a PosixPath or a WindowsPath - object. You can also instantiate a PosixPath or WindowsPath directly, - but cannot instantiate a WindowsPath on a POSIX system or vice versa. + This class provides dummy implementations for many methods that derived + classes can override selectively; the default implementations raise + UnsupportedOperation. The most basic methods, such as stat() and open(), + directly raise UnsupportedOperation; these basic methods are called by + other methods such as is_dir() and read_text(). + + The Path class derives this class to implement local filesystem paths. + Users may derive their own classes to implement virtual filesystem paths, + such as paths in archive files or on remote storage systems. """ __slots__ = () + __bytes__ = None + __fspath__ = None # virtual paths have no local file system representation + + def _unsupported(self, method_name): + msg = f"{type(self).__name__}.{method_name}() is unsupported" + if isinstance(self, Path): + msg += " on this system" + raise UnsupportedOperation(msg) def stat(self, *, follow_symlinks=True): """ Return the result of the stat() system call on this path, like os.stat() does. """ - return os.stat(self, follow_symlinks=follow_symlinks) + self._unsupported("stat") def lstat(self): """ @@ -859,7 +892,21 @@ class Path(PurePath): """ Check if this path is a mount point """ - return os.path.ismount(self) + # Need to exist and be a dir + if not self.exists() or not self.is_dir(): + return False + + try: + parent_dev = self.parent.stat().st_dev + except OSError: + return False + + dev = self.stat().st_dev + if dev != parent_dev: + return True + ino = self.stat().st_ino + parent_ino = self.parent.stat().st_ino + return ino == parent_ino def is_symlink(self): """ @@ -880,7 +927,10 @@ class Path(PurePath): """ Whether this path is a junction. """ - return os.path.isjunction(self) + # Junctions are a Windows-only feature, not present in POSIX nor the + # majority of virtual filesystems. There is no cross-platform idiom + # to check for junctions (using stat().st_mode). + return False def is_block_device(self): """ @@ -964,9 +1014,7 @@ class Path(PurePath): Open the file pointed by this path and return a file object, as the built-in open() function does. """ - if "b" not in mode: - encoding = io.text_encoding(encoding) - return io.open(self, mode, buffering, encoding, errors, newline) + self._unsupported("open") def read_bytes(self): """ @@ -1009,13 +1057,12 @@ class Path(PurePath): The children are yielded in arbitrary order, and the special entries '.' and '..' are not included. """ - return (self._make_child_relpath(name) for name in os.listdir(self)) + self._unsupported("iterdir") def _scandir(self): - # bpo-24132: a future version of pathlib will support subclassing of - # pathlib.Path to customize how the filesystem is accessed. This - # includes scandir(), which is used to implement glob(). - return os.scandir(self) + # Emulate os.scandir(), which returns an object that can be used as a + # context manager. This method is called by walk() and glob(). + return contextlib.nullcontext(self.iterdir()) def _make_child_relpath(self, name): sep = self.pathmod.sep @@ -1144,13 +1191,13 @@ class Path(PurePath): # blow up for a minor reason when (say) a thousand readable # directories are still left to visit. That logic is copied here. try: - scandir_it = path._scandir() + scandir_obj = path._scandir() except OSError as error: if on_error is not None: on_error(error) continue - with scandir_it: + with scandir_obj as scandir_it: dirnames = [] filenames = [] for entry in scandir_it: @@ -1172,17 +1219,13 @@ class Path(PurePath): paths += [path._make_child_relpath(d) for d in reversed(dirnames)] - def __init__(self, *args, **kwargs): - if kwargs: - msg = ("support for supplying keyword arguments to pathlib.PurePath " - "is deprecated and scheduled for removal in Python {remove}") - warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14)) - super().__init__(*args) + def absolute(self): + """Return an absolute version of this path + No normalization or symlink resolution is performed. - def __new__(cls, *args, **kwargs): - if cls is Path: - cls = WindowsPath if os.name == 'nt' else PosixPath - return object.__new__(cls) + Use resolve() to resolve symlinks and remove '..' segments. + """ + self._unsupported("absolute") @classmethod def cwd(cls): @@ -1193,18 +1236,264 @@ class Path(PurePath): # os.path.abspath('.') == os.getcwd(). return cls().absolute() + def expanduser(self): + """ Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + self._unsupported("expanduser") + @classmethod def home(cls): - """Return a new path pointing to the user's home directory (as - returned by os.path.expanduser('~')). + """Return a new path pointing to expanduser('~'). """ return cls("~").expanduser() + def readlink(self): + """ + Return the path to which the symbolic link points. + """ + self._unsupported("readlink") + readlink._supported = False + + def _split_stack(self): + """ + Split the path into a 2-tuple (anchor, parts), where *anchor* is the + uppermost parent of the path (equivalent to path.parents[-1]), and + *parts* is a reversed list of parts following the anchor. + """ + return self._from_parsed_parts(self.drive, self.root, []), self._tail[::-1] + + def resolve(self, strict=False): + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it. + """ + if self._resolving: + return self + try: + path = self.absolute() + except UnsupportedOperation: + path = self + + # If the user has *not* overridden the `readlink()` method, then symlinks are unsupported + # and (in non-strict mode) we can improve performance by not calling `stat()`. + querying = strict or getattr(self.readlink, '_supported', True) + link_count = 0 + stat_cache = {} + target_cache = {} + path, parts = path._split_stack() + while parts: + part = parts.pop() + if part == '..': + if not path._tail: + if path.root: + # Delete '..' segment immediately following root + continue + elif path._tail[-1] != '..': + # Delete '..' segment and its predecessor + path = path.parent + continue + # Join the current part onto the path. + path_parent = path + path = path._make_child_relpath(part) + if querying and part != '..': + path._resolving = True + try: + st = stat_cache.get(path) + if st is None: + st = stat_cache[path] = path.stat(follow_symlinks=False) + if S_ISLNK(st.st_mode): + # Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are + # encountered during resolution. + link_count += 1 + if link_count >= _MAX_SYMLINKS: + raise OSError(ELOOP, "Too many symbolic links in path", str(path)) + target = target_cache.get(path) + if target is None: + target = target_cache[path] = path.readlink() + target, target_parts = target._split_stack() + # If the symlink target is absolute (like '/etc/hosts'), set the current + # path to its uppermost parent (like '/'). If not, the symlink target is + # relative to the symlink parent, which we recorded earlier. + path = target if target.root else path_parent + # Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to + # the stack of unresolved path parts. + parts.extend(target_parts) + elif parts and not S_ISDIR(st.st_mode): + raise NotADirectoryError(ENOTDIR, "Not a directory", str(path)) + except OSError: + if strict: + raise + else: + querying = False + path._resolving = False + return path + + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + self._unsupported("symlink_to") + + def hardlink_to(self, target): + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + self._unsupported("hardlink_to") + + def touch(self, mode=0o666, exist_ok=True): + """ + Create this file with the given access mode, if it doesn't exist. + """ + self._unsupported("touch") + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a new directory at this given path. + """ + self._unsupported("mkdir") + + def rename(self, target): + """ + Rename this path to the target path. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + self._unsupported("rename") + + def replace(self, target): + """ + Rename this path to the target path, overwriting if that path exists. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + self._unsupported("replace") + + def chmod(self, mode, *, follow_symlinks=True): + """ + Change the permissions of the path, like os.chmod(). + """ + self._unsupported("chmod") + + def lchmod(self, mode): + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + self.chmod(mode, follow_symlinks=False) + + def unlink(self, missing_ok=False): + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + self._unsupported("unlink") + + def rmdir(self): + """ + Remove this directory. The directory must be empty. + """ + self._unsupported("rmdir") + + def owner(self): + """ + Return the login name of the file owner. + """ + self._unsupported("owner") + + def group(self): + """ + Return the group name of the file gid. + """ + self._unsupported("group") + + def as_uri(self): + """Return the path as a URI.""" + self._unsupported("as_uri") + + +class Path(_PathBase): + """PurePath subclass that can make system calls. + + Path represents a filesystem path but unlike PurePath, also offers + methods to do system calls on path objects. Depending on your system, + instantiating a Path will return either a PosixPath or a WindowsPath + object. You can also instantiate a PosixPath or WindowsPath directly, + but cannot instantiate a WindowsPath on a POSIX system or vice versa. + """ + __slots__ = () + __bytes__ = PurePath.__bytes__ + __fspath__ = PurePath.__fspath__ + as_uri = PurePath.as_uri + + def __init__(self, *args, **kwargs): + if kwargs: + msg = ("support for supplying keyword arguments to pathlib.PurePath " + "is deprecated and scheduled for removal in Python {remove}") + warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14)) + super().__init__(*args) + + def __new__(cls, *args, **kwargs): + if cls is Path: + cls = WindowsPath if os.name == 'nt' else PosixPath + return object.__new__(cls) + + def stat(self, *, follow_symlinks=True): + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + return os.stat(self, follow_symlinks=follow_symlinks) + + def is_mount(self): + """ + Check if this path is a mount point + """ + return os.path.ismount(self) + + def is_junction(self): + """ + Whether this path is a junction. + """ + return os.path.isjunction(self) + + def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + """ + Open the file pointed by this path and return a file object, as + the built-in open() function does. + """ + if "b" not in mode: + encoding = io.text_encoding(encoding) + return io.open(self, mode, buffering, encoding, errors, newline) + + def iterdir(self): + """Yield path objects of the directory contents. + + The children are yielded in arbitrary order, and the + special entries '.' and '..' are not included. + """ + return (self._make_child_relpath(name) for name in os.listdir(self)) + + def _scandir(self): + return os.scandir(self) + def absolute(self): - """Return an absolute version of this path by prepending the current - working directory. No normalization or symlink resolution is performed. + """Return an absolute version of this path + No normalization or symlink resolution is performed. - Use resolve() to get the canonical path to a file. + Use resolve() to resolve symlinks and remove '..' segments. """ if self.is_absolute(): return self @@ -1232,34 +1521,26 @@ class Path(PurePath): return self.with_segments(os.path.realpath(self, strict=strict)) - def owner(self): - """ - Return the login name of the file owner. - """ - try: - import pwd + if pwd: + def owner(self): + """ + Return the login name of the file owner. + """ return pwd.getpwuid(self.stat().st_uid).pw_name - except ImportError: - raise UnsupportedOperation("Path.owner() is unsupported on this system") - - def group(self): - """ - Return the group name of the file gid. - """ - try: - import grp + if grp: + def group(self): + """ + Return the group name of the file gid. + """ return grp.getgrgid(self.stat().st_gid).gr_name - except ImportError: - raise UnsupportedOperation("Path.group() is unsupported on this system") - def readlink(self): - """ - Return the path to which the symbolic link points. - """ - if not hasattr(os, "readlink"): - raise UnsupportedOperation("os.readlink() not available on this system") - return self.with_segments(os.readlink(self)) + if hasattr(os, "readlink"): + def readlink(self): + """ + Return the path to which the symbolic link points. + """ + return self.with_segments(os.readlink(self)) def touch(self, mode=0o666, exist_ok=True): """ @@ -1306,13 +1587,6 @@ class Path(PurePath): """ os.chmod(self, mode, follow_symlinks=follow_symlinks) - def lchmod(self, mode): - """ - Like chmod(), except if the path points to a symlink, the symlink's - permissions are changed, rather than its target's. - """ - self.chmod(mode, follow_symlinks=False) - def unlink(self, missing_ok=False): """ Remove this file or link. @@ -1356,24 +1630,22 @@ class Path(PurePath): os.replace(self, target) return self.with_segments(target) - def symlink_to(self, target, target_is_directory=False): - """ - Make this path a symlink pointing to the target path. - Note the order of arguments (link, target) is the reverse of os.symlink. - """ - if not hasattr(os, "symlink"): - raise UnsupportedOperation("os.symlink() not available on this system") - os.symlink(target, self, target_is_directory) - - def hardlink_to(self, target): - """ - Make this path a hard link pointing to the same file as *target*. - - Note the order of arguments (self, target) is the reverse of os.link's. - """ - if not hasattr(os, "link"): - raise UnsupportedOperation("os.link() not available on this system") - os.link(target, self) + if hasattr(os, "symlink"): + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + os.symlink(target, self, target_is_directory) + + if hasattr(os, "link"): + def hardlink_to(self, target): + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + os.link(target, self) def expanduser(self): """ Return a new path with expanded ~ and ~user constructs |