diff options
Diffstat (limited to 'Lib/importlib')
45 files changed, 1487 insertions, 4455 deletions
diff --git a/Lib/importlib/__init__.py b/Lib/importlib/__init__.py index 2baaf937329..22c90f24a7e 100644 --- a/Lib/importlib/__init__.py +++ b/Lib/importlib/__init__.py @@ -1,108 +1,74 @@ -"""A pure Python implementation of import. - -References on import: - - * Language reference - http://docs.python.org/ref/import.html - * __import__ function - http://docs.python.org/lib/built-in-funcs.html - * Packages - http://www.python.org/doc/essays/packages.html - * PEP 235: Import on Case-Insensitive Platforms - http://www.python.org/dev/peps/pep-0235 - * PEP 275: Import Modules from Zip Archives - http://www.python.org/dev/peps/pep-0273 - * PEP 302: New Import Hooks - http://www.python.org/dev/peps/pep-0302/ - * PEP 328: Imports: Multi-line and Absolute/Relative - http://www.python.org/dev/peps/pep-0328 - -""" -__all__ = ['__import__', 'import_module'] - -from . import _bootstrap - -import os -import re -import tokenize +"""A pure Python implementation of import.""" +__all__ = ['__import__', 'import_module', 'invalidate_caches'] # Bootstrap help ##################################################### -def _case_ok(directory, check): - """Check if the directory contains something matching 'check'. +# Until bootstrapping is complete, DO NOT import any modules that attempt +# to import importlib._bootstrap (directly or indirectly). Since this +# partially initialised package would be present in sys.modules, those +# modules would get an uninitialised copy of the source version, instead +# of a fully initialised version (either the frozen one or the one +# initialised below if the frozen one is not available). +import _imp # Just the builtin component, NOT the full Python module +import sys - No check is done if the file/directory exists or not. +try: + import _frozen_importlib as _bootstrap +except ImportError: + from . import _bootstrap + _bootstrap._setup(sys, _imp) +else: + # importlib._bootstrap is the built-in import, ensure we don't create + # a second copy of the module. + _bootstrap.__name__ = 'importlib._bootstrap' + _bootstrap.__package__ = 'importlib' + _bootstrap.__file__ = __file__.replace('__init__.py', '_bootstrap.py') + sys.modules['importlib._bootstrap'] = _bootstrap + +# To simplify imports in test code +_w_long = _bootstrap._w_long +_r_long = _bootstrap._r_long + +# Fully bootstrapped at this point, import whatever you like, circular +# dependencies and startup overhead minimisation permitting :) - """ - if 'PYTHONCASEOK' in os.environ: - return True - elif check in os.listdir(directory if directory else os.getcwd()): - return True - return False +# Public API ######################################################### +from ._bootstrap import __import__ -def _w_long(x): - """Convert a 32-bit integer to little-endian. - XXX Temporary until marshal's long functions are exposed. +def invalidate_caches(): + """Call the invalidate_caches() method on all meta path finders stored in + sys.meta_path (where implemented).""" + for finder in sys.meta_path: + if hasattr(finder, 'invalidate_caches'): + finder.invalidate_caches() - """ - x = int(x) - int_bytes = [] - int_bytes.append(x & 0xFF) - int_bytes.append((x >> 8) & 0xFF) - int_bytes.append((x >> 16) & 0xFF) - int_bytes.append((x >> 24) & 0xFF) - return bytearray(int_bytes) +def find_loader(name, path=None): + """Find the loader for the specified module. -def _r_long(int_bytes): - """Convert 4 bytes in little-endian to an integer. + First, sys.modules is checked to see if the module was already imported. If + so, then sys.modules[name].__loader__ is returned. If that happens to be + set to None, then ValueError is raised. If the module is not in + sys.modules, then sys.meta_path is searched for a suitable loader with the + value of 'path' given to the finders. None is returned if no loader could + be found. - XXX Temporary until marshal's long function are exposed. + Dotted names do not have their parent packages implicitly imported. You will + most likely need to explicitly import all parent packages in the proper + order for a submodule to get the correct loader. """ - x = int_bytes[0] - x |= int_bytes[1] << 8 - x |= int_bytes[2] << 16 - x |= int_bytes[3] << 24 - return x - - -# Required built-in modules. -try: - import posix as _os -except ImportError: try: - import nt as _os - except ImportError: - try: - import os2 as _os - except ImportError: - raise ImportError('posix, nt, or os2 module required for importlib') -_bootstrap._os = _os -import imp, sys, marshal, errno, _io -_bootstrap.imp = imp -_bootstrap.sys = sys -_bootstrap.marshal = marshal -_bootstrap.errno = errno -_bootstrap._io = _io -import _warnings -_bootstrap._warnings = _warnings - - -from os import sep -# For os.path.join replacement; pull from Include/osdefs.h:SEP . -_bootstrap.path_sep = sep - -_bootstrap._case_ok = _case_ok -marshal._w_long = _w_long -marshal._r_long = _r_long - - -# Public API ######################################################### - -from ._bootstrap import __import__ + loader = sys.modules[name].__loader__ + if loader is None: + raise ValueError('{}.__loader__ is None'.format(name)) + else: + return loader + except KeyError: + pass + return _bootstrap._find_module(name, path) def import_module(name, package=None): diff --git a/Lib/importlib/_bootstrap.py b/Lib/importlib/_bootstrap.py index aa4032c0ff5..15fe1564f32 100644 --- a/Lib/importlib/_bootstrap.py +++ b/Lib/importlib/_bootstrap.py @@ -6,33 +6,93 @@ such it requires the injection of specific modules and attributes in order to work. One should use importlib as the public-facing version of this module. """ - -# Injected modules are '_warnings', 'imp', 'sys', 'marshal', 'errno', '_io', -# and '_os' (a.k.a. 'posix', 'nt' or 'os2'). -# Injected attribute is path_sep. # +# IMPORTANT: Whenever making changes to this module, be sure to run +# a top-level make in order to get the frozen version of the module +# update. Not doing so, will result in the Makefile to fail for +# all others who don't have a ./python around to freeze the module +# in the early stages of compilation. +# + +# See importlib._setup() for what is injected into the global namespace. + # When editing this code be aware that code executed at import time CANNOT # reference any injected objects! This includes not only global code but also # anything specified at the class level. +# XXX Make sure all public names have no single leading underscore and all +# others do. + # Bootstrap-related code ###################################################### -# XXX Could also expose Modules/getpath.c:joinpath() -def _path_join(*args): - """Replacement for os.path.join.""" - return path_sep.join(x[:-len(path_sep)] if x.endswith(path_sep) else x - for x in args if x) +_CASE_INSENSITIVE_PLATFORMS = 'win', 'cygwin', 'darwin' -def _path_exists(path): - """Replacement for os.path.exists.""" - try: - _os.stat(path) - except OSError: - return False +def _make_relax_case(): + if sys.platform.startswith(_CASE_INSENSITIVE_PLATFORMS): + def _relax_case(): + """True if filenames must be checked case-insensitively.""" + return b'PYTHONCASEOK' in _os.environ else: - return True + def _relax_case(): + """True if filenames must be checked case-insensitively.""" + return False + return _relax_case + + +# TODO: Expose from marshal +def _w_long(x): + """Convert a 32-bit integer to little-endian. + + XXX Temporary until marshal's long functions are exposed. + + """ + x = int(x) + int_bytes = [] + int_bytes.append(x & 0xFF) + int_bytes.append((x >> 8) & 0xFF) + int_bytes.append((x >> 16) & 0xFF) + int_bytes.append((x >> 24) & 0xFF) + return bytearray(int_bytes) + + +# TODO: Expose from marshal +def _r_long(int_bytes): + """Convert 4 bytes in little-endian to an integer. + + XXX Temporary until marshal's long function are exposed. + + """ + x = int_bytes[0] + x |= int_bytes[1] << 8 + x |= int_bytes[2] << 16 + x |= int_bytes[3] << 24 + return x + + +def _path_join(*path_parts): + """Replacement for os.path.join().""" + new_parts = [] + for part in path_parts: + if not part: + continue + new_parts.append(part) + if part[-1] not in path_separators: + new_parts.append(path_sep) + return ''.join(new_parts[:-1]) # Drop superfluous path separator. + + +def _path_split(path): + """Replacement for os.path.split().""" + for x in reversed(path): + if x in path_separators: + sep = x + break + else: + sep = path_sep + front, _, tail = path.rpartition(sep) + return front, tail def _path_is_mode_type(path, mode): @@ -58,61 +118,401 @@ def _path_isdir(path): return _path_is_mode_type(path, 0o040000) -def _path_without_ext(path, ext_type): - """Replacement for os.path.splitext()[0].""" - for suffix in _suffix_list(ext_type): - if path.endswith(suffix): - return path[:-len(suffix)] - else: - raise ValueError("path is not of the specified type") +def _write_atomic(path, data, mode=0o666): + """Best-effort function to write data to a path atomically. + Be prepared to handle a FileExistsError if concurrent writing of the + temporary file is attempted.""" + # id() is used to generate a pseudo-random filename. + path_tmp = '{}.{}'.format(path, id(path)) + fd = _os.open(path_tmp, + _os.O_EXCL | _os.O_CREAT | _os.O_WRONLY, mode & 0o666) + try: + # We first write data to a temporary file, and then use os.replace() to + # perform an atomic rename. + with _io.FileIO(fd, 'wb') as file: + file.write(data) + _os.replace(path_tmp, path) + except OSError: + try: + _os.unlink(path_tmp) + except OSError: + pass + raise -def _path_absolute(path): - """Replacement for os.path.abspath.""" - if not path: - path = _os.getcwd() +def _wrap(new, old): + """Simple substitute for functools.update_wrapper.""" + for replace in ['__module__', '__name__', '__qualname__', '__doc__']: + if hasattr(old, replace): + setattr(new, replace, getattr(old, replace)) + new.__dict__.update(old.__dict__) + + +_code_type = type(_wrap.__code__) + + +def new_module(name): + """Create a new module. + + The module is not entered into sys.modules. + + """ + return type(_io)(name) + + +# Module-level locking ######################################################## + +# A dict mapping module names to weakrefs of _ModuleLock instances +_module_locks = {} +# A dict mapping thread ids to _ModuleLock instances +_blocking_on = {} + + +class _DeadlockError(RuntimeError): + pass + + +class _ModuleLock: + """A recursive lock implementation which is able to detect deadlocks + (e.g. thread 1 trying to take locks A then B, and thread 2 trying to + take locks B then A). + """ + + def __init__(self, name): + self.lock = _thread.allocate_lock() + self.wakeup = _thread.allocate_lock() + self.name = name + self.owner = None + self.count = 0 + self.waiters = 0 + + def has_deadlock(self): + # Deadlock avoidance for concurrent circular imports. + me = _thread.get_ident() + tid = self.owner + while True: + lock = _blocking_on.get(tid) + if lock is None: + return False + tid = lock.owner + if tid == me: + return True + + def acquire(self): + """ + Acquire the module lock. If a potential deadlock is detected, + a _DeadlockError is raised. + Otherwise, the lock is always acquired and True is returned. + """ + tid = _thread.get_ident() + _blocking_on[tid] = self + try: + while True: + with self.lock: + if self.count == 0 or self.owner == tid: + self.owner = tid + self.count += 1 + return True + if self.has_deadlock(): + raise _DeadlockError("deadlock detected by %r" % self) + if self.wakeup.acquire(False): + self.waiters += 1 + # Wait for a release() call + self.wakeup.acquire() + self.wakeup.release() + finally: + del _blocking_on[tid] + + def release(self): + tid = _thread.get_ident() + with self.lock: + if self.owner != tid: + raise RuntimeError("cannot release un-acquired lock") + assert self.count > 0 + self.count -= 1 + if self.count == 0: + self.owner = None + if self.waiters: + self.waiters -= 1 + self.wakeup.release() + + def __repr__(self): + return "_ModuleLock(%r) at %d" % (self.name, id(self)) + + +class _DummyModuleLock: + """A simple _ModuleLock equivalent for Python builds without + multi-threading support.""" + + def __init__(self, name): + self.name = name + self.count = 0 + + def acquire(self): + self.count += 1 + return True + + def release(self): + if self.count == 0: + raise RuntimeError("cannot release un-acquired lock") + self.count -= 1 + + def __repr__(self): + return "_DummyModuleLock(%r) at %d" % (self.name, id(self)) + + +# The following two functions are for consumption by Python/import.c. + +def _get_module_lock(name): + """Get or create the module lock for a given module name. + + Should only be called with the import lock taken.""" + lock = None try: - return _os._getfullpathname(path) - except AttributeError: - if path.startswith('/'): - return path + lock = _module_locks[name]() + except KeyError: + pass + if lock is None: + if _thread is None: + lock = _DummyModuleLock(name) else: - return _path_join(_os.getcwd(), path) + lock = _ModuleLock(name) + def cb(_): + del _module_locks[name] + _module_locks[name] = _weakref.ref(lock, cb) + return lock + +def _lock_unlock_module(name): + """Release the global import lock, and acquires then release the + module lock for a given module name. + This is used to ensure a module is completely initialized, in the + event it is being imported by another thread. + + Should only be called with the import lock taken.""" + lock = _get_module_lock(name) + _imp.release_lock() + try: + lock.acquire() + except _DeadlockError: + # Concurrent circular import, we'll accept a partially initialized + # module object. + pass + else: + lock.release() +# Frame stripping magic ############################################### -def _wrap(new, old): - """Simple substitute for functools.wraps.""" - for replace in ['__module__', '__name__', '__doc__']: - setattr(new, replace, getattr(old, replace)) - new.__dict__.update(old.__dict__) +def _call_with_frames_removed(f, *args, **kwds): + """remove_importlib_frames in import.c will always remove sequences + of importlib frames that end with a call to this function + Use it instead of a normal call in places where including the importlib + frames introduces unwanted noise into the traceback (e.g. when executing + module code) + """ + return f(*args, **kwds) + + +# Finder/loader utility code ############################################### + +"""Magic word to reject .pyc files generated by other Python versions. +It should change for each incompatible change to the bytecode. + +The value of CR and LF is incorporated so if you ever read or write +a .pyc file in text mode the magic number will be wrong; also, the +Apple MPW compiler swaps their values, botching string constants. + +The magic numbers must be spaced apart at least 2 values, as the +-U interpeter flag will cause MAGIC+1 being used. They have been +odd numbers for some time now. + +There were a variety of old schemes for setting the magic number. +The current working scheme is to increment the previous value by +10. + +Starting with the adoption of PEP 3147 in Python 3.2, every bump in magic +number also includes a new "magic tag", i.e. a human readable string used +to represent the magic number in __pycache__ directories. When you change +the magic number, you must also set a new unique magic tag. Generally this +can be named after the Python major version of the magic number bump, but +it can really be anything, as long as it's different than anything else +that's come before. The tags are included in the following table, starting +with Python 3.2a0. + +Known values: + Python 1.5: 20121 + Python 1.5.1: 20121 + Python 1.5.2: 20121 + Python 1.6: 50428 + Python 2.0: 50823 + Python 2.0.1: 50823 + Python 2.1: 60202 + Python 2.1.1: 60202 + Python 2.1.2: 60202 + Python 2.2: 60717 + Python 2.3a0: 62011 + Python 2.3a0: 62021 + Python 2.3a0: 62011 (!) + Python 2.4a0: 62041 + Python 2.4a3: 62051 + Python 2.4b1: 62061 + Python 2.5a0: 62071 + Python 2.5a0: 62081 (ast-branch) + Python 2.5a0: 62091 (with) + Python 2.5a0: 62092 (changed WITH_CLEANUP opcode) + Python 2.5b3: 62101 (fix wrong code: for x, in ...) + Python 2.5b3: 62111 (fix wrong code: x += yield) + Python 2.5c1: 62121 (fix wrong lnotab with for loops and + storing constants that should have been removed) + Python 2.5c2: 62131 (fix wrong code: for x, in ... in listcomp/genexp) + Python 2.6a0: 62151 (peephole optimizations and STORE_MAP opcode) + Python 2.6a1: 62161 (WITH_CLEANUP optimization) + Python 3000: 3000 + 3010 (removed UNARY_CONVERT) + 3020 (added BUILD_SET) + 3030 (added keyword-only parameters) + 3040 (added signature annotations) + 3050 (print becomes a function) + 3060 (PEP 3115 metaclass syntax) + 3061 (string literals become unicode) + 3071 (PEP 3109 raise changes) + 3081 (PEP 3137 make __file__ and __name__ unicode) + 3091 (kill str8 interning) + 3101 (merge from 2.6a0, see 62151) + 3103 (__file__ points to source file) + Python 3.0a4: 3111 (WITH_CLEANUP optimization). + Python 3.0a5: 3131 (lexical exception stacking, including POP_EXCEPT) + Python 3.1a0: 3141 (optimize list, set and dict comprehensions: + change LIST_APPEND and SET_ADD, add MAP_ADD) + Python 3.1a0: 3151 (optimize conditional branches: + introduce POP_JUMP_IF_FALSE and POP_JUMP_IF_TRUE) + Python 3.2a0: 3160 (add SETUP_WITH) + tag: cpython-32 + Python 3.2a1: 3170 (add DUP_TOP_TWO, remove DUP_TOPX and ROT_FOUR) + tag: cpython-32 + Python 3.2a2 3180 (add DELETE_DEREF) + Python 3.3a0 3190 __class__ super closure changed + Python 3.3a0 3200 (__qualname__ added) + 3210 (added size modulo 2**32 to the pyc header) + Python 3.3a1 3220 (changed PEP 380 implementation) + Python 3.3a4 3230 (revert changes to implicit __class__ closure) + +MAGIC must change whenever the bytecode emitted by the compiler may no +longer be understood by older implementations of the eval loop (usually +due to the addition of new opcodes). + +""" +_RAW_MAGIC_NUMBER = 3230 | ord('\r') << 16 | ord('\n') << 24 +_MAGIC_BYTES = bytes(_RAW_MAGIC_NUMBER >> n & 0xff for n in range(0, 25, 8)) + +_PYCACHE = '__pycache__' + +SOURCE_SUFFIXES = ['.py'] # _setup() adds .pyw as needed. + +DEBUG_BYTECODE_SUFFIXES = ['.pyc'] +OPTIMIZED_BYTECODE_SUFFIXES = ['.pyo'] + +def cache_from_source(path, debug_override=None): + """Given the path to a .py file, return the path to its .pyc/.pyo file. + + The .py file does not need to exist; this simply returns the path to the + .pyc/.pyo file calculated as if the .py file were imported. The extension + will be .pyc unless sys.flags.optimize is non-zero, then it will be .pyo. + + If debug_override is not None, then it must be a boolean and is used in + place of sys.flags.optimize. + + If sys.implementation.cache_tag is None then NotImplementedError is raised. + + """ + debug = not sys.flags.optimize if debug_override is None else debug_override + if debug: + suffixes = DEBUG_BYTECODE_SUFFIXES + else: + suffixes = OPTIMIZED_BYTECODE_SUFFIXES + head, tail = _path_split(path) + base_filename, sep, _ = tail.partition('.') + tag = sys.implementation.cache_tag + if tag is None: + raise NotImplementedError('sys.implementation.cache_tag is None') + filename = ''.join([base_filename, sep, tag, suffixes[0]]) + return _path_join(head, _PYCACHE, filename) + + +def source_from_cache(path): + """Given the path to a .pyc./.pyo file, return the path to its .py file. + + The .pyc/.pyo file does not need to exist; this simply returns the path to + the .py file calculated to correspond to the .pyc/.pyo file. If path does + not conform to PEP 3147 format, ValueError will be raised. If + sys.implementation.cache_tag is None then NotImplementedError is raised. + + """ + if sys.implementation.cache_tag is None: + raise NotImplementedError('sys.implementation.cache_tag is None') + head, pycache_filename = _path_split(path) + head, pycache = _path_split(head) + if pycache != _PYCACHE: + raise ValueError('{} not bottom-level directory in ' + '{!r}'.format(_PYCACHE, path)) + if pycache_filename.count('.') != 2: + raise ValueError('expected only 2 dots in ' + '{!r}'.format(pycache_filename)) + base_filename = pycache_filename.partition('.')[0] + return _path_join(head, base_filename + SOURCE_SUFFIXES[0]) + + +def _get_sourcefile(bytecode_path): + """Convert a bytecode file path to a source path (if possible). + + This function exists purely for backwards-compatibility for + PyImport_ExecCodeModuleWithFilenames() in the C API. + + """ + if len(bytecode_path) == 0: + return None + rest, _, extension = bytecode_path.rparition('.') + if not rest or extension.lower()[-3:-1] != '.py': + return bytecode_path + + try: + source_path = source_from_cache(bytecode_path) + except (NotImplementedError, ValueError): + source_path = bytcode_path[-1:] -code_type = type(_wrap.__code__) + return source_path if _path_isfile(source_stats) else bytecode_path + + +def _verbose_message(message, *args): + """Print the message to stderr if -v/PYTHONVERBOSE is turned on.""" + if sys.flags.verbose: + if not message.startswith(('#', 'import ')): + message = '# ' + message + print(message.format(*args), file=sys.stderr) -# Finder/loader utility code ################################################## def set_package(fxn): """Set __package__ on the returned module.""" - def wrapper(*args, **kwargs): + def set_package_wrapper(*args, **kwargs): module = fxn(*args, **kwargs) - if not hasattr(module, '__package__') or module.__package__ is None: + if getattr(module, '__package__', None) is None: module.__package__ = module.__name__ if not hasattr(module, '__path__'): module.__package__ = module.__package__.rpartition('.')[0] return module - _wrap(wrapper, fxn) - return wrapper + _wrap(set_package_wrapper, fxn) + return set_package_wrapper def set_loader(fxn): """Set __loader__ on the returned module.""" - def wrapper(self, *args, **kwargs): + def set_loader_wrapper(self, *args, **kwargs): module = fxn(self, *args, **kwargs) if not hasattr(module, '__loader__'): module.__loader__ = self return module - _wrap(wrapper, fxn) - return wrapper + _wrap(set_loader_wrapper, fxn) + return set_loader_wrapper def module_for_loader(fxn): @@ -120,31 +520,54 @@ def module_for_loader(fxn): The decorated function is passed the module to use instead of the module name. The module passed in to the function is either from sys.modules if - it already exists or is a new module which has __name__ set and is inserted - into sys.modules. If an exception is raised and the decorator created the - module it is subsequently removed from sys.modules. + it already exists or is a new module. If the module is new, then __name__ + is set the first argument to the method, __loader__ is set to self, and + __package__ is set accordingly (if self.is_package() is defined) will be set + before it is passed to the decorated function (if self.is_package() does + not work for the module it will be set post-load). + + If an exception is raised and the decorator created the module it is + subsequently removed from sys.modules. The decorator assumes that the decorated function takes the module name as the second argument. """ - def decorated(self, fullname, *args, **kwargs): + def module_for_loader_wrapper(self, fullname, *args, **kwargs): module = sys.modules.get(fullname) - is_reload = bool(module) + is_reload = module is not None if not is_reload: # This must be done before open() is called as the 'io' module # implicitly imports 'locale' and would otherwise trigger an # infinite loop. - module = imp.new_module(fullname) + module = new_module(fullname) + # This must be done before putting the module in sys.modules + # (otherwise an optimization shortcut in import.c becomes wrong) + module.__initializing__ = True sys.modules[fullname] = module + module.__loader__ = self + try: + is_package = self.is_package(fullname) + except (ImportError, AttributeError): + pass + else: + if is_package: + module.__package__ = fullname + else: + module.__package__ = fullname.rpartition('.')[0] + else: + module.__initializing__ = True try: + # If __package__ was not set above, __import__() will do it later. return fxn(self, module, *args, **kwargs) except: if not is_reload: del sys.modules[fullname] raise - _wrap(decorated, fxn) - return decorated + finally: + module.__initializing__ = False + _wrap(module_for_loader_wrapper, fxn) + return module_for_loader_wrapper def _check_name(method): @@ -155,38 +578,51 @@ def _check_name(method): compared against. If the comparison fails then ImportError is raised. """ - def inner(self, name, *args, **kwargs): - if self._name != name: - raise ImportError("loader cannot handle %s" % name) + def _check_name_wrapper(self, name=None, *args, **kwargs): + if name is None: + name = self.name + elif self.name != name: + raise ImportError("loader cannot handle %s" % name, name=name) return method(self, name, *args, **kwargs) - _wrap(inner, method) - return inner + _wrap(_check_name_wrapper, method) + return _check_name_wrapper def _requires_builtin(fxn): """Decorator to verify the named module is built-in.""" - def wrapper(self, fullname): + def _requires_builtin_wrapper(self, fullname): if fullname not in sys.builtin_module_names: - raise ImportError("{0} is not a built-in module".format(fullname)) + raise ImportError("{} is not a built-in module".format(fullname), + name=fullname) return fxn(self, fullname) - _wrap(wrapper, fxn) - return wrapper + _wrap(_requires_builtin_wrapper, fxn) + return _requires_builtin_wrapper def _requires_frozen(fxn): """Decorator to verify the named module is frozen.""" - def wrapper(self, fullname): - if not imp.is_frozen(fullname): - raise ImportError("{0} is not a frozen module".format(fullname)) + def _requires_frozen_wrapper(self, fullname): + if not _imp.is_frozen(fullname): + raise ImportError("{} is not a frozen module".format(fullname), + name=fullname) return fxn(self, fullname) - _wrap(wrapper, fxn) - return wrapper + _wrap(_requires_frozen_wrapper, fxn) + return _requires_frozen_wrapper + + +def _find_module_shim(self, fullname): + """Try to find a loader for the specified module by delegating to + self.find_loader().""" + # Call find_loader(). If it returns a string (indicating this + # is a namespace package portion), generate a warning and + # return None. + loader, portions = self.find_loader(fullname) + if loader is None and len(portions): + msg = "Not importing directory {}: missing __init__" + _warnings.warn(msg.format(portions[0]), ImportWarning) + return loader -def _suffix_list(suffix_type): - """Return a list of file suffixes based on the imp file type.""" - return [suffix[0] for suffix in imp.get_suffixes() - if suffix[2] == suffix_type] # Loaders ##################################################################### @@ -201,6 +637,10 @@ class BuiltinImporter: """ @classmethod + def module_repr(cls, module): + return "<module '{}' (built-in)>".format(module.__name__) + + @classmethod def find_module(cls, fullname, path=None): """Find the built-in module. @@ -209,7 +649,7 @@ class BuiltinImporter: """ if path is not None: return None - return cls if imp.is_builtin(fullname) else None + return cls if _imp.is_builtin(fullname) else None @classmethod @set_package @@ -219,7 +659,7 @@ class BuiltinImporter: """Load a built-in module.""" is_reload = fullname in sys.modules try: - return imp.init_builtin(fullname) + return _call_with_frames_removed(_imp.init_builtin, fullname) except: if not is_reload and fullname in sys.modules: del sys.modules[fullname] @@ -240,7 +680,7 @@ class BuiltinImporter: @classmethod @_requires_builtin def is_package(cls, fullname): - """Return None as built-in module are never packages.""" + """Return False as built-in modules are never packages.""" return False @@ -254,9 +694,13 @@ class FrozenImporter: """ @classmethod + def module_repr(cls, m): + return "<module '{}' (frozen)>".format(m.__name__) + + @classmethod def find_module(cls, fullname, path=None): """Find a frozen module.""" - return cls if imp.is_frozen(fullname) else None + return cls if _imp.is_frozen(fullname) else None @classmethod @set_package @@ -266,7 +710,10 @@ class FrozenImporter: """Load a frozen module.""" is_reload = fullname in sys.modules try: - return imp.init_frozen(fullname) + m = _call_with_frames_removed(_imp.init_frozen, fullname) + # Let our own module_repr() method produce a suitable repr. + del m.__file__ + return m except: if not is_reload and fullname in sys.modules: del sys.modules[fullname] @@ -276,7 +723,7 @@ class FrozenImporter: @_requires_frozen def get_code(cls, fullname): """Return the code object for the frozen module.""" - return imp.get_frozen_object(fullname) + return _imp.get_frozen_object(fullname) @classmethod @_requires_frozen @@ -287,40 +734,117 @@ class FrozenImporter: @classmethod @_requires_frozen def is_package(cls, fullname): - """Return if the frozen module is a package.""" - return imp.is_frozen_package(fullname) + """Return True if the frozen module is a package.""" + return _imp.is_frozen_package(fullname) + + +class WindowsRegistryFinder: + + """Meta path finder for modules declared in the Windows registry. + """ + + REGISTRY_KEY = ( + "Software\\Python\\PythonCore\\{sys_version}" + "\\Modules\\{fullname}") + REGISTRY_KEY_DEBUG = ( + "Software\\Python\\PythonCore\\{sys_version}" + "\\Modules\\{fullname}\\Debug") + DEBUG_BUILD = False # Changed in _setup() + + @classmethod + def _open_registry(cls, key): + try: + return _winreg.OpenKey(_winreg.HKEY_CURRENT_USER, key) + except WindowsError: + return _winreg.OpenKey(_winreg.HKEY_LOCAL_MACHINE, key) + + @classmethod + def _search_registry(cls, fullname): + if cls.DEBUG_BUILD: + registry_key = cls.REGISTRY_KEY_DEBUG + else: + registry_key = cls.REGISTRY_KEY + key = registry_key.format(fullname=fullname, + sys_version=sys.version[:3]) + try: + with cls._open_registry(key) as hkey: + filepath = _winreg.QueryValue(hkey, "") + except WindowsError: + return None + return filepath + + @classmethod + def find_module(cls, fullname, path=None): + """Find module named in the registry.""" + filepath = cls._search_registry(fullname) + if filepath is None: + return None + try: + _os.stat(filepath) + except OSError: + return None + for loader, suffixes, _ in _get_supported_file_loaders(): + if filepath.endswith(tuple(suffixes)): + return loader(fullname, filepath) class _LoaderBasics: """Base class of common code needed by both SourceLoader and - _SourcelessFileLoader.""" + SourcelessFileLoader.""" def is_package(self, fullname): """Concrete implementation of InspectLoader.is_package by checking if the path returned by get_filename has a filename of '__init__.py'.""" - filename = self.get_filename(fullname).rpartition(path_sep)[2] - return filename.rsplit('.', 1)[0] == '__init__' + filename = _path_split(self.get_filename(fullname))[1] + filename_base = filename.rsplit('.', 1)[0] + tail_name = fullname.rpartition('.')[2] + return filename_base == '__init__' and tail_name != '__init__' - def _bytes_from_bytecode(self, fullname, data, source_mtime): + def _bytes_from_bytecode(self, fullname, data, bytecode_path, source_stats): """Return the marshalled bytes from bytecode, verifying the magic - number and timestamp along the way. + number, timestamp and source size along the way. - If source_mtime is None then skip the timestamp check. + If source_stats is None then skip the timestamp check. """ magic = data[:4] raw_timestamp = data[4:8] - if len(magic) != 4 or magic != imp.get_magic(): - raise ImportError("bad magic number in {}".format(fullname)) + raw_size = data[8:12] + if magic != _MAGIC_BYTES: + msg = 'bad magic number in {!r}: {!r}'.format(fullname, magic) + raise ImportError(msg, name=fullname, path=bytecode_path) elif len(raw_timestamp) != 4: - raise EOFError("bad timestamp in {}".format(fullname)) - elif source_mtime is not None: - if marshal._r_long(raw_timestamp) != source_mtime: - raise ImportError("bytecode is stale for {}".format(fullname)) + message = 'bad timestamp in {}'.format(fullname) + _verbose_message(message) + raise EOFError(message) + elif len(raw_size) != 4: + message = 'bad size in {}'.format(fullname) + _verbose_message(message) + raise EOFError(message) + if source_stats is not None: + try: + source_mtime = int(source_stats['mtime']) + except KeyError: + pass + else: + if _r_long(raw_timestamp) != source_mtime: + message = 'bytecode is stale for {}'.format(fullname) + _verbose_message(message) + raise ImportError(message, name=fullname, + path=bytecode_path) + try: + source_size = source_stats['size'] & 0xFFFFFFFF + except KeyError: + pass + else: + if _r_long(raw_size) != source_size: + raise ImportError( + "bytecode is stale for {}".format(fullname), + name=fullname, path=bytecode_path) # Can't return the code object as errors from marshal loading need to # propagate even when source is available. - return data[8:] + return data[12:] @module_for_loader def _load_module(self, module, *, sourceless=False): @@ -330,16 +854,19 @@ class _LoaderBasics: code_object = self.get_code(name) module.__file__ = self.get_filename(name) if not sourceless: - module.__cached__ = imp.cache_from_source(module.__file__) + try: + module.__cached__ = cache_from_source(module.__file__) + except NotImplementedError: + module.__cached__ = module.__file__ else: module.__cached__ = module.__file__ module.__package__ = name if self.is_package(name): - module.__path__ = [module.__file__.rsplit(path_sep, 1)[0]] + module.__path__ = [_path_split(module.__file__)[0]] else: module.__package__ = module.__package__.rpartition('.')[0] module.__loader__ = self - exec(code_object, module.__dict__) + _call_with_frames_removed(exec, code_object, module.__dict__) return module @@ -348,11 +875,30 @@ class SourceLoader(_LoaderBasics): def path_mtime(self, path): """Optional method that returns the modification time (an int) for the specified path, where path is a str. + """ + raise NotImplementedError + + def path_stats(self, path): + """Optional method returning a metadata dict for the specified path + to by the path (str). + Possible keys: + - 'mtime' (mandatory) is the numeric timestamp of last source + code modification; + - 'size' (optional) is the size in bytes of the source code. Implementing this method allows the loader to read bytecode files. + """ + return {'mtime': self.path_mtime(path)} + def _cache_bytecode(self, source_path, cache_path, data): + """Optional method which writes data (bytes) to a file path (a str). + + Implementing this method allows for the writing of bytecode files. + + The source path is needed in order to correctly transfer permissions """ - raise NotImplementedError + # For backwards compatibility, we delegate to set_data() + return self.set_data(cache_path, data) def set_data(self, path, data): """Optional method which writes data (bytes) to a file path (a str). @@ -369,28 +915,42 @@ class SourceLoader(_LoaderBasics): path = self.get_filename(fullname) try: source_bytes = self.get_data(path) - except IOError: - raise ImportError("source not available through get_data()") - encoding = tokenize.detect_encoding(_io.BytesIO(source_bytes).readline) + except IOError as exc: + raise ImportError("source not available through get_data()", + name=fullname) from exc + readsource = _io.BytesIO(source_bytes).readline + try: + encoding = tokenize.detect_encoding(readsource) + except SyntaxError as exc: + raise ImportError("Failed to detect encoding", + name=fullname) from exc newline_decoder = _io.IncrementalNewlineDecoder(None, True) - return newline_decoder.decode(source_bytes.decode(encoding[0])) + try: + return newline_decoder.decode(source_bytes.decode(encoding[0])) + except UnicodeDecodeError as exc: + raise ImportError("Failed to decode source file", + name=fullname) from exc def get_code(self, fullname): """Concrete implementation of InspectLoader.get_code. - Reading of bytecode requires path_mtime to be implemented. To write + Reading of bytecode requires path_stats to be implemented. To write bytecode, set_data must also be implemented. """ source_path = self.get_filename(fullname) - bytecode_path = imp.cache_from_source(source_path) source_mtime = None - if bytecode_path is not None: + try: + bytecode_path = cache_from_source(source_path) + except NotImplementedError: + bytecode_path = None + else: try: - source_mtime = self.path_mtime(source_path) + st = self.path_stats(source_path) except NotImplementedError: pass else: + source_mtime = int(st['mtime']) try: data = self.get_data(bytecode_path) except IOError: @@ -398,29 +958,37 @@ class SourceLoader(_LoaderBasics): else: try: bytes_data = self._bytes_from_bytecode(fullname, data, - source_mtime) + bytecode_path, + st) except (ImportError, EOFError): pass else: + _verbose_message('{} matches {}', bytecode_path, + source_path) found = marshal.loads(bytes_data) - if isinstance(found, code_type): + if isinstance(found, _code_type): + _imp._fix_co_filename(found, source_path) + _verbose_message('code object from {}', + bytecode_path) return found else: msg = "Non-code object in {}" - raise ImportError(msg.format(bytecode_path)) + raise ImportError(msg.format(bytecode_path), + name=fullname, path=bytecode_path) source_bytes = self.get_data(source_path) - code_object = compile(source_bytes, source_path, 'exec', - dont_inherit=True) + code_object = _call_with_frames_removed(compile, + source_bytes, source_path, 'exec', + dont_inherit=True) + _verbose_message('code object from {}', source_path) if (not sys.dont_write_bytecode and bytecode_path is not None and - source_mtime is not None): - # If e.g. Jython ever implements imp.cache_from_source to have - # their own cached file format, this block of code will most likely - # raise an exception. - data = bytearray(imp.get_magic()) - data.extend(marshal._w_long(source_mtime)) + source_mtime is not None): + data = bytearray(_MAGIC_BYTES) + data.extend(_w_long(source_mtime)) + data.extend(_w_long(len(source_bytes))) data.extend(marshal.dumps(code_object)) try: - self.set_data(bytecode_path, data) + self._cache_bytecode(source_path, bytecode_path, data) + _verbose_message('wrote {!r}', bytecode_path) except NotImplementedError: pass return code_object @@ -436,7 +1004,7 @@ class SourceLoader(_LoaderBasics): return self._load_module(fullname) -class _FileLoader: +class FileLoader: """Base file loader class which implements the loader protocol methods that require file system usage.""" @@ -444,13 +1012,20 @@ class _FileLoader: def __init__(self, fullname, path): """Cache the module name and the path to the file found by the finder.""" - self._name = fullname - self._path = path + self.name = fullname + self.path = path + + @_check_name + def load_module(self, fullname): + """Load a module from a file.""" + # Issue #14857: Avoid the zero-argument form so the implementation + # of that form can be updated without breaking the frozen module + return super(FileLoader, self).load_module(fullname) @_check_name def get_filename(self, fullname): """Return the path to the source file as found by the finder.""" - return self._path + return self.path def get_data(self, path): """Return the data from path as raw bytes.""" @@ -458,52 +1033,56 @@ class _FileLoader: return file.read() -class _SourceFileLoader(_FileLoader, SourceLoader): +class SourceFileLoader(FileLoader, SourceLoader): """Concrete implementation of SourceLoader using the file system.""" - def path_mtime(self, path): - """Return the modification time for the path.""" - return int(_os.stat(path).st_mtime) + def path_stats(self, path): + """Return the metadata for the path.""" + st = _os.stat(path) + return {'mtime': st.st_mtime, 'size': st.st_size} - def set_data(self, path, data): + def _cache_bytecode(self, source_path, bytecode_path, data): + # Adapt between the two APIs + try: + mode = _os.stat(source_path).st_mode + except OSError: + mode = 0o666 + # We always ensure write access so we can update cached files + # later even when the source files are read-only on Windows (#6074) + mode |= 0o200 + return self.set_data(bytecode_path, data, _mode=mode) + + def set_data(self, path, data, *, _mode=0o666): """Write bytes data to a file.""" - parent, _, filename = path.rpartition(path_sep) + parent, filename = _path_split(path) path_parts = [] # Figure out what directories are missing. while parent and not _path_isdir(parent): - parent, _, part = parent.rpartition(path_sep) + parent, part = _path_split(parent) path_parts.append(part) # Create needed directories. for part in reversed(path_parts): parent = _path_join(parent, part) try: _os.mkdir(parent) - except OSError as exc: + except FileExistsError: # Probably another Python process already created the dir. - if exc.errno == errno.EEXIST: - continue - else: - raise - except IOError as exc: - # If can't get proper access, then just forget about writing - # the data. - if exc.errno == errno.EACCES: - return - else: - raise - try: - with _io.FileIO(path, 'wb') as file: - file.write(data) - except IOError as exc: - # Don't worry if you can't write bytecode. - if exc.errno == errno.EACCES: + continue + except OSError as exc: + # Could be a permission error, read-only filesystem: just forget + # about writing the data. + _verbose_message('could not create {!r}: {!r}', parent, exc) return - else: - raise + try: + _write_atomic(path, data, _mode) + _verbose_message('created {!r}', path) + except OSError as exc: + # Same as above: just don't write the bytecode. + _verbose_message('could not create {!r}: {!r}', path, exc) -class _SourcelessFileLoader(_FileLoader, _LoaderBasics): +class SourcelessFileLoader(FileLoader, _LoaderBasics): """Loader which handles sourceless file imports.""" @@ -513,19 +1092,25 @@ class _SourcelessFileLoader(_FileLoader, _LoaderBasics): def get_code(self, fullname): path = self.get_filename(fullname) data = self.get_data(path) - bytes_data = self._bytes_from_bytecode(fullname, data, None) + bytes_data = self._bytes_from_bytecode(fullname, data, path, None) found = marshal.loads(bytes_data) - if isinstance(found, code_type): + if isinstance(found, _code_type): + _verbose_message('code object from {!r}', path) return found else: - raise ImportError("Non-code object in {}".format(path)) + raise ImportError("Non-code object in {}".format(path), + name=fullname, path=path) def get_source(self, fullname): """Return None as there is no source code.""" return None -class _ExtensionFileLoader: +# Filled in by _setup(). +EXTENSION_SUFFIXES = [] + + +class ExtensionFileLoader: """Loader for extension modules. @@ -534,14 +1119,8 @@ class _ExtensionFileLoader: """ def __init__(self, name, path): - """Initialize the loader. - - If is_pkg is True then an exception is raised as extension modules - cannot be the __init__ module for an extension module. - - """ - self._name = name - self._path = path + self.name = name + self.path = path @_check_name @set_package @@ -550,297 +1129,528 @@ class _ExtensionFileLoader: """Load an extension module.""" is_reload = fullname in sys.modules try: - return imp.load_dynamic(fullname, self._path) + module = _call_with_frames_removed(_imp.load_dynamic, + fullname, self.path) + _verbose_message('extension module loaded from {!r}', self.path) + if self.is_package(fullname) and not hasattr(module, '__path__'): + module.__path__ = [_path_split(self.path)[0]] + return module except: if not is_reload and fullname in sys.modules: del sys.modules[fullname] raise - @_check_name def is_package(self, fullname): - """Return False as an extension module can never be a package.""" - return False + """Return True if the extension module is a package.""" + file_name = _path_split(self.path)[1] + return any(file_name == '__init__' + suffix + for suffix in EXTENSION_SUFFIXES) - @_check_name def get_code(self, fullname): """Return None as an extension module cannot create a code object.""" return None - @_check_name def get_source(self, fullname): """Return None as extension modules have no source code.""" return None +class _NamespacePath: + """Represents a namespace package's path. It uses the module name + to find its parent module, and from there it looks up the parent's + __path__. When this changes, the module's own path is recomputed, + using path_finder. For top-level modules, the parent module's path + is sys.path.""" + + def __init__(self, name, path, path_finder): + self._name = name + self._path = path + self._last_parent_path = tuple(self._get_parent_path()) + self._path_finder = path_finder + + def _find_parent_path_names(self): + """Returns a tuple of (parent-module-name, parent-path-attr-name)""" + parent, dot, me = self._name.rpartition('.') + if dot == '': + # This is a top-level module. sys.path contains the parent path. + return 'sys', 'path' + # Not a top-level module. parent-module.__path__ contains the + # parent path. + return parent, '__path__' + + def _get_parent_path(self): + parent_module_name, path_attr_name = self._find_parent_path_names() + return getattr(sys.modules[parent_module_name], path_attr_name) + + def _recalculate(self): + # If the parent's path has changed, recalculate _path + parent_path = tuple(self._get_parent_path()) # Make a copy + if parent_path != self._last_parent_path: + loader, new_path = self._path_finder(self._name, parent_path) + # Note that no changes are made if a loader is returned, but we + # do remember the new parent path + if loader is None: + self._path = new_path + self._last_parent_path = parent_path # Save the copy + return self._path + + def __iter__(self): + return iter(self._recalculate()) + + def __len__(self): + return len(self._recalculate()) + + def __repr__(self): + return "_NamespacePath({!r})".format(self._path) + + def __contains__(self, item): + return item in self._recalculate() + + def append(self, item): + self._path.append(item) + + +class NamespaceLoader: + def __init__(self, name, path, path_finder): + self._path = _NamespacePath(name, path, path_finder) + + @classmethod + def module_repr(cls, module): + return "<module '{}' (namespace)>".format(module.__name__) + + @module_for_loader + def load_module(self, module): + """Load a namespace module.""" + _verbose_message('namespace module loaded with path {!r}', self._path) + module.__path__ = self._path + return module + + # Finders ##################################################################### class PathFinder: - """Meta path finder for sys.(path|path_hooks|path_importer_cache).""" + """Meta path finder for sys.path and package __path__ attributes.""" @classmethod - def _path_hooks(cls, path, hooks=None): + def invalidate_caches(cls): + """Call the invalidate_caches() method on all path entry finders + stored in sys.path_importer_caches (where implemented).""" + for finder in sys.path_importer_cache.values(): + if hasattr(finder, 'invalidate_caches'): + finder.invalidate_caches() + + @classmethod + def _path_hooks(cls, path): """Search sequence of hooks for a finder for 'path'. If 'hooks' is false then use sys.path_hooks. """ - if not hooks: - hooks = sys.path_hooks - for hook in hooks: + if not sys.path_hooks: + _warnings.warn('sys.path_hooks is empty', ImportWarning) + for hook in sys.path_hooks: try: return hook(path) except ImportError: continue else: - raise ImportError("no path hook found for {0}".format(path)) + return None @classmethod - def _path_importer_cache(cls, path, default=None): - """Get the finder for the path from sys.path_importer_cache. - - If the path is not in the cache, find the appropriate finder and cache - it. If None is cached, get the default finder and cache that - (if applicable). + def _path_importer_cache(cls, path): + """Get the finder for the path entry from sys.path_importer_cache. - Because of NullImporter, some finder should be returned. The only - explicit fail case is if None is cached but the path cannot be used for - the default hook, for which ImportError is raised. + If the path entry is not in the cache, find the appropriate finder + and cache it. If no finder is available, store None. """ + if path == '': + path = '.' try: finder = sys.path_importer_cache[path] except KeyError: finder = cls._path_hooks(path) sys.path_importer_cache[path] = finder - else: - if finder is None and default: - # Raises ImportError on failure. - finder = default(path) - sys.path_importer_cache[path] = finder return finder @classmethod + def _get_loader(cls, fullname, path): + """Find the loader or namespace_path for this module/package name.""" + # If this ends up being a namespace package, namespace_path is + # the list of paths that will become its __path__ + namespace_path = [] + for entry in path: + if not isinstance(entry, (str, bytes)): + continue + finder = cls._path_importer_cache(entry) + if finder is not None: + if hasattr(finder, 'find_loader'): + loader, portions = finder.find_loader(fullname) + else: + loader = finder.find_module(fullname) + portions = [] + if loader is not None: + # We found a loader: return it immediately. + return loader, namespace_path + # This is possibly part of a namespace package. + # Remember these path entries (if any) for when we + # create a namespace package, and continue iterating + # on path. + namespace_path.extend(portions) + else: + return None, namespace_path + + @classmethod def find_module(cls, fullname, path=None): """Find the module on sys.path or 'path' based on sys.path_hooks and sys.path_importer_cache.""" - if not path: + if path is None: path = sys.path - for entry in path: - try: - finder = cls._path_importer_cache(entry) - except ImportError: - continue - if finder: - loader = finder.find_module(fullname) - if loader: - return loader + loader, namespace_path = cls._get_loader(fullname, path) + if loader is not None: + return loader else: - return None + if namespace_path: + # We found at least one namespace path. Return a + # loader which can create the namespace package. + return NamespaceLoader(fullname, namespace_path, cls._get_loader) + else: + return None -class _FileFinder: +class FileFinder: """File-based finder. - Constructor takes a list of objects detailing what file extensions their - loader supports along with whether it can be used for a package. + Interactions with the file system are cached for performance, being + refreshed when the directory the finder is handling has been modified. """ def __init__(self, path, *details): - """Initialize with finder details.""" - packages = [] - modules = [] - for detail in details: - modules.extend((suffix, detail.loader) for suffix in detail.suffixes) - if detail.supports_packages: - packages.extend((suffix, detail.loader) - for suffix in detail.suffixes) - self.packages = packages - self.modules = modules - self.path = path - - def find_module(self, fullname): - """Try to find a loader for the specified module.""" + """Initialize with the path to search on and a variable number of + 3-tuples containing the loader, file suffixes the loader recognizes, + and a boolean of whether the loader handles packages.""" + loaders = [] + for loader, suffixes in details: + loaders.extend((suffix, loader) for suffix in suffixes) + self._loaders = loaders + # Base (directory) path + self.path = path or '.' + self._path_mtime = -1 + self._path_cache = set() + self._relaxed_path_cache = set() + + def invalidate_caches(self): + """Invalidate the directory mtime.""" + self._path_mtime = -1 + + find_module = _find_module_shim + + def find_loader(self, fullname): + """Try to find a loader for the specified module, or the namespace + package portions. Returns (loader, list-of-portions).""" + is_namespace = False tail_module = fullname.rpartition('.')[2] - base_path = _path_join(self.path, tail_module) - if _path_isdir(base_path) and _case_ok(self.path, tail_module): - for suffix, loader in self.packages: - init_filename = '__init__' + suffix - full_path = _path_join(base_path, init_filename) - if (_path_isfile(full_path) and - _case_ok(base_path, init_filename)): - return loader(fullname, full_path) - else: - msg = "Not importing directory {}: missing __init__" - _warnings.warn(msg.format(base_path), ImportWarning) - for suffix, loader in self.modules: - mod_filename = tail_module + suffix - full_path = _path_join(self.path, mod_filename) - if _path_isfile(full_path) and _case_ok(self.path, mod_filename): - return loader(fullname, full_path) - return None - -class _SourceFinderDetails: - - loader = _SourceFileLoader - supports_packages = True - - def __init__(self): - self.suffixes = _suffix_list(imp.PY_SOURCE) - -class _SourcelessFinderDetails: - - loader = _SourcelessFileLoader - supports_packages = True + try: + mtime = _os.stat(self.path).st_mtime + except OSError: + mtime = -1 + if mtime != self._path_mtime: + self._fill_cache() + self._path_mtime = mtime + # tail_module keeps the original casing, for __file__ and friends + if _relax_case(): + cache = self._relaxed_path_cache + cache_module = tail_module.lower() + else: + cache = self._path_cache + cache_module = tail_module + # Check if the module is the name of a directory (and thus a package). + if cache_module in cache: + base_path = _path_join(self.path, tail_module) + if _path_isdir(base_path): + for suffix, loader in self._loaders: + init_filename = '__init__' + suffix + full_path = _path_join(base_path, init_filename) + if _path_isfile(full_path): + return (loader(fullname, full_path), [base_path]) + else: + # A namespace package, return the path if we don't also + # find a module in the next section. + is_namespace = True + # Check for a file w/ a proper suffix exists. + for suffix, loader in self._loaders: + if cache_module + suffix in cache: + full_path = _path_join(self.path, tail_module + suffix) + if _path_isfile(full_path): + return (loader(fullname, full_path), []) + if is_namespace: + return (None, [base_path]) + return (None, []) + + def _fill_cache(self): + """Fill the cache of potential modules and packages for this directory.""" + path = self.path + try: + contents = _os.listdir(path) + except (FileNotFoundError, PermissionError, NotADirectoryError): + # Directory has either been removed, turned into a file, or made + # unreadable. + contents = [] + # We store two cached versions, to handle runtime changes of the + # PYTHONCASEOK environment variable. + if not sys.platform.startswith('win'): + self._path_cache = set(contents) + else: + # Windows users can import modules with case-insensitive file + # suffixes (for legacy reasons). Make the suffix lowercase here + # so it's done once instead of for every import. This is safe as + # the specified suffixes to check against are always specified in a + # case-sensitive manner. + lower_suffix_contents = set() + for item in contents: + name, dot, suffix = item.partition('.') + if dot: + new_name = '{}.{}'.format(name, suffix.lower()) + else: + new_name = name + lower_suffix_contents.add(new_name) + self._path_cache = lower_suffix_contents + if sys.platform.startswith(_CASE_INSENSITIVE_PLATFORMS): + self._relaxed_path_cache = set(fn.lower() for fn in contents) - def __init__(self): - self.suffixes = _suffix_list(imp.PY_COMPILED) + @classmethod + def path_hook(cls, *loader_details): + """A class method which returns a closure to use on sys.path_hook + which will return an instance using the specified loaders and the path + called on the closure. + If the path called on the closure is not a directory, ImportError is + raised. -class _ExtensionFinderDetails: + """ + def path_hook_for_FileFinder(path): + """Path hook for importlib.machinery.FileFinder.""" + if not _path_isdir(path): + raise ImportError("only directories are supported", path=path) + return cls(path, *loader_details) - loader = _ExtensionFileLoader - supports_packages = False + return path_hook_for_FileFinder - def __init__(self): - self.suffixes = _suffix_list(imp.C_EXTENSION) + def __repr__(self): + return "FileFinder(%r)" % (self.path,) # Import itself ############################################################### -def _file_path_hook(path): - """If the path is a directory, return a file-based finder.""" - if _path_isdir(path): - return _FileFinder(path, _ExtensionFinderDetails(), - _SourceFinderDetails(), - _SourcelessFinderDetails()) - else: - raise ImportError("only directories are supported") +class _ImportLockContext: + """Context manager for the import lock.""" -_DEFAULT_PATH_HOOK = _file_path_hook + def __enter__(self): + """Acquire the import lock.""" + _imp.acquire_lock() -class _DefaultPathFinder(PathFinder): + def __exit__(self, exc_type, exc_value, exc_traceback): + """Release the import lock regardless of any raised exceptions.""" + _imp.release_lock() - """Subclass of PathFinder that implements implicit semantics for - __import__.""" - @classmethod - def _path_hooks(cls, path): - """Search sys.path_hooks as well as implicit path hooks.""" - try: - return super()._path_hooks(path) - except ImportError: - implicit_hooks = [_DEFAULT_PATH_HOOK, imp.NullImporter] - return super()._path_hooks(path, implicit_hooks) +def _resolve_name(name, package, level): + """Resolve a relative module name to an absolute one.""" + bits = package.rsplit('.', level - 1) + if len(bits) < level: + raise ValueError('attempted relative import beyond top-level package') + base = bits[0] + return '{}.{}'.format(base, name) if name else base - @classmethod - def _path_importer_cache(cls, path): - """Use the default path hook when None is stored in - sys.path_importer_cache.""" - return super()._path_importer_cache(path, _DEFAULT_PATH_HOOK) +def _find_module(name, path): + """Find a module's loader.""" + if not sys.meta_path: + _warnings.warn('sys.meta_path is empty', ImportWarning) + for finder in sys.meta_path: + with _ImportLockContext(): + loader = finder.find_module(name, path) + if loader is not None: + # The parent import may have already imported this module. + if name not in sys.modules: + return loader + else: + return sys.modules[name].__loader__ + else: + return None -class _ImportLockContext: - """Context manager for the import lock.""" +def _sanity_check(name, package, level): + """Verify arguments are "sane".""" + if not isinstance(name, str): + raise TypeError("module name must be str, not {}".format(type(name))) + if level < 0: + raise ValueError('level must be >= 0') + if package: + if not isinstance(package, str): + raise TypeError("__package__ not set to a string") + elif package not in sys.modules: + msg = ("Parent module {!r} not loaded, cannot perform relative " + "import") + raise SystemError(msg.format(package)) + if not name and level == 0: + raise ValueError("Empty module name") - def __enter__(self): - """Acquire the import lock.""" - imp.acquire_lock() - def __exit__(self, exc_type, exc_value, exc_traceback): - """Release the import lock regardless of any raised exceptions.""" - imp.release_lock() +_ERR_MSG = 'No module named {!r}' +def _find_and_load_unlocked(name, import_): + path = None + parent = name.rpartition('.')[0] + if parent: + if parent not in sys.modules: + _call_with_frames_removed(import_, parent) + # Crazy side-effects! + if name in sys.modules: + return sys.modules[name] + # Backwards-compatibility; be nicer to skip the dict lookup. + parent_module = sys.modules[parent] + try: + path = parent_module.__path__ + except AttributeError: + msg = (_ERR_MSG + '; {} is not a package').format(name, parent) + raise ImportError(msg, name=name) + loader = _find_module(name, path) + if loader is None: + exc = ImportError(_ERR_MSG.format(name), name=name) + # TODO(brett): switch to a proper ModuleNotFound exception in Python + # 3.4. + exc._not_found = True + raise exc + elif name not in sys.modules: + # The parent import may have already imported this module. + loader.load_module(name) + _verbose_message('import {!r} # {!r}', name, loader) + # Backwards-compatibility; be nicer to skip the dict lookup. + module = sys.modules[name] + if parent: + # Set the module as an attribute on its parent. + parent_module = sys.modules[parent] + setattr(parent_module, name.rpartition('.')[2], module) + # Set __package__ if the loader did not. + if getattr(module, '__package__', None) is None: + try: + module.__package__ = module.__name__ + if not hasattr(module, '__path__'): + module.__package__ = module.__package__.rpartition('.')[0] + except AttributeError: + pass + # Set loader if need be. + if not hasattr(module, '__loader__'): + try: + module.__loader__ = loader + except AttributeError: + pass + return module -_IMPLICIT_META_PATH = [BuiltinImporter, FrozenImporter, _DefaultPathFinder] -_ERR_MSG = 'No module named {}' +def _find_and_load(name, import_): + """Find and load the module, and release the import lock.""" + try: + lock = _get_module_lock(name) + finally: + _imp.release_lock() + lock.acquire() + try: + return _find_and_load_unlocked(name, import_) + finally: + lock.release() + def _gcd_import(name, package=None, level=0): """Import and return the module based on its name, the package the call is being made from, and the level adjustment. This function represents the greatest common denominator of functionality - between import_module and __import__. This includes settting __package__ if + between import_module and __import__. This includes setting __package__ if the loader did not. """ - if package: - if not hasattr(package, 'rindex'): - raise ValueError("__package__ not set to a string") - elif package not in sys.modules: - msg = ("Parent module {0!r} not loaded, cannot perform relative " - "import") - raise SystemError(msg.format(package)) - if not name and level == 0: - raise ValueError("Empty module name") + _sanity_check(name, package, level) if level > 0: - dot = len(package) - for x in range(level, 1, -1): - try: - dot = package.rindex('.', 0, dot) - except ValueError: - raise ValueError("attempted relative import beyond " - "top-level package") - if name: - name = "{0}.{1}".format(package[:dot], name) - else: - name = package[:dot] - with _ImportLockContext(): - try: - module = sys.modules[name] - if module is None: - message = ("import of {} halted; " - "None in sys.modules".format(name)) - raise ImportError(message) - return module - except KeyError: - pass - parent = name.rpartition('.')[0] - path = None - if parent: - if parent not in sys.modules: - _gcd_import(parent) - # Backwards-compatibility; be nicer to skip the dict lookup. - parent_module = sys.modules[parent] - try: - path = parent_module.__path__ - except AttributeError: - msg = (_ERR_MSG + '; {} is not a package').format(name, parent) - raise ImportError(msg) - meta_path = sys.meta_path + _IMPLICIT_META_PATH - for finder in meta_path: - loader = finder.find_module(name, path) - if loader is not None: - # The parent import may have already imported this module. - if name not in sys.modules: - loader.load_module(name) - break - else: - raise ImportError(_ERR_MSG.format(name)) - # Backwards-compatibility; be nicer to skip the dict lookup. - module = sys.modules[name] - if parent: - # Set the module as an attribute on its parent. - setattr(parent_module, name.rpartition('.')[2], module) - # Set __package__ if the loader did not. - if not hasattr(module, '__package__') or module.__package__ is None: - # Watch out for what comes out of sys.modules to not be a module, - # e.g. an int. - try: - module.__package__ = module.__name__ - if not hasattr(module, '__path__'): - module.__package__ = module.__package__.rpartition('.')[0] - except AttributeError: - pass - return module + name = _resolve_name(name, package, level) + _imp.acquire_lock() + if name not in sys.modules: + return _find_and_load(name, _gcd_import) + module = sys.modules[name] + if module is None: + _imp.release_lock() + message = ("import of {} halted; " + "None in sys.modules".format(name)) + raise ImportError(message, name=name) + _lock_unlock_module(name) + return module + +def _handle_fromlist(module, fromlist, import_): + """Figure out what __import__ should return. + + The import_ parameter is a callable which takes the name of module to + import. It is required to decouple the function from assuming importlib's + import implementation is desired. + + """ + # The hell that is fromlist ... + # If a package was imported, try to import stuff from fromlist. + if hasattr(module, '__path__'): + if '*' in fromlist: + fromlist = list(fromlist) + fromlist.remove('*') + if hasattr(module, '__all__'): + fromlist.extend(module.__all__) + for x in fromlist: + if not hasattr(module, x): + from_name = '{}.{}'.format(module.__name__, x) + try: + _call_with_frames_removed(import_, from_name) + except ImportError as exc: + # Backwards-compatibility dictates we ignore failed + # imports triggered by fromlist for modules that don't + # exist. + # TODO(brett): In Python 3.4, have import raise + # ModuleNotFound and catch that. + if getattr(exc, '_not_found', False): + if exc.name == from_name: + continue + raise + return module + + +def _calc___package__(globals): + """Calculate what __package__ should be. + __package__ is not guaranteed to be defined or could be set to None + to represent that its proper value is unknown. -def __import__(name, globals={}, locals={}, fromlist=[], level=0): + """ + package = globals.get('__package__') + if package is None: + package = globals['__name__'] + if '__path__' not in globals: + package = package.rpartition('.')[0] + return package + + +def _get_supported_file_loaders(): + """Returns a list of file-based module loaders. + + Each item is a tuple (loader, suffixes, allow_packages). + """ + extensions = ExtensionFileLoader, _imp.extension_suffixes() + source = SourceFileLoader, SOURCE_SUFFIXES + bytecode = SourcelessFileLoader, BYTECODE_SUFFIXES + return [extensions, source, bytecode] + + +def __import__(name, globals=None, locals=None, fromlist=(), level=0): """Import a module. The 'globals' argument is used to infer where the import is occuring from @@ -851,40 +1661,117 @@ def __import__(name, globals={}, locals={}, fromlist=[], level=0): import (e.g. ``from ..pkg import mod`` would have a 'level' of 2). """ - if not hasattr(name, 'rpartition'): - raise TypeError("module name must be str, not {}".format(type(name))) if level == 0: module = _gcd_import(name) else: - # __package__ is not guaranteed to be defined or could be set to None - # to represent that it's proper value is unknown - package = globals.get('__package__') - if package is None: - package = globals['__name__'] - if '__path__' not in globals: - package = package.rpartition('.')[0] + globals_ = globals if globals is not None else {} + package = _calc___package__(globals_) module = _gcd_import(name, package, level) - # The hell that is fromlist ... if not fromlist: # Return up to the first dot in 'name'. This is complicated by the fact # that 'name' may be relative. if level == 0: - return sys.modules[name.partition('.')[0]] + return _gcd_import(name.partition('.')[0]) elif not name: return module else: + # Figure out where to slice the module's name up to the first dot + # in 'name'. cut_off = len(name) - len(name.partition('.')[0]) - return sys.modules[module.__name__[:-cut_off]] + # Slice end needs to be positive to alleviate need to special-case + # when ``'.' not in name``. + return sys.modules[module.__name__[:len(module.__name__)-cut_off]] else: - # If a package was imported, try to import stuff from fromlist. - if hasattr(module, '__path__'): - if '*' in fromlist and hasattr(module, '__all__'): - fromlist = list(fromlist) - fromlist.remove('*') - fromlist.extend(module.__all__) - for x in (y for y in fromlist if not hasattr(module,y)): - try: - _gcd_import('{0}.{1}'.format(module.__name__, x)) - except ImportError: - pass - return module + return _handle_fromlist(module, fromlist, _gcd_import) + + + +def _setup(sys_module, _imp_module): + """Setup importlib by importing needed built-in modules and injecting them + into the global namespace. + + As sys is needed for sys.modules access and _imp is needed to load built-in + modules, those two modules must be explicitly passed in. + + """ + global _imp, sys, BYTECODE_SUFFIXES + _imp = _imp_module + sys = sys_module + + if sys.flags.optimize: + BYTECODE_SUFFIXES = OPTIMIZED_BYTECODE_SUFFIXES + else: + BYTECODE_SUFFIXES = DEBUG_BYTECODE_SUFFIXES + + module_type = type(sys) + for name, module in sys.modules.items(): + if isinstance(module, module_type): + if not hasattr(module, '__loader__'): + if name in sys.builtin_module_names: + module.__loader__ = BuiltinImporter + elif _imp.is_frozen(name): + module.__loader__ = FrozenImporter + + self_module = sys.modules[__name__] + for builtin_name in ('_io', '_warnings', 'builtins', 'marshal'): + if builtin_name not in sys.modules: + builtin_module = BuiltinImporter.load_module(builtin_name) + else: + builtin_module = sys.modules[builtin_name] + setattr(self_module, builtin_name, builtin_module) + + os_details = ('posix', ['/']), ('nt', ['\\', '/']), ('os2', ['\\', '/']) + for builtin_os, path_separators in os_details: + # Assumption made in _path_join() + assert all(len(sep) == 1 for sep in path_separators) + path_sep = path_separators[0] + if builtin_os in sys.modules: + os_module = sys.modules[builtin_os] + break + else: + try: + os_module = BuiltinImporter.load_module(builtin_os) + # TODO: rip out os2 code after 3.3 is released as per PEP 11 + if builtin_os == 'os2' and 'EMX GCC' in sys.version: + path_sep = path_separators[1] + break + except ImportError: + continue + else: + raise ImportError('importlib requires posix or nt') + + try: + thread_module = BuiltinImporter.load_module('_thread') + except ImportError: + # Python was built without threads + thread_module = None + weakref_module = BuiltinImporter.load_module('_weakref') + + if builtin_os == 'nt': + winreg_module = BuiltinImporter.load_module('winreg') + setattr(self_module, '_winreg', winreg_module) + + setattr(self_module, '_os', os_module) + setattr(self_module, '_thread', thread_module) + setattr(self_module, '_weakref', weakref_module) + setattr(self_module, 'path_sep', path_sep) + setattr(self_module, 'path_separators', set(path_separators)) + # Constants + setattr(self_module, '_relax_case', _make_relax_case()) + EXTENSION_SUFFIXES.extend(_imp.extension_suffixes()) + if builtin_os == 'nt': + SOURCE_SUFFIXES.append('.pyw') + if '_d.pyd' in EXTENSION_SUFFIXES: + WindowsRegistryFinder.DEBUG_BUILD = True + + +def _install(sys_module, _imp_module): + """Install importlib as the implementation of import.""" + _setup(sys_module, _imp_module) + supported_loaders = _get_supported_file_loaders() + sys.path_hooks.extend([FileFinder.path_hook(*supported_loaders)]) + sys.meta_path.append(BuiltinImporter) + sys.meta_path.append(FrozenImporter) + if _os.__name__ == 'nt': + sys.meta_path.append(WindowsRegistryFinder) + sys.meta_path.append(PathFinder) diff --git a/Lib/importlib/abc.py b/Lib/importlib/abc.py index fa343f85a47..387567a450f 100644 --- a/Lib/importlib/abc.py +++ b/Lib/importlib/abc.py @@ -1,44 +1,109 @@ """Abstract base classes related to import.""" from . import _bootstrap from . import machinery -from . import util +try: + import _frozen_importlib +except ImportError as exc: + if exc.name != '_frozen_importlib': + raise + _frozen_importlib = None import abc import imp -import io import marshal -import os.path import sys import tokenize -import types import warnings -class Loader(metaclass=abc.ABCMeta): +def _register(abstract_cls, *classes): + for cls in classes: + abstract_cls.register(cls) + if _frozen_importlib is not None: + frozen_cls = getattr(_frozen_importlib, cls.__name__) + abstract_cls.register(frozen_cls) - """Abstract base class for import loaders.""" + +class Finder(metaclass=abc.ABCMeta): + + """Legacy abstract base class for import finders. + + It may be subclassed for compatibility with legacy third party + reimplementations of the import system. Otherwise, finder + implementations should derive from the more specific MetaPathFinder + or PathEntryFinder ABCs. + """ @abc.abstractmethod - def load_module(self, fullname): - """Abstract method which when implemented should load a module. - The fullname is a str.""" + def find_module(self, fullname, path=None): + """An abstract method that should find a module. + The fullname is a str and the optional path is a str or None. + Returns a Loader object. + """ raise NotImplementedError -class Finder(metaclass=abc.ABCMeta): +class MetaPathFinder(Finder): - """Abstract base class for import finders.""" + """Abstract base class for import finders on sys.meta_path.""" @abc.abstractmethod - def find_module(self, fullname, path=None): - """Abstract method which when implemented should find a module. - The fullname is a str and the optional path is a str or None. + def find_module(self, fullname, path): + """Abstract method which, when implemented, should find a module. + The fullname is a str and the path is a str or None. Returns a Loader object. """ raise NotImplementedError -Finder.register(machinery.BuiltinImporter) -Finder.register(machinery.FrozenImporter) -Finder.register(machinery.PathFinder) + def invalidate_caches(self): + """An optional method for clearing the finder's cache, if any. + This method is used by importlib.invalidate_caches(). + """ + return NotImplemented + +_register(MetaPathFinder, machinery.BuiltinImporter, machinery.FrozenImporter, + machinery.PathFinder, machinery.WindowsRegistryFinder) + + +class PathEntryFinder(Finder): + + """Abstract base class for path entry finders used by PathFinder.""" + + @abc.abstractmethod + def find_loader(self, fullname): + """Abstract method which, when implemented, returns a module loader. + The fullname is a str. Returns a 2-tuple of (Loader, portion) where + portion is a sequence of file system locations contributing to part of + a namespace package. The sequence may be empty and the loader may be + None. + """ + raise NotImplementedError + + find_module = _bootstrap._find_module_shim + + def invalidate_caches(self): + """An optional method for clearing the finder's cache, if any. + This method is used by PathFinder.invalidate_caches(). + """ + return NotImplemented + +_register(PathEntryFinder, machinery.FileFinder) + + +class Loader(metaclass=abc.ABCMeta): + + """Abstract base class for import loaders.""" + + @abc.abstractmethod + def load_module(self, fullname): + """Abstract method which when implemented should load a module. + The fullname is a str.""" + raise NotImplementedError + + @abc.abstractmethod + def module_repr(self, module): + """Abstract method which when implemented calculates and returns the + given module's repr.""" + raise NotImplementedError class ResourceLoader(Loader): @@ -84,8 +149,8 @@ class InspectLoader(Loader): module. The fullname is a str. Returns a str.""" raise NotImplementedError -InspectLoader.register(machinery.BuiltinImporter) -InspectLoader.register(machinery.FrozenImporter) +_register(InspectLoader, machinery.BuiltinImporter, machinery.FrozenImporter, + machinery.ExtensionFileLoader) class ExecutionLoader(InspectLoader): @@ -104,6 +169,15 @@ class ExecutionLoader(InspectLoader): raise NotImplementedError +class FileLoader(_bootstrap.FileLoader, ResourceLoader, ExecutionLoader): + + """Abstract base class partially implementing the ResourceLoader and + ExecutionLoader ABCs.""" + +_register(FileLoader, machinery.SourceFileLoader, + machinery.SourcelessFileLoader) + + class SourceLoader(_bootstrap.SourceLoader, ResourceLoader, ExecutionLoader): """Abstract base class for loading source code (and optionally any @@ -123,7 +197,20 @@ class SourceLoader(_bootstrap.SourceLoader, ResourceLoader, ExecutionLoader): def path_mtime(self, path): """Return the (int) modification time for the path (str).""" - raise NotImplementedError + if self.path_stats.__func__ is SourceLoader.path_stats: + raise NotImplementedError + return int(self.path_stats(path)['mtime']) + + def path_stats(self, path): + """Return a metadata dict for the source pointed to by the path (str). + Possible keys: + - 'mtime' (mandatory) is the numeric timestamp of last source + code modification; + - 'size' (optional) is the size in bytes of the source code. + """ + if self.path_mtime.__func__ is SourceLoader.path_mtime: + raise NotImplementedError + return {'mtime': self.path_mtime(path)} def set_data(self, path, data): """Write the bytes to the path (if possible). @@ -137,6 +224,7 @@ class SourceLoader(_bootstrap.SourceLoader, ResourceLoader, ExecutionLoader): """ raise NotImplementedError +_register(SourceLoader, machinery.SourceFileLoader) class PyLoader(SourceLoader): @@ -195,10 +283,10 @@ class PyLoader(SourceLoader): "use SourceLoader instead. " "See the importlib documentation on how to be " "compatible with Python 3.1 onwards.", - PendingDeprecationWarning) + DeprecationWarning) path = self.source_path(fullname) if path is None: - raise ImportError + raise ImportError(name=fullname) else: return path @@ -226,7 +314,7 @@ class PyPycLoader(PyLoader): if path is not None: return path raise ImportError("no source or bytecode path available for " - "{0!r}".format(fullname)) + "{0!r}".format(fullname), name=fullname) def get_code(self, fullname): """Get a code object from source or bytecode.""" @@ -234,7 +322,7 @@ class PyPycLoader(PyLoader): "removal in Python 3.4; use SourceLoader instead. " "If Python 3.1 compatibility is required, see the " "latest documentation for PyLoader.", - PendingDeprecationWarning) + DeprecationWarning) source_timestamp = self.source_mtime(fullname) # Try to use bytecode if it is available. bytecode_path = self.bytecode_path(fullname) @@ -243,20 +331,30 @@ class PyPycLoader(PyLoader): try: magic = data[:4] if len(magic) < 4: - raise ImportError("bad magic number in {}".format(fullname)) + raise ImportError( + "bad magic number in {}".format(fullname), + name=fullname, path=bytecode_path) raw_timestamp = data[4:8] if len(raw_timestamp) < 4: raise EOFError("bad timestamp in {}".format(fullname)) - pyc_timestamp = marshal._r_long(raw_timestamp) - bytecode = data[8:] + pyc_timestamp = _bootstrap._r_long(raw_timestamp) + raw_source_size = data[8:12] + if len(raw_source_size) != 4: + raise EOFError("bad file size in {}".format(fullname)) + # Source size is unused as the ABC does not provide a way to + # get the size of the source ahead of reading it. + bytecode = data[12:] # Verify that the magic number is valid. if imp.get_magic() != magic: - raise ImportError("bad magic number in {}".format(fullname)) + raise ImportError( + "bad magic number in {}".format(fullname), + name=fullname, path=bytecode_path) # Verify that the bytecode is not stale (only matters when # there is source to fall back on. if source_timestamp: if pyc_timestamp < source_timestamp: - raise ImportError("bytecode is stale") + raise ImportError("bytecode is stale", name=fullname, + path=bytecode_path) except (ImportError, EOFError): # If source is available give it a shot. if source_timestamp is not None: @@ -268,18 +366,20 @@ class PyPycLoader(PyLoader): return marshal.loads(bytecode) elif source_timestamp is None: raise ImportError("no source or bytecode available to create code " - "object for {0!r}".format(fullname)) + "object for {0!r}".format(fullname), + name=fullname) # Use the source. source_path = self.source_path(fullname) if source_path is None: message = "a source path must exist to load {0}".format(fullname) - raise ImportError(message) + raise ImportError(message, name=fullname) source = self.get_data(source_path) code_object = compile(source, source_path, 'exec', dont_inherit=True) # Generate bytecode and write it out. if not sys.dont_write_bytecode: data = bytearray(imp.get_magic()) - data.extend(marshal._w_long(source_timestamp)) + data.extend(_bootstrap._w_long(source_timestamp)) + data.extend(_bootstrap._w_long(len(source) & 0xFFFFFFFF)) data.extend(marshal.dumps(code_object)) self.write_bytecode(fullname, data) return code_object diff --git a/Lib/importlib/machinery.py b/Lib/importlib/machinery.py index 519774440f1..ff826e4fae7 100644 --- a/Lib/importlib/machinery.py +++ b/Lib/importlib/machinery.py @@ -1,5 +1,20 @@ """The machinery of importlib: finders, loaders, hooks, etc.""" +import _imp + +from ._bootstrap import (SOURCE_SUFFIXES, DEBUG_BYTECODE_SUFFIXES, + OPTIMIZED_BYTECODE_SUFFIXES, BYTECODE_SUFFIXES, + EXTENSION_SUFFIXES) from ._bootstrap import BuiltinImporter from ._bootstrap import FrozenImporter +from ._bootstrap import WindowsRegistryFinder from ._bootstrap import PathFinder +from ._bootstrap import FileFinder +from ._bootstrap import SourceFileLoader +from ._bootstrap import SourcelessFileLoader +from ._bootstrap import ExtensionFileLoader + + +def all_suffixes(): + """Returns a list of all recognized module suffixes for this process""" + return SOURCE_SUFFIXES + BYTECODE_SUFFIXES + EXTENSION_SUFFIXES diff --git a/Lib/importlib/test/__init__.py b/Lib/importlib/test/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 --- a/Lib/importlib/test/__init__.py +++ /dev/null diff --git a/Lib/importlib/test/__main__.py b/Lib/importlib/test/__main__.py deleted file mode 100644 index decc53d8c5a..00000000000 --- a/Lib/importlib/test/__main__.py +++ /dev/null @@ -1,29 +0,0 @@ -"""Run importlib's test suite. - -Specifying the ``--builtin`` flag will run tests, where applicable, with -builtins.__import__ instead of importlib.__import__. - -""" -import importlib -from importlib.test.import_ import util -import os.path -from test.support import run_unittest -import sys -import unittest - - -def test_main(): - if '__pycache__' in __file__: - parts = __file__.split(os.path.sep) - start_dir = sep.join(parts[:-2]) - else: - start_dir = os.path.dirname(__file__) - top_dir = os.path.dirname(os.path.dirname(start_dir)) - test_loader = unittest.TestLoader() - if '--builtin' in sys.argv: - util.using___import__ = True - run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/abc.py b/Lib/importlib/test/abc.py deleted file mode 100644 index 2c17ac329bc..00000000000 --- a/Lib/importlib/test/abc.py +++ /dev/null @@ -1,99 +0,0 @@ -import abc -import unittest - - -class FinderTests(unittest.TestCase, metaclass=abc.ABCMeta): - - """Basic tests for a finder to pass.""" - - @abc.abstractmethod - def test_module(self): - # Test importing a top-level module. - pass - - @abc.abstractmethod - def test_package(self): - # Test importing a package. - pass - - @abc.abstractmethod - def test_module_in_package(self): - # Test importing a module contained within a package. - # A value for 'path' should be used if for a meta_path finder. - pass - - @abc.abstractmethod - def test_package_in_package(self): - # Test importing a subpackage. - # A value for 'path' should be used if for a meta_path finder. - pass - - @abc.abstractmethod - def test_package_over_module(self): - # Test that packages are chosen over modules. - pass - - @abc.abstractmethod - def test_failure(self): - # Test trying to find a module that cannot be handled. - pass - - -class LoaderTests(unittest.TestCase, metaclass=abc.ABCMeta): - - @abc.abstractmethod - def test_module(self): - """A module should load without issue. - - After the loader returns the module should be in sys.modules. - - Attributes to verify: - - * __file__ - * __loader__ - * __name__ - * No __path__ - - """ - pass - - @abc.abstractmethod - def test_package(self): - """Loading a package should work. - - After the loader returns the module should be in sys.modules. - - Attributes to verify: - - * __name__ - * __file__ - * __package__ - * __path__ - * __loader__ - - """ - pass - - @abc.abstractmethod - def test_lacking_parent(self): - """A loader should not be dependent on it's parent package being - imported.""" - pass - - @abc.abstractmethod - def test_module_reuse(self): - """If a module is already in sys.modules, it should be reused.""" - pass - - @abc.abstractmethod - def test_state_after_failure(self): - """If a module is already in sys.modules and a reload fails - (e.g. a SyntaxError), the module should be in the state it was before - the reload began.""" - pass - - @abc.abstractmethod - def test_unloadable(self): - """Test ImportError is raised when the loader is asked to load a module - it can't.""" - pass diff --git a/Lib/importlib/test/benchmark.py b/Lib/importlib/test/benchmark.py deleted file mode 100644 index b5de6c6b010..00000000000 --- a/Lib/importlib/test/benchmark.py +++ /dev/null @@ -1,172 +0,0 @@ -"""Benchmark some basic import use-cases. - -The assumption is made that this benchmark is run in a fresh interpreter and -thus has no external changes made to import-related attributes in sys. - -""" -from . import util -from .source import util as source_util -import decimal -import imp -import importlib -import os -import py_compile -import sys -import timeit - - -def bench(name, cleanup=lambda: None, *, seconds=1, repeat=3): - """Bench the given statement as many times as necessary until total - executions take one second.""" - stmt = "__import__({!r})".format(name) - timer = timeit.Timer(stmt) - for x in range(repeat): - total_time = 0 - count = 0 - while total_time < seconds: - try: - total_time += timer.timeit(1) - finally: - cleanup() - count += 1 - else: - # One execution too far - if total_time > seconds: - count -= 1 - yield count // seconds - -def from_cache(seconds, repeat): - """sys.modules""" - name = '<benchmark import>' - module = imp.new_module(name) - module.__file__ = '<test>' - module.__package__ = '' - with util.uncache(name): - sys.modules[name] = module - for result in bench(name, repeat=repeat, seconds=seconds): - yield result - - -def builtin_mod(seconds, repeat): - """Built-in module""" - name = 'errno' - if name in sys.modules: - del sys.modules[name] - # Relying on built-in importer being implicit. - for result in bench(name, lambda: sys.modules.pop(name), repeat=repeat, - seconds=seconds): - yield result - - -def source_wo_bytecode(seconds, repeat): - """Source w/o bytecode: simple""" - sys.dont_write_bytecode = True - try: - name = '__importlib_test_benchmark__' - # Clears out sys.modules and puts an entry at the front of sys.path. - with source_util.create_modules(name) as mapping: - assert not os.path.exists(imp.cache_from_source(mapping[name])) - for result in bench(name, lambda: sys.modules.pop(name), repeat=repeat, - seconds=seconds): - yield result - finally: - sys.dont_write_bytecode = False - - -def decimal_wo_bytecode(seconds, repeat): - """Source w/o bytecode: decimal""" - name = 'decimal' - decimal_bytecode = imp.cache_from_source(decimal.__file__) - if os.path.exists(decimal_bytecode): - os.unlink(decimal_bytecode) - sys.dont_write_bytecode = True - try: - for result in bench(name, lambda: sys.modules.pop(name), repeat=repeat, - seconds=seconds): - yield result - finally: - sys.dont_write_bytecode = False - - -def source_writing_bytecode(seconds, repeat): - """Source writing bytecode: simple""" - assert not sys.dont_write_bytecode - name = '__importlib_test_benchmark__' - with source_util.create_modules(name) as mapping: - def cleanup(): - sys.modules.pop(name) - os.unlink(imp.cache_from_source(mapping[name])) - for result in bench(name, cleanup, repeat=repeat, seconds=seconds): - assert not os.path.exists(imp.cache_from_source(mapping[name])) - yield result - - -def decimal_writing_bytecode(seconds, repeat): - """Source writing bytecode: decimal""" - assert not sys.dont_write_bytecode - name = 'decimal' - def cleanup(): - sys.modules.pop(name) - os.unlink(imp.cache_from_source(decimal.__file__)) - for result in bench(name, cleanup, repeat=repeat, seconds=seconds): - yield result - - -def source_using_bytecode(seconds, repeat): - """Bytecode w/ source: simple""" - name = '__importlib_test_benchmark__' - with source_util.create_modules(name) as mapping: - py_compile.compile(mapping[name]) - assert os.path.exists(imp.cache_from_source(mapping[name])) - for result in bench(name, lambda: sys.modules.pop(name), repeat=repeat, - seconds=seconds): - yield result - - -def decimal_using_bytecode(seconds, repeat): - """Bytecode w/ source: decimal""" - name = 'decimal' - py_compile.compile(decimal.__file__) - for result in bench(name, lambda: sys.modules.pop(name), repeat=repeat, - seconds=seconds): - yield result - - -def main(import_): - __builtins__.__import__ = import_ - benchmarks = (from_cache, builtin_mod, - source_using_bytecode, source_wo_bytecode, - source_writing_bytecode, - decimal_using_bytecode, decimal_writing_bytecode, - decimal_wo_bytecode,) - seconds = 1 - seconds_plural = 's' if seconds > 1 else '' - repeat = 3 - header = "Measuring imports/second over {} second{}, best out of {}\n" - print(header.format(seconds, seconds_plural, repeat)) - for benchmark in benchmarks: - print(benchmark.__doc__, "[", end=' ') - sys.stdout.flush() - results = [] - for result in benchmark(seconds=seconds, repeat=repeat): - results.append(result) - print(result, end=' ') - sys.stdout.flush() - assert not sys.dont_write_bytecode - print("]", "best is", format(max(results), ',d')) - - -if __name__ == '__main__': - import optparse - - parser = optparse.OptionParser() - parser.add_option('-b', '--builtin', dest='builtin', action='store_true', - default=False, help="use the built-in __import__") - options, args = parser.parse_args() - if args: - raise RuntimeError("unrecognized args: {}".format(args)) - import_ = __import__ - if not options.builtin: - import_ = importlib.__import__ - - main(import_) diff --git a/Lib/importlib/test/builtin/__init__.py b/Lib/importlib/test/builtin/__init__.py deleted file mode 100644 index 31a3b5f7d46..00000000000 --- a/Lib/importlib/test/builtin/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import importlib.test -import os - - -def test_suite(): - directory = os.path.dirname(__file__) - return importlib.test.test_suite('importlib.test.builtin', directory) - - -if __name__ == '__main__': - from test.support import run_unittest - run_unittest(test_suite()) diff --git a/Lib/importlib/test/builtin/test_finder.py b/Lib/importlib/test/builtin/test_finder.py deleted file mode 100644 index 40f690e4aff..00000000000 --- a/Lib/importlib/test/builtin/test_finder.py +++ /dev/null @@ -1,55 +0,0 @@ -from importlib import machinery -from .. import abc -from .. import util -from . import util as builtin_util - -import sys -import unittest - -class FinderTests(abc.FinderTests): - - """Test find_module() for built-in modules.""" - - def test_module(self): - # Common case. - with util.uncache(builtin_util.NAME): - found = machinery.BuiltinImporter.find_module(builtin_util.NAME) - self.assertTrue(found) - - def test_package(self): - # Built-in modules cannot be a package. - pass - - def test_module_in_package(self): - # Built-in modules cannobt be in a package. - pass - - def test_package_in_package(self): - # Built-in modules cannot be a package. - pass - - def test_package_over_module(self): - # Built-in modules cannot be a package. - pass - - def test_failure(self): - assert 'importlib' not in sys.builtin_module_names - loader = machinery.BuiltinImporter.find_module('importlib') - self.assertTrue(loader is None) - - def test_ignore_path(self): - # The value for 'path' should always trigger a failed import. - with util.uncache(builtin_util.NAME): - loader = machinery.BuiltinImporter.find_module(builtin_util.NAME, - ['pkg']) - self.assertTrue(loader is None) - - - -def test_main(): - from test.support import run_unittest - run_unittest(FinderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/builtin/test_loader.py b/Lib/importlib/test/builtin/test_loader.py deleted file mode 100644 index 1a8539b1e80..00000000000 --- a/Lib/importlib/test/builtin/test_loader.py +++ /dev/null @@ -1,102 +0,0 @@ -import importlib -from importlib import machinery -from .. import abc -from .. import util -from . import util as builtin_util - -import sys -import types -import unittest - - -class LoaderTests(abc.LoaderTests): - - """Test load_module() for built-in modules.""" - - verification = {'__name__': 'errno', '__package__': '', - '__loader__': machinery.BuiltinImporter} - - def verify(self, module): - """Verify that the module matches against what it should have.""" - self.assertTrue(isinstance(module, types.ModuleType)) - for attr, value in self.verification.items(): - self.assertEqual(getattr(module, attr), value) - self.assertTrue(module.__name__ in sys.modules) - - load_module = staticmethod(lambda name: - machinery.BuiltinImporter.load_module(name)) - - def test_module(self): - # Common case. - with util.uncache(builtin_util.NAME): - module = self.load_module(builtin_util.NAME) - self.verify(module) - - def test_package(self): - # Built-in modules cannot be a package. - pass - - def test_lacking_parent(self): - # Built-in modules cannot be a package. - pass - - def test_state_after_failure(self): - # Not way to force an imoprt failure. - pass - - def test_module_reuse(self): - # Test that the same module is used in a reload. - with util.uncache(builtin_util.NAME): - module1 = self.load_module(builtin_util.NAME) - module2 = self.load_module(builtin_util.NAME) - self.assertTrue(module1 is module2) - - def test_unloadable(self): - name = 'dssdsdfff' - assert name not in sys.builtin_module_names - with self.assertRaises(ImportError): - self.load_module(name) - - def test_already_imported(self): - # Using the name of a module already imported but not a built-in should - # still fail. - assert hasattr(importlib, '__file__') # Not a built-in. - with self.assertRaises(ImportError): - self.load_module('importlib') - - -class InspectLoaderTests(unittest.TestCase): - - """Tests for InspectLoader methods for BuiltinImporter.""" - - def test_get_code(self): - # There is no code object. - result = machinery.BuiltinImporter.get_code(builtin_util.NAME) - self.assertTrue(result is None) - - def test_get_source(self): - # There is no source. - result = machinery.BuiltinImporter.get_source(builtin_util.NAME) - self.assertTrue(result is None) - - def test_is_package(self): - # Cannot be a package. - result = machinery.BuiltinImporter.is_package(builtin_util.NAME) - self.assertTrue(not result) - - def test_not_builtin(self): - # Modules not built-in should raise ImportError. - for meth_name in ('get_code', 'get_source', 'is_package'): - method = getattr(machinery.BuiltinImporter, meth_name) - with self.assertRaises(ImportError): - method(builtin_util.BAD_NAME) - - - -def test_main(): - from test.support import run_unittest - run_unittest(LoaderTests, InspectLoaderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/builtin/util.py b/Lib/importlib/test/builtin/util.py deleted file mode 100644 index 5704699ee23..00000000000 --- a/Lib/importlib/test/builtin/util.py +++ /dev/null @@ -1,7 +0,0 @@ -import sys - -assert 'errno' in sys.builtin_module_names -NAME = 'errno' - -assert 'importlib' not in sys.builtin_module_names -BAD_NAME = 'importlib' diff --git a/Lib/importlib/test/extension/__init__.py b/Lib/importlib/test/extension/__init__.py deleted file mode 100644 index 2ec584072d0..00000000000 --- a/Lib/importlib/test/extension/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -import importlib.test -import os.path -import unittest - - -def test_suite(): - directory = os.path.dirname(__file__) - return importlib.test.test_suite('importlib.test.extension', directory) - - -if __name__ == '__main__': - from test.support import run_unittest - run_unittest(test_suite()) diff --git a/Lib/importlib/test/extension/test_case_sensitivity.py b/Lib/importlib/test/extension/test_case_sensitivity.py deleted file mode 100644 index e062fb6597d..00000000000 --- a/Lib/importlib/test/extension/test_case_sensitivity.py +++ /dev/null @@ -1,42 +0,0 @@ -import sys -from test import support -import unittest -from importlib import _bootstrap -from .. import util -from . import util as ext_util - - -@util.case_insensitive_tests -class ExtensionModuleCaseSensitivityTest(unittest.TestCase): - - def find_module(self): - good_name = ext_util.NAME - bad_name = good_name.upper() - assert good_name != bad_name - finder = _bootstrap._FileFinder(ext_util.PATH, - _bootstrap._ExtensionFinderDetails()) - return finder.find_module(bad_name) - - def test_case_sensitive(self): - with support.EnvironmentVarGuard() as env: - env.unset('PYTHONCASEOK') - loader = self.find_module() - self.assertIsNone(loader) - - def test_case_insensitivity(self): - with support.EnvironmentVarGuard() as env: - env.set('PYTHONCASEOK', '1') - loader = self.find_module() - self.assertTrue(hasattr(loader, 'load_module')) - - - - -def test_main(): - if ext_util.FILENAME is None: - return - support.run_unittest(ExtensionModuleCaseSensitivityTest) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/extension/test_finder.py b/Lib/importlib/test/extension/test_finder.py deleted file mode 100644 index ea97483317c..00000000000 --- a/Lib/importlib/test/extension/test_finder.py +++ /dev/null @@ -1,47 +0,0 @@ -from importlib import _bootstrap -from .. import abc -from . import util - -import unittest - -class FinderTests(abc.FinderTests): - - """Test the finder for extension modules.""" - - def find_module(self, fullname): - importer = _bootstrap._FileFinder(util.PATH, - _bootstrap._ExtensionFinderDetails()) - return importer.find_module(fullname) - - def test_module(self): - self.assertTrue(self.find_module(util.NAME)) - - def test_package(self): - # Extension modules cannot be an __init__ for a package. - pass - - def test_module_in_package(self): - # No extension module in a package available for testing. - pass - - def test_package_in_package(self): - # Extension modules cannot be an __init__ for a package. - pass - - def test_package_over_module(self): - # Extension modules cannot be an __init__ for a package. - pass - - def test_failure(self): - self.assertTrue(self.find_module('asdfjkl;') is None) - - # XXX Raise an exception if someone tries to use the 'path' argument? - - -def test_main(): - from test.support import run_unittest - run_unittest(FinderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/extension/test_loader.py b/Lib/importlib/test/extension/test_loader.py deleted file mode 100644 index 4a783db8a5a..00000000000 --- a/Lib/importlib/test/extension/test_loader.py +++ /dev/null @@ -1,59 +0,0 @@ -from importlib import _bootstrap -from . import util as ext_util -from .. import abc -from .. import util - -import sys -import unittest - - -class LoaderTests(abc.LoaderTests): - - """Test load_module() for extension modules.""" - - def load_module(self, fullname): - loader = _bootstrap._ExtensionFileLoader(ext_util.NAME, - ext_util.FILEPATH) - return loader.load_module(fullname) - - def test_module(self): - with util.uncache(ext_util.NAME): - module = self.load_module(ext_util.NAME) - for attr, value in [('__name__', ext_util.NAME), - ('__file__', ext_util.FILEPATH), - ('__package__', '')]: - self.assertEqual(getattr(module, attr), value) - self.assertTrue(ext_util.NAME in sys.modules) - self.assertTrue(isinstance(module.__loader__, - _bootstrap._ExtensionFileLoader)) - - def test_package(self): - # Extensions are not found in packages. - pass - - def test_lacking_parent(self): - # Extensions are not found in packages. - pass - - def test_module_reuse(self): - with util.uncache(ext_util.NAME): - module1 = self.load_module(ext_util.NAME) - module2 = self.load_module(ext_util.NAME) - self.assertTrue(module1 is module2) - - def test_state_after_failure(self): - # No easy way to trigger a failure after a successful import. - pass - - def test_unloadable(self): - with self.assertRaises(ImportError): - self.load_module('asdfjkl;') - - -def test_main(): - from test.support import run_unittest - run_unittest(LoaderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/extension/test_path_hook.py b/Lib/importlib/test/extension/test_path_hook.py deleted file mode 100644 index 4610420d293..00000000000 --- a/Lib/importlib/test/extension/test_path_hook.py +++ /dev/null @@ -1,31 +0,0 @@ -from importlib import _bootstrap -from . import util - -import collections -import imp -import sys -import unittest - - -class PathHookTests(unittest.TestCase): - - """Test the path hook for extension modules.""" - # XXX Should it only succeed for pre-existing directories? - # XXX Should it only work for directories containing an extension module? - - def hook(self, entry): - return _bootstrap._file_path_hook(entry) - - def test_success(self): - # Path hook should handle a directory where a known extension module - # exists. - self.assertTrue(hasattr(self.hook(util.PATH), 'find_module')) - - -def test_main(): - from test.support import run_unittest - run_unittest(PathHookTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/extension/util.py b/Lib/importlib/test/extension/util.py deleted file mode 100644 index d1491697483..00000000000 --- a/Lib/importlib/test/extension/util.py +++ /dev/null @@ -1,21 +0,0 @@ -import imp -import os -import sys - -PATH = None -EXT = None -FILENAME = None -NAME = '_testcapi' -_file_exts = [x[0] for x in imp.get_suffixes() if x[2] == imp.C_EXTENSION] -try: - for PATH in sys.path: - for EXT in _file_exts: - FILENAME = NAME + EXT - FILEPATH = os.path.join(PATH, FILENAME) - if os.path.exists(os.path.join(PATH, FILENAME)): - raise StopIteration - else: - PATH = EXT = FILENAME = FILEPATH = None -except StopIteration: - pass -del _file_exts diff --git a/Lib/importlib/test/frozen/__init__.py b/Lib/importlib/test/frozen/__init__.py deleted file mode 100644 index 2945eeb0bc1..00000000000 --- a/Lib/importlib/test/frozen/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -import importlib.test -import os.path -import unittest - - -def test_suite(): - directory = os.path.dirname(__file__) - return importlib.test.test_suite('importlib.test.frozen', directory) - - -if __name__ == '__main__': - from test.support import run_unittest - run_unittest(test_suite()) diff --git a/Lib/importlib/test/frozen/test_finder.py b/Lib/importlib/test/frozen/test_finder.py deleted file mode 100644 index db88379d125..00000000000 --- a/Lib/importlib/test/frozen/test_finder.py +++ /dev/null @@ -1,47 +0,0 @@ -from ... import machinery -from .. import abc - -import unittest - - -class FinderTests(abc.FinderTests): - - """Test finding frozen modules.""" - - def find(self, name, path=None): - finder = machinery.FrozenImporter - return finder.find_module(name, path) - - def test_module(self): - name = '__hello__' - loader = self.find(name) - self.assertTrue(hasattr(loader, 'load_module')) - - def test_package(self): - loader = self.find('__phello__') - self.assertTrue(hasattr(loader, 'load_module')) - - def test_module_in_package(self): - loader = self.find('__phello__.spam', ['__phello__']) - self.assertTrue(hasattr(loader, 'load_module')) - - def test_package_in_package(self): - # No frozen package within another package to test with. - pass - - def test_package_over_module(self): - # No easy way to test. - pass - - def test_failure(self): - loader = self.find('<not real>') - self.assertTrue(loader is None) - - -def test_main(): - from test.support import run_unittest - run_unittest(FinderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/frozen/test_loader.py b/Lib/importlib/test/frozen/test_loader.py deleted file mode 100644 index b685ef57084..00000000000 --- a/Lib/importlib/test/frozen/test_loader.py +++ /dev/null @@ -1,105 +0,0 @@ -from importlib import machinery -import imp -import unittest -from .. import abc -from .. import util -from test.support import captured_stdout - -class LoaderTests(abc.LoaderTests): - - def test_module(self): - with util.uncache('__hello__'), captured_stdout() as stdout: - module = machinery.FrozenImporter.load_module('__hello__') - check = {'__name__': '__hello__', '__file__': '<frozen>', - '__package__': '', '__loader__': machinery.FrozenImporter} - for attr, value in check.items(): - self.assertEqual(getattr(module, attr), value) - self.assertEqual(stdout.getvalue(), 'Hello world!\n') - - def test_package(self): - with util.uncache('__phello__'), captured_stdout() as stdout: - module = machinery.FrozenImporter.load_module('__phello__') - check = {'__name__': '__phello__', '__file__': '<frozen>', - '__package__': '__phello__', '__path__': ['__phello__'], - '__loader__': machinery.FrozenImporter} - for attr, value in check.items(): - attr_value = getattr(module, attr) - self.assertEqual(attr_value, value, - "for __phello__.%s, %r != %r" % - (attr, attr_value, value)) - self.assertEqual(stdout.getvalue(), 'Hello world!\n') - - def test_lacking_parent(self): - with util.uncache('__phello__', '__phello__.spam'), \ - captured_stdout() as stdout: - module = machinery.FrozenImporter.load_module('__phello__.spam') - check = {'__name__': '__phello__.spam', '__file__': '<frozen>', - '__package__': '__phello__', - '__loader__': machinery.FrozenImporter} - for attr, value in check.items(): - attr_value = getattr(module, attr) - self.assertEqual(attr_value, value, - "for __phello__.spam.%s, %r != %r" % - (attr, attr_value, value)) - self.assertEqual(stdout.getvalue(), 'Hello world!\n') - - def test_module_reuse(self): - with util.uncache('__hello__'), captured_stdout() as stdout: - module1 = machinery.FrozenImporter.load_module('__hello__') - module2 = machinery.FrozenImporter.load_module('__hello__') - self.assertTrue(module1 is module2) - self.assertEqual(stdout.getvalue(), - 'Hello world!\nHello world!\n') - - def test_state_after_failure(self): - # No way to trigger an error in a frozen module. - pass - - def test_unloadable(self): - assert machinery.FrozenImporter.find_module('_not_real') is None - with self.assertRaises(ImportError): - machinery.FrozenImporter.load_module('_not_real') - - -class InspectLoaderTests(unittest.TestCase): - - """Tests for the InspectLoader methods for FrozenImporter.""" - - def test_get_code(self): - # Make sure that the code object is good. - name = '__hello__' - with captured_stdout() as stdout: - code = machinery.FrozenImporter.get_code(name) - mod = imp.new_module(name) - exec(code, mod.__dict__) - self.assertTrue(hasattr(mod, 'initialized')) - self.assertEqual(stdout.getvalue(), 'Hello world!\n') - - def test_get_source(self): - # Should always return None. - result = machinery.FrozenImporter.get_source('__hello__') - self.assertTrue(result is None) - - def test_is_package(self): - # Should be able to tell what is a package. - test_for = (('__hello__', False), ('__phello__', True), - ('__phello__.spam', False)) - for name, is_package in test_for: - result = machinery.FrozenImporter.is_package(name) - self.assertTrue(bool(result) == is_package) - - def test_failure(self): - # Raise ImportError for modules that are not frozen. - for meth_name in ('get_code', 'get_source', 'is_package'): - method = getattr(machinery.FrozenImporter, meth_name) - with self.assertRaises(ImportError): - method('importlib') - - -def test_main(): - from test.support import run_unittest - run_unittest(LoaderTests, InspectLoaderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/__init__.py b/Lib/importlib/test/import_/__init__.py deleted file mode 100644 index fdf7661dc0b..00000000000 --- a/Lib/importlib/test/import_/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -import importlib.test -import os.path -import unittest - - -def test_suite(): - directory = os.path.dirname(__file__) - return importlib.test.test_suite('importlib.test.import_', directory) - - -if __name__ == '__main__': - from test.support import run_unittest - run_unittest(test_suite()) diff --git a/Lib/importlib/test/import_/test___package__.py b/Lib/importlib/test/import_/test___package__.py deleted file mode 100644 index 5056ae59cca..00000000000 --- a/Lib/importlib/test/import_/test___package__.py +++ /dev/null @@ -1,119 +0,0 @@ -"""PEP 366 ("Main module explicit relative imports") specifies the -semantics for the __package__ attribute on modules. This attribute is -used, when available, to detect which package a module belongs to (instead -of using the typical __path__/__name__ test). - -""" -import unittest -from .. import util -from . import util as import_util - - -class Using__package__(unittest.TestCase): - - """Use of __package__ supercedes the use of __name__/__path__ to calculate - what package a module belongs to. The basic algorithm is [__package__]:: - - def resolve_name(name, package, level): - level -= 1 - base = package.rsplit('.', level)[0] - return '{0}.{1}'.format(base, name) - - But since there is no guarantee that __package__ has been set (or not been - set to None [None]), there has to be a way to calculate the attribute's value - [__name__]:: - - def calc_package(caller_name, has___path__): - if has__path__: - return caller_name - else: - return caller_name.rsplit('.', 1)[0] - - Then the normal algorithm for relative name imports can proceed as if - __package__ had been set. - - """ - - def test_using___package__(self): - # [__package__] - with util.mock_modules('pkg.__init__', 'pkg.fake') as importer: - with util.import_state(meta_path=[importer]): - import_util.import_('pkg.fake') - module = import_util.import_('', - globals={'__package__': 'pkg.fake'}, - fromlist=['attr'], level=2) - self.assertEqual(module.__name__, 'pkg') - - def test_using___name__(self, package_as_None=False): - # [__name__] - globals_ = {'__name__': 'pkg.fake', '__path__': []} - if package_as_None: - globals_['__package__'] = None - with util.mock_modules('pkg.__init__', 'pkg.fake') as importer: - with util.import_state(meta_path=[importer]): - import_util.import_('pkg.fake') - module = import_util.import_('', globals= globals_, - fromlist=['attr'], level=2) - self.assertEqual(module.__name__, 'pkg') - - def test_None_as___package__(self): - # [None] - self.test_using___name__(package_as_None=True) - - def test_bad__package__(self): - globals = {'__package__': '<not real>'} - with self.assertRaises(SystemError): - import_util.import_('', globals, {}, ['relimport'], 1) - - def test_bunk__package__(self): - globals = {'__package__': 42} - with self.assertRaises(ValueError): - import_util.import_('', globals, {}, ['relimport'], 1) - - -@import_util.importlib_only -class Setting__package__(unittest.TestCase): - - """Because __package__ is a new feature, it is not always set by a loader. - Import will set it as needed to help with the transition to relying on - __package__. - - For a top-level module, __package__ is set to None [top-level]. For a - package __name__ is used for __package__ [package]. For submodules the - value is __name__.rsplit('.', 1)[0] [submodule]. - - """ - - # [top-level] - def test_top_level(self): - with util.mock_modules('top_level') as mock: - with util.import_state(meta_path=[mock]): - del mock['top_level'].__package__ - module = import_util.import_('top_level') - self.assertEqual(module.__package__, '') - - # [package] - def test_package(self): - with util.mock_modules('pkg.__init__') as mock: - with util.import_state(meta_path=[mock]): - del mock['pkg'].__package__ - module = import_util.import_('pkg') - self.assertEqual(module.__package__, 'pkg') - - # [submodule] - def test_submodule(self): - with util.mock_modules('pkg.__init__', 'pkg.mod') as mock: - with util.import_state(meta_path=[mock]): - del mock['pkg.mod'].__package__ - pkg = import_util.import_('pkg.mod') - module = getattr(pkg, 'mod') - self.assertEqual(module.__package__, 'pkg') - - -def test_main(): - from test.support import run_unittest - run_unittest(Using__package__, Setting__package__) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_api.py b/Lib/importlib/test/import_/test_api.py deleted file mode 100644 index 9075d427597..00000000000 --- a/Lib/importlib/test/import_/test_api.py +++ /dev/null @@ -1,22 +0,0 @@ -from . import util -import unittest - - -class APITest(unittest.TestCase): - - """Test API-specific details for __import__ (e.g. raising the right - exception when passing in an int for the module name).""" - - def test_name_requires_rparition(self): - # Raise TypeError if a non-string is passed in for the module name. - with self.assertRaises(TypeError): - util.import_(42) - - -def test_main(): - from test.support import run_unittest - run_unittest(APITest) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_caching.py b/Lib/importlib/test/import_/test_caching.py deleted file mode 100644 index 48dc64311af..00000000000 --- a/Lib/importlib/test/import_/test_caching.py +++ /dev/null @@ -1,86 +0,0 @@ -"""Test that sys.modules is used properly by import.""" -from .. import util -from . import util as import_util -import sys -from types import MethodType -import unittest - - -class UseCache(unittest.TestCase): - - """When it comes to sys.modules, import prefers it over anything else. - - Once a name has been resolved, sys.modules is checked to see if it contains - the module desired. If so, then it is returned [use cache]. If it is not - found, then the proper steps are taken to perform the import, but - sys.modules is still used to return the imported module (e.g., not what a - loader returns) [from cache on return]. This also applies to imports of - things contained within a package and thus get assigned as an attribute - [from cache to attribute] or pulled in thanks to a fromlist import - [from cache for fromlist]. But if sys.modules contains None then - ImportError is raised [None in cache]. - - """ - def test_using_cache(self): - # [use cache] - module_to_use = "some module found!" - with util.uncache(module_to_use): - sys.modules['some_module'] = module_to_use - module = import_util.import_('some_module') - self.assertEqual(id(module_to_use), id(module)) - - def test_None_in_cache(self): - #[None in cache] - name = 'using_None' - with util.uncache(name): - sys.modules[name] = None - with self.assertRaises(ImportError): - import_util.import_(name) - - def create_mock(self, *names, return_=None): - mock = util.mock_modules(*names) - original_load = mock.load_module - def load_module(self, fullname): - original_load(fullname) - return return_ - mock.load_module = MethodType(load_module, mock) - return mock - - # __import__ inconsistent between loaders and built-in import when it comes - # to when to use the module in sys.modules and when not to. - @import_util.importlib_only - def test_using_cache_after_loader(self): - # [from cache on return] - with self.create_mock('module') as mock: - with util.import_state(meta_path=[mock]): - module = import_util.import_('module') - self.assertEqual(id(module), id(sys.modules['module'])) - - # See test_using_cache_after_loader() for reasoning. - @import_util.importlib_only - def test_using_cache_for_assigning_to_attribute(self): - # [from cache to attribute] - with self.create_mock('pkg.__init__', 'pkg.module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg.module') - self.assertTrue(hasattr(module, 'module')) - self.assertTrue(id(module.module), id(sys.modules['pkg.module'])) - - # See test_using_cache_after_loader() for reasoning. - @import_util.importlib_only - def test_using_cache_for_fromlist(self): - # [from cache for fromlist] - with self.create_mock('pkg.__init__', 'pkg.module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg', fromlist=['module']) - self.assertTrue(hasattr(module, 'module')) - self.assertEqual(id(module.module), - id(sys.modules['pkg.module'])) - - -def test_main(): - from test.support import run_unittest - run_unittest(UseCache) - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_fromlist.py b/Lib/importlib/test/import_/test_fromlist.py deleted file mode 100644 index 7ecde037aee..00000000000 --- a/Lib/importlib/test/import_/test_fromlist.py +++ /dev/null @@ -1,123 +0,0 @@ -"""Test that the semantics relating to the 'fromlist' argument are correct.""" -from .. import util -from . import util as import_util -import unittest - -class ReturnValue(unittest.TestCase): - - """The use of fromlist influences what import returns. - - If direct ``import ...`` statement is used, the root module or package is - returned [import return]. But if fromlist is set, then the specified module - is actually returned (whether it is a relative import or not) - [from return]. - - """ - - def test_return_from_import(self): - # [import return] - with util.mock_modules('pkg.__init__', 'pkg.module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg.module') - self.assertEqual(module.__name__, 'pkg') - - def test_return_from_from_import(self): - # [from return] - with util.mock_modules('pkg.__init__', 'pkg.module')as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg.module', fromlist=['attr']) - self.assertEqual(module.__name__, 'pkg.module') - - -class HandlingFromlist(unittest.TestCase): - - """Using fromlist triggers different actions based on what is being asked - of it. - - If fromlist specifies an object on a module, nothing special happens - [object case]. This is even true if the object does not exist [bad object]. - - If a package is being imported, then what is listed in fromlist may be - treated as a module to be imported [module]. But once again, even if - something in fromlist does not exist as a module, no error is raised - [no module]. And this extends to what is contained in __all__ when '*' is - imported [using *]. And '*' does not need to be the only name in the - fromlist [using * with others]. - - """ - - def test_object(self): - # [object case] - with util.mock_modules('module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('module', fromlist=['attr']) - self.assertEqual(module.__name__, 'module') - - def test_unexistent_object(self): - # [bad object] - with util.mock_modules('module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('module', fromlist=['non_existent']) - self.assertEqual(module.__name__, 'module') - self.assertTrue(not hasattr(module, 'non_existent')) - - def test_module_from_package(self): - # [module] - with util.mock_modules('pkg.__init__', 'pkg.module') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg', fromlist=['module']) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'module')) - self.assertEqual(module.module.__name__, 'pkg.module') - - def test_no_module_from_package(self): - # [no module] - with util.mock_modules('pkg.__init__') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg', fromlist='non_existent') - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(not hasattr(module, 'non_existent')) - - def test_empty_string(self): - with util.mock_modules('pkg.__init__', 'pkg.mod') as importer: - with util.import_state(meta_path=[importer]): - module = import_util.import_('pkg.mod', fromlist=['']) - self.assertEqual(module.__name__, 'pkg.mod') - - def basic_star_test(self, fromlist=['*']): - # [using *] - with util.mock_modules('pkg.__init__', 'pkg.module') as mock: - with util.import_state(meta_path=[mock]): - mock['pkg'].__all__ = ['module'] - module = import_util.import_('pkg', fromlist=fromlist) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'module')) - self.assertEqual(module.module.__name__, 'pkg.module') - - def test_using_star(self): - # [using *] - self.basic_star_test() - - def test_fromlist_as_tuple(self): - self.basic_star_test(('*',)) - - def test_star_with_others(self): - # [using * with others] - context = util.mock_modules('pkg.__init__', 'pkg.module1', 'pkg.module2') - with context as mock: - with util.import_state(meta_path=[mock]): - mock['pkg'].__all__ = ['module1'] - module = import_util.import_('pkg', fromlist=['module2', '*']) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'module1')) - self.assertTrue(hasattr(module, 'module2')) - self.assertEqual(module.module1.__name__, 'pkg.module1') - self.assertEqual(module.module2.__name__, 'pkg.module2') - - -def test_main(): - from test.support import run_unittest - run_unittest(ReturnValue, HandlingFromlist) - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_meta_path.py b/Lib/importlib/test/import_/test_meta_path.py deleted file mode 100644 index 3b130c9a13c..00000000000 --- a/Lib/importlib/test/import_/test_meta_path.py +++ /dev/null @@ -1,97 +0,0 @@ -from .. import util -from . import util as import_util -from types import MethodType -import unittest - - -class CallingOrder(unittest.TestCase): - - """Calls to the importers on sys.meta_path happen in order that they are - specified in the sequence, starting with the first importer - [first called], and then continuing on down until one is found that doesn't - return None [continuing].""" - - - def test_first_called(self): - # [first called] - mod = 'top_level' - first = util.mock_modules(mod) - second = util.mock_modules(mod) - with util.mock_modules(mod) as first, util.mock_modules(mod) as second: - first.modules[mod] = 42 - second.modules[mod] = -13 - with util.import_state(meta_path=[first, second]): - self.assertEqual(import_util.import_(mod), 42) - - def test_continuing(self): - # [continuing] - mod_name = 'for_real' - with util.mock_modules('nonexistent') as first, \ - util.mock_modules(mod_name) as second: - first.find_module = lambda self, fullname, path=None: None - second.modules[mod_name] = 42 - with util.import_state(meta_path=[first, second]): - self.assertEqual(import_util.import_(mod_name), 42) - - -class CallSignature(unittest.TestCase): - - """If there is no __path__ entry on the parent module, then 'path' is None - [no path]. Otherwise, the value for __path__ is passed in for the 'path' - argument [path set].""" - - def log(self, fxn): - log = [] - def wrapper(self, *args, **kwargs): - log.append([args, kwargs]) - return fxn(*args, **kwargs) - return log, wrapper - - - def test_no_path(self): - # [no path] - mod_name = 'top_level' - assert '.' not in mod_name - with util.mock_modules(mod_name) as importer: - log, wrapped_call = self.log(importer.find_module) - importer.find_module = MethodType(wrapped_call, importer) - with util.import_state(meta_path=[importer]): - import_util.import_(mod_name) - assert len(log) == 1 - args = log[0][0] - kwargs = log[0][1] - # Assuming all arguments are positional. - self.assertEqual(len(args), 2) - self.assertEqual(len(kwargs), 0) - self.assertEqual(args[0], mod_name) - self.assertTrue(args[1] is None) - - def test_with_path(self): - # [path set] - pkg_name = 'pkg' - mod_name = pkg_name + '.module' - path = [42] - assert '.' in mod_name - with util.mock_modules(pkg_name+'.__init__', mod_name) as importer: - importer.modules[pkg_name].__path__ = path - log, wrapped_call = self.log(importer.find_module) - importer.find_module = MethodType(wrapped_call, importer) - with util.import_state(meta_path=[importer]): - import_util.import_(mod_name) - assert len(log) == 2 - args = log[1][0] - kwargs = log[1][1] - # Assuming all arguments are positional. - self.assertTrue(not kwargs) - self.assertEqual(args[0], mod_name) - self.assertTrue(args[1] is path) - - - -def test_main(): - from test.support import run_unittest - run_unittest(CallingOrder, CallSignature) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_packages.py b/Lib/importlib/test/import_/test_packages.py deleted file mode 100644 index faadc32172b..00000000000 --- a/Lib/importlib/test/import_/test_packages.py +++ /dev/null @@ -1,37 +0,0 @@ -from .. import util -from . import util as import_util -import sys -import unittest -import importlib - - -class ParentModuleTests(unittest.TestCase): - - """Importing a submodule should import the parent modules.""" - - def test_import_parent(self): - with util.mock_modules('pkg.__init__', 'pkg.module') as mock: - with util.import_state(meta_path=[mock]): - module = import_util.import_('pkg.module') - self.assertTrue('pkg' in sys.modules) - - def test_bad_parent(self): - with util.mock_modules('pkg.module') as mock: - with util.import_state(meta_path=[mock]): - with self.assertRaises(ImportError): - import_util.import_('pkg.module') - - def test_module_not_package(self): - # Try to import a submodule from a non-package should raise ImportError. - assert not hasattr(sys, '__path__') - with self.assertRaises(ImportError): - import_util.import_('sys.no_submodules_here') - - -def test_main(): - from test.support import run_unittest - run_unittest(ParentModuleTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_path.py b/Lib/importlib/test/import_/test_path.py deleted file mode 100644 index 2faa23174b3..00000000000 --- a/Lib/importlib/test/import_/test_path.py +++ /dev/null @@ -1,131 +0,0 @@ -from importlib import _bootstrap -from importlib import machinery -from .. import util -from . import util as import_util -import imp -import os -import sys -import tempfile -from test import support -from types import MethodType -import unittest - - -class FinderTests(unittest.TestCase): - - """Tests for PathFinder.""" - - def test_failure(self): - # Test None returned upon not finding a suitable finder. - module = '<test module>' - with util.import_state(): - self.assertTrue(machinery.PathFinder.find_module(module) is None) - - def test_sys_path(self): - # Test that sys.path is used when 'path' is None. - # Implicitly tests that sys.path_importer_cache is used. - module = '<test module>' - path = '<test path>' - importer = util.mock_modules(module) - with util.import_state(path_importer_cache={path: importer}, - path=[path]): - loader = machinery.PathFinder.find_module(module) - self.assertTrue(loader is importer) - - def test_path(self): - # Test that 'path' is used when set. - # Implicitly tests that sys.path_importer_cache is used. - module = '<test module>' - path = '<test path>' - importer = util.mock_modules(module) - with util.import_state(path_importer_cache={path: importer}): - loader = machinery.PathFinder.find_module(module, [path]) - self.assertTrue(loader is importer) - - def test_path_hooks(self): - # Test that sys.path_hooks is used. - # Test that sys.path_importer_cache is set. - module = '<test module>' - path = '<test path>' - importer = util.mock_modules(module) - hook = import_util.mock_path_hook(path, importer=importer) - with util.import_state(path_hooks=[hook]): - loader = machinery.PathFinder.find_module(module, [path]) - self.assertTrue(loader is importer) - self.assertTrue(path in sys.path_importer_cache) - self.assertTrue(sys.path_importer_cache[path] is importer) - - def test_path_importer_cache_has_None(self): - # Test that if sys.path_importer_cache has None that None is returned. - clear_cache = {path: None for path in sys.path} - with util.import_state(path_importer_cache=clear_cache): - for name in ('asynchat', 'sys', '<test module>'): - self.assertTrue(machinery.PathFinder.find_module(name) is None) - - def test_path_importer_cache_has_None_continues(self): - # Test that having None in sys.path_importer_cache causes the search to - # continue. - path = '<test path>' - module = '<test module>' - importer = util.mock_modules(module) - with util.import_state(path=['1', '2'], - path_importer_cache={'1': None, '2': importer}): - loader = machinery.PathFinder.find_module(module) - self.assertTrue(loader is importer) - - - -class DefaultPathFinderTests(unittest.TestCase): - - """Test importlib._bootstrap._DefaultPathFinder.""" - - def test_implicit_hooks(self): - # Test that the implicit path hooks are used. - bad_path = '<path>' - module = '<module>' - assert not os.path.exists(bad_path) - existing_path = tempfile.mkdtemp() - try: - with util.import_state(): - nothing = _bootstrap._DefaultPathFinder.find_module(module, - path=[existing_path]) - self.assertTrue(nothing is None) - self.assertTrue(existing_path in sys.path_importer_cache) - result = isinstance(sys.path_importer_cache[existing_path], - imp.NullImporter) - self.assertFalse(result) - nothing = _bootstrap._DefaultPathFinder.find_module(module, - path=[bad_path]) - self.assertTrue(nothing is None) - self.assertTrue(bad_path in sys.path_importer_cache) - self.assertTrue(isinstance(sys.path_importer_cache[bad_path], - imp.NullImporter)) - finally: - os.rmdir(existing_path) - - - def test_path_importer_cache_has_None(self): - # Test that the default hook is used when sys.path_importer_cache - # contains None for a path. - module = '<test module>' - importer = util.mock_modules(module) - path = '<test path>' - # XXX Not blackbox. - original_hook = _bootstrap._DEFAULT_PATH_HOOK - mock_hook = import_util.mock_path_hook(path, importer=importer) - _bootstrap._DEFAULT_PATH_HOOK = mock_hook - try: - with util.import_state(path_importer_cache={path: None}): - loader = _bootstrap._DefaultPathFinder.find_module(module, - path=[path]) - self.assertTrue(loader is importer) - finally: - _bootstrap._DEFAULT_PATH_HOOK = original_hook - - -def test_main(): - from test.support import run_unittest - run_unittest(FinderTests, DefaultPathFinderTests) - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/test_relative_imports.py b/Lib/importlib/test/import_/test_relative_imports.py deleted file mode 100644 index a0f6b2d827d..00000000000 --- a/Lib/importlib/test/import_/test_relative_imports.py +++ /dev/null @@ -1,203 +0,0 @@ -"""Test relative imports (PEP 328).""" -from .. import util -from . import util as import_util -import sys -import unittest - -class RelativeImports(unittest.TestCase): - - """PEP 328 introduced relative imports. This allows for imports to occur - from within a package without having to specify the actual package name. - - A simple example is to import another module within the same package - [module from module]:: - - # From pkg.mod1 with pkg.mod2 being a module. - from . import mod2 - - This also works for getting an attribute from a module that is specified - in a relative fashion [attr from module]:: - - # From pkg.mod1. - from .mod2 import attr - - But this is in no way restricted to working between modules; it works - from [package to module],:: - - # From pkg, importing pkg.module which is a module. - from . import module - - [module to package],:: - - # Pull attr from pkg, called from pkg.module which is a module. - from . import attr - - and [package to package]:: - - # From pkg.subpkg1 (both pkg.subpkg[1,2] are packages). - from .. import subpkg2 - - The number of dots used is in no way restricted [deep import]:: - - # Import pkg.attr from pkg.pkg1.pkg2.pkg3.pkg4.pkg5. - from ...... import attr - - To prevent someone from accessing code that is outside of a package, one - cannot reach the location containing the root package itself:: - - # From pkg.__init__ [too high from package] - from .. import top_level - - # From pkg.module [too high from module] - from .. import top_level - - Relative imports are the only type of import that allow for an empty - module name for an import [empty name]. - - """ - - def relative_import_test(self, create, globals_, callback): - """Abstract out boilerplace for setting up for an import test.""" - uncache_names = [] - for name in create: - if not name.endswith('.__init__'): - uncache_names.append(name) - else: - uncache_names.append(name[:-len('.__init__')]) - with util.mock_modules(*create) as importer: - with util.import_state(meta_path=[importer]): - for global_ in globals_: - with util.uncache(*uncache_names): - callback(global_) - - - def test_module_from_module(self): - # [module from module] - create = 'pkg.__init__', 'pkg.mod2' - globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.mod1'} - def callback(global_): - import_util.import_('pkg') # For __import__(). - module = import_util.import_('', global_, fromlist=['mod2'], level=1) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'mod2')) - self.assertEqual(module.mod2.attr, 'pkg.mod2') - self.relative_import_test(create, globals_, callback) - - def test_attr_from_module(self): - # [attr from module] - create = 'pkg.__init__', 'pkg.mod2' - globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.mod1'} - def callback(global_): - import_util.import_('pkg') # For __import__(). - module = import_util.import_('mod2', global_, fromlist=['attr'], - level=1) - self.assertEqual(module.__name__, 'pkg.mod2') - self.assertEqual(module.attr, 'pkg.mod2') - self.relative_import_test(create, globals_, callback) - - def test_package_to_module(self): - # [package to module] - create = 'pkg.__init__', 'pkg.module' - globals_ = ({'__package__': 'pkg'}, - {'__name__': 'pkg', '__path__': ['blah']}) - def callback(global_): - import_util.import_('pkg') # For __import__(). - module = import_util.import_('', global_, fromlist=['module'], - level=1) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'module')) - self.assertEqual(module.module.attr, 'pkg.module') - self.relative_import_test(create, globals_, callback) - - def test_module_to_package(self): - # [module to package] - create = 'pkg.__init__', 'pkg.module' - globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.module'} - def callback(global_): - import_util.import_('pkg') # For __import__(). - module = import_util.import_('', global_, fromlist=['attr'], level=1) - self.assertEqual(module.__name__, 'pkg') - self.relative_import_test(create, globals_, callback) - - def test_package_to_package(self): - # [package to package] - create = ('pkg.__init__', 'pkg.subpkg1.__init__', - 'pkg.subpkg2.__init__') - globals_ = ({'__package__': 'pkg.subpkg1'}, - {'__name__': 'pkg.subpkg1', '__path__': ['blah']}) - def callback(global_): - module = import_util.import_('', global_, fromlist=['subpkg2'], - level=2) - self.assertEqual(module.__name__, 'pkg') - self.assertTrue(hasattr(module, 'subpkg2')) - self.assertEqual(module.subpkg2.attr, 'pkg.subpkg2.__init__') - - def test_deep_import(self): - # [deep import] - create = ['pkg.__init__'] - for count in range(1,6): - create.append('{0}.pkg{1}.__init__'.format( - create[-1][:-len('.__init__')], count)) - globals_ = ({'__package__': 'pkg.pkg1.pkg2.pkg3.pkg4.pkg5'}, - {'__name__': 'pkg.pkg1.pkg2.pkg3.pkg4.pkg5', - '__path__': ['blah']}) - def callback(global_): - import_util.import_(globals_[0]['__package__']) - module = import_util.import_('', global_, fromlist=['attr'], level=6) - self.assertEqual(module.__name__, 'pkg') - self.relative_import_test(create, globals_, callback) - - def test_too_high_from_package(self): - # [too high from package] - create = ['top_level', 'pkg.__init__'] - globals_ = ({'__package__': 'pkg'}, - {'__name__': 'pkg', '__path__': ['blah']}) - def callback(global_): - import_util.import_('pkg') - with self.assertRaises(ValueError): - import_util.import_('', global_, fromlist=['top_level'], - level=2) - self.relative_import_test(create, globals_, callback) - - def test_too_high_from_module(self): - # [too high from module] - create = ['top_level', 'pkg.__init__', 'pkg.module'] - globals_ = {'__package__': 'pkg'}, {'__name__': 'pkg.module'} - def callback(global_): - import_util.import_('pkg') - with self.assertRaises(ValueError): - import_util.import_('', global_, fromlist=['top_level'], - level=2) - self.relative_import_test(create, globals_, callback) - - def test_empty_name_w_level_0(self): - # [empty name] - with self.assertRaises(ValueError): - import_util.import_('') - - def test_import_from_different_package(self): - # Test importing from a different package than the caller. - # in pkg.subpkg1.mod - # from ..subpkg2 import mod - create = ['__runpy_pkg__.__init__', - '__runpy_pkg__.__runpy_pkg__.__init__', - '__runpy_pkg__.uncle.__init__', - '__runpy_pkg__.uncle.cousin.__init__', - '__runpy_pkg__.uncle.cousin.nephew'] - globals_ = {'__package__': '__runpy_pkg__.__runpy_pkg__'} - def callback(global_): - import_util.import_('__runpy_pkg__.__runpy_pkg__') - module = import_util.import_('uncle.cousin', globals_, {}, - fromlist=['nephew'], - level=2) - self.assertEqual(module.__name__, '__runpy_pkg__.uncle.cousin') - self.relative_import_test(create, globals_, callback) - - - -def test_main(): - from test.support import run_unittest - run_unittest(RelativeImports) - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/import_/util.py b/Lib/importlib/test/import_/util.py deleted file mode 100644 index 649c5ed27c3..00000000000 --- a/Lib/importlib/test/import_/util.py +++ /dev/null @@ -1,29 +0,0 @@ -import functools -import importlib -import importlib._bootstrap -import unittest - - -using___import__ = False - - -def import_(*args, **kwargs): - """Delegate to allow for injecting different implementations of import.""" - if using___import__: - return __import__(*args, **kwargs) - else: - return importlib.__import__(*args, **kwargs) - - -def importlib_only(fxn): - """Decorator to skip a test if using __builtins__.__import__.""" - return unittest.skipIf(using___import__, "importlib-specific test")(fxn) - - -def mock_path_hook(*entries, importer): - """A mock sys.path_hooks entry.""" - def hook(entry): - if entry not in entries: - raise ImportError - return importer - return hook diff --git a/Lib/importlib/test/regrtest.py b/Lib/importlib/test/regrtest.py deleted file mode 100644 index b103ae7d0e9..00000000000 --- a/Lib/importlib/test/regrtest.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Run Python's standard test suite using importlib.__import__. - -Tests known to fail because of assumptions that importlib (properly) -invalidates are automatically skipped if the entire test suite is run. -Otherwise all command-line options valid for test.regrtest are also valid for -this script. - -XXX FAILING - * test_import - - test_incorrect_code_name - file name differing between __file__ and co_filename (r68360 on trunk) - - test_import_by_filename - exception for trying to import by file name does not match - -""" -import importlib -import sys -from test import regrtest - -if __name__ == '__main__': - __builtins__.__import__ = importlib.__import__ - - exclude = ['--exclude', - 'test_frozen', # Does not expect __loader__ attribute - 'test_pkg', # Does not expect __loader__ attribute - 'test_pydoc', # Does not expect __loader__ attribute - ] - - # Switching on --exclude implies running all test but the ones listed, so - # only use it when one is not running an explicit test - if len(sys.argv) == 1: - # No programmatic way to specify tests to exclude - sys.argv.extend(exclude) - - regrtest.main(quiet=True, verbose2=True) diff --git a/Lib/importlib/test/source/__init__.py b/Lib/importlib/test/source/__init__.py deleted file mode 100644 index 8d7c49dc9c5..00000000000 --- a/Lib/importlib/test/source/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -import importlib.test -import os.path -import unittest - - -def test_suite(): - directory = os.path.dirname(__file__) - return importlib.test.test_suite('importlib.test.source', directory) - - -if __name__ == '__main__': - from test.support import run_unittest - run_unittest(test_suite()) diff --git a/Lib/importlib/test/source/test_abc_loader.py b/Lib/importlib/test/source/test_abc_loader.py deleted file mode 100644 index 32459074a07..00000000000 --- a/Lib/importlib/test/source/test_abc_loader.py +++ /dev/null @@ -1,876 +0,0 @@ -import importlib -from importlib import abc - -from .. import abc as testing_abc -from .. import util -from . import util as source_util - -import imp -import inspect -import io -import marshal -import os -import sys -import types -import unittest -import warnings - - -class SourceOnlyLoaderMock(abc.SourceLoader): - - # Globals that should be defined for all modules. - source = (b"_ = '::'.join([__name__, __file__, __cached__, __package__, " - b"repr(__loader__)])") - - def __init__(self, path): - self.path = path - - def get_data(self, path): - assert self.path == path - return self.source - - def get_filename(self, fullname): - return self.path - - -class SourceLoaderMock(SourceOnlyLoaderMock): - - source_mtime = 1 - - def __init__(self, path, magic=imp.get_magic()): - super().__init__(path) - self.bytecode_path = imp.cache_from_source(self.path) - data = bytearray(magic) - data.extend(marshal._w_long(self.source_mtime)) - code_object = compile(self.source, self.path, 'exec', - dont_inherit=True) - data.extend(marshal.dumps(code_object)) - self.bytecode = bytes(data) - self.written = {} - - def get_data(self, path): - if path == self.path: - return super().get_data(path) - elif path == self.bytecode_path: - return self.bytecode - else: - raise IOError - - def path_mtime(self, path): - assert path == self.path - return self.source_mtime - - def set_data(self, path, data): - self.written[path] = bytes(data) - return path == self.bytecode_path - - -class PyLoaderMock(abc.PyLoader): - - # Globals that should be defined for all modules. - source = (b"_ = '::'.join([__name__, __file__, __package__, " - b"repr(__loader__)])") - - def __init__(self, data): - """Take a dict of 'module_name: path' pairings. - - Paths should have no file extension, allowing packages to be denoted by - ending in '__init__'. - - """ - self.module_paths = data - self.path_to_module = {val:key for key,val in data.items()} - - def get_data(self, path): - if path not in self.path_to_module: - raise IOError - return self.source - - def is_package(self, name): - filename = os.path.basename(self.get_filename(name)) - return os.path.splitext(filename)[0] == '__init__' - - def source_path(self, name): - try: - return self.module_paths[name] - except KeyError: - raise ImportError - - def get_filename(self, name): - """Silence deprecation warning.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - path = super().get_filename(name) - assert len(w) == 1 - assert issubclass(w[0].category, PendingDeprecationWarning) - return path - - -class PyLoaderCompatMock(PyLoaderMock): - - """Mock that matches what is suggested to have a loader that is compatible - from Python 3.1 onwards.""" - - def get_filename(self, fullname): - try: - return self.module_paths[fullname] - except KeyError: - raise ImportError - - def source_path(self, fullname): - try: - return self.get_filename(fullname) - except ImportError: - return None - - -class PyPycLoaderMock(abc.PyPycLoader, PyLoaderMock): - - default_mtime = 1 - - def __init__(self, source, bc={}): - """Initialize mock. - - 'bc' is a dict keyed on a module's name. The value is dict with - possible keys of 'path', 'mtime', 'magic', and 'bc'. Except for 'path', - each of those keys control if any part of created bytecode is to - deviate from default values. - - """ - super().__init__(source) - self.module_bytecode = {} - self.path_to_bytecode = {} - self.bytecode_to_path = {} - for name, data in bc.items(): - self.path_to_bytecode[data['path']] = name - self.bytecode_to_path[name] = data['path'] - magic = data.get('magic', imp.get_magic()) - mtime = importlib._w_long(data.get('mtime', self.default_mtime)) - if 'bc' in data: - bc = data['bc'] - else: - bc = self.compile_bc(name) - self.module_bytecode[name] = magic + mtime + bc - - def compile_bc(self, name): - source_path = self.module_paths.get(name, '<test>') or '<test>' - code = compile(self.source, source_path, 'exec') - return marshal.dumps(code) - - def source_mtime(self, name): - if name in self.module_paths: - return self.default_mtime - elif name in self.module_bytecode: - return None - else: - raise ImportError - - def bytecode_path(self, name): - try: - return self.bytecode_to_path[name] - except KeyError: - if name in self.module_paths: - return None - else: - raise ImportError - - def write_bytecode(self, name, bytecode): - self.module_bytecode[name] = bytecode - return True - - def get_data(self, path): - if path in self.path_to_module: - return super().get_data(path) - elif path in self.path_to_bytecode: - name = self.path_to_bytecode[path] - return self.module_bytecode[name] - else: - raise IOError - - def is_package(self, name): - try: - return super().is_package(name) - except TypeError: - return '__init__' in self.bytecode_to_path[name] - - def get_code(self, name): - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - code_object = super().get_code(name) - assert len(w) == 1 - assert issubclass(w[0].category, PendingDeprecationWarning) - return code_object - -class PyLoaderTests(testing_abc.LoaderTests): - - """Tests for importlib.abc.PyLoader.""" - - mocker = PyLoaderMock - - def eq_attrs(self, ob, **kwargs): - for attr, val in kwargs.items(): - found = getattr(ob, attr) - self.assertEqual(found, val, - "{} attribute: {} != {}".format(attr, found, val)) - - def test_module(self): - name = '<module>' - path = os.path.join('', 'path', 'to', 'module') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertTrue(name in sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, __package__='', - __loader__=mock) - self.assertTrue(not hasattr(module, '__path__')) - return mock, name - - def test_package(self): - name = '<pkg>' - path = os.path.join('path', 'to', name, '__init__') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertTrue(name in sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, - __path__=[os.path.dirname(path)], __package__=name, - __loader__=mock) - return mock, name - - def test_lacking_parent(self): - name = 'pkg.mod' - path = os.path.join('path', 'to', 'pkg', 'mod') - mock = self.mocker({name: path}) - with util.uncache(name): - module = mock.load_module(name) - self.assertIn(name, sys.modules) - self.eq_attrs(module, __name__=name, __file__=path, __package__='pkg', - __loader__=mock) - self.assertFalse(hasattr(module, '__path__')) - return mock, name - - def test_module_reuse(self): - name = 'mod' - path = os.path.join('path', 'to', 'mod') - module = imp.new_module(name) - mock = self.mocker({name: path}) - with util.uncache(name): - sys.modules[name] = module - loaded_module = mock.load_module(name) - self.assertTrue(loaded_module is module) - self.assertTrue(sys.modules[name] is module) - return mock, name - - def test_state_after_failure(self): - name = "mod" - module = imp.new_module(name) - module.blah = None - mock = self.mocker({name: os.path.join('path', 'to', 'mod')}) - mock.source = b"1/0" - with util.uncache(name): - sys.modules[name] = module - with self.assertRaises(ZeroDivisionError): - mock.load_module(name) - self.assertTrue(sys.modules[name] is module) - self.assertTrue(hasattr(module, 'blah')) - return mock - - def test_unloadable(self): - name = "mod" - mock = self.mocker({name: os.path.join('path', 'to', 'mod')}) - mock.source = b"1/0" - with util.uncache(name): - with self.assertRaises(ZeroDivisionError): - mock.load_module(name) - self.assertTrue(name not in sys.modules) - return mock - - -class PyLoaderCompatTests(PyLoaderTests): - - """Test that the suggested code to make a loader that is compatible from - Python 3.1 forward works.""" - - mocker = PyLoaderCompatMock - - -class PyLoaderInterfaceTests(unittest.TestCase): - - """Tests for importlib.abc.PyLoader to make sure that when source_path() - doesn't return a path everything works as expected.""" - - def test_no_source_path(self): - # No source path should lead to ImportError. - name = 'mod' - mock = PyLoaderMock({}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_source_path_is_None(self): - name = 'mod' - mock = PyLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_get_filename_with_source_path(self): - # get_filename() should return what source_path() returns. - name = 'mod' - path = os.path.join('path', 'to', 'source') - mock = PyLoaderMock({name: path}) - with util.uncache(name): - self.assertEqual(mock.get_filename(name), path) - - def test_get_filename_no_source_path(self): - # get_filename() should raise ImportError if source_path returns None. - name = 'mod' - mock = PyLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError): - mock.get_filename(name) - - -class PyPycLoaderTests(PyLoaderTests): - - """Tests for importlib.abc.PyPycLoader.""" - - mocker = PyPycLoaderMock - - @source_util.writes_bytecode_files - def verify_bytecode(self, mock, name): - assert name in mock.module_paths - self.assertIn(name, mock.module_bytecode) - magic = mock.module_bytecode[name][:4] - self.assertEqual(magic, imp.get_magic()) - mtime = importlib._r_long(mock.module_bytecode[name][4:8]) - self.assertEqual(mtime, 1) - bc = mock.module_bytecode[name][8:] - self.assertEqual(bc, mock.compile_bc(name)) - - def test_module(self): - mock, name = super().test_module() - self.verify_bytecode(mock, name) - - def test_package(self): - mock, name = super().test_package() - self.verify_bytecode(mock, name) - - def test_lacking_parent(self): - mock, name = super().test_lacking_parent() - self.verify_bytecode(mock, name) - - def test_module_reuse(self): - mock, name = super().test_module_reuse() - self.verify_bytecode(mock, name) - - def test_state_after_failure(self): - super().test_state_after_failure() - - def test_unloadable(self): - super().test_unloadable() - - -class PyPycLoaderInterfaceTests(unittest.TestCase): - - """Test for the interface of importlib.abc.PyPycLoader.""" - - def get_filename_check(self, src_path, bc_path, expect): - name = 'mod' - mock = PyPycLoaderMock({name: src_path}, {name: {'path': bc_path}}) - with util.uncache(name): - assert mock.source_path(name) == src_path - assert mock.bytecode_path(name) == bc_path - self.assertEqual(mock.get_filename(name), expect) - - def test_filename_with_source_bc(self): - # When source and bytecode paths present, return the source path. - self.get_filename_check('source_path', 'bc_path', 'source_path') - - def test_filename_with_source_no_bc(self): - # With source but no bc, return source path. - self.get_filename_check('source_path', None, 'source_path') - - def test_filename_with_no_source_bc(self): - # With not source but bc, return the bc path. - self.get_filename_check(None, 'bc_path', 'bc_path') - - def test_filename_with_no_source_or_bc(self): - # With no source or bc, raise ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: None}, {name: {'path': None}}) - with util.uncache(name), self.assertRaises(ImportError): - mock.get_filename(name) - - -class SkipWritingBytecodeTests(unittest.TestCase): - - """Test that bytecode is properly handled based on - sys.dont_write_bytecode.""" - - @source_util.writes_bytecode_files - def run_test(self, dont_write_bytecode): - name = 'mod' - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}) - sys.dont_write_bytecode = dont_write_bytecode - with util.uncache(name): - mock.load_module(name) - self.assertTrue((name in mock.module_bytecode) is not - dont_write_bytecode) - - def test_no_bytecode_written(self): - self.run_test(True) - - def test_bytecode_written(self): - self.run_test(False) - - -class RegeneratedBytecodeTests(unittest.TestCase): - - """Test that bytecode is regenerated as expected.""" - - @source_util.writes_bytecode_files - def test_different_magic(self): - # A different magic number should lead to new bytecode. - name = 'mod' - bad_magic = b'\x00\x00\x00\x00' - assert bad_magic != imp.get_magic() - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}, - {name: {'path': os.path.join('path', 'to', - 'mod.bytecode'), - 'magic': bad_magic}}) - with util.uncache(name): - mock.load_module(name) - self.assertTrue(name in mock.module_bytecode) - magic = mock.module_bytecode[name][:4] - self.assertEqual(magic, imp.get_magic()) - - @source_util.writes_bytecode_files - def test_old_mtime(self): - # Bytecode with an older mtime should be regenerated. - name = 'mod' - old_mtime = PyPycLoaderMock.default_mtime - 1 - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}, - {name: {'path': 'path/to/mod.bytecode', 'mtime': old_mtime}}) - with util.uncache(name): - mock.load_module(name) - self.assertTrue(name in mock.module_bytecode) - mtime = importlib._r_long(mock.module_bytecode[name][4:8]) - self.assertEqual(mtime, PyPycLoaderMock.default_mtime) - - -class BadBytecodeFailureTests(unittest.TestCase): - - """Test import failures when there is no source and parts of the bytecode - is bad.""" - - def test_bad_magic(self): - # A bad magic number should lead to an ImportError. - name = 'mod' - bad_magic = b'\x00\x00\x00\x00' - bc = {name: - {'path': os.path.join('path', 'to', 'mod'), - 'magic': bad_magic}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_no_bytecode(self): - # Missing code object bytecode should lead to an EOFError. - name = 'mod' - bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b''}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(EOFError): - mock.load_module(name) - - def test_bad_bytecode(self): - # Malformed code object bytecode should lead to a ValueError. - name = 'mod' - bc = {name: {'path': os.path.join('path', 'to', 'mod'), 'bc': b'1234'}} - mock = PyPycLoaderMock({name: None}, bc) - with util.uncache(name), self.assertRaises(ValueError): - mock.load_module(name) - - -def raise_ImportError(*args, **kwargs): - raise ImportError - -class MissingPathsTests(unittest.TestCase): - - """Test what happens when a source or bytecode path does not exist (either - from *_path returning None or raising ImportError).""" - - def test_source_path_None(self): - # Bytecode should be used when source_path returns None, along with - # __file__ being set to the bytecode path. - name = 'mod' - bytecode_path = 'path/to/mod' - mock = PyPycLoaderMock({name: None}, {name: {'path': bytecode_path}}) - with util.uncache(name): - module = mock.load_module(name) - self.assertEqual(module.__file__, bytecode_path) - - # Testing for bytecode_path returning None handled by all tests where no - # bytecode initially exists. - - def test_all_paths_None(self): - # If all *_path methods return None, raise ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: None}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_source_path_ImportError(self): - # An ImportError from source_path should trigger an ImportError. - name = 'mod' - mock = PyPycLoaderMock({}, {name: {'path': os.path.join('path', 'to', - 'mod')}}) - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - def test_bytecode_path_ImportError(self): - # An ImportError from bytecode_path should trigger an ImportError. - name = 'mod' - mock = PyPycLoaderMock({name: os.path.join('path', 'to', 'mod')}) - bad_meth = types.MethodType(raise_ImportError, mock) - mock.bytecode_path = bad_meth - with util.uncache(name), self.assertRaises(ImportError): - mock.load_module(name) - - -class SourceLoaderTestHarness(unittest.TestCase): - - def setUp(self, *, is_package=True, **kwargs): - self.package = 'pkg' - if is_package: - self.path = os.path.join(self.package, '__init__.py') - self.name = self.package - else: - module_name = 'mod' - self.path = os.path.join(self.package, '.'.join(['mod', 'py'])) - self.name = '.'.join([self.package, module_name]) - self.cached = imp.cache_from_source(self.path) - self.loader = self.loader_mock(self.path, **kwargs) - - def verify_module(self, module): - self.assertEqual(module.__name__, self.name) - self.assertEqual(module.__file__, self.path) - self.assertEqual(module.__cached__, self.cached) - self.assertEqual(module.__package__, self.package) - self.assertEqual(module.__loader__, self.loader) - values = module._.split('::') - self.assertEqual(values[0], self.name) - self.assertEqual(values[1], self.path) - self.assertEqual(values[2], self.cached) - self.assertEqual(values[3], self.package) - self.assertEqual(values[4], repr(self.loader)) - - def verify_code(self, code_object): - module = imp.new_module(self.name) - module.__file__ = self.path - module.__cached__ = self.cached - module.__package__ = self.package - module.__loader__ = self.loader - module.__path__ = [] - exec(code_object, module.__dict__) - self.verify_module(module) - - -class SourceOnlyLoaderTests(SourceLoaderTestHarness): - - """Test importlib.abc.SourceLoader for source-only loading. - - Reload testing is subsumed by the tests for - importlib.util.module_for_loader. - - """ - - loader_mock = SourceOnlyLoaderMock - - def test_get_source(self): - # Verify the source code is returned as a string. - # If an IOError is raised by get_data then raise ImportError. - expected_source = self.loader.source.decode('utf-8') - self.assertEqual(self.loader.get_source(self.name), expected_source) - def raise_IOError(path): - raise IOError - self.loader.get_data = raise_IOError - with self.assertRaises(ImportError): - self.loader.get_source(self.name) - - def test_is_package(self): - # Properly detect when loading a package. - self.setUp(is_package=True) - self.assertTrue(self.loader.is_package(self.name)) - self.setUp(is_package=False) - self.assertFalse(self.loader.is_package(self.name)) - - def test_get_code(self): - # Verify the code object is created. - code_object = self.loader.get_code(self.name) - self.verify_code(code_object) - - def test_load_module(self): - # Loading a module should set __name__, __loader__, __package__, - # __path__ (for packages), __file__, and __cached__. - # The module should also be put into sys.modules. - with util.uncache(self.name): - module = self.loader.load_module(self.name) - self.verify_module(module) - self.assertEqual(module.__path__, [os.path.dirname(self.path)]) - self.assertTrue(self.name in sys.modules) - - def test_package_settings(self): - # __package__ needs to be set, while __path__ is set on if the module - # is a package. - # Testing the values for a package are covered by test_load_module. - self.setUp(is_package=False) - with util.uncache(self.name): - module = self.loader.load_module(self.name) - self.verify_module(module) - self.assertTrue(not hasattr(module, '__path__')) - - def test_get_source_encoding(self): - # Source is considered encoded in UTF-8 by default unless otherwise - # specified by an encoding line. - source = "_ = 'ü'" - self.loader.source = source.encode('utf-8') - returned_source = self.loader.get_source(self.name) - self.assertEqual(returned_source, source) - source = "# coding: latin-1\n_ = ü" - self.loader.source = source.encode('latin-1') - returned_source = self.loader.get_source(self.name) - self.assertEqual(returned_source, source) - - -@unittest.skipIf(sys.dont_write_bytecode, "sys.dont_write_bytecode is true") -class SourceLoaderBytecodeTests(SourceLoaderTestHarness): - - """Test importlib.abc.SourceLoader's use of bytecode. - - Source-only testing handled by SourceOnlyLoaderTests. - - """ - - loader_mock = SourceLoaderMock - - def verify_code(self, code_object, *, bytecode_written=False): - super().verify_code(code_object) - if bytecode_written: - self.assertIn(self.cached, self.loader.written) - data = bytearray(imp.get_magic()) - data.extend(marshal._w_long(self.loader.source_mtime)) - data.extend(marshal.dumps(code_object)) - self.assertEqual(self.loader.written[self.cached], bytes(data)) - - def test_code_with_everything(self): - # When everything should work. - code_object = self.loader.get_code(self.name) - self.verify_code(code_object) - - def test_no_bytecode(self): - # If no bytecode exists then move on to the source. - self.loader.bytecode_path = "<does not exist>" - # Sanity check - with self.assertRaises(IOError): - bytecode_path = imp.cache_from_source(self.path) - self.loader.get_data(bytecode_path) - code_object = self.loader.get_code(self.name) - self.verify_code(code_object, bytecode_written=True) - - def test_code_bad_timestamp(self): - # Bytecode is only used when the timestamp matches the source EXACTLY. - for source_mtime in (0, 2): - assert source_mtime != self.loader.source_mtime - original = self.loader.source_mtime - self.loader.source_mtime = source_mtime - # If bytecode is used then EOFError would be raised by marshal. - self.loader.bytecode = self.loader.bytecode[8:] - code_object = self.loader.get_code(self.name) - self.verify_code(code_object, bytecode_written=True) - self.loader.source_mtime = original - - def test_code_bad_magic(self): - # Skip over bytecode with a bad magic number. - self.setUp(magic=b'0000') - # If bytecode is used then EOFError would be raised by marshal. - self.loader.bytecode = self.loader.bytecode[8:] - code_object = self.loader.get_code(self.name) - self.verify_code(code_object, bytecode_written=True) - - def test_dont_write_bytecode(self): - # Bytecode is not written if sys.dont_write_bytecode is true. - # Can assume it is false already thanks to the skipIf class decorator. - try: - sys.dont_write_bytecode = True - self.loader.bytecode_path = "<does not exist>" - code_object = self.loader.get_code(self.name) - self.assertNotIn(self.cached, self.loader.written) - finally: - sys.dont_write_bytecode = False - - def test_no_set_data(self): - # If set_data is not defined, one can still read bytecode. - self.setUp(magic=b'0000') - original_set_data = self.loader.__class__.set_data - try: - del self.loader.__class__.set_data - code_object = self.loader.get_code(self.name) - self.verify_code(code_object) - finally: - self.loader.__class__.set_data = original_set_data - - def test_set_data_raises_exceptions(self): - # Raising NotImplementedError or IOError is okay for set_data. - def raise_exception(exc): - def closure(*args, **kwargs): - raise exc - return closure - - self.setUp(magic=b'0000') - self.loader.set_data = raise_exception(NotImplementedError) - code_object = self.loader.get_code(self.name) - self.verify_code(code_object) - - -class SourceLoaderGetSourceTests(unittest.TestCase): - - """Tests for importlib.abc.SourceLoader.get_source().""" - - def test_default_encoding(self): - # Should have no problems with UTF-8 text. - name = 'mod' - mock = SourceOnlyLoaderMock('mod.file') - source = 'x = "ü"' - mock.source = source.encode('utf-8') - returned_source = mock.get_source(name) - self.assertEqual(returned_source, source) - - def test_decoded_source(self): - # Decoding should work. - name = 'mod' - mock = SourceOnlyLoaderMock("mod.file") - source = "# coding: Latin-1\nx='ü'" - assert source.encode('latin-1') != source.encode('utf-8') - mock.source = source.encode('latin-1') - returned_source = mock.get_source(name) - self.assertEqual(returned_source, source) - - def test_universal_newlines(self): - # PEP 302 says universal newlines should be used. - name = 'mod' - mock = SourceOnlyLoaderMock('mod.file') - source = "x = 42\r\ny = -13\r\n" - mock.source = source.encode('utf-8') - expect = io.IncrementalNewlineDecoder(None, True).decode(source) - self.assertEqual(mock.get_source(name), expect) - -class AbstractMethodImplTests(unittest.TestCase): - - """Test the concrete abstractmethod implementations.""" - - class Loader(abc.Loader): - def load_module(self, fullname): - super().load_module(fullname) - - class Finder(abc.Finder): - def find_module(self, _): - super().find_module(_) - - class ResourceLoader(Loader, abc.ResourceLoader): - def get_data(self, _): - super().get_data(_) - - class InspectLoader(Loader, abc.InspectLoader): - def is_package(self, _): - super().is_package(_) - - def get_code(self, _): - super().get_code(_) - - def get_source(self, _): - super().get_source(_) - - class ExecutionLoader(InspectLoader, abc.ExecutionLoader): - def get_filename(self, _): - super().get_filename(_) - - class SourceLoader(ResourceLoader, ExecutionLoader, abc.SourceLoader): - pass - - class PyLoader(ResourceLoader, InspectLoader, abc.PyLoader): - def source_path(self, _): - super().source_path(_) - - class PyPycLoader(PyLoader, abc.PyPycLoader): - def bytecode_path(self, _): - super().bytecode_path(_) - - def source_mtime(self, _): - super().source_mtime(_) - - def write_bytecode(self, _, _2): - super().write_bytecode(_, _2) - - def raises_NotImplementedError(self, ins, *args): - for method_name in args: - method = getattr(ins, method_name) - arg_count = len(inspect.getfullargspec(method)[0]) - 1 - args = [''] * arg_count - try: - method(*args) - except NotImplementedError: - pass - else: - msg = "{}.{} did not raise NotImplementedError" - self.fail(msg.format(ins.__class__.__name__, method_name)) - - def test_Loader(self): - self.raises_NotImplementedError(self.Loader(), 'load_module') - - # XXX misplaced; should be somewhere else - def test_Finder(self): - self.raises_NotImplementedError(self.Finder(), 'find_module') - - def test_ResourceLoader(self): - self.raises_NotImplementedError(self.ResourceLoader(), 'load_module', - 'get_data') - - def test_InspectLoader(self): - self.raises_NotImplementedError(self.InspectLoader(), 'load_module', - 'is_package', 'get_code', 'get_source') - - def test_ExecutionLoader(self): - self.raises_NotImplementedError(self.ExecutionLoader(), 'load_module', - 'is_package', 'get_code', 'get_source', - 'get_filename') - - def test_SourceLoader(self): - ins = self.SourceLoader() - # Required abstractmethods. - self.raises_NotImplementedError(ins, 'get_filename', 'get_data') - # Optional abstractmethods. - self.raises_NotImplementedError(ins,'path_mtime', 'set_data') - - def test_PyLoader(self): - self.raises_NotImplementedError(self.PyLoader(), 'source_path', - 'get_data', 'is_package') - - def test_PyPycLoader(self): - self.raises_NotImplementedError(self.PyPycLoader(), 'source_path', - 'source_mtime', 'bytecode_path', - 'write_bytecode') - - -def test_main(): - from test.support import run_unittest - run_unittest(PyLoaderTests, PyLoaderCompatTests, - PyLoaderInterfaceTests, - PyPycLoaderTests, PyPycLoaderInterfaceTests, - SkipWritingBytecodeTests, RegeneratedBytecodeTests, - BadBytecodeFailureTests, MissingPathsTests, - SourceOnlyLoaderTests, - SourceLoaderBytecodeTests, - SourceLoaderGetSourceTests, - AbstractMethodImplTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/test_case_sensitivity.py b/Lib/importlib/test/source/test_case_sensitivity.py deleted file mode 100644 index 73777de4ba4..00000000000 --- a/Lib/importlib/test/source/test_case_sensitivity.py +++ /dev/null @@ -1,60 +0,0 @@ -"""Test case-sensitivity (PEP 235).""" -from importlib import _bootstrap -from .. import util -from . import util as source_util -import os -import sys -from test import support as test_support -import unittest - - -@util.case_insensitive_tests -class CaseSensitivityTest(unittest.TestCase): - - """PEP 235 dictates that on case-preserving, case-insensitive file systems - that imports are case-sensitive unless the PYTHONCASEOK environment - variable is set.""" - - name = 'MoDuLe' - assert name != name.lower() - - def find(self, path): - finder = _bootstrap._FileFinder(path, - _bootstrap._SourceFinderDetails(), - _bootstrap._SourcelessFinderDetails()) - return finder.find_module(self.name) - - def sensitivity_test(self): - """Look for a module with matching and non-matching sensitivity.""" - sensitive_pkg = 'sensitive.{0}'.format(self.name) - insensitive_pkg = 'insensitive.{0}'.format(self.name.lower()) - context = source_util.create_modules(insensitive_pkg, sensitive_pkg) - with context as mapping: - sensitive_path = os.path.join(mapping['.root'], 'sensitive') - insensitive_path = os.path.join(mapping['.root'], 'insensitive') - return self.find(sensitive_path), self.find(insensitive_path) - - def test_sensitive(self): - with test_support.EnvironmentVarGuard() as env: - env.unset('PYTHONCASEOK') - sensitive, insensitive = self.sensitivity_test() - self.assertTrue(hasattr(sensitive, 'load_module')) - self.assertIn(self.name, sensitive.get_filename(self.name)) - self.assertIsNone(insensitive) - - def test_insensitive(self): - with test_support.EnvironmentVarGuard() as env: - env.set('PYTHONCASEOK', '1') - sensitive, insensitive = self.sensitivity_test() - self.assertTrue(hasattr(sensitive, 'load_module')) - self.assertIn(self.name, sensitive.get_filename(self.name)) - self.assertTrue(hasattr(insensitive, 'load_module')) - self.assertIn(self.name, insensitive.get_filename(self.name)) - - -def test_main(): - test_support.run_unittest(CaseSensitivityTest) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/test_file_loader.py b/Lib/importlib/test/source/test_file_loader.py deleted file mode 100644 index c7a7d8fbcae..00000000000 --- a/Lib/importlib/test/source/test_file_loader.py +++ /dev/null @@ -1,419 +0,0 @@ -import importlib -from importlib import _bootstrap -from .. import abc -from .. import util -from . import util as source_util - -import errno -import imp -import marshal -import os -import py_compile -import shutil -import stat -import sys -import unittest - -from test.support import make_legacy_pyc - - -class SimpleTest(unittest.TestCase): - - """Should have no issue importing a source module [basic]. And if there is - a syntax error, it should raise a SyntaxError [syntax error]. - - """ - - # [basic] - def test_module(self): - with source_util.create_modules('_temp') as mapping: - loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) - module = loader.load_module('_temp') - self.assertTrue('_temp' in sys.modules) - check = {'__name__': '_temp', '__file__': mapping['_temp'], - '__package__': ''} - for attr, value in check.items(): - self.assertEqual(getattr(module, attr), value) - - def test_package(self): - with source_util.create_modules('_pkg.__init__') as mapping: - loader = _bootstrap._SourceFileLoader('_pkg', - mapping['_pkg.__init__']) - module = loader.load_module('_pkg') - self.assertTrue('_pkg' in sys.modules) - check = {'__name__': '_pkg', '__file__': mapping['_pkg.__init__'], - '__path__': [os.path.dirname(mapping['_pkg.__init__'])], - '__package__': '_pkg'} - for attr, value in check.items(): - self.assertEqual(getattr(module, attr), value) - - - def test_lacking_parent(self): - with source_util.create_modules('_pkg.__init__', '_pkg.mod')as mapping: - loader = _bootstrap._SourceFileLoader('_pkg.mod', - mapping['_pkg.mod']) - module = loader.load_module('_pkg.mod') - self.assertTrue('_pkg.mod' in sys.modules) - check = {'__name__': '_pkg.mod', '__file__': mapping['_pkg.mod'], - '__package__': '_pkg'} - for attr, value in check.items(): - self.assertEqual(getattr(module, attr), value) - - def fake_mtime(self, fxn): - """Fake mtime to always be higher than expected.""" - return lambda name: fxn(name) + 1 - - def test_module_reuse(self): - with source_util.create_modules('_temp') as mapping: - loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) - module = loader.load_module('_temp') - module_id = id(module) - module_dict_id = id(module.__dict__) - with open(mapping['_temp'], 'w') as file: - file.write("testing_var = 42\n") - # For filesystems where the mtime is only to a second granularity, - # everything that has happened above can be too fast; - # force an mtime on the source that is guaranteed to be different - # than the original mtime. - loader.path_mtime = self.fake_mtime(loader.path_mtime) - module = loader.load_module('_temp') - self.assertTrue('testing_var' in module.__dict__, - "'testing_var' not in " - "{0}".format(list(module.__dict__.keys()))) - self.assertEqual(module, sys.modules['_temp']) - self.assertEqual(id(module), module_id) - self.assertEqual(id(module.__dict__), module_dict_id) - - def test_state_after_failure(self): - # A failed reload should leave the original module intact. - attributes = ('__file__', '__path__', '__package__') - value = '<test>' - name = '_temp' - with source_util.create_modules(name) as mapping: - orig_module = imp.new_module(name) - for attr in attributes: - setattr(orig_module, attr, value) - with open(mapping[name], 'w') as file: - file.write('+++ bad syntax +++') - loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) - with self.assertRaises(SyntaxError): - loader.load_module(name) - for attr in attributes: - self.assertEqual(getattr(orig_module, attr), value) - - # [syntax error] - def test_bad_syntax(self): - with source_util.create_modules('_temp') as mapping: - with open(mapping['_temp'], 'w') as file: - file.write('=') - loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) - with self.assertRaises(SyntaxError): - loader.load_module('_temp') - self.assertTrue('_temp' not in sys.modules) - - def test_file_from_empty_string_dir(self): - # Loading a module found from an empty string entry on sys.path should - # not only work, but keep all attributes relative. - file_path = '_temp.py' - with open(file_path, 'w') as file: - file.write("# test file for importlib") - try: - with util.uncache('_temp'): - loader = _bootstrap._SourceFileLoader('_temp', file_path) - mod = loader.load_module('_temp') - self.assertEqual(file_path, mod.__file__) - self.assertEqual(imp.cache_from_source(file_path), - mod.__cached__) - finally: - os.unlink(file_path) - pycache = os.path.dirname(imp.cache_from_source(file_path)) - shutil.rmtree(pycache) - - def test_timestamp_overflow(self): - # When a modification timestamp is larger than 2**32, it should be - # truncated rather than raise an OverflowError. - with source_util.create_modules('_temp') as mapping: - source = mapping['_temp'] - compiled = imp.cache_from_source(source) - with open(source, 'w') as f: - f.write("x = 5") - try: - os.utime(source, (2 ** 33 - 5, 2 ** 33 - 5)) - except OverflowError: - self.skipTest("cannot set modification time to large integer") - except OSError as e: - if e.errno != getattr(errno, 'EOVERFLOW', None): - raise - self.skipTest("cannot set modification time to large integer ({})".format(e)) - loader = _bootstrap._SourceFileLoader('_temp', mapping['_temp']) - mod = loader.load_module('_temp') - # Sanity checks. - self.assertEqual(mod.__cached__, compiled) - self.assertEqual(mod.x, 5) - # The pyc file was created. - os.stat(compiled) - - -class BadBytecodeTest(unittest.TestCase): - - def import_(self, file, module_name): - loader = self.loader(module_name, file) - module = loader.load_module(module_name) - self.assertTrue(module_name in sys.modules) - - def manipulate_bytecode(self, name, mapping, manipulator, *, - del_source=False): - """Manipulate the bytecode of a module by passing it into a callable - that returns what to use as the new bytecode.""" - try: - del sys.modules['_temp'] - except KeyError: - pass - py_compile.compile(mapping[name]) - if not del_source: - bytecode_path = imp.cache_from_source(mapping[name]) - else: - os.unlink(mapping[name]) - bytecode_path = make_legacy_pyc(mapping[name]) - if manipulator: - with open(bytecode_path, 'rb') as file: - bc = file.read() - new_bc = manipulator(bc) - with open(bytecode_path, 'wb') as file: - if new_bc is not None: - file.write(new_bc) - return bytecode_path - - def _test_empty_file(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: b'', - del_source=del_source) - test('_temp', mapping, bc_path) - - @source_util.writes_bytecode_files - def _test_partial_magic(self, test, *, del_source=False): - # When their are less than 4 bytes to a .pyc, regenerate it if - # possible, else raise ImportError. - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:3], - del_source=del_source) - test('_temp', mapping, bc_path) - - def _test_magic_only(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:4], - del_source=del_source) - test('_temp', mapping, bc_path) - - def _test_partial_timestamp(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:7], - del_source=del_source) - test('_temp', mapping, bc_path) - - def _test_no_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:8], - del_source=del_source) - file_path = mapping['_temp'] if not del_source else bc_path - with self.assertRaises(EOFError): - self.import_(file_path, '_temp') - - def _test_non_code_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bytecode_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:8] + marshal.dumps(b'abcd'), - del_source=del_source) - file_path = mapping['_temp'] if not del_source else bytecode_path - with self.assertRaises(ImportError): - self.import_(file_path, '_temp') - - def _test_bad_marshal(self, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bytecode_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: bc[:8] + b'<test>', - del_source=del_source) - file_path = mapping['_temp'] if not del_source else bytecode_path - with self.assertRaises(EOFError): - self.import_(file_path, '_temp') - - def _test_bad_magic(self, test, *, del_source=False): - with source_util.create_modules('_temp') as mapping: - bc_path = self.manipulate_bytecode('_temp', mapping, - lambda bc: b'\x00\x00\x00\x00' + bc[4:]) - test('_temp', mapping, bc_path) - - -class SourceLoaderBadBytecodeTest(BadBytecodeTest): - - loader = _bootstrap._SourceFileLoader - - @source_util.writes_bytecode_files - def test_empty_file(self): - # When a .pyc is empty, regenerate it if possible, else raise - # ImportError. - def test(name, mapping, bytecode_path): - self.import_(mapping[name], name) - with open(bytecode_path, 'rb') as file: - self.assertGreater(len(file.read()), 8) - - self._test_empty_file(test) - - def test_partial_magic(self): - def test(name, mapping, bytecode_path): - self.import_(mapping[name], name) - with open(bytecode_path, 'rb') as file: - self.assertGreater(len(file.read()), 8) - - self._test_partial_magic(test) - - @source_util.writes_bytecode_files - def test_magic_only(self): - # When there is only the magic number, regenerate the .pyc if possible, - # else raise EOFError. - def test(name, mapping, bytecode_path): - self.import_(mapping[name], name) - with open(bytecode_path, 'rb') as file: - self.assertGreater(len(file.read()), 8) - - self._test_magic_only(test) - - @source_util.writes_bytecode_files - def test_bad_magic(self): - # When the magic number is different, the bytecode should be - # regenerated. - def test(name, mapping, bytecode_path): - self.import_(mapping[name], name) - with open(bytecode_path, 'rb') as bytecode_file: - self.assertEqual(bytecode_file.read(4), imp.get_magic()) - - self._test_bad_magic(test) - - @source_util.writes_bytecode_files - def test_partial_timestamp(self): - # When the timestamp is partial, regenerate the .pyc, else - # raise EOFError. - def test(name, mapping, bc_path): - self.import_(mapping[name], name) - with open(bc_path, 'rb') as file: - self.assertGreater(len(file.read()), 8) - - self._test_partial_timestamp(test) - - @source_util.writes_bytecode_files - def test_no_marshal(self): - # When there is only the magic number and timestamp, raise EOFError. - self._test_no_marshal() - - @source_util.writes_bytecode_files - def test_non_code_marshal(self): - self._test_non_code_marshal() - # XXX ImportError when sourceless - - # [bad marshal] - @source_util.writes_bytecode_files - def test_bad_marshal(self): - # Bad marshal data should raise a ValueError. - self._test_bad_marshal() - - # [bad timestamp] - @source_util.writes_bytecode_files - def test_old_timestamp(self): - # When the timestamp is older than the source, bytecode should be - # regenerated. - zeros = b'\x00\x00\x00\x00' - with source_util.create_modules('_temp') as mapping: - py_compile.compile(mapping['_temp']) - bytecode_path = imp.cache_from_source(mapping['_temp']) - with open(bytecode_path, 'r+b') as bytecode_file: - bytecode_file.seek(4) - bytecode_file.write(zeros) - self.import_(mapping['_temp'], '_temp') - source_mtime = os.path.getmtime(mapping['_temp']) - source_timestamp = importlib._w_long(source_mtime) - with open(bytecode_path, 'rb') as bytecode_file: - bytecode_file.seek(4) - self.assertEqual(bytecode_file.read(4), source_timestamp) - - # [bytecode read-only] - @source_util.writes_bytecode_files - def test_read_only_bytecode(self): - # When bytecode is read-only but should be rewritten, fail silently. - with source_util.create_modules('_temp') as mapping: - # Create bytecode that will need to be re-created. - py_compile.compile(mapping['_temp']) - bytecode_path = imp.cache_from_source(mapping['_temp']) - with open(bytecode_path, 'r+b') as bytecode_file: - bytecode_file.seek(0) - bytecode_file.write(b'\x00\x00\x00\x00') - # Make the bytecode read-only. - os.chmod(bytecode_path, - stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - try: - # Should not raise IOError! - self.import_(mapping['_temp'], '_temp') - finally: - # Make writable for eventual clean-up. - os.chmod(bytecode_path, stat.S_IWUSR) - - -class SourcelessLoaderBadBytecodeTest(BadBytecodeTest): - - loader = _bootstrap._SourcelessFileLoader - - def test_empty_file(self): - def test(name, mapping, bytecode_path): - with self.assertRaises(ImportError): - self.import_(bytecode_path, name) - - self._test_empty_file(test, del_source=True) - - def test_partial_magic(self): - def test(name, mapping, bytecode_path): - with self.assertRaises(ImportError): - self.import_(bytecode_path, name) - self._test_partial_magic(test, del_source=True) - - def test_magic_only(self): - def test(name, mapping, bytecode_path): - with self.assertRaises(EOFError): - self.import_(bytecode_path, name) - - self._test_magic_only(test, del_source=True) - - def test_bad_magic(self): - def test(name, mapping, bytecode_path): - with self.assertRaises(ImportError): - self.import_(bytecode_path, name) - - self._test_bad_magic(test, del_source=True) - - def test_partial_timestamp(self): - def test(name, mapping, bytecode_path): - with self.assertRaises(EOFError): - self.import_(bytecode_path, name) - - self._test_partial_timestamp(test, del_source=True) - - def test_no_marshal(self): - self._test_no_marshal(del_source=True) - - def test_non_code_marshal(self): - self._test_non_code_marshal(del_source=True) - - -def test_main(): - from test.support import run_unittest - run_unittest(SimpleTest, - SourceLoaderBadBytecodeTest, - SourcelessLoaderBadBytecodeTest - ) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/test_finder.py b/Lib/importlib/test/source/test_finder.py deleted file mode 100644 index 7b9088da0ce..00000000000 --- a/Lib/importlib/test/source/test_finder.py +++ /dev/null @@ -1,153 +0,0 @@ -from importlib import _bootstrap -from .. import abc -from . import util as source_util -from test.support import make_legacy_pyc -import os -import errno -import py_compile -import unittest -import warnings - - -class FinderTests(abc.FinderTests): - - """For a top-level module, it should just be found directly in the - directory being searched. This is true for a directory with source - [top-level source], bytecode [top-level bc], or both [top-level both]. - There is also the possibility that it is a package [top-level package], in - which case there will be a directory with the module name and an - __init__.py file. If there is a directory without an __init__.py an - ImportWarning is returned [empty dir]. - - For sub-modules and sub-packages, the same happens as above but only use - the tail end of the name [sub module] [sub package] [sub empty]. - - When there is a conflict between a package and module having the same name - in the same directory, the package wins out [package over module]. This is - so that imports of modules within the package can occur rather than trigger - an import error. - - When there is a package and module with the same name, always pick the - package over the module [package over module]. This is so that imports from - the package have the possibility of succeeding. - - """ - - def import_(self, root, module): - finder = _bootstrap._FileFinder(root, - _bootstrap._SourceFinderDetails(), - _bootstrap._SourcelessFinderDetails()) - return finder.find_module(module) - - def run_test(self, test, create=None, *, compile_=None, unlink=None): - """Test the finding of 'test' with the creation of modules listed in - 'create'. - - Any names listed in 'compile_' are byte-compiled. Modules - listed in 'unlink' have their source files deleted. - - """ - if create is None: - create = {test} - with source_util.create_modules(*create) as mapping: - if compile_: - for name in compile_: - py_compile.compile(mapping[name]) - if unlink: - for name in unlink: - os.unlink(mapping[name]) - try: - make_legacy_pyc(mapping[name]) - except OSError as error: - # Some tests do not set compile_=True so the source - # module will not get compiled and there will be no - # PEP 3147 pyc file to rename. - if error.errno != errno.ENOENT: - raise - loader = self.import_(mapping['.root'], test) - self.assertTrue(hasattr(loader, 'load_module')) - return loader - - def test_module(self): - # [top-level source] - self.run_test('top_level') - # [top-level bc] - self.run_test('top_level', compile_={'top_level'}, - unlink={'top_level'}) - # [top-level both] - self.run_test('top_level', compile_={'top_level'}) - - # [top-level package] - def test_package(self): - # Source. - self.run_test('pkg', {'pkg.__init__'}) - # Bytecode. - self.run_test('pkg', {'pkg.__init__'}, compile_={'pkg.__init__'}, - unlink={'pkg.__init__'}) - # Both. - self.run_test('pkg', {'pkg.__init__'}, compile_={'pkg.__init__'}) - - # [sub module] - def test_module_in_package(self): - with source_util.create_modules('pkg.__init__', 'pkg.sub') as mapping: - pkg_dir = os.path.dirname(mapping['pkg.__init__']) - loader = self.import_(pkg_dir, 'pkg.sub') - self.assertTrue(hasattr(loader, 'load_module')) - - # [sub package] - def test_package_in_package(self): - context = source_util.create_modules('pkg.__init__', 'pkg.sub.__init__') - with context as mapping: - pkg_dir = os.path.dirname(mapping['pkg.__init__']) - loader = self.import_(pkg_dir, 'pkg.sub') - self.assertTrue(hasattr(loader, 'load_module')) - - # [sub empty] - def test_empty_sub_directory(self): - context = source_util.create_modules('pkg.__init__', 'pkg.sub.__init__') - with warnings.catch_warnings(): - warnings.simplefilter("error", ImportWarning) - with context as mapping: - os.unlink(mapping['pkg.sub.__init__']) - pkg_dir = os.path.dirname(mapping['pkg.__init__']) - with self.assertRaises(ImportWarning): - self.import_(pkg_dir, 'pkg.sub') - - # [package over modules] - def test_package_over_module(self): - name = '_temp' - loader = self.run_test(name, {'{0}.__init__'.format(name), name}) - self.assertTrue('__init__' in loader.get_filename(name)) - - - def test_failure(self): - with source_util.create_modules('blah') as mapping: - nothing = self.import_(mapping['.root'], 'sdfsadsadf') - self.assertTrue(nothing is None) - - # [empty dir] - def test_empty_dir(self): - with warnings.catch_warnings(): - warnings.simplefilter("error", ImportWarning) - with self.assertRaises(ImportWarning): - self.run_test('pkg', {'pkg.__init__'}, unlink={'pkg.__init__'}) - - def test_empty_string_for_dir(self): - # The empty string from sys.path means to search in the cwd. - finder = _bootstrap._FileFinder('', _bootstrap._SourceFinderDetails()) - with open('mod.py', 'w') as file: - file.write("# test file for importlib") - try: - loader = finder.find_module('mod') - self.assertTrue(hasattr(loader, 'load_module')) - finally: - os.unlink('mod.py') - - -def test_main(): - from test.support import run_unittest - run_unittest(FinderTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/test_path_hook.py b/Lib/importlib/test/source/test_path_hook.py deleted file mode 100644 index 374f7b6ad3e..00000000000 --- a/Lib/importlib/test/source/test_path_hook.py +++ /dev/null @@ -1,26 +0,0 @@ -from importlib import _bootstrap -from . import util as source_util -import unittest - - -class PathHookTest(unittest.TestCase): - - """Test the path hook for source.""" - - def test_success(self): - with source_util.create_modules('dummy') as mapping: - self.assertTrue(hasattr(_bootstrap._file_path_hook(mapping['.root']), - 'find_module')) - - def test_empty_string(self): - # The empty string represents the cwd. - self.assertTrue(hasattr(_bootstrap._file_path_hook(''), 'find_module')) - - -def test_main(): - from test.support import run_unittest - run_unittest(PathHookTest) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/test_source_encoding.py b/Lib/importlib/test/source/test_source_encoding.py deleted file mode 100644 index 794a3df2463..00000000000 --- a/Lib/importlib/test/source/test_source_encoding.py +++ /dev/null @@ -1,123 +0,0 @@ -from importlib import _bootstrap -from . import util as source_util - -import codecs -import re -import sys -# Because sys.path gets essentially blanked, need to have unicodedata already -# imported for the parser to use. -import unicodedata -import unittest - - -CODING_RE = re.compile(r'coding[:=]\s*([-\w.]+)') - - -class EncodingTest(unittest.TestCase): - - """PEP 3120 makes UTF-8 the default encoding for source code - [default encoding]. - - PEP 263 specifies how that can change on a per-file basis. Either the first - or second line can contain the encoding line [encoding first line] - encoding second line]. If the file has the BOM marker it is considered UTF-8 - implicitly [BOM]. If any encoding is specified it must be UTF-8, else it is - an error [BOM and utf-8][BOM conflict]. - - """ - - variable = '\u00fc' - character = '\u00c9' - source_line = "{0} = '{1}'\n".format(variable, character) - module_name = '_temp' - - def run_test(self, source): - with source_util.create_modules(self.module_name) as mapping: - with open(mapping[self.module_name], 'wb') as file: - file.write(source) - loader = _bootstrap._SourceFileLoader(self.module_name, - mapping[self.module_name]) - return loader.load_module(self.module_name) - - def create_source(self, encoding): - encoding_line = "# coding={0}".format(encoding) - assert CODING_RE.search(encoding_line) - source_lines = [encoding_line.encode('utf-8')] - source_lines.append(self.source_line.encode(encoding)) - return b'\n'.join(source_lines) - - def test_non_obvious_encoding(self): - # Make sure that an encoding that has never been a standard one for - # Python works. - encoding_line = "# coding=koi8-r" - assert CODING_RE.search(encoding_line) - source = "{0}\na=42\n".format(encoding_line).encode("koi8-r") - self.run_test(source) - - # [default encoding] - def test_default_encoding(self): - self.run_test(self.source_line.encode('utf-8')) - - # [encoding first line] - def test_encoding_on_first_line(self): - encoding = 'Latin-1' - source = self.create_source(encoding) - self.run_test(source) - - # [encoding second line] - def test_encoding_on_second_line(self): - source = b"#/usr/bin/python\n" + self.create_source('Latin-1') - self.run_test(source) - - # [BOM] - def test_bom(self): - self.run_test(codecs.BOM_UTF8 + self.source_line.encode('utf-8')) - - # [BOM and utf-8] - def test_bom_and_utf_8(self): - source = codecs.BOM_UTF8 + self.create_source('utf-8') - self.run_test(source) - - # [BOM conflict] - def test_bom_conflict(self): - source = codecs.BOM_UTF8 + self.create_source('latin-1') - with self.assertRaises(SyntaxError): - self.run_test(source) - - -class LineEndingTest(unittest.TestCase): - - r"""Source written with the three types of line endings (\n, \r\n, \r) - need to be readable [cr][crlf][lf].""" - - def run_test(self, line_ending): - module_name = '_temp' - source_lines = [b"a = 42", b"b = -13", b''] - source = line_ending.join(source_lines) - with source_util.create_modules(module_name) as mapping: - with open(mapping[module_name], 'wb') as file: - file.write(source) - loader = _bootstrap._SourceFileLoader(module_name, - mapping[module_name]) - return loader.load_module(module_name) - - # [cr] - def test_cr(self): - self.run_test(b'\r') - - # [crlf] - def test_crlf(self): - self.run_test(b'\r\n') - - # [lf] - def test_lf(self): - self.run_test(b'\n') - - -def test_main(): - from test.support import run_unittest - run_unittest(EncodingTest, LineEndingTest) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/source/util.py b/Lib/importlib/test/source/util.py deleted file mode 100644 index ae65663a670..00000000000 --- a/Lib/importlib/test/source/util.py +++ /dev/null @@ -1,97 +0,0 @@ -from .. import util -import contextlib -import errno -import functools -import imp -import os -import os.path -import sys -import tempfile -from test import support - - -def writes_bytecode_files(fxn): - """Decorator to protect sys.dont_write_bytecode from mutation and to skip - tests that require it to be set to False.""" - if sys.dont_write_bytecode: - return lambda *args, **kwargs: None - @functools.wraps(fxn) - def wrapper(*args, **kwargs): - original = sys.dont_write_bytecode - sys.dont_write_bytecode = False - try: - to_return = fxn(*args, **kwargs) - finally: - sys.dont_write_bytecode = original - return to_return - return wrapper - - -def ensure_bytecode_path(bytecode_path): - """Ensure that the __pycache__ directory for PEP 3147 pyc file exists. - - :param bytecode_path: File system path to PEP 3147 pyc file. - """ - try: - os.mkdir(os.path.dirname(bytecode_path)) - except OSError as error: - if error.errno != errno.EEXIST: - raise - - -@contextlib.contextmanager -def create_modules(*names): - """Temporarily create each named module with an attribute (named 'attr') - that contains the name passed into the context manager that caused the - creation of the module. - - All files are created in a temporary directory returned by - tempfile.mkdtemp(). This directory is inserted at the beginning of - sys.path. When the context manager exits all created files (source and - bytecode) are explicitly deleted. - - No magic is performed when creating packages! This means that if you create - a module within a package you must also create the package's __init__ as - well. - - """ - source = 'attr = {0!r}' - created_paths = [] - mapping = {} - state_manager = None - uncache_manager = None - try: - temp_dir = tempfile.mkdtemp() - mapping['.root'] = temp_dir - import_names = set() - for name in names: - if not name.endswith('__init__'): - import_name = name - else: - import_name = name[:-len('.__init__')] - import_names.add(import_name) - if import_name in sys.modules: - del sys.modules[import_name] - name_parts = name.split('.') - file_path = temp_dir - for directory in name_parts[:-1]: - file_path = os.path.join(file_path, directory) - if not os.path.exists(file_path): - os.mkdir(file_path) - created_paths.append(file_path) - file_path = os.path.join(file_path, name_parts[-1] + '.py') - with open(file_path, 'w') as file: - file.write(source.format(name)) - created_paths.append(file_path) - mapping[name] = file_path - uncache_manager = util.uncache(*import_names) - uncache_manager.__enter__() - state_manager = util.import_state(path=[temp_dir]) - state_manager.__enter__() - yield mapping - finally: - if state_manager is not None: - state_manager.__exit__(None, None, None) - if uncache_manager is not None: - uncache_manager.__exit__(None, None, None) - support.rmtree(temp_dir) diff --git a/Lib/importlib/test/test_abc.py b/Lib/importlib/test/test_abc.py deleted file mode 100644 index 0ecbe390ad6..00000000000 --- a/Lib/importlib/test/test_abc.py +++ /dev/null @@ -1,89 +0,0 @@ -from importlib import abc -from importlib import machinery -import inspect -import unittest - - -class InheritanceTests: - - """Test that the specified class is a subclass/superclass of the expected - classes.""" - - subclasses = [] - superclasses = [] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - assert self.subclasses or self.superclasses, self.__class__ - self.__test = getattr(abc, self.__class__.__name__) - - def test_subclasses(self): - # Test that the expected subclasses inherit. - for subclass in self.subclasses: - self.assertTrue(issubclass(subclass, self.__test), - "{0} is not a subclass of {1}".format(subclass, self.__test)) - - def test_superclasses(self): - # Test that the class inherits from the expected superclasses. - for superclass in self.superclasses: - self.assertTrue(issubclass(self.__test, superclass), - "{0} is not a superclass of {1}".format(superclass, self.__test)) - - -class Finder(InheritanceTests, unittest.TestCase): - - subclasses = [machinery.BuiltinImporter, machinery.FrozenImporter, - machinery.PathFinder] - - -class Loader(InheritanceTests, unittest.TestCase): - - subclasses = [abc.PyLoader] - - -class ResourceLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.Loader] - - -class InspectLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.Loader] - subclasses = [abc.PyLoader, machinery.BuiltinImporter, - machinery.FrozenImporter] - - -class ExecutionLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.InspectLoader] - subclasses = [abc.PyLoader] - - -class SourceLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.ResourceLoader, abc.ExecutionLoader] - - -class PyLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.Loader, abc.ResourceLoader, abc.ExecutionLoader] - - -class PyPycLoader(InheritanceTests, unittest.TestCase): - - superclasses = [abc.PyLoader] - - -def test_main(): - from test.support import run_unittest - classes = [] - for class_ in globals().values(): - if (inspect.isclass(class_) and - issubclass(class_, unittest.TestCase) and - issubclass(class_, InheritanceTests)): - classes.append(class_) - run_unittest(*classes) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/test_api.py b/Lib/importlib/test/test_api.py deleted file mode 100644 index a151626de7d..00000000000 --- a/Lib/importlib/test/test_api.py +++ /dev/null @@ -1,93 +0,0 @@ -from . import util -import imp -import importlib -import sys -import unittest - - -class ImportModuleTests(unittest.TestCase): - - """Test importlib.import_module.""" - - def test_module_import(self): - # Test importing a top-level module. - with util.mock_modules('top_level') as mock: - with util.import_state(meta_path=[mock]): - module = importlib.import_module('top_level') - self.assertEqual(module.__name__, 'top_level') - - def test_absolute_package_import(self): - # Test importing a module from a package with an absolute name. - pkg_name = 'pkg' - pkg_long_name = '{0}.__init__'.format(pkg_name) - name = '{0}.mod'.format(pkg_name) - with util.mock_modules(pkg_long_name, name) as mock: - with util.import_state(meta_path=[mock]): - module = importlib.import_module(name) - self.assertEqual(module.__name__, name) - - def test_shallow_relative_package_import(self): - # Test importing a module from a package through a relative import. - pkg_name = 'pkg' - pkg_long_name = '{0}.__init__'.format(pkg_name) - module_name = 'mod' - absolute_name = '{0}.{1}'.format(pkg_name, module_name) - relative_name = '.{0}'.format(module_name) - with util.mock_modules(pkg_long_name, absolute_name) as mock: - with util.import_state(meta_path=[mock]): - importlib.import_module(pkg_name) - module = importlib.import_module(relative_name, pkg_name) - self.assertEqual(module.__name__, absolute_name) - - def test_deep_relative_package_import(self): - modules = ['a.__init__', 'a.b.__init__', 'a.c'] - with util.mock_modules(*modules) as mock: - with util.import_state(meta_path=[mock]): - importlib.import_module('a') - importlib.import_module('a.b') - module = importlib.import_module('..c', 'a.b') - self.assertEqual(module.__name__, 'a.c') - - def test_absolute_import_with_package(self): - # Test importing a module from a package with an absolute name with - # the 'package' argument given. - pkg_name = 'pkg' - pkg_long_name = '{0}.__init__'.format(pkg_name) - name = '{0}.mod'.format(pkg_name) - with util.mock_modules(pkg_long_name, name) as mock: - with util.import_state(meta_path=[mock]): - importlib.import_module(pkg_name) - module = importlib.import_module(name, pkg_name) - self.assertEqual(module.__name__, name) - - def test_relative_import_wo_package(self): - # Relative imports cannot happen without the 'package' argument being - # set. - with self.assertRaises(TypeError): - importlib.import_module('.support') - - - def test_loaded_once(self): - # Issue #13591: Modules should only be loaded once when - # initializing the parent package attempts to import the - # module currently being imported. - b_load_count = 0 - def load_a(): - importlib.import_module('a.b') - def load_b(): - nonlocal b_load_count - b_load_count += 1 - code = {'a': load_a, 'a.b': load_b} - modules = ['a.__init__', 'a.b'] - with util.mock_modules(*modules, module_code=code) as mock: - with util.import_state(meta_path=[mock]): - importlib.import_module('a.b') - self.assertEqual(b_load_count, 1) - -def test_main(): - from test.support import run_unittest - run_unittest(ImportModuleTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/test_util.py b/Lib/importlib/test/test_util.py deleted file mode 100644 index 602447f09e4..00000000000 --- a/Lib/importlib/test/test_util.py +++ /dev/null @@ -1,118 +0,0 @@ -from importlib import util -from . import util as test_util -import imp -import sys -import types -import unittest - - -class ModuleForLoaderTests(unittest.TestCase): - - """Tests for importlib.util.module_for_loader.""" - - def return_module(self, name): - fxn = util.module_for_loader(lambda self, module: module) - return fxn(self, name) - - def raise_exception(self, name): - def to_wrap(self, module): - raise ImportError - fxn = util.module_for_loader(to_wrap) - try: - fxn(self, name) - except ImportError: - pass - - def test_new_module(self): - # Test that when no module exists in sys.modules a new module is - # created. - module_name = 'a.b.c' - with test_util.uncache(module_name): - module = self.return_module(module_name) - self.assertTrue(module_name in sys.modules) - self.assertTrue(isinstance(module, types.ModuleType)) - self.assertEqual(module.__name__, module_name) - - def test_reload(self): - # Test that a module is reused if already in sys.modules. - name = 'a.b.c' - module = imp.new_module('a.b.c') - with test_util.uncache(name): - sys.modules[name] = module - returned_module = self.return_module(name) - self.assertIs(returned_module, sys.modules[name]) - - def test_new_module_failure(self): - # Test that a module is removed from sys.modules if added but an - # exception is raised. - name = 'a.b.c' - with test_util.uncache(name): - self.raise_exception(name) - self.assertTrue(name not in sys.modules) - - def test_reload_failure(self): - # Test that a failure on reload leaves the module in-place. - name = 'a.b.c' - module = imp.new_module(name) - with test_util.uncache(name): - sys.modules[name] = module - self.raise_exception(name) - self.assertIs(module, sys.modules[name]) - - -class SetPackageTests(unittest.TestCase): - - - """Tests for importlib.util.set_package.""" - - def verify(self, module, expect): - """Verify the module has the expected value for __package__ after - passing through set_package.""" - fxn = lambda: module - wrapped = util.set_package(fxn) - wrapped() - self.assertTrue(hasattr(module, '__package__')) - self.assertEqual(expect, module.__package__) - - def test_top_level(self): - # __package__ should be set to the empty string if a top-level module. - # Implicitly tests when package is set to None. - module = imp.new_module('module') - module.__package__ = None - self.verify(module, '') - - def test_package(self): - # Test setting __package__ for a package. - module = imp.new_module('pkg') - module.__path__ = ['<path>'] - module.__package__ = None - self.verify(module, 'pkg') - - def test_submodule(self): - # Test __package__ for a module in a package. - module = imp.new_module('pkg.mod') - module.__package__ = None - self.verify(module, 'pkg') - - def test_setting_if_missing(self): - # __package__ should be set if it is missing. - module = imp.new_module('mod') - if hasattr(module, '__package__'): - delattr(module, '__package__') - self.verify(module, '') - - def test_leaving_alone(self): - # If __package__ is set and not None then leave it alone. - for value in (True, False): - module = imp.new_module('mod') - module.__package__ = value - self.verify(module, value) - - -def test_main(): - from test import support - support.run_unittest(ModuleForLoaderTests, SetPackageTests) - - -if __name__ == '__main__': - test_main() diff --git a/Lib/importlib/test/util.py b/Lib/importlib/test/util.py deleted file mode 100644 index 93b7cd2861b..00000000000 --- a/Lib/importlib/test/util.py +++ /dev/null @@ -1,136 +0,0 @@ -from contextlib import contextmanager -import imp -import os.path -from test import support -import unittest -import sys - - -CASE_INSENSITIVE_FS = True -# Windows is the only OS that is *always* case-insensitive -# (OS X *can* be case-sensitive). -if sys.platform not in ('win32', 'cygwin'): - changed_name = __file__.upper() - if changed_name == __file__: - changed_name = __file__.lower() - if not os.path.exists(changed_name): - CASE_INSENSITIVE_FS = False - - -def case_insensitive_tests(test): - """Class decorator that nullifies tests requiring a case-insensitive - file system.""" - return unittest.skipIf(not CASE_INSENSITIVE_FS, - "requires a case-insensitive filesystem")(test) - - -@contextmanager -def uncache(*names): - """Uncache a module from sys.modules. - - A basic sanity check is performed to prevent uncaching modules that either - cannot/shouldn't be uncached. - - """ - for name in names: - if name in ('sys', 'marshal', 'imp'): - raise ValueError( - "cannot uncache {0} as it will break _importlib".format(name)) - try: - del sys.modules[name] - except KeyError: - pass - try: - yield - finally: - for name in names: - try: - del sys.modules[name] - except KeyError: - pass - -@contextmanager -def import_state(**kwargs): - """Context manager to manage the various importers and stored state in the - sys module. - - The 'modules' attribute is not supported as the interpreter state stores a - pointer to the dict that the interpreter uses internally; - reassigning to sys.modules does not have the desired effect. - - """ - originals = {} - try: - for attr, default in (('meta_path', []), ('path', []), - ('path_hooks', []), - ('path_importer_cache', {})): - originals[attr] = getattr(sys, attr) - if attr in kwargs: - new_value = kwargs[attr] - del kwargs[attr] - else: - new_value = default - setattr(sys, attr, new_value) - if len(kwargs): - raise ValueError( - 'unrecognized arguments: {0}'.format(kwargs.keys())) - yield - finally: - for attr, value in originals.items(): - setattr(sys, attr, value) - - -class mock_modules: - - """A mock importer/loader.""" - - def __init__(self, *names, module_code={}): - self.modules = {} - self.module_code = {} - for name in names: - if not name.endswith('.__init__'): - import_name = name - else: - import_name = name[:-len('.__init__')] - if '.' not in name: - package = None - elif import_name == name: - package = name.rsplit('.', 1)[0] - else: - package = import_name - module = imp.new_module(import_name) - module.__loader__ = self - module.__file__ = '<mock __file__>' - module.__package__ = package - module.attr = name - if import_name != name: - module.__path__ = ['<mock __path__>'] - self.modules[import_name] = module - if import_name in module_code: - self.module_code[import_name] = module_code[import_name] - - def __getitem__(self, name): - return self.modules[name] - - def find_module(self, fullname, path=None): - if fullname not in self.modules: - return None - else: - return self - - def load_module(self, fullname): - if fullname not in self.modules: - raise ImportError - else: - sys.modules[fullname] = self.modules[fullname] - if fullname in self.module_code: - self.module_code[fullname]() - return self.modules[fullname] - - def __enter__(self): - self._uncache = uncache(*self.modules.keys()) - self._uncache.__enter__() - return self - - def __exit__(self, *exc_info): - self._uncache.__exit__(None, None, None) diff --git a/Lib/importlib/util.py b/Lib/importlib/util.py index 7b44fa1344c..13164371025 100644 --- a/Lib/importlib/util.py +++ b/Lib/importlib/util.py @@ -3,3 +3,19 @@ from ._bootstrap import module_for_loader from ._bootstrap import set_loader from ._bootstrap import set_package +from ._bootstrap import _resolve_name + + +def resolve_name(name, package): + """Resolve a relative module name to an absolute one.""" + if not name.startswith('.'): + return name + elif not package: + raise ValueError('{!r} is not a relative name ' + '(no leading dot)'.format(name)) + level = 0 + for character in name: + if character != '.': + break + level += 1 + return _resolve_name(name[level:], package, level) |