aboutsummaryrefslogtreecommitdiffstatshomepage
diff options
context:
space:
mode:
-rw-r--r--Lib/pathlib.py446
-rw-r--r--Lib/test/test_pathlib.py400
-rw-r--r--Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst2
3 files changed, 687 insertions, 161 deletions
diff --git a/Lib/pathlib.py b/Lib/pathlib.py
index bd5f61b0b7c..e6be9061013 100644
--- a/Lib/pathlib.py
+++ b/Lib/pathlib.py
@@ -5,6 +5,7 @@ paths with operations that have semantics appropriate for different
operating systems.
"""
+import contextlib
import fnmatch
import functools
import io
@@ -15,10 +16,19 @@ import re
import sys
import warnings
from _collections_abc import Sequence
-from errno import ENOENT, ENOTDIR, EBADF, ELOOP
+from errno import ENOENT, ENOTDIR, EBADF, ELOOP, EINVAL
from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO
from urllib.parse import quote_from_bytes as urlquote_from_bytes
+try:
+ import pwd
+except ImportError:
+ pwd = None
+try:
+ import grp
+except ImportError:
+ grp = None
+
__all__ = [
"UnsupportedOperation",
@@ -30,6 +40,9 @@ __all__ = [
# Internals
#
+# Maximum number of symlinks to follow in _PathBase.resolve()
+_MAX_SYMLINKS = 40
+
# Reference for Windows paths can be found at
# https://learn.microsoft.com/en-gb/windows/win32/fileio/naming-a-file .
_WIN_RESERVED_NAMES = frozenset(
@@ -292,6 +305,11 @@ class PurePath:
# The `_hash` slot stores the hash of the case-normalized string
# path. It's set when `__hash__()` is called for the first time.
'_hash',
+
+ # The '_resolving' slot stores a boolean indicating whether the path
+ # is being processed by `_PathBase.resolve()`. This prevents duplicate
+ # work from occurring when `resolve()` calls `stat()` or `readlink()`.
+ '_resolving',
)
pathmod = os.path
@@ -331,6 +349,7 @@ class PurePath:
f"not {type(path).__name__!r}")
paths.append(path)
self._raw_paths = paths
+ self._resolving = False
def with_segments(self, *pathsegments):
"""Construct a new path object from any number of path-like objects.
@@ -416,7 +435,7 @@ class PurePath:
return "{}({!r})".format(self.__class__.__name__, self.as_posix())
def as_uri(self):
- """Return the path as a 'file' URI."""
+ """Return the path as a URI."""
if not self.is_absolute():
raise ValueError("relative path can't be expressed as a file URI")
@@ -691,7 +710,9 @@ class PurePath:
tail = self._tail
if not tail:
return self
- return self._from_parsed_parts(drv, root, tail[:-1])
+ path = self._from_parsed_parts(drv, root, tail[:-1])
+ path._resolving = self._resolving
+ return path
@property
def parents(self):
@@ -776,23 +797,35 @@ class PureWindowsPath(PurePath):
# Filesystem-accessing classes
-class Path(PurePath):
- """PurePath subclass that can make system calls.
+class _PathBase(PurePath):
+ """Base class for concrete path objects.
- Path represents a filesystem path but unlike PurePath, also offers
- methods to do system calls on path objects. Depending on your system,
- instantiating a Path will return either a PosixPath or a WindowsPath
- object. You can also instantiate a PosixPath or WindowsPath directly,
- but cannot instantiate a WindowsPath on a POSIX system or vice versa.
+ This class provides dummy implementations for many methods that derived
+ classes can override selectively; the default implementations raise
+ UnsupportedOperation. The most basic methods, such as stat() and open(),
+ directly raise UnsupportedOperation; these basic methods are called by
+ other methods such as is_dir() and read_text().
+
+ The Path class derives this class to implement local filesystem paths.
+ Users may derive their own classes to implement virtual filesystem paths,
+ such as paths in archive files or on remote storage systems.
"""
__slots__ = ()
+ __bytes__ = None
+ __fspath__ = None # virtual paths have no local file system representation
+
+ def _unsupported(self, method_name):
+ msg = f"{type(self).__name__}.{method_name}() is unsupported"
+ if isinstance(self, Path):
+ msg += " on this system"
+ raise UnsupportedOperation(msg)
def stat(self, *, follow_symlinks=True):
"""
Return the result of the stat() system call on this path, like
os.stat() does.
"""
- return os.stat(self, follow_symlinks=follow_symlinks)
+ self._unsupported("stat")
def lstat(self):
"""
@@ -859,7 +892,21 @@ class Path(PurePath):
"""
Check if this path is a mount point
"""
- return os.path.ismount(self)
+ # Need to exist and be a dir
+ if not self.exists() or not self.is_dir():
+ return False
+
+ try:
+ parent_dev = self.parent.stat().st_dev
+ except OSError:
+ return False
+
+ dev = self.stat().st_dev
+ if dev != parent_dev:
+ return True
+ ino = self.stat().st_ino
+ parent_ino = self.parent.stat().st_ino
+ return ino == parent_ino
def is_symlink(self):
"""
@@ -880,7 +927,10 @@ class Path(PurePath):
"""
Whether this path is a junction.
"""
- return os.path.isjunction(self)
+ # Junctions are a Windows-only feature, not present in POSIX nor the
+ # majority of virtual filesystems. There is no cross-platform idiom
+ # to check for junctions (using stat().st_mode).
+ return False
def is_block_device(self):
"""
@@ -964,9 +1014,7 @@ class Path(PurePath):
Open the file pointed by this path and return a file object, as
the built-in open() function does.
"""
- if "b" not in mode:
- encoding = io.text_encoding(encoding)
- return io.open(self, mode, buffering, encoding, errors, newline)
+ self._unsupported("open")
def read_bytes(self):
"""
@@ -1009,13 +1057,12 @@ class Path(PurePath):
The children are yielded in arbitrary order, and the
special entries '.' and '..' are not included.
"""
- return (self._make_child_relpath(name) for name in os.listdir(self))
+ self._unsupported("iterdir")
def _scandir(self):
- # bpo-24132: a future version of pathlib will support subclassing of
- # pathlib.Path to customize how the filesystem is accessed. This
- # includes scandir(), which is used to implement glob().
- return os.scandir(self)
+ # Emulate os.scandir(), which returns an object that can be used as a
+ # context manager. This method is called by walk() and glob().
+ return contextlib.nullcontext(self.iterdir())
def _make_child_relpath(self, name):
sep = self.pathmod.sep
@@ -1144,13 +1191,13 @@ class Path(PurePath):
# blow up for a minor reason when (say) a thousand readable
# directories are still left to visit. That logic is copied here.
try:
- scandir_it = path._scandir()
+ scandir_obj = path._scandir()
except OSError as error:
if on_error is not None:
on_error(error)
continue
- with scandir_it:
+ with scandir_obj as scandir_it:
dirnames = []
filenames = []
for entry in scandir_it:
@@ -1172,17 +1219,13 @@ class Path(PurePath):
paths += [path._make_child_relpath(d) for d in reversed(dirnames)]
- def __init__(self, *args, **kwargs):
- if kwargs:
- msg = ("support for supplying keyword arguments to pathlib.PurePath "
- "is deprecated and scheduled for removal in Python {remove}")
- warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14))
- super().__init__(*args)
+ def absolute(self):
+ """Return an absolute version of this path
+ No normalization or symlink resolution is performed.
- def __new__(cls, *args, **kwargs):
- if cls is Path:
- cls = WindowsPath if os.name == 'nt' else PosixPath
- return object.__new__(cls)
+ Use resolve() to resolve symlinks and remove '..' segments.
+ """
+ self._unsupported("absolute")
@classmethod
def cwd(cls):
@@ -1193,18 +1236,264 @@ class Path(PurePath):
# os.path.abspath('.') == os.getcwd().
return cls().absolute()
+ def expanduser(self):
+ """ Return a new path with expanded ~ and ~user constructs
+ (as returned by os.path.expanduser)
+ """
+ self._unsupported("expanduser")
+
@classmethod
def home(cls):
- """Return a new path pointing to the user's home directory (as
- returned by os.path.expanduser('~')).
+ """Return a new path pointing to expanduser('~').
"""
return cls("~").expanduser()
+ def readlink(self):
+ """
+ Return the path to which the symbolic link points.
+ """
+ self._unsupported("readlink")
+ readlink._supported = False
+
+ def _split_stack(self):
+ """
+ Split the path into a 2-tuple (anchor, parts), where *anchor* is the
+ uppermost parent of the path (equivalent to path.parents[-1]), and
+ *parts* is a reversed list of parts following the anchor.
+ """
+ return self._from_parsed_parts(self.drive, self.root, []), self._tail[::-1]
+
+ def resolve(self, strict=False):
+ """
+ Make the path absolute, resolving all symlinks on the way and also
+ normalizing it.
+ """
+ if self._resolving:
+ return self
+ try:
+ path = self.absolute()
+ except UnsupportedOperation:
+ path = self
+
+ # If the user has *not* overridden the `readlink()` method, then symlinks are unsupported
+ # and (in non-strict mode) we can improve performance by not calling `stat()`.
+ querying = strict or getattr(self.readlink, '_supported', True)
+ link_count = 0
+ stat_cache = {}
+ target_cache = {}
+ path, parts = path._split_stack()
+ while parts:
+ part = parts.pop()
+ if part == '..':
+ if not path._tail:
+ if path.root:
+ # Delete '..' segment immediately following root
+ continue
+ elif path._tail[-1] != '..':
+ # Delete '..' segment and its predecessor
+ path = path.parent
+ continue
+ # Join the current part onto the path.
+ path_parent = path
+ path = path._make_child_relpath(part)
+ if querying and part != '..':
+ path._resolving = True
+ try:
+ st = stat_cache.get(path)
+ if st is None:
+ st = stat_cache[path] = path.stat(follow_symlinks=False)
+ if S_ISLNK(st.st_mode):
+ # Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are
+ # encountered during resolution.
+ link_count += 1
+ if link_count >= _MAX_SYMLINKS:
+ raise OSError(ELOOP, "Too many symbolic links in path", str(path))
+ target = target_cache.get(path)
+ if target is None:
+ target = target_cache[path] = path.readlink()
+ target, target_parts = target._split_stack()
+ # If the symlink target is absolute (like '/etc/hosts'), set the current
+ # path to its uppermost parent (like '/'). If not, the symlink target is
+ # relative to the symlink parent, which we recorded earlier.
+ path = target if target.root else path_parent
+ # Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to
+ # the stack of unresolved path parts.
+ parts.extend(target_parts)
+ elif parts and not S_ISDIR(st.st_mode):
+ raise NotADirectoryError(ENOTDIR, "Not a directory", str(path))
+ except OSError:
+ if strict:
+ raise
+ else:
+ querying = False
+ path._resolving = False
+ return path
+
+ def symlink_to(self, target, target_is_directory=False):
+ """
+ Make this path a symlink pointing to the target path.
+ Note the order of arguments (link, target) is the reverse of os.symlink.
+ """
+ self._unsupported("symlink_to")
+
+ def hardlink_to(self, target):
+ """
+ Make this path a hard link pointing to the same file as *target*.
+
+ Note the order of arguments (self, target) is the reverse of os.link's.
+ """
+ self._unsupported("hardlink_to")
+
+ def touch(self, mode=0o666, exist_ok=True):
+ """
+ Create this file with the given access mode, if it doesn't exist.
+ """
+ self._unsupported("touch")
+
+ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
+ """
+ Create a new directory at this given path.
+ """
+ self._unsupported("mkdir")
+
+ def rename(self, target):
+ """
+ Rename this path to the target path.
+
+ The target path may be absolute or relative. Relative paths are
+ interpreted relative to the current working directory, *not* the
+ directory of the Path object.
+
+ Returns the new Path instance pointing to the target path.
+ """
+ self._unsupported("rename")
+
+ def replace(self, target):
+ """
+ Rename this path to the target path, overwriting if that path exists.
+
+ The target path may be absolute or relative. Relative paths are
+ interpreted relative to the current working directory, *not* the
+ directory of the Path object.
+
+ Returns the new Path instance pointing to the target path.
+ """
+ self._unsupported("replace")
+
+ def chmod(self, mode, *, follow_symlinks=True):
+ """
+ Change the permissions of the path, like os.chmod().
+ """
+ self._unsupported("chmod")
+
+ def lchmod(self, mode):
+ """
+ Like chmod(), except if the path points to a symlink, the symlink's
+ permissions are changed, rather than its target's.
+ """
+ self.chmod(mode, follow_symlinks=False)
+
+ def unlink(self, missing_ok=False):
+ """
+ Remove this file or link.
+ If the path is a directory, use rmdir() instead.
+ """
+ self._unsupported("unlink")
+
+ def rmdir(self):
+ """
+ Remove this directory. The directory must be empty.
+ """
+ self._unsupported("rmdir")
+
+ def owner(self):
+ """
+ Return the login name of the file owner.
+ """
+ self._unsupported("owner")
+
+ def group(self):
+ """
+ Return the group name of the file gid.
+ """
+ self._unsupported("group")
+
+ def as_uri(self):
+ """Return the path as a URI."""
+ self._unsupported("as_uri")
+
+
+class Path(_PathBase):
+ """PurePath subclass that can make system calls.
+
+ Path represents a filesystem path but unlike PurePath, also offers
+ methods to do system calls on path objects. Depending on your system,
+ instantiating a Path will return either a PosixPath or a WindowsPath
+ object. You can also instantiate a PosixPath or WindowsPath directly,
+ but cannot instantiate a WindowsPath on a POSIX system or vice versa.
+ """
+ __slots__ = ()
+ __bytes__ = PurePath.__bytes__
+ __fspath__ = PurePath.__fspath__
+ as_uri = PurePath.as_uri
+
+ def __init__(self, *args, **kwargs):
+ if kwargs:
+ msg = ("support for supplying keyword arguments to pathlib.PurePath "
+ "is deprecated and scheduled for removal in Python {remove}")
+ warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14))
+ super().__init__(*args)
+
+ def __new__(cls, *args, **kwargs):
+ if cls is Path:
+ cls = WindowsPath if os.name == 'nt' else PosixPath
+ return object.__new__(cls)
+
+ def stat(self, *, follow_symlinks=True):
+ """
+ Return the result of the stat() system call on this path, like
+ os.stat() does.
+ """
+ return os.stat(self, follow_symlinks=follow_symlinks)
+
+ def is_mount(self):
+ """
+ Check if this path is a mount point
+ """
+ return os.path.ismount(self)
+
+ def is_junction(self):
+ """
+ Whether this path is a junction.
+ """
+ return os.path.isjunction(self)
+
+ def open(self, mode='r', buffering=-1, encoding=None,
+ errors=None, newline=None):
+ """
+ Open the file pointed by this path and return a file object, as
+ the built-in open() function does.
+ """
+ if "b" not in mode:
+ encoding = io.text_encoding(encoding)
+ return io.open(self, mode, buffering, encoding, errors, newline)
+
+ def iterdir(self):
+ """Yield path objects of the directory contents.
+
+ The children are yielded in arbitrary order, and the
+ special entries '.' and '..' are not included.
+ """
+ return (self._make_child_relpath(name) for name in os.listdir(self))
+
+ def _scandir(self):
+ return os.scandir(self)
+
def absolute(self):
- """Return an absolute version of this path by prepending the current
- working directory. No normalization or symlink resolution is performed.
+ """Return an absolute version of this path
+ No normalization or symlink resolution is performed.
- Use resolve() to get the canonical path to a file.
+ Use resolve() to resolve symlinks and remove '..' segments.
"""
if self.is_absolute():
return self
@@ -1232,34 +1521,26 @@ class Path(PurePath):
return self.with_segments(os.path.realpath(self, strict=strict))
- def owner(self):
- """
- Return the login name of the file owner.
- """
- try:
- import pwd
+ if pwd:
+ def owner(self):
+ """
+ Return the login name of the file owner.
+ """
return pwd.getpwuid(self.stat().st_uid).pw_name
- except ImportError:
- raise UnsupportedOperation("Path.owner() is unsupported on this system")
-
- def group(self):
- """
- Return the group name of the file gid.
- """
- try:
- import grp
+ if grp:
+ def group(self):
+ """
+ Return the group name of the file gid.
+ """
return grp.getgrgid(self.stat().st_gid).gr_name
- except ImportError:
- raise UnsupportedOperation("Path.group() is unsupported on this system")
- def readlink(self):
- """
- Return the path to which the symbolic link points.
- """
- if not hasattr(os, "readlink"):
- raise UnsupportedOperation("os.readlink() not available on this system")
- return self.with_segments(os.readlink(self))
+ if hasattr(os, "readlink"):
+ def readlink(self):
+ """
+ Return the path to which the symbolic link points.
+ """
+ return self.with_segments(os.readlink(self))
def touch(self, mode=0o666, exist_ok=True):
"""
@@ -1306,13 +1587,6 @@ class Path(PurePath):
"""
os.chmod(self, mode, follow_symlinks=follow_symlinks)
- def lchmod(self, mode):
- """
- Like chmod(), except if the path points to a symlink, the symlink's
- permissions are changed, rather than its target's.
- """
- self.chmod(mode, follow_symlinks=False)
-
def unlink(self, missing_ok=False):
"""
Remove this file or link.
@@ -1356,24 +1630,22 @@ class Path(PurePath):
os.replace(self, target)
return self.with_segments(target)
- def symlink_to(self, target, target_is_directory=False):
- """
- Make this path a symlink pointing to the target path.
- Note the order of arguments (link, target) is the reverse of os.symlink.
- """
- if not hasattr(os, "symlink"):
- raise UnsupportedOperation("os.symlink() not available on this system")
- os.symlink(target, self, target_is_directory)
-
- def hardlink_to(self, target):
- """
- Make this path a hard link pointing to the same file as *target*.
-
- Note the order of arguments (self, target) is the reverse of os.link's.
- """
- if not hasattr(os, "link"):
- raise UnsupportedOperation("os.link() not available on this system")
- os.link(target, self)
+ if hasattr(os, "symlink"):
+ def symlink_to(self, target, target_is_directory=False):
+ """
+ Make this path a symlink pointing to the target path.
+ Note the order of arguments (link, target) is the reverse of os.symlink.
+ """
+ os.symlink(target, self, target_is_directory)
+
+ if hasattr(os, "link"):
+ def hardlink_to(self, target):
+ """
+ Make this path a hard link pointing to the same file as *target*.
+
+ Note the order of arguments (self, target) is the reverse of os.link's.
+ """
+ os.link(target, self)
def expanduser(self):
""" Return a new path with expanded ~ and ~user constructs
diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py
index 484a5e6c3bd..319148e9065 100644
--- a/Lib/test/test_pathlib.py
+++ b/Lib/test/test_pathlib.py
@@ -1582,14 +1582,172 @@ class WindowsPathAsPureTest(PureWindowsPathTest):
#
-# Tests for the concrete classes.
+# Tests for the virtual classes.
#
-class PathTest(unittest.TestCase):
- """Tests for the FS-accessing functionalities of the Path classes."""
+class PathBaseTest(PurePathTest):
+ cls = pathlib._PathBase
- cls = pathlib.Path
- can_symlink = os_helper.can_symlink()
+ def test_unsupported_operation(self):
+ P = self.cls
+ p = self.cls()
+ e = pathlib.UnsupportedOperation
+ self.assertRaises(e, p.stat)
+ self.assertRaises(e, p.lstat)
+ self.assertRaises(e, p.exists)
+ self.assertRaises(e, p.samefile, 'foo')
+ self.assertRaises(e, p.is_dir)
+ self.assertRaises(e, p.is_file)
+ self.assertRaises(e, p.is_mount)
+ self.assertRaises(e, p.is_symlink)
+ self.assertRaises(e, p.is_block_device)
+ self.assertRaises(e, p.is_char_device)
+ self.assertRaises(e, p.is_fifo)
+ self.assertRaises(e, p.is_socket)
+ self.assertRaises(e, p.open)
+ self.assertRaises(e, p.read_bytes)
+ self.assertRaises(e, p.read_text)
+ self.assertRaises(e, p.write_bytes, b'foo')
+ self.assertRaises(e, p.write_text, 'foo')
+ self.assertRaises(e, p.iterdir)
+ self.assertRaises(e, p.glob, '*')
+ self.assertRaises(e, p.rglob, '*')
+ self.assertRaises(e, lambda: list(p.walk()))
+ self.assertRaises(e, p.absolute)
+ self.assertRaises(e, P.cwd)
+ self.assertRaises(e, p.expanduser)
+ self.assertRaises(e, p.home)
+ self.assertRaises(e, p.readlink)
+ self.assertRaises(e, p.symlink_to, 'foo')
+ self.assertRaises(e, p.hardlink_to, 'foo')
+ self.assertRaises(e, p.mkdir)
+ self.assertRaises(e, p.touch)
+ self.assertRaises(e, p.rename, 'foo')
+ self.assertRaises(e, p.replace, 'foo')
+ self.assertRaises(e, p.chmod, 0o755)
+ self.assertRaises(e, p.lchmod, 0o755)
+ self.assertRaises(e, p.unlink)
+ self.assertRaises(e, p.rmdir)
+ self.assertRaises(e, p.owner)
+ self.assertRaises(e, p.group)
+ self.assertRaises(e, p.as_uri)
+
+ def test_as_uri_common(self):
+ e = pathlib.UnsupportedOperation
+ self.assertRaises(e, self.cls().as_uri)
+
+ def test_fspath_common(self):
+ self.assertRaises(TypeError, os.fspath, self.cls())
+
+ def test_as_bytes_common(self):
+ self.assertRaises(TypeError, bytes, self.cls())
+
+ def test_matches_path_api(self):
+ our_names = {name for name in dir(self.cls) if name[0] != '_'}
+ path_names = {name for name in dir(pathlib.Path) if name[0] != '_'}
+ self.assertEqual(our_names, path_names)
+ for attr_name in our_names:
+ our_attr = getattr(self.cls, attr_name)
+ path_attr = getattr(pathlib.Path, attr_name)
+ self.assertEqual(our_attr.__doc__, path_attr.__doc__)
+
+
+class DummyPathIO(io.BytesIO):
+ """
+ Used by DummyPath to implement `open('w')`
+ """
+
+ def __init__(self, files, path):
+ super().__init__()
+ self.files = files
+ self.path = path
+
+ def close(self):
+ self.files[self.path] = self.getvalue()
+ super().close()
+
+
+class DummyPath(pathlib._PathBase):
+ """
+ Simple implementation of PathBase that keeps files and directories in
+ memory.
+ """
+ _files = {}
+ _directories = {}
+ _symlinks = {}
+
+ def stat(self, *, follow_symlinks=True):
+ if follow_symlinks:
+ path = str(self.resolve())
+ else:
+ path = str(self.parent.resolve() / self.name)
+ if path in self._files:
+ st_mode = stat.S_IFREG
+ elif path in self._directories:
+ st_mode = stat.S_IFDIR
+ elif path in self._symlinks:
+ st_mode = stat.S_IFLNK
+ else:
+ raise FileNotFoundError(errno.ENOENT, "Not found", str(self))
+ return os.stat_result((st_mode, hash(str(self)), 0, 0, 0, 0, 0, 0, 0, 0))
+
+ def open(self, mode='r', buffering=-1, encoding=None,
+ errors=None, newline=None):
+ if buffering != -1:
+ raise NotImplementedError
+ path_obj = self.resolve()
+ path = str(path_obj)
+ name = path_obj.name
+ parent = str(path_obj.parent)
+ if path in self._directories:
+ raise IsADirectoryError(errno.EISDIR, "Is a directory", path)
+
+ text = 'b' not in mode
+ mode = ''.join(c for c in mode if c not in 'btU')
+ if mode == 'r':
+ if path not in self._files:
+ raise FileNotFoundError(errno.ENOENT, "File not found", path)
+ stream = io.BytesIO(self._files[path])
+ elif mode == 'w':
+ if parent not in self._directories:
+ raise FileNotFoundError(errno.ENOENT, "File not found", parent)
+ stream = DummyPathIO(self._files, path)
+ self._files[path] = b''
+ self._directories[parent].add(name)
+ else:
+ raise NotImplementedError
+ if text:
+ stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline)
+ return stream
+
+ def iterdir(self):
+ path = str(self.resolve())
+ if path in self._files:
+ raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path)
+ elif path in self._directories:
+ return (self / name for name in self._directories[path])
+ else:
+ raise FileNotFoundError(errno.ENOENT, "File not found", path)
+
+ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
+ try:
+ self._directories[str(self.parent)].add(self.name)
+ self._directories[str(self)] = set()
+ except KeyError:
+ if not parents or self.parent == self:
+ raise FileNotFoundError(errno.ENOENT, "File not found", str(self.parent)) from None
+ self.parent.mkdir(parents=True, exist_ok=True)
+ self.mkdir(mode, parents=False, exist_ok=exist_ok)
+ except FileExistsError:
+ if not exist_ok:
+ raise
+
+
+class DummyPathTest(unittest.TestCase):
+ """Tests for PathBase methods that use stat(), open() and iterdir()."""
+
+ cls = DummyPath
+ can_symlink = False
# (BASE)
# |
@@ -1612,37 +1770,38 @@ class PathTest(unittest.TestCase):
#
def setUp(self):
- def cleanup():
- os.chmod(join('dirE'), 0o777)
- os_helper.rmtree(BASE)
- self.addCleanup(cleanup)
- os.mkdir(BASE)
- os.mkdir(join('dirA'))
- os.mkdir(join('dirB'))
- os.mkdir(join('dirC'))
- os.mkdir(join('dirC', 'dirD'))
- os.mkdir(join('dirE'))
- with open(join('fileA'), 'wb') as f:
- f.write(b"this is file A\n")
- with open(join('dirB', 'fileB'), 'wb') as f:
- f.write(b"this is file B\n")
- with open(join('dirC', 'fileC'), 'wb') as f:
- f.write(b"this is file C\n")
- with open(join('dirC', 'novel.txt'), 'wb') as f:
- f.write(b"this is a novel\n")
- with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
- f.write(b"this is file D\n")
- os.chmod(join('dirE'), 0)
- if self.can_symlink:
- # Relative symlinks.
- os.symlink('fileA', join('linkA'))
- os.symlink('non-existing', join('brokenLink'))
- os.symlink('dirB', join('linkB'), target_is_directory=True)
- os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
- # This one goes upwards, creating a loop.
- os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
- # Broken symlink (pointing to itself).
- os.symlink('brokenLinkLoop', join('brokenLinkLoop'))
+ # note: this must be kept in sync with `PathTest.setUp()`
+ cls = self.cls
+ cls._files.clear()
+ cls._directories.clear()
+ cls._symlinks.clear()
+ join = cls.pathmod.join
+ cls._files.update({
+ join(BASE, 'fileA'): b'this is file A\n',
+ join(BASE, 'dirB', 'fileB'): b'this is file B\n',
+ join(BASE, 'dirC', 'fileC'): b'this is file C\n',
+ join(BASE, 'dirC', 'dirD', 'fileD'): b'this is file D\n',
+ join(BASE, 'dirC', 'novel.txt'): b'this is a novel\n',
+ })
+ cls._directories.update({
+ BASE: {'dirA', 'dirB', 'dirC', 'dirE', 'fileA'},
+ join(BASE, 'dirA'): set(),
+ join(BASE, 'dirB'): {'fileB'},
+ join(BASE, 'dirC'): {'dirD', 'fileC', 'novel.txt'},
+ join(BASE, 'dirC', 'dirD'): {'fileD'},
+ join(BASE, 'dirE'): {},
+ })
+ dirname = BASE
+ while True:
+ dirname, basename = cls.pathmod.split(dirname)
+ if not basename:
+ break
+ cls._directories[dirname] = {basename}
+
+ def tempdir(self):
+ path = self.cls(BASE).with_name('tmp-dirD')
+ path.mkdir()
+ return path
def assertFileNotFound(self, func, *args, **kwargs):
with self.assertRaises(FileNotFoundError) as cm:
@@ -1991,9 +2150,11 @@ class PathTest(unittest.TestCase):
def test_glob_many_open_files(self):
depth = 30
P = self.cls
- base = P(BASE) / 'deep'
- p = P(base, *(['d']*depth))
- p.mkdir(parents=True)
+ p = base = P(BASE) / 'deep'
+ p.mkdir()
+ for _ in range(depth):
+ p /= 'd'
+ p.mkdir()
pattern = '/'.join(['*'] * depth)
iters = [base.glob(pattern) for j in range(100)]
for it in iters:
@@ -2080,6 +2241,7 @@ class PathTest(unittest.TestCase):
self.assertEqual((P / 'brokenLink').readlink(),
self.cls('non-existing'))
self.assertEqual((P / 'linkB').readlink(), self.cls('dirB'))
+ self.assertEqual((P / 'linkB' / 'linkD').readlink(), self.cls('../dirB'))
with self.assertRaises(OSError):
(P / 'fileA').readlink()
@@ -2128,7 +2290,7 @@ class PathTest(unittest.TestCase):
self._check_resolve_relative(p, P(BASE, 'dirB', 'fileB', 'foo', 'in',
'spam'), False)
p = P(BASE, 'dirA', 'linkC', '..', 'foo', 'in', 'spam')
- if os.name == 'nt':
+ if os.name == 'nt' and isinstance(p, pathlib.Path):
# In Windows, if linkY points to dirB, 'dirA\linkY\..'
# resolves to 'dirA' without resolving linkY first.
self._check_resolve_relative(p, P(BASE, 'dirA', 'foo', 'in',
@@ -2138,9 +2300,7 @@ class PathTest(unittest.TestCase):
# resolves to 'dirB/..' first before resolving to parent of dirB.
self._check_resolve_relative(p, P(BASE, 'foo', 'in', 'spam'), False)
# Now create absolute symlinks.
- d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
- dir=os.getcwd()))
- self.addCleanup(os_helper.rmtree, d)
+ d = self.tempdir()
P(BASE, 'dirA', 'linkX').symlink_to(d)
P(BASE, str(d), 'linkY').symlink_to(join('dirB'))
p = P(BASE, 'dirA', 'linkX', 'linkY', 'fileB')
@@ -2150,7 +2310,7 @@ class PathTest(unittest.TestCase):
self._check_resolve_relative(p, P(BASE, 'dirB', 'foo', 'in', 'spam'),
False)
p = P(BASE, 'dirA', 'linkX', 'linkY', '..', 'foo', 'in', 'spam')
- if os.name == 'nt':
+ if os.name == 'nt' and isinstance(p, pathlib.Path):
# In Windows, if linkY points to dirB, 'dirA\linkY\..'
# resolves to 'dirA' without resolving linkY first.
self._check_resolve_relative(p, P(d, 'foo', 'in', 'spam'), False)
@@ -2174,6 +2334,38 @@ class PathTest(unittest.TestCase):
# Non-strict
self.assertEqual(r.resolve(strict=False), p / '3' / '4')
+ def _check_symlink_loop(self, *args):
+ path = self.cls(*args)
+ with self.assertRaises(OSError) as cm:
+ path.resolve(strict=True)
+ self.assertEqual(cm.exception.errno, errno.ELOOP)
+
+ def test_resolve_loop(self):
+ if not self.can_symlink:
+ self.skipTest("symlinks required")
+ if os.name == 'nt' and issubclass(self.cls, pathlib.Path):
+ self.skipTest("symlink loops work differently with concrete Windows paths")
+ # Loops with relative symlinks.
+ self.cls(BASE, 'linkX').symlink_to('linkX/inside')
+ self._check_symlink_loop(BASE, 'linkX')
+ self.cls(BASE, 'linkY').symlink_to('linkY')
+ self._check_symlink_loop(BASE, 'linkY')
+ self.cls(BASE, 'linkZ').symlink_to('linkZ/../linkZ')
+ self._check_symlink_loop(BASE, 'linkZ')
+ # Non-strict
+ p = self.cls(BASE, 'linkZ', 'foo')
+ self.assertEqual(p.resolve(strict=False), p)
+ # Loops with absolute symlinks.
+ self.cls(BASE, 'linkU').symlink_to(join('linkU/inside'))
+ self._check_symlink_loop(BASE, 'linkU')
+ self.cls(BASE, 'linkV').symlink_to(join('linkV'))
+ self._check_symlink_loop(BASE, 'linkV')
+ self.cls(BASE, 'linkW').symlink_to(join('linkW/../linkW'))
+ self._check_symlink_loop(BASE, 'linkW')
+ # Non-strict
+ q = self.cls(BASE, 'linkW', 'foo')
+ self.assertEqual(q.resolve(strict=False), q)
+
def test_stat(self):
statA = self.cls(BASE).joinpath('fileA').stat()
statB = self.cls(BASE).joinpath('dirB', 'fileB').stat()
@@ -2382,6 +2574,10 @@ class PathTest(unittest.TestCase):
self.assertEqualNormCase(str(p), BASE)
# Resolve relative paths.
+ try:
+ self.cls().absolute()
+ except pathlib.UnsupportedOperation:
+ return
old_path = os.getcwd()
os.chdir(BASE)
try:
@@ -2409,6 +2605,92 @@ class PathTest(unittest.TestCase):
def test_complex_symlinks_relative_dot_dot(self):
self._check_complex_symlinks(os.path.join('dirA', '..'))
+
+class DummyPathWithSymlinks(DummyPath):
+ def readlink(self):
+ path = str(self.parent.resolve() / self.name)
+ if path in self._symlinks:
+ return self.with_segments(self._symlinks[path])
+ elif path in self._files or path in self._directories:
+ raise OSError(errno.EINVAL, "Not a symlink", path)
+ else:
+ raise FileNotFoundError(errno.ENOENT, "File not found", path)
+
+ def symlink_to(self, target, target_is_directory=False):
+ self._directories[str(self.parent)].add(self.name)
+ self._symlinks[str(self)] = str(target)
+
+
+class DummyPathWithSymlinksTest(DummyPathTest):
+ cls = DummyPathWithSymlinks
+ can_symlink = True
+
+ def setUp(self):
+ super().setUp()
+ cls = self.cls
+ join = cls.pathmod.join
+ cls._symlinks.update({
+ join(BASE, 'linkA'): 'fileA',
+ join(BASE, 'linkB'): 'dirB',
+ join(BASE, 'dirA', 'linkC'): join('..', 'dirB'),
+ join(BASE, 'dirB', 'linkD'): join('..', 'dirB'),
+ join(BASE, 'brokenLink'): 'non-existing',
+ join(BASE, 'brokenLinkLoop'): 'brokenLinkLoop',
+ })
+ cls._directories[BASE].update({'linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'})
+ cls._directories[join(BASE, 'dirA')].add('linkC')
+ cls._directories[join(BASE, 'dirB')].add('linkD')
+
+
+#
+# Tests for the concrete classes.
+#
+
+class PathTest(DummyPathTest):
+ """Tests for the FS-accessing functionalities of the Path classes."""
+ cls = pathlib.Path
+ can_symlink = os_helper.can_symlink()
+
+ def setUp(self):
+ # note: this must be kept in sync with `DummyPathTest.setUp()`
+ def cleanup():
+ os.chmod(join('dirE'), 0o777)
+ os_helper.rmtree(BASE)
+ self.addCleanup(cleanup)
+ os.mkdir(BASE)
+ os.mkdir(join('dirA'))
+ os.mkdir(join('dirB'))
+ os.mkdir(join('dirC'))
+ os.mkdir(join('dirC', 'dirD'))
+ os.mkdir(join('dirE'))
+ with open(join('fileA'), 'wb') as f:
+ f.write(b"this is file A\n")
+ with open(join('dirB', 'fileB'), 'wb') as f:
+ f.write(b"this is file B\n")
+ with open(join('dirC', 'fileC'), 'wb') as f:
+ f.write(b"this is file C\n")
+ with open(join('dirC', 'novel.txt'), 'wb') as f:
+ f.write(b"this is a novel\n")
+ with open(join('dirC', 'dirD', 'fileD'), 'wb') as f:
+ f.write(b"this is file D\n")
+ os.chmod(join('dirE'), 0)
+ if self.can_symlink:
+ # Relative symlinks.
+ os.symlink('fileA', join('linkA'))
+ os.symlink('non-existing', join('brokenLink'))
+ os.symlink('dirB', join('linkB'), target_is_directory=True)
+ os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True)
+ # This one goes upwards, creating a loop.
+ os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True)
+ # Broken symlink (pointing to itself).
+ os.symlink('brokenLinkLoop', join('brokenLinkLoop'))
+
+ def tempdir(self):
+ d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD',
+ dir=os.getcwd()))
+ self.addCleanup(os_helper.rmtree, d)
+ return d
+
def test_concrete_class(self):
if self.cls is pathlib.Path:
expected = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath
@@ -3178,12 +3460,6 @@ class PosixPathTest(PathTest):
self.assertEqual(str(P('//a').absolute()), '//a')
self.assertEqual(str(P('//a/b').absolute()), '//a/b')
- def _check_symlink_loop(self, *args):
- path = self.cls(*args)
- with self.assertRaises(OSError) as cm:
- path.resolve(strict=True)
- self.assertEqual(cm.exception.errno, errno.ELOOP)
-
@unittest.skipIf(
is_emscripten or is_wasi,
"umask is not implemented on Emscripten/WASI."
@@ -3230,30 +3506,6 @@ class PosixPathTest(PathTest):
st = os.stat(join('masked_new_file'))
self.assertEqual(stat.S_IMODE(st.st_mode), 0o750)
- def test_resolve_loop(self):
- if not self.can_symlink:
- self.skipTest("symlinks required")
- # Loops with relative symlinks.
- os.symlink('linkX/inside', join('linkX'))
- self._check_symlink_loop(BASE, 'linkX')
- os.symlink('linkY', join('linkY'))
- self._check_symlink_loop(BASE, 'linkY')
- os.symlink('linkZ/../linkZ', join('linkZ'))
- self._check_symlink_loop(BASE, 'linkZ')
- # Non-strict
- p = self.cls(BASE, 'linkZ', 'foo')
- self.assertEqual(p.resolve(strict=False), p)
- # Loops with absolute symlinks.
- os.symlink(join('linkU/inside'), join('linkU'))
- self._check_symlink_loop(BASE, 'linkU')
- os.symlink(join('linkV'), join('linkV'))
- self._check_symlink_loop(BASE, 'linkV')
- os.symlink(join('linkW/../linkW'), join('linkW'))
- self._check_symlink_loop(BASE, 'linkW')
- # Non-strict
- q = self.cls(BASE, 'linkW', 'foo')
- self.assertEqual(q.resolve(strict=False), q)
-
def test_glob(self):
P = self.cls
p = P(BASE)
diff --git a/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst b/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst
new file mode 100644
index 00000000000..a4221fc4ca9
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst
@@ -0,0 +1,2 @@
+Add private ``pathlib._PathBase`` class, which provides experimental support
+for virtual filesystems, and may be made public in a future version of Python.