diff --git a/Lib/pkgutil.py b/Lib/pkgutil.py index a4c474006b..dccbec52aa 100644 --- a/Lib/pkgutil.py +++ b/Lib/pkgutil.py @@ -14,7 +14,7 @@ __all__ = [ 'get_importer', 'iter_importers', 'get_loader', 'find_loader', 'walk_packages', 'iter_modules', 'get_data', - 'ImpImporter', 'ImpLoader', 'read_code', 'extend_path', + 'read_code', 'extend_path', 'ModuleInfo', ] @@ -23,20 +23,6 @@ ModuleInfo.__doc__ = 'A namedtuple with minimal info about a module.' -def _get_spec(finder, name): - """Return the finder-specific module spec.""" - # Works with legacy finders. - try: - find_spec = finder.find_spec - except AttributeError: - loader = finder.find_module(name) - if loader is None: - return None - return importlib.util.spec_from_loader(name, loader) - else: - return find_spec(name) - - def read_code(stream): # This helper is needed in order for the PEP 302 emulation to # correctly handle compiled files @@ -184,6 +170,7 @@ def _iter_file_finder_modules(importer, prefix=''): iter_importer_modules.register( importlib.machinery.FileFinder, _iter_file_finder_modules) + try: import zipimport from zipimport import zipimporter @@ -231,6 +218,7 @@ def get_importer(path_item): The cache (or part of it) can be cleared manually if a rescan of sys.path_hooks is necessary. """ + path_item = os.fsdecode(path_item) try: importer = sys.path_importer_cache[path_item] except KeyError: @@ -282,6 +270,10 @@ def get_loader(module_or_name): If the named module is not already imported, its containing package (if any) is imported, in order to establish the package __path__. """ + warnings._deprecated("pkgutil.get_loader", + f"{warnings._DEPRECATED_MSG}; " + "use importlib.util.find_spec() instead", + remove=(3, 14)) if module_or_name in sys.modules: module_or_name = sys.modules[module_or_name] if module_or_name is None: @@ -306,6 +298,10 @@ def find_loader(fullname): importlib.util.find_spec that converts most failures to ImportError and only returns the loader rather than the full spec """ + warnings._deprecated("pkgutil.find_loader", + f"{warnings._DEPRECATED_MSG}; " + "use importlib.util.find_spec() instead", + remove=(3, 14)) if fullname.startswith('.'): msg = "Relative module name {!r} not supported".format(fullname) raise ImportError(msg) @@ -328,10 +324,10 @@ def extend_path(path, name): from pkgutil import extend_path __path__ = extend_path(__path__, __name__) - This will add to the package's __path__ all subdirectories of - directories on sys.path named after the package. This is useful - if one wants to distribute different parts of a single logical - package as multiple directories. + For each directory on sys.path that has a subdirectory that + matches the package name, add the subdirectory to the package's + __path__. This is useful if one wants to distribute different + parts of a single logical package as multiple directories. It also looks for *.pkg files beginning where * matches the name argument. This feature is similar to *.pth files (see site.py), diff --git a/Lib/test/test_import/__init__.py b/Lib/test/test_import/__init__.py index 51fafd89ce..abe8b75e7d 100644 --- a/Lib/test/test_import/__init__.py +++ b/Lib/test/test_import/__init__.py @@ -1,9 +1,16 @@ import builtins -import contextlib import errno import glob +import json import importlib.util from importlib._bootstrap_external import _get_sourcefile +from importlib.machinery import ( + AppleFrameworkLoader, + BuiltinImporter, + ExtensionFileLoader, + FrozenImporter, + SourceFileLoader, +) import marshal import os import py_compile @@ -15,27 +22,125 @@ import textwrap import threading import time +import types import unittest from unittest import mock +import _imp from test.support import os_helper from test.support import ( - STDLIB_DIR, is_jython, swap_attr, swap_item, cpython_only, is_emscripten, - is_wasi) + STDLIB_DIR, + swap_attr, + swap_item, + cpython_only, + is_apple_mobile, + is_emscripten, + is_wasi, + run_in_subinterp, + run_in_subinterp_with_config, + Py_TRACE_REFS, + requires_gil_enabled, + Py_GIL_DISABLED, + force_not_colorized_test_class, +) from test.support.import_helper import ( - forget, make_legacy_pyc, unlink, unload, DirsOnSysPath, CleanImport) + forget, make_legacy_pyc, unlink, unload, ready_to_import, + DirsOnSysPath, CleanImport, import_module) from test.support.os_helper import ( - TESTFN, rmtree, temp_umask, TESTFN_UNENCODABLE, temp_dir) + TESTFN, rmtree, temp_umask, TESTFN_UNENCODABLE) from test.support import script_helper from test.support import threading_helper +from test.support.testcase import ExtraAssertions from test.test_importlib.util import uncache from types import ModuleType +try: + import _testsinglephase +except ImportError: + _testsinglephase = None +try: + import _testmultiphase +except ImportError: + _testmultiphase = None +try: + import _interpreters +except ModuleNotFoundError: + _interpreters = None +try: + import _testinternalcapi +except ImportError: + _testinternalcapi = None skip_if_dont_write_bytecode = unittest.skipIf( sys.dont_write_bytecode, "test meaningful only when writing bytecode") + +def _require_loader(module, loader, skip): + if isinstance(module, str): + module = __import__(module) + + MODULE_KINDS = { + BuiltinImporter: 'built-in', + ExtensionFileLoader: 'extension', + AppleFrameworkLoader: 'framework extension', + FrozenImporter: 'frozen', + SourceFileLoader: 'pure Python', + } + + expected = loader + assert isinstance(expected, type), expected + expected = MODULE_KINDS[expected] + + actual = module.__spec__.loader + if not isinstance(actual, type): + actual = type(actual) + actual = MODULE_KINDS[actual] + + if actual != expected: + err = f'expected module to be {expected}, got {module.__spec__}' + if skip: + raise unittest.SkipTest(err) + raise Exception(err) + return module + +def require_builtin(module, *, skip=False): + module = _require_loader(module, BuiltinImporter, skip) + assert module.__spec__.origin == 'built-in', module.__spec__ + +def require_extension(module, *, skip=False): + # Apple extensions must be distributed as frameworks. This requires + # a specialist loader. + if is_apple_mobile: + _require_loader(module, AppleFrameworkLoader, skip) + else: + _require_loader(module, ExtensionFileLoader, skip) + +def require_frozen(module, *, skip=True): + module = _require_loader(module, FrozenImporter, skip) + assert module.__spec__.origin == 'frozen', module.__spec__ + +def require_pure_python(module, *, skip=False): + _require_loader(module, SourceFileLoader, skip) + +def create_extension_loader(modname, filename): + # Apple extensions must be distributed as frameworks. This requires + # a specialist loader. + if is_apple_mobile: + return AppleFrameworkLoader(modname, filename) + else: + return ExtensionFileLoader(modname, filename) + +def import_extension_from_file(modname, filename, *, put_in_sys_modules=True): + loader = create_extension_loader(modname, filename) + spec = importlib.util.spec_from_loader(modname, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + if put_in_sys_modules: + sys.modules[modname] = module + return module + + def remove_files(name): for f in (name + ".py", name + ".pyc", @@ -45,28 +150,222 @@ def remove_files(name): rmtree('__pycache__') -@contextlib.contextmanager -def _ready_to_import(name=None, source=""): - # sets up a temporary directory and removes it - # creates the module file - # temporarily clears the module from sys.modules (if any) - # reverts or removes the module when cleaning up - name = name or "spam" - with temp_dir() as tempdir: - path = script_helper.make_script(tempdir, name, source) - old_module = sys.modules.pop(name, None) +def no_rerun(reason): + """Skip rerunning for a particular test. + + WARNING: Use this decorator with care; skipping rerunning makes it + impossible to find reference leaks. Provide a clear reason for skipping the + test using the 'reason' parameter. + """ + def deco(func): + _has_run = False + def wrapper(self): + nonlocal _has_run + if _has_run: + self.skipTest(reason) + func(self) + _has_run = True + return wrapper + return deco + + +if _testsinglephase is not None: + def restore__testsinglephase(*, _orig=_testsinglephase): + # We started with the module imported and want to restore + # it to its nominal state. + sys.modules.pop('_testsinglephase', None) + _orig._clear_globals() + origin = _orig.__spec__.origin + _testinternalcapi.clear_extension('_testsinglephase', origin) + import _testsinglephase + + +def requires_singlephase_init(meth): + """Decorator to skip if single-phase init modules are not supported.""" + if not isinstance(meth, type): + def meth(self, _meth=meth): + try: + return _meth(self) + finally: + restore__testsinglephase() + meth = cpython_only(meth) + msg = "gh-117694: free-threaded build does not currently support single-phase init modules in sub-interpreters" + meth = requires_gil_enabled(msg)(meth) + return unittest.skipIf(_testsinglephase is None, + 'test requires _testsinglephase module')(meth) + + +def requires_subinterpreters(meth): + """Decorator to skip a test if subinterpreters are not supported.""" + return unittest.skipIf(_interpreters is None, + 'subinterpreters required')(meth) + + +class ModuleSnapshot(types.SimpleNamespace): + """A representation of a module for testing. + + Fields: + + * id - the module's object ID + * module - the actual module or an adequate substitute + * __file__ + * __spec__ + * name + * origin + * ns - a copy (dict) of the module's __dict__ (or None) + * ns_id - the object ID of the module's __dict__ + * cached - the sys.modules[mod.__spec__.name] entry (or None) + * cached_id - the object ID of the sys.modules entry (or None) + + In cases where the value is not available (e.g. due to serialization), + the value will be None. + """ + _fields = tuple('id module ns ns_id cached cached_id'.split()) + + @classmethod + def from_module(cls, mod): + name = mod.__spec__.name + cached = sys.modules.get(name) + return cls( + id=id(mod), + module=mod, + ns=types.SimpleNamespace(**mod.__dict__), + ns_id=id(mod.__dict__), + cached=cached, + cached_id=id(cached), + ) + + SCRIPT = textwrap.dedent(''' + {imports} + + name = {name!r} + + {prescript} + + mod = {name} + + {body} + + {postscript} + ''') + IMPORTS = textwrap.dedent(''' + import sys + ''').strip() + SCRIPT_BODY = textwrap.dedent(''' + # Capture the snapshot data. + cached = sys.modules.get(name) + snapshot = dict( + id=id(mod), + module=dict( + __file__=mod.__file__, + __spec__=dict( + name=mod.__spec__.name, + origin=mod.__spec__.origin, + ), + ), + ns=None, + ns_id=id(mod.__dict__), + cached=None, + cached_id=id(cached) if cached else None, + ) + ''').strip() + CLEANUP_SCRIPT = textwrap.dedent(''' + # Clean up the module. + sys.modules.pop(name, None) + ''').strip() + + @classmethod + def build_script(cls, name, *, + prescript=None, + import_first=False, + postscript=None, + postcleanup=False, + ): + if postcleanup is True: + postcleanup = cls.CLEANUP_SCRIPT + elif isinstance(postcleanup, str): + postcleanup = textwrap.dedent(postcleanup).strip() + postcleanup = cls.CLEANUP_SCRIPT + os.linesep + postcleanup + else: + postcleanup = '' + prescript = textwrap.dedent(prescript).strip() if prescript else '' + postscript = textwrap.dedent(postscript).strip() if postscript else '' + + if postcleanup: + if postscript: + postscript = postscript + os.linesep * 2 + postcleanup + else: + postscript = postcleanup + + if import_first: + prescript += textwrap.dedent(f''' + + # Now import the module. + assert name not in sys.modules + import {name}''') + + return cls.SCRIPT.format( + imports=cls.IMPORTS.strip(), + name=name, + prescript=prescript.strip(), + body=cls.SCRIPT_BODY.strip(), + postscript=postscript, + ) + + @classmethod + def parse(cls, text): + raw = json.loads(text) + mod = raw['module'] + mod['__spec__'] = types.SimpleNamespace(**mod['__spec__']) + raw['module'] = types.SimpleNamespace(**mod) + return cls(**raw) + + @classmethod + def from_subinterp(cls, name, interpid=None, *, pipe=None, **script_kwds): + if pipe is not None: + return cls._from_subinterp(name, interpid, pipe, script_kwds) + pipe = os.pipe() try: - sys.path.insert(0, tempdir) - yield name, path - sys.path.remove(tempdir) + return cls._from_subinterp(name, interpid, pipe, script_kwds) finally: - if old_module is not None: - sys.modules[name] = old_module - elif name in sys.modules: - del sys.modules[name] + r, w = pipe + os.close(r) + os.close(w) + + @classmethod + def _from_subinterp(cls, name, interpid, pipe, script_kwargs): + r, w = pipe + + # Build the script. + postscript = textwrap.dedent(f''' + # Send the result over the pipe. + import json + import os + os.write({w}, json.dumps(snapshot).encode()) + + ''') + _postscript = script_kwargs.get('postscript') + if _postscript: + _postscript = textwrap.dedent(_postscript).lstrip() + postscript += _postscript + script_kwargs['postscript'] = postscript.strip() + script = cls.build_script(name, **script_kwargs) + + # Run the script. + if interpid is None: + ret = run_in_subinterp(script) + if ret != 0: + raise AssertionError(f'{ret} != 0') + else: + _interpreters.run_string(interpid, script) + + # Parse the results. + text = os.read(r, 1000) + return cls.parse(text.decode()) -class ImportTests(unittest.TestCase): +@force_not_colorized_test_class +class ImportTests(unittest.TestCase, ExtraAssertions): def setUp(self): remove_files(TESTFN) @@ -87,8 +386,7 @@ def test_from_import_missing_attr_raises_ImportError(self): with self.assertRaises(ImportError): from importlib import something_that_should_not_exist_anywhere - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_from_import_missing_attr_has_name_and_path(self): with CleanImport('os'): import os @@ -100,15 +398,19 @@ def test_from_import_missing_attr_has_name_and_path(self): @cpython_only def test_from_import_missing_attr_has_name_and_so_path(self): - import _testcapi + _testcapi = import_module("_testcapi") with self.assertRaises(ImportError) as cm: from _testcapi import i_dont_exist self.assertEqual(cm.exception.name, '_testcapi') if hasattr(_testcapi, "__file__"): - self.assertEqual(cm.exception.path, _testcapi.__file__) + # The path on the exception is strictly the spec origin, not the + # module's __file__. For most cases, these are the same; but on + # iOS, the Framework relocation process results in the exception + # being raised from the spec location. + self.assertEqual(cm.exception.path, _testcapi.__spec__.origin) self.assertRegex( str(cm.exception), - r"cannot import name 'i_dont_exist' from '_testcapi' \(.*\.(so|pyd)\)" + r"cannot import name 'i_dont_exist' from '_testcapi' \(.*(\.(so|pyd))?\)" ) else: self.assertEqual( @@ -123,19 +425,17 @@ def test_from_import_missing_attr_has_name(self): self.assertEqual(cm.exception.name, '_warning') self.assertIsNone(cm.exception.path) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_from_import_missing_attr_path_is_canonical(self): with self.assertRaises(ImportError) as cm: from os.path import i_dont_exist self.assertIn(cm.exception.name, {'posixpath', 'ntpath'}) self.assertIsNotNone(cm.exception) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_from_import_star_invalid_type(self): import re - with _ready_to_import() as (name, path): + with ready_to_import() as (name, path): with open(path, 'w', encoding='utf-8') as f: f.write("__all__ = [b'invalid_type']") globals = {} @@ -144,7 +444,7 @@ def test_from_import_star_invalid_type(self): ): exec(f"from {name} import *", globals) self.assertNotIn(b"invalid_type", globals) - with _ready_to_import() as (name, path): + with ready_to_import() as (name, path): with open(path, 'w', encoding='utf-8') as f: f.write("globals()[b'invalid_type'] = object()") globals = {} @@ -161,18 +461,18 @@ def test_case_sensitivity(self): import RAnDoM def test_double_const(self): - # Another brief digression to test the accuracy of manifest float - # constants. - from test import double_const # don't blink -- that *was* the test + # Importing double_const checks that float constants + # serialiazed by marshal as PYC files don't lose precision + # (SF bug 422177). + from test.test_import.data import double_const + unload('test.test_import.data.double_const') + from test.test_import.data import double_const # noqa: F811 def test_import(self): def test_with_extension(ext): # The extension is normally ".py", perhaps ".pyw". source = TESTFN + ext - if is_jython: - pyc = TESTFN + "$py.class" - else: - pyc = TESTFN + ".pyc" + pyc = TESTFN + ".pyc" with open(source, "w", encoding='utf-8') as f: print("# This tests Python's ability to import a", @@ -274,7 +574,7 @@ def test_import_name_binding(self): import test as x import test.support self.assertIs(x, test, x.__name__) - self.assertTrue(hasattr(test.support, "__file__")) + self.assertHasAttr(test.support, "__file__") # import x.y.z as w binds z as w import test.support as y @@ -295,7 +595,7 @@ def test_issue31286(self): # import in a 'for' loop resulted in segmentation fault for i in range(2): - import test.support.script_helper as x + import test.support.script_helper as x # noqa: F811 def test_failing_reload(self): # A failing reload should leave the module object in sys.modules. @@ -345,7 +645,7 @@ def test_file_to_source(self): sys.path.insert(0, os.curdir) try: mod = __import__(TESTFN) - self.assertTrue(mod.__file__.endswith('.py')) + self.assertEndsWith(mod.__file__, '.py') os.remove(source) del sys.modules[TESTFN] make_legacy_pyc(source) @@ -427,8 +727,7 @@ def test_from_import_message_for_existing_module(self): with self.assertRaisesRegex(ImportError, "^cannot import name 'bogus'"): from re import bogus - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_from_import_AttributeError(self): # Issue #24492: trying to import an attribute that raises an # AttributeError should lead to an ImportError. @@ -457,7 +756,6 @@ def test_issue31492(self): with self.assertRaises(AttributeError): os.does_not_exist - @unittest.skipIf(sys.platform == 'win32', 'TODO: RUSTPYTHON; Flaky') @threading_helper.requires_working_threading() def test_concurrency(self): # bpo 38091: this is a hack to slow down the code that calls @@ -493,7 +791,7 @@ def run(): finally: del sys.path[0] - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; FileNotFoundError: [WinError 2] No such file or directory: 'built-in' @unittest.skipUnless(sys.platform == "win32", "Windows-specific") def test_dll_dependency_import(self): from _winapi import GetModuleFileName @@ -539,6 +837,453 @@ def test_dll_dependency_import(self): env=env, cwd=os.path.dirname(pyexe)) + @unittest.expectedFailure # TODO: RUSTPYTHON _imp.get_frozen_object("x", b"6\'\xd5Cu\x12"). TypeError: expected at most 1 arguments, got 2 + def test_issue105979(self): + # this used to crash + with self.assertRaises(ImportError) as cm: + _imp.get_frozen_object("x", b"6\'\xd5Cu\x12") + self.assertIn("Frozen object named 'x' is invalid", + str(cm.exception)) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_frozen_module_from_import_error(self): + with self.assertRaises(ImportError) as cm: + from os import this_will_never_exist + self.assertIn( + f"cannot import name 'this_will_never_exist' from 'os' ({os.__file__})", + str(cm.exception), + ) + with self.assertRaises(ImportError) as cm: + from sys import this_will_never_exist + self.assertIn( + "cannot import name 'this_will_never_exist' from 'sys' (unknown location)", + str(cm.exception), + ) + + scripts = [ + """ +import os +os.__spec__.has_location = False +os.__file__ = [] +from os import this_will_never_exist +""", + """ +import os +os.__spec__.has_location = False +del os.__file__ +from os import this_will_never_exist +""", + """ +import os +os.__spec__.origin = [] +os.__file__ = [] +from os import this_will_never_exist +""" + ] + for script in scripts: + with self.subTest(script=script): + expected_error = ( + b"cannot import name 'this_will_never_exist' " + b"from 'os' (unknown location)" + ) + popen = script_helper.spawn_python("-c", script) + stdout, stderr = popen.communicate() + self.assertIn(expected_error, stdout) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_non_module_from_import_error(self): + prefix = """ +import sys +class NotAModule: ... +nm = NotAModule() +nm.symbol = 123 +sys.modules["not_a_module"] = nm +from not_a_module import symbol +""" + scripts = [ + prefix + "from not_a_module import missing_symbol", + prefix + "nm.__spec__ = []\nfrom not_a_module import missing_symbol", + ] + for script in scripts: + with self.subTest(script=script): + expected_error = ( + b"ImportError: cannot import name 'missing_symbol' from " + b"'' (unknown location)" + ) + popen = script_helper.spawn_python("-c", script) + stdout, stderr = popen.communicate() + self.assertIn(expected_error, stdout) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_script_shadowing_stdlib(self): + script_errors = [ + ( + "import fractions\nfractions.Fraction", + rb"AttributeError: module 'fractions' has no attribute 'Fraction'" + ), + ( + "from fractions import Fraction", + rb"ImportError: cannot import name 'Fraction' from 'fractions'" + ) + ] + for script, error in script_errors: + with self.subTest(script=script), os_helper.temp_dir() as tmp: + with open(os.path.join(tmp, "fractions.py"), "w", encoding='utf-8') as f: + f.write(script) + + expected_error = error + ( + rb" \(consider renaming '.*fractions.py' since it has the " + rb"same name as the standard library module named 'fractions' " + rb"and prevents importing that standard library module\)" + ) + + popen = script_helper.spawn_python(os.path.join(tmp, "fractions.py"), cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-m', 'fractions', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-c', 'import fractions', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + # and there's no error at all when using -P + popen = script_helper.spawn_python('-P', 'fractions.py', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertEqual(stdout, b'') + + tmp_child = os.path.join(tmp, "child") + os.mkdir(tmp_child) + + # test the logic with different cwd + popen = script_helper.spawn_python(os.path.join(tmp, "fractions.py"), cwd=tmp_child) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-m', 'fractions', cwd=tmp_child) + stdout, stderr = popen.communicate() + self.assertEqual(stdout, b'') # no error + + popen = script_helper.spawn_python('-c', 'import fractions', cwd=tmp_child) + stdout, stderr = popen.communicate() + self.assertEqual(stdout, b'') # no error + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_package_shadowing_stdlib_module(self): + script_errors = [ + ( + "fractions.Fraction", + rb"AttributeError: module 'fractions' has no attribute 'Fraction'" + ), + ( + "from fractions import Fraction", + rb"ImportError: cannot import name 'Fraction' from 'fractions'" + ) + ] + for script, error in script_errors: + with self.subTest(script=script), os_helper.temp_dir() as tmp: + os.mkdir(os.path.join(tmp, "fractions")) + with open( + os.path.join(tmp, "fractions", "__init__.py"), "w", encoding='utf-8' + ) as f: + f.write("shadowing_module = True") + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write("import fractions; fractions.shadowing_module\n") + f.write(script) + + expected_error = error + ( + rb" \(consider renaming '.*[\\/]fractions[\\/]+__init__.py' since it has the " + rb"same name as the standard library module named 'fractions' " + rb"and prevents importing that standard library module\)" + ) + + popen = script_helper.spawn_python(os.path.join(tmp, "main.py"), cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-m', 'main', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + # and there's no shadowing at all when using -P + popen = script_helper.spawn_python('-P', 'main.py', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, b"module 'fractions' has no attribute 'shadowing_module'") + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_script_shadowing_third_party(self): + script_errors = [ + ( + "import numpy\nnumpy.array", + rb"AttributeError: module 'numpy' has no attribute 'array'" + ), + ( + "from numpy import array", + rb"ImportError: cannot import name 'array' from 'numpy'" + ) + ] + for script, error in script_errors: + with self.subTest(script=script), os_helper.temp_dir() as tmp: + with open(os.path.join(tmp, "numpy.py"), "w", encoding='utf-8') as f: + f.write(script) + + expected_error = error + ( + rb" \(consider renaming '.*numpy.py' if it has the " + rb"same name as a library you intended to import\)\s+\Z" + ) + + popen = script_helper.spawn_python(os.path.join(tmp, "numpy.py")) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-m', 'numpy', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + popen = script_helper.spawn_python('-c', 'import numpy', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_script_maybe_not_shadowing_third_party(self): + with os_helper.temp_dir() as tmp: + with open(os.path.join(tmp, "numpy.py"), "w", encoding='utf-8') as f: + f.write("this_script_does_not_attempt_to_import_numpy = True") + + expected_error = ( + rb"AttributeError: module 'numpy' has no attribute 'attr'\s+\Z" + ) + popen = script_helper.spawn_python('-c', 'import numpy; numpy.attr', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + expected_error = ( + rb"ImportError: cannot import name 'attr' from 'numpy' \(.*\)\s+\Z" + ) + popen = script_helper.spawn_python('-c', 'from numpy import attr', cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_script_shadowing_stdlib_edge_cases(self): + with os_helper.temp_dir() as tmp: + with open(os.path.join(tmp, "fractions.py"), "w", encoding='utf-8') as f: + f.write("shadowing_module = True") + + # Unhashable str subclass + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module +class substr(str): + __hash__ = None +fractions.__name__ = substr('fractions') +try: + fractions.Fraction +except TypeError as e: + print(str(e)) +""") + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + self.assertEqual(stdout.rstrip(), b"unhashable type: 'substr'") + + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module +class substr(str): + __hash__ = None +fractions.__name__ = substr('fractions') +try: + from fractions import Fraction +except TypeError as e: + print(str(e)) +""") + + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + self.assertEqual(stdout.rstrip(), b"unhashable type: 'substr'") + + # Various issues with sys module + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module + +import sys +sys.stdlib_module_names = None +try: + fractions.Fraction +except AttributeError as e: + print(str(e)) + +del sys.stdlib_module_names +try: + fractions.Fraction +except AttributeError as e: + print(str(e)) + +sys.path = [0] +try: + fractions.Fraction +except AttributeError as e: + print(str(e)) +""") + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + lines = stdout.splitlines() + self.assertEqual(len(lines), 3) + for line in lines: + self.assertEqual(line, b"module 'fractions' has no attribute 'Fraction'") + + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module + +import sys +sys.stdlib_module_names = None +try: + from fractions import Fraction +except ImportError as e: + print(str(e)) + +del sys.stdlib_module_names +try: + from fractions import Fraction +except ImportError as e: + print(str(e)) + +sys.path = [0] +try: + from fractions import Fraction +except ImportError as e: + print(str(e)) +""") + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + lines = stdout.splitlines() + self.assertEqual(len(lines), 3) + for line in lines: + self.assertRegex(line, rb"cannot import name 'Fraction' from 'fractions' \(.*\)") + + # Various issues with origin + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module +del fractions.__spec__.origin +try: + fractions.Fraction +except AttributeError as e: + print(str(e)) + +fractions.__spec__.origin = [] +try: + fractions.Fraction +except AttributeError as e: + print(str(e)) +""") + + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + lines = stdout.splitlines() + self.assertEqual(len(lines), 2) + for line in lines: + self.assertEqual(line, b"module 'fractions' has no attribute 'Fraction'") + + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write(""" +import fractions +fractions.shadowing_module +del fractions.__spec__.origin +try: + from fractions import Fraction +except ImportError as e: + print(str(e)) + +fractions.__spec__.origin = [] +try: + from fractions import Fraction +except ImportError as e: + print(str(e)) +""") + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + lines = stdout.splitlines() + self.assertEqual(len(lines), 2) + for line in lines: + self.assertRegex(line, rb"cannot import name 'Fraction' from 'fractions' \(.*\)") + + @unittest.skipIf(sys.platform == 'win32', 'Cannot delete cwd on Windows') + @unittest.skipIf(sys.platform == 'sunos5', 'Cannot delete cwd on Solaris/Illumos') + @unittest.skipIf(sys.platform.startswith('aix'), 'Cannot delete cwd on AIX') + def test_script_shadowing_stdlib_cwd_failure(self): + with os_helper.temp_dir() as tmp: + subtmp = os.path.join(tmp, "subtmp") + os.mkdir(subtmp) + with open(os.path.join(subtmp, "main.py"), "w", encoding='utf-8') as f: + f.write(f""" +import sys +assert sys.path[0] == '' + +import os +import shutil +shutil.rmtree(os.getcwd()) + +os.does_not_exist +""") + # Use -c to ensure sys.path[0] is "" + popen = script_helper.spawn_python("-c", "import main", cwd=subtmp) + stdout, stderr = popen.communicate() + expected_error = rb"AttributeError: module 'os' has no attribute 'does_not_exist'" + self.assertRegex(stdout, expected_error) + + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_script_shadowing_stdlib_sys_path_modification(self): + script_errors = [ + ( + "import fractions\nfractions.Fraction", + rb"AttributeError: module 'fractions' has no attribute 'Fraction'" + ), + ( + "from fractions import Fraction", + rb"ImportError: cannot import name 'Fraction' from 'fractions'" + ) + ] + for script, error in script_errors: + with self.subTest(script=script), os_helper.temp_dir() as tmp: + with open(os.path.join(tmp, "fractions.py"), "w", encoding='utf-8') as f: + f.write("shadowing_module = True") + with open(os.path.join(tmp, "main.py"), "w", encoding='utf-8') as f: + f.write('import sys; sys.path.insert(0, "this_folder_does_not_exist")\n') + f.write(script) + expected_error = error + ( + rb" \(consider renaming '.*fractions.py' since it has the " + rb"same name as the standard library module named 'fractions' " + rb"and prevents importing that standard library module\)" + ) + + popen = script_helper.spawn_python("main.py", cwd=tmp) + stdout, stderr = popen.communicate() + self.assertRegex(stdout, expected_error) + + @unittest.skip('TODO: RUSTPYTHON; AttributeError: module "_imp" has no attribute "create_dynamic"') + def test_create_dynamic_null(self): + with self.assertRaisesRegex(ValueError, 'embedded null character'): + class Spec: + name = "a\x00b" + origin = "abc" + _imp.create_dynamic(Spec()) + + with self.assertRaisesRegex(ValueError, 'embedded null character'): + class Spec2: + name = "abc" + origin = "a\x00b" + _imp.create_dynamic(Spec2()) + @skip_if_dont_write_bytecode class FilePermissionTests(unittest.TestCase): @@ -552,7 +1297,7 @@ class FilePermissionTests(unittest.TestCase): ) def test_creation_mode(self): mask = 0o022 - with temp_umask(mask), _ready_to_import() as (name, path): + with temp_umask(mask), ready_to_import() as (name, path): cached_path = importlib.util.cache_from_source(path) module = __import__(name) if not os.path.exists(cached_path): @@ -571,7 +1316,7 @@ def test_creation_mode(self): def test_cached_mode_issue_2051(self): # permissions of .pyc should match those of .py, regardless of mask mode = 0o600 - with temp_umask(0o022), _ready_to_import() as (name, path): + with temp_umask(0o022), ready_to_import() as (name, path): cached_path = importlib.util.cache_from_source(path) os.chmod(path, mode) __import__(name) @@ -587,7 +1332,7 @@ def test_cached_mode_issue_2051(self): @os_helper.skip_unless_working_chmod def test_cached_readonly(self): mode = 0o400 - with temp_umask(0o022), _ready_to_import() as (name, path): + with temp_umask(0o022), ready_to_import() as (name, path): cached_path = importlib.util.cache_from_source(path) os.chmod(path, mode) __import__(name) @@ -602,7 +1347,7 @@ def test_cached_readonly(self): def test_pyc_always_writable(self): # Initially read-only .pyc files on Windows used to cause problems # with later updates, see issue #6074 for details - with _ready_to_import() as (name, path): + with ready_to_import() as (name, path): # Write a Python file, make it read-only and import it with open(path, 'w', encoding='utf-8') as f: f.write("x = 'original'\n") @@ -684,8 +1429,7 @@ def test_basics(self): self.assertEqual(mod.code_filename, self.file_name) self.assertEqual(mod.func_filename, self.file_name) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON; AssertionError: 'another_module.py' != .../unlikely_module_name.py def test_incorrect_code_name(self): py_compile.compile(self.file_name, dfile="another_module.py") mod = self.import_module() @@ -721,7 +1465,7 @@ def test_foreign_code(self): self.assertEqual(mod.constant.co_filename, foreign_code.co_filename) -class PathsTests(unittest.TestCase): +class PathsTests(unittest.TestCase, ExtraAssertions): SAMPLES = ('test', 'test\u00e4\u00f6\u00fc\u00df', 'test\u00e9\u00e8', 'test\u00b0\u00b3\u00b2') path = TESTFN @@ -771,11 +1515,11 @@ def test_UNC_path(self): self.fail("could not import 'test_unc_path' from %r: %r" % (unc, e)) self.assertEqual(mod.testdata, 'test_unc_path') - self.assertTrue(mod.__file__.startswith(unc), mod.__file__) + self.assertStartsWith(mod.__file__, unc) unload("test_unc_path") -class RelativeImportTests(unittest.TestCase): +class RelativeImportTests(unittest.TestCase, ExtraAssertions): def tearDown(self): unload("test.relimport") @@ -784,7 +1528,7 @@ def tearDown(self): def test_relimport_star(self): # This will import * from .test_import. from .. import relimport - self.assertTrue(hasattr(relimport, "RelativeImportTests")) + self.assertHasAttr(relimport, "RelativeImportTests") def test_issue3221(self): # Note for mergers: the 'absolute' tests from the 2.x branch @@ -843,10 +1587,37 @@ def test_import_from_unloaded_package(self): import package2.submodule1 package2.submodule1.submodule2 + def test_rebinding(self): + # The same data is also used for testing pkgutil.resolve_name() + # in test_pkgutil and mock.patch in test_unittest. + path = os.path.join(os.path.dirname(__file__), 'data') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + from package3 import submodule + self.assertEqual(submodule.attr, 'rebound') + import package3.submodule as submodule + self.assertEqual(submodule.attr, 'rebound') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + import package3.submodule as submodule + self.assertEqual(submodule.attr, 'rebound') + from package3 import submodule + self.assertEqual(submodule.attr, 'rebound') + + def test_rebinding2(self): + path = os.path.join(os.path.dirname(__file__), 'data') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + import package4.submodule as submodule + self.assertEqual(submodule.attr, 'submodule') + from package4 import submodule + self.assertEqual(submodule.attr, 'submodule') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + from package4 import submodule + self.assertEqual(submodule.attr, 'origin') + import package4.submodule as submodule + self.assertEqual(submodule.attr, 'submodule') + class OverridingImportBuiltinTests(unittest.TestCase): - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_override_builtin(self): # Test that overriding builtins.__import__ can bypass sys.modules. import os @@ -1075,7 +1846,7 @@ def test_symlinked_dir_importable(self): @cpython_only -class ImportlibBootstrapTests(unittest.TestCase): +class ImportlibBootstrapTests(unittest.TestCase, ExtraAssertions): # These tests check that importlib is bootstrapped. def test_frozen_importlib(self): @@ -1088,7 +1859,7 @@ def test_frozen_importlib_is_bootstrap(self): self.assertIs(mod, _bootstrap) self.assertEqual(mod.__name__, 'importlib._bootstrap') self.assertEqual(mod.__package__, 'importlib') - self.assertTrue(mod.__file__.endswith('_bootstrap.py'), mod.__file__) + self.assertEndsWith(mod.__file__, '_bootstrap.py') def test_frozen_importlib_external_is_bootstrap_external(self): from importlib import _bootstrap_external @@ -1096,7 +1867,7 @@ def test_frozen_importlib_external_is_bootstrap_external(self): self.assertIs(mod, _bootstrap_external) self.assertEqual(mod.__name__, 'importlib._bootstrap_external') self.assertEqual(mod.__package__, 'importlib') - self.assertTrue(mod.__file__.endswith('_bootstrap_external.py'), mod.__file__) + self.assertEndsWith(mod.__file__, '_bootstrap_external.py') def test_there_can_be_only_one(self): # Issue #15386 revealed a tricky loophole in the bootstrapping @@ -1306,8 +2077,7 @@ def exec_module(*args): else: importlib.SourceLoader.exec_module = old_exec_module - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON @unittest.skipUnless(TESTFN_UNENCODABLE, 'need TESTFN_UNENCODABLE') def test_unencodable_filename(self): # Issue #11619: The Python parser and the import machinery must not @@ -1358,8 +2128,7 @@ def test_rebinding(self): from test.test_import.data.circular_imports.subpkg import util self.assertIs(util.util, rebinding.util) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON AttributeError: module "test.test_import.data.circular_imports" has no attribute "binding" def test_binding(self): try: import test.test_import.data.circular_imports.binding @@ -1370,8 +2139,7 @@ def test_crossreference1(self): import test.test_import.data.circular_imports.use import test.test_import.data.circular_imports.source - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_crossreference2(self): with self.assertRaises(AttributeError) as cm: import test.test_import.data.circular_imports.source @@ -1391,8 +2159,16 @@ def test_circular_from_import(self): str(cm.exception), ) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON + def test_circular_import(self): + with self.assertRaisesRegex( + AttributeError, + r"partially initialized module 'test.test_import.data.circular_imports.import_cycle' " + r"from '.*' has no attribute 'some_attribute' \(most likely due to a circular import\)" + ): + import test.test_import.data.circular_imports.import_cycle + + @unittest.expectedFailure # TODO: RUSTPYTHON def test_absolute_circular_submodule(self): with self.assertRaises(AttributeError) as cm: import test.test_import.data.circular_imports.subpkg2.parent @@ -1403,6 +2179,37 @@ def test_absolute_circular_submodule(self): str(cm.exception), ) + @requires_singlephase_init + @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") + def test_singlephase_circular(self): + """Regression test for gh-123950 + + Import a single-phase-init module that imports itself + from the PyInit_* function (before it's added to sys.modules). + Manages its own cache (which is `static`, and so incompatible + with multiple interpreters or interpreter reset). + """ + name = '_testsinglephase_circular' + helper_name = 'test.test_import.data.circular_imports.singlephase' + with uncache(name, helper_name): + filename = _testsinglephase.__file__ + # We don't put the module in sys.modules: that the *inner* + # import should do that. + mod = import_extension_from_file(name, filename, + put_in_sys_modules=False) + + self.assertEqual(mod.helper_mod_name, helper_name) + self.assertIn(name, sys.modules) + self.assertIn(helper_name, sys.modules) + + self.assertIn(name, sys.modules) + self.assertIn(helper_name, sys.modules) + self.assertNotIn(name, sys.modules) + self.assertNotIn(helper_name, sys.modules) + self.assertIs(mod.clear_static_var(), mod) + _testinternalcapi.clear_extension('_testsinglephase_circular', + mod.__spec__.origin) + def test_unwritable_module(self): self.addCleanup(unload, "test.test_import.data.unwritable") self.addCleanup(unload, "test.test_import.data.unwritable.x") @@ -1417,6 +2224,1187 @@ def test_unwritable_module(self): unwritable.x = 42 +class SubinterpImportTests(unittest.TestCase): + + RUN_KWARGS = dict( + allow_fork=False, + allow_exec=False, + allow_threads=True, + allow_daemon_threads=False, + # Isolation-related config values aren't included here. + ) + ISOLATED = dict( + use_main_obmalloc=False, + gil=2, + ) + NOT_ISOLATED = {k: not v for k, v in ISOLATED.items()} + NOT_ISOLATED['gil'] = 1 + + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") + def pipe(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + if hasattr(os, 'set_blocking'): + os.set_blocking(r, False) + return (r, w) + + def import_script(self, name, fd, filename=None, check_override=None): + override_text = '' + if check_override is not None: + override_text = f''' + import _imp + _imp._override_multi_interp_extensions_check({check_override}) + ''' + if filename: + # Apple extensions must be distributed as frameworks. This requires + # a specialist loader. + if is_apple_mobile: + loader = "AppleFrameworkLoader" + else: + loader = "ExtensionFileLoader" + + return textwrap.dedent(f''' + from importlib.util import spec_from_loader, module_from_spec + from importlib.machinery import {loader} + import os, sys + {override_text} + loader = {loader}({name!r}, {filename!r}) + spec = spec_from_loader({name!r}, loader) + try: + module = module_from_spec(spec) + loader.exec_module(module) + except ImportError as exc: + text = 'ImportError: ' + str(exc) + else: + text = 'okay' + os.write({fd}, text.encode('utf-8')) + ''') + else: + return textwrap.dedent(f''' + import os, sys + {override_text} + try: + import {name} + except ImportError as exc: + text = 'ImportError: ' + str(exc) + else: + text = 'okay' + os.write({fd}, text.encode('utf-8')) + ''') + + def run_here(self, name, filename=None, *, + check_singlephase_setting=False, + check_singlephase_override=None, + isolated=False, + ): + """ + Try importing the named module in a subinterpreter. + + The subinterpreter will be in the current process. + The module will have already been imported in the main interpreter. + Thus, for extension/builtin modules, the module definition will + have been loaded already and cached globally. + + "check_singlephase_setting" determines whether or not + the interpreter will be configured to check for modules + that are not compatible with use in multiple interpreters. + + This should always return "okay" for all modules if the + setting is False (with no override). + """ + __import__(name) + + kwargs = dict( + **self.RUN_KWARGS, + **(self.ISOLATED if isolated else self.NOT_ISOLATED), + check_multi_interp_extensions=check_singlephase_setting, + ) + + r, w = self.pipe() + script = self.import_script(name, w, filename, + check_singlephase_override) + + ret = run_in_subinterp_with_config(script, **kwargs) + self.assertEqual(ret, 0) + return os.read(r, 100) + + def check_compatible_here(self, name, filename=None, *, + strict=False, + isolated=False, + ): + # Verify that the named module may be imported in a subinterpreter. + # (See run_here() for more info.) + out = self.run_here(name, filename, + check_singlephase_setting=strict, + isolated=isolated, + ) + self.assertEqual(out, b'okay') + + def check_incompatible_here(self, name, filename=None, *, isolated=False): + # Differences from check_compatible_here(): + # * verify that import fails + # * "strict" is always True + out = self.run_here(name, filename, + check_singlephase_setting=True, + isolated=isolated, + ) + self.assertEqual( + out.decode('utf-8'), + f'ImportError: module {name} does not support loading in subinterpreters', + ) + + def check_compatible_fresh(self, name, *, strict=False, isolated=False): + # Differences from check_compatible_here(): + # * subinterpreter in a new process + # * module has never been imported before in that process + # * this tests importing the module for the first time + kwargs = dict( + **self.RUN_KWARGS, + **(self.ISOLATED if isolated else self.NOT_ISOLATED), + check_multi_interp_extensions=strict, + ) + gil = kwargs['gil'] + kwargs['gil'] = 'default' if gil == 0 else ( + 'shared' if gil == 1 else 'own' if gil == 2 else gil) + _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' + import _testinternalcapi, sys + assert ( + {name!r} in sys.builtin_module_names or + {name!r} not in sys.modules + ), repr({name!r}) + config = type(sys.implementation)(**{kwargs}) + ret = _testinternalcapi.run_in_subinterp_with_config( + {self.import_script(name, "sys.stdout.fileno()")!r}, + config, + ) + assert ret == 0, ret + ''')) + self.assertEqual(err, b'') + self.assertEqual(out, b'okay') + + def check_incompatible_fresh(self, name, *, isolated=False): + # Differences from check_compatible_fresh(): + # * verify that import fails + # * "strict" is always True + kwargs = dict( + **self.RUN_KWARGS, + **(self.ISOLATED if isolated else self.NOT_ISOLATED), + check_multi_interp_extensions=True, + ) + gil = kwargs['gil'] + kwargs['gil'] = 'default' if gil == 0 else ( + 'shared' if gil == 1 else 'own' if gil == 2 else gil) + _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' + import _testinternalcapi, sys + assert {name!r} not in sys.modules, {name!r} + config = type(sys.implementation)(**{kwargs}) + ret = _testinternalcapi.run_in_subinterp_with_config( + {self.import_script(name, "sys.stdout.fileno()")!r}, + config, + ) + assert ret == 0, ret + ''')) + self.assertEqual(err, b'') + self.assertEqual( + out.decode('utf-8'), + f'ImportError: module {name} does not support loading in subinterpreters', + ) + + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_builtin_compat(self): + # For now we avoid using sys or builtins + # since they still don't implement multi-phase init. + module = '_imp' + require_builtin(module) + if not Py_GIL_DISABLED: + with self.subTest(f'{module}: not strict'): + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + + @cpython_only + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_frozen_compat(self): + module = '_frozen_importlib' + require_frozen(module, skip=True) + if __import__(module).__spec__.origin != 'frozen': + raise unittest.SkipTest(f'{module} is unexpectedly not frozen') + if not Py_GIL_DISABLED: + with self.subTest(f'{module}: not strict'): + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + + @requires_singlephase_init + def test_single_init_extension_compat(self): + module = '_testsinglephase' + require_extension(module) + with self.subTest(f'{module}: not strict'): + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_incompatible_here(module) + with self.subTest(f'{module}: strict, fresh'): + self.check_incompatible_fresh(module) + with self.subTest(f'{module}: isolated, fresh'): + self.check_incompatible_fresh(module, isolated=True) + + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") + def test_multi_init_extension_compat(self): + # Module with Py_MOD_PER_INTERPRETER_GIL_SUPPORTED + module = '_testmultiphase' + require_extension(module) + + if not Py_GIL_DISABLED: + with self.subTest(f'{module}: not strict'): + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + with self.subTest(f'{module}: strict, fresh'): + self.check_compatible_fresh(module, strict=True) + + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") + def test_multi_init_extension_non_isolated_compat(self): + # Module with Py_MOD_MULTIPLE_INTERPRETERS_NOT_SUPPORTED + # and Py_MOD_GIL_NOT_USED + modname = '_test_non_isolated' + filename = _testmultiphase.__file__ + module = import_extension_from_file(modname, filename) + + require_extension(module) + with self.subTest(f'{modname}: isolated'): + self.check_incompatible_here(modname, filename, isolated=True) + with self.subTest(f'{modname}: not isolated'): + self.check_incompatible_here(modname, filename, isolated=False) + if not Py_GIL_DISABLED: + with self.subTest(f'{modname}: not strict'): + self.check_compatible_here(modname, filename, strict=False) + + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") + def test_multi_init_extension_per_interpreter_gil_compat(self): + + # _test_shared_gil_only: + # Explicit Py_MOD_MULTIPLE_INTERPRETERS_SUPPORTED (default) + # and Py_MOD_GIL_NOT_USED + # _test_no_multiple_interpreter_slot: + # No Py_mod_multiple_interpreters slot + # and Py_MOD_GIL_NOT_USED + for modname in ('_test_shared_gil_only', + '_test_no_multiple_interpreter_slot'): + with self.subTest(modname=modname): + + filename = _testmultiphase.__file__ + module = import_extension_from_file(modname, filename) + + require_extension(module) + with self.subTest(f'{modname}: isolated, strict'): + self.check_incompatible_here(modname, filename, + isolated=True) + with self.subTest(f'{modname}: not isolated, strict'): + self.check_compatible_here(modname, filename, + strict=True, isolated=False) + if not Py_GIL_DISABLED: + with self.subTest(f'{modname}: not isolated, not strict'): + self.check_compatible_here( + modname, filename, strict=False, isolated=False) + + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_python_compat(self): + module = 'threading' + require_pure_python(module) + if not Py_GIL_DISABLED: + with self.subTest(f'{module}: not strict'): + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + with self.subTest(f'{module}: strict, fresh'): + self.check_compatible_fresh(module, strict=True) + + @requires_singlephase_init + def test_singlephase_check_with_setting_and_override(self): + module = '_testsinglephase' + require_extension(module) + + def check_compatible(setting, override): + out = self.run_here( + module, + check_singlephase_setting=setting, + check_singlephase_override=override, + ) + self.assertEqual(out, b'okay') + + def check_incompatible(setting, override): + out = self.run_here( + module, + check_singlephase_setting=setting, + check_singlephase_override=override, + ) + self.assertNotEqual(out, b'okay') + + with self.subTest('config: check enabled; override: enabled'): + check_incompatible(True, 1) + with self.subTest('config: check enabled; override: use config'): + check_incompatible(True, 0) + with self.subTest('config: check enabled; override: disabled'): + check_compatible(True, -1) + + with self.subTest('config: check disabled; override: enabled'): + check_incompatible(False, 1) + with self.subTest('config: check disabled; override: use config'): + check_compatible(False, 0) + with self.subTest('config: check disabled; override: disabled'): + check_compatible(False, -1) + + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_isolated_config(self): + module = 'threading' + require_pure_python(module) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True, isolated=True) + with self.subTest(f'{module}: strict, fresh'): + self.check_compatible_fresh(module, strict=True, isolated=True) + + @requires_subinterpreters + @requires_singlephase_init + def test_disallowed_reimport(self): + # See https://github.com/python/cpython/issues/104621. + script = textwrap.dedent(''' + import _testsinglephase + print(_testsinglephase) + ''') + interpid = _interpreters.create() + self.addCleanup(lambda: _interpreters.destroy(interpid)) + + excsnap = _interpreters.run_string(interpid, script) + self.assertIsNot(excsnap, None) + + excsnap = _interpreters.run_string(interpid, script) + self.assertIsNot(excsnap, None) + + +class TestSinglePhaseSnapshot(ModuleSnapshot): + """A representation of a single-phase init module for testing. + + Fields from ModuleSnapshot: + + * id - id(mod) + * module - mod or a SimpleNamespace with __file__ & __spec__ + * ns - a shallow copy of mod.__dict__ + * ns_id - id(mod.__dict__) + * cached - sys.modules[name] (or None if not there or not snapshotable) + * cached_id - id(sys.modules[name]) (or None if not there) + + Extra fields: + + * summed - the result of calling "mod.sum(1, 2)" + * lookedup - the result of calling "mod.look_up_self()" + * lookedup_id - the object ID of self.lookedup + * state_initialized - the result of calling "mod.state_initialized()" + * init_count - (optional) the result of calling "mod.initialized_count()" + + Overridden methods from ModuleSnapshot: + + * from_module() + * parse() + + Other methods from ModuleSnapshot: + + * build_script() + * from_subinterp() + + ---- + + There are 5 modules in Modules/_testsinglephase.c: + + * _testsinglephase + * has global state + * extra loads skip the init function, copy def.m_base.m_copy + * counts calls to init function + * _testsinglephase_basic_wrapper + * _testsinglephase by another name (and separate init function symbol) + * _testsinglephase_basic_copy + * same as _testsinglephase but with own def (and init func) + * _testsinglephase_with_reinit + * has no global or module state + * mod.state_initialized returns None + * an extra load in the main interpreter calls the cached init func + * an extra load in legacy subinterpreters does a full load + * _testsinglephase_with_state + * has module state + * an extra load in the main interpreter calls the cached init func + * an extra load in legacy subinterpreters does a full load + + (See Modules/_testsinglephase.c for more info.) + + For all those modules, the snapshot after the initial load (not in + the global extensions cache) would look like the following: + + * initial load + * id: ID of nww module object + * ns: exactly what the module init put there + * ns_id: ID of new module's __dict__ + * cached_id: same as self.id + * summed: 3 (never changes) + * lookedup_id: same as self.id + * state_initialized: a timestamp between the time of the load + and the time of the snapshot + * init_count: 1 (None for _testsinglephase_with_reinit) + + For the other scenarios it varies. + + For the _testsinglephase, _testsinglephase_basic_wrapper, and + _testsinglephase_basic_copy modules, the snapshot should look + like the following: + + * reloaded + * id: no change + * ns: matches what the module init function put there, + including the IDs of all contained objects, + plus any extra attributes added before the reload + * ns_id: no change + * cached_id: no change + * lookedup_id: no change + * state_initialized: no change + * init_count: no change + * already loaded + * (same as initial load except for ns and state_initialized) + * ns: matches the initial load, incl. IDs of contained objects + * state_initialized: no change from initial load + + For _testsinglephase_with_reinit: + + * reloaded: same as initial load (old module & ns is discarded) + * already loaded: same as initial load (old module & ns is discarded) + + For _testsinglephase_with_state: + + * reloaded + * (same as initial load (old module & ns is discarded), + except init_count) + * init_count: increase by 1 + * already loaded: same as reloaded + """ + + @classmethod + def from_module(cls, mod): + self = super().from_module(mod) + self.summed = mod.sum(1, 2) + self.lookedup = mod.look_up_self() + self.lookedup_id = id(self.lookedup) + self.state_initialized = mod.state_initialized() + if hasattr(mod, 'initialized_count'): + self.init_count = mod.initialized_count() + return self + + SCRIPT_BODY = ModuleSnapshot.SCRIPT_BODY + textwrap.dedent(''' + snapshot['module'].update(dict( + int_const=mod.int_const, + str_const=mod.str_const, + _module_initialized=mod._module_initialized, + )) + snapshot.update(dict( + summed=mod.sum(1, 2), + lookedup_id=id(mod.look_up_self()), + state_initialized=mod.state_initialized(), + init_count=mod.initialized_count(), + has_spam=hasattr(mod, 'spam'), + spam=getattr(mod, 'spam', None), + )) + ''').rstrip() + + @classmethod + def parse(cls, text): + self = super().parse(text) + if not self.has_spam: + del self.spam + del self.has_spam + return self + + +@requires_singlephase_init +class SinglephaseInitTests(unittest.TestCase, ExtraAssertions): + + NAME = '_testsinglephase' + + @classmethod + def setUpClass(cls): + spec = importlib.util.find_spec(cls.NAME) + cls.LOADER = type(spec.loader) + + # Apple extensions must be distributed as frameworks. This requires + # a specialist loader, and we need to differentiate between the + # spec.origin and the original file location. + if is_apple_mobile: + assert cls.LOADER is AppleFrameworkLoader + + cls.ORIGIN = spec.origin + with open(spec.origin + ".origin", "r") as f: + cls.FILE = os.path.join( + os.path.dirname(sys.executable), + f.read().strip() + ) + else: + assert cls.LOADER is ExtensionFileLoader + + cls.ORIGIN = spec.origin + cls.FILE = spec.origin + + # Start fresh. + cls.clean_up() + + def tearDown(self): + # Clean up the module. + self.clean_up() + + @classmethod + def clean_up(cls): + name = cls.NAME + if name in sys.modules: + if hasattr(sys.modules[name], '_clear_globals'): + assert sys.modules[name].__file__ == cls.FILE, \ + f"{sys.modules[name].__file__} != {cls.FILE}" + + sys.modules[name]._clear_globals() + del sys.modules[name] + # Clear all internally cached data for the extension. + _testinternalcapi.clear_extension(name, cls.ORIGIN) + + ######################### + # helpers + + def add_module_cleanup(self, name): + def clean_up(): + # Clear all internally cached data for the extension. + _testinternalcapi.clear_extension(name, self.ORIGIN) + self.addCleanup(clean_up) + + def _load_dynamic(self, name, path): + """ + Load an extension module. + """ + # This is essentially copied from the old imp module. + from importlib._bootstrap import _load + loader = self.LOADER(name, path) + + # Issue bpo-24748: Skip the sys.modules check in _load_module_shim; + # always load new extension. + spec = importlib.util.spec_from_file_location(name, path, + loader=loader) + return _load(spec) + + def load(self, name): + try: + already_loaded = self.already_loaded + except AttributeError: + already_loaded = self.already_loaded = {} + assert name not in already_loaded + mod = self._load_dynamic(name, self.ORIGIN) + self.assertNotIn(mod, already_loaded.values()) + already_loaded[name] = mod + return types.SimpleNamespace( + name=name, + module=mod, + snapshot=TestSinglePhaseSnapshot.from_module(mod), + ) + + def re_load(self, name, mod): + assert sys.modules[name] is mod + assert mod.__dict__ == mod.__dict__ + reloaded = self._load_dynamic(name, self.ORIGIN) + return types.SimpleNamespace( + name=name, + module=reloaded, + snapshot=TestSinglePhaseSnapshot.from_module(reloaded), + ) + + # subinterpreters + + def add_subinterpreter(self): + interpid = _interpreters.create('legacy') + def ensure_destroyed(): + try: + _interpreters.destroy(interpid) + except _interpreters.InterpreterNotFoundError: + pass + self.addCleanup(ensure_destroyed) + _interpreters.exec(interpid, textwrap.dedent(''' + import sys + import _testinternalcapi + ''')) + def clean_up(): + _interpreters.exec(interpid, textwrap.dedent(f''' + name = {self.NAME!r} + if name in sys.modules: + sys.modules.pop(name)._clear_globals() + _testinternalcapi.clear_extension(name, {self.ORIGIN!r}) + ''')) + _interpreters.destroy(interpid) + self.addCleanup(clean_up) + return interpid + + def import_in_subinterp(self, interpid=None, *, + postscript=None, + postcleanup=False, + ): + name = self.NAME + + if postcleanup: + import_ = 'import _testinternalcapi' if interpid is None else '' + postcleanup = f''' + {import_} + mod._clear_globals() + _testinternalcapi.clear_extension(name, {self.ORIGIN!r}) + ''' + + try: + pipe = self._pipe + except AttributeError: + r, w = pipe = self._pipe = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + + snapshot = TestSinglePhaseSnapshot.from_subinterp( + name, + interpid, + pipe=pipe, + import_first=True, + postscript=postscript, + postcleanup=postcleanup, + ) + + return types.SimpleNamespace( + name=name, + module=None, + snapshot=snapshot, + ) + + # checks + + def check_common(self, loaded): + isolated = False + + mod = loaded.module + if not mod: + # It came from a subinterpreter. + isolated = True + mod = loaded.snapshot.module + # mod.__name__ might not match, but the spec will. + self.assertEqual(mod.__spec__.name, loaded.name) + self.assertEqual(mod.__file__, self.FILE) + self.assertEqual(mod.__spec__.origin, self.ORIGIN) + if not isolated: + self.assertIsSubclass(mod.error, Exception) + self.assertEqual(mod.int_const, 1969) + self.assertEqual(mod.str_const, 'something different') + self.assertIsInstance(mod._module_initialized, float) + self.assertGreater(mod._module_initialized, 0) + + snap = loaded.snapshot + self.assertEqual(snap.summed, 3) + if snap.state_initialized is not None: + self.assertIsInstance(snap.state_initialized, float) + self.assertGreater(snap.state_initialized, 0) + if isolated: + # The "looked up" module is interpreter-specific + # (interp->imports.modules_by_index was set for the module). + self.assertEqual(snap.lookedup_id, snap.id) + self.assertEqual(snap.cached_id, snap.id) + with self.assertRaises(AttributeError): + snap.spam + else: + self.assertIs(snap.lookedup, mod) + self.assertIs(snap.cached, mod) + + def check_direct(self, loaded): + # The module has its own PyModuleDef, with a matching name. + self.assertEqual(loaded.module.__name__, loaded.name) + self.assertIs(loaded.snapshot.lookedup, loaded.module) + + def check_indirect(self, loaded, orig): + # The module re-uses another's PyModuleDef, with a different name. + assert orig is not loaded.module + assert orig.__name__ != loaded.name + self.assertNotEqual(loaded.module.__name__, loaded.name) + self.assertIs(loaded.snapshot.lookedup, loaded.module) + + def check_basic(self, loaded, expected_init_count): + # m_size == -1 + # The module loads fresh the first time and copies m_copy after. + snap = loaded.snapshot + self.assertIsNot(snap.state_initialized, None) + self.assertIsInstance(snap.init_count, int) + self.assertGreater(snap.init_count, 0) + self.assertEqual(snap.init_count, expected_init_count) + + def check_with_reinit(self, loaded): + # m_size >= 0 + # The module loads fresh every time. + pass + + def check_fresh(self, loaded): + """ + The module had not been loaded before (at least since fully reset). + """ + snap = loaded.snapshot + # The module's init func was run. + # A copy of the module's __dict__ was stored in def->m_base.m_copy. + # The previous m_copy was deleted first. + # _PyRuntime.imports.extensions was set. + self.assertEqual(snap.init_count, 1) + # The global state was initialized. + # The module attrs were initialized from that state. + self.assertEqual(snap.module._module_initialized, + snap.state_initialized) + + def check_semi_fresh(self, loaded, base, prev): + """ + The module had been loaded before and then reset + (but the module global state wasn't). + """ + snap = loaded.snapshot + # The module's init func was run again. + # A copy of the module's __dict__ was stored in def->m_base.m_copy. + # The previous m_copy was deleted first. + # The module globals did not get reset. + self.assertNotEqual(snap.id, base.snapshot.id) + self.assertNotEqual(snap.id, prev.snapshot.id) + self.assertEqual(snap.init_count, prev.snapshot.init_count + 1) + # The global state was updated. + # The module attrs were initialized from that state. + self.assertEqual(snap.module._module_initialized, + snap.state_initialized) + self.assertNotEqual(snap.state_initialized, + base.snapshot.state_initialized) + self.assertNotEqual(snap.state_initialized, + prev.snapshot.state_initialized) + + def check_copied(self, loaded, base): + """ + The module had been loaded before and never reset. + """ + snap = loaded.snapshot + # The module's init func was not run again. + # The interpreter copied m_copy, as set by the other interpreter, + # with objects owned by the other interpreter. + # The module globals did not get reset. + self.assertNotEqual(snap.id, base.snapshot.id) + self.assertEqual(snap.init_count, base.snapshot.init_count) + # The global state was not updated since the init func did not run. + # The module attrs were not directly initialized from that state. + # The state and module attrs still match the previous loading. + self.assertEqual(snap.module._module_initialized, + snap.state_initialized) + self.assertEqual(snap.state_initialized, + base.snapshot.state_initialized) + + ######################### + # the tests + + def test_cleared_globals(self): + loaded = self.load(self.NAME) + _testsinglephase = loaded.module + init_before = _testsinglephase.state_initialized() + + _testsinglephase._clear_globals() + init_after = _testsinglephase.state_initialized() + init_count = _testsinglephase.initialized_count() + + self.assertGreater(init_before, 0) + self.assertEqual(init_after, 0) + self.assertEqual(init_count, -1) + + def test_variants(self): + # Exercise the most meaningful variants described in Python/import.c. + self.maxDiff = None + + # Check the "basic" module. + + name = self.NAME + expected_init_count = 1 + with self.subTest(name): + loaded = self.load(name) + + self.check_common(loaded) + self.check_direct(loaded) + self.check_basic(loaded, expected_init_count) + basic = loaded.module + + # Check its indirect variants. + + name = f'{self.NAME}_basic_wrapper' + self.add_module_cleanup(name) + expected_init_count += 1 + with self.subTest(name): + loaded = self.load(name) + + self.check_common(loaded) + self.check_indirect(loaded, basic) + self.check_basic(loaded, expected_init_count) + + # Currently PyState_AddModule() always replaces the cached module. + self.assertIs(basic.look_up_self(), loaded.module) + self.assertEqual(basic.initialized_count(), expected_init_count) + + # The cached module shouldn't change after this point. + basic_lookedup = loaded.module + + # Check its direct variant. + + name = f'{self.NAME}_basic_copy' + self.add_module_cleanup(name) + expected_init_count += 1 + with self.subTest(name): + loaded = self.load(name) + + self.check_common(loaded) + self.check_direct(loaded) + self.check_basic(loaded, expected_init_count) + + # This should change the cached module for _testsinglephase. + self.assertIs(basic.look_up_self(), basic_lookedup) + self.assertEqual(basic.initialized_count(), expected_init_count) + + # Check the non-basic variant that has no state. + + name = f'{self.NAME}_with_reinit' + self.add_module_cleanup(name) + with self.subTest(name): + loaded = self.load(name) + + self.check_common(loaded) + self.assertIs(loaded.snapshot.state_initialized, None) + self.check_direct(loaded) + self.check_with_reinit(loaded) + + # This should change the cached module for _testsinglephase. + self.assertIs(basic.look_up_self(), basic_lookedup) + self.assertEqual(basic.initialized_count(), expected_init_count) + + # Check the basic variant that has state. + + name = f'{self.NAME}_with_state' + self.add_module_cleanup(name) + with self.subTest(name): + loaded = self.load(name) + self.addCleanup(loaded.module._clear_module_state) + + self.check_common(loaded) + self.assertIsNot(loaded.snapshot.state_initialized, None) + self.check_direct(loaded) + self.check_with_reinit(loaded) + + # This should change the cached module for _testsinglephase. + self.assertIs(basic.look_up_self(), basic_lookedup) + self.assertEqual(basic.initialized_count(), expected_init_count) + + def test_basic_reloaded(self): + # m_copy is copied into the existing module object. + # Global state is not changed. + self.maxDiff = None + + for name in [ + self.NAME, # the "basic" module + f'{self.NAME}_basic_wrapper', # the indirect variant + f'{self.NAME}_basic_copy', # the direct variant + ]: + self.add_module_cleanup(name) + with self.subTest(name): + loaded = self.load(name) + reloaded = self.re_load(name, loaded.module) + + self.check_common(loaded) + self.check_common(reloaded) + + # Make sure the original __dict__ did not get replaced. + self.assertEqual(id(loaded.module.__dict__), + loaded.snapshot.ns_id) + self.assertEqual(loaded.snapshot.ns.__dict__, + loaded.module.__dict__) + + self.assertEqual(reloaded.module.__spec__.name, reloaded.name) + self.assertEqual(reloaded.module.__name__, + reloaded.snapshot.ns.__name__) + + self.assertIs(reloaded.module, loaded.module) + self.assertIs(reloaded.module.__dict__, loaded.module.__dict__) + # It only happens to be the same but that's good enough here. + # We really just want to verify that the re-loaded attrs + # didn't change. + self.assertIs(reloaded.snapshot.lookedup, + loaded.snapshot.lookedup) + self.assertEqual(reloaded.snapshot.state_initialized, + loaded.snapshot.state_initialized) + self.assertEqual(reloaded.snapshot.init_count, + loaded.snapshot.init_count) + + self.assertIs(reloaded.snapshot.cached, reloaded.module) + + def test_with_reinit_reloaded(self): + # The module's m_init func is run again. + self.maxDiff = None + + # Keep a reference around. + basic = self.load(self.NAME) + + for name, has_state in [ + (f'{self.NAME}_with_reinit', False), # m_size == 0 + (f'{self.NAME}_with_state', True), # m_size > 0 + ]: + self.add_module_cleanup(name) + with self.subTest(name=name, has_state=has_state): + loaded = self.load(name) + if has_state: + self.addCleanup(loaded.module._clear_module_state) + + reloaded = self.re_load(name, loaded.module) + if has_state: + self.addCleanup(reloaded.module._clear_module_state) + + self.check_common(loaded) + self.check_common(reloaded) + + # Make sure the original __dict__ did not get replaced. + self.assertEqual(id(loaded.module.__dict__), + loaded.snapshot.ns_id) + self.assertEqual(loaded.snapshot.ns.__dict__, + loaded.module.__dict__) + + self.assertEqual(reloaded.module.__spec__.name, reloaded.name) + self.assertEqual(reloaded.module.__name__, + reloaded.snapshot.ns.__name__) + + self.assertIsNot(reloaded.module, loaded.module) + self.assertNotEqual(reloaded.module.__dict__, + loaded.module.__dict__) + self.assertIs(reloaded.snapshot.lookedup, reloaded.module) + if loaded.snapshot.state_initialized is None: + self.assertIs(reloaded.snapshot.state_initialized, None) + else: + self.assertGreater(reloaded.snapshot.state_initialized, + loaded.snapshot.state_initialized) + + self.assertIs(reloaded.snapshot.cached, reloaded.module) + + @unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi") + def test_check_state_first(self): + for variant in ['', '_with_reinit', '_with_state']: + name = f'{self.NAME}{variant}_check_cache_first' + with self.subTest(name): + mod = self._load_dynamic(name, self.ORIGIN) + self.assertEqual(mod.__name__, name) + sys.modules.pop(name, None) + _testinternalcapi.clear_extension(name, self.ORIGIN) + + # Currently, for every single-phrase init module loaded + # in multiple interpreters, those interpreters share a + # PyModuleDef for that object, which can be a problem. + # Also, we test with a single-phase module that has global state, + # which is shared by all interpreters. + + @no_rerun(reason="module state is not cleared (see gh-140657)") + @requires_subinterpreters + def test_basic_multiple_interpreters_main_no_reset(self): + # without resetting; already loaded in main interpreter + + # At this point: + # * alive in 0 interpreters + # * module def may or may not be loaded already + # * module def not in _PyRuntime.imports.extensions + # * mod init func has not run yet (since reset, at least) + # * m_copy not set (hasn't been loaded yet or already cleared) + # * module's global state has not been initialized yet + # (or already cleared) + + main_loaded = self.load(self.NAME) + _testsinglephase = main_loaded.module + # Attrs set after loading are not in m_copy. + _testsinglephase.spam = 'spam, spam, spam, spam, eggs, and spam' + + self.check_common(main_loaded) + self.check_fresh(main_loaded) + + interpid1 = self.add_subinterpreter() + interpid2 = self.add_subinterpreter() + + # At this point: + # * alive in 1 interpreter (main) + # * module def in _PyRuntime.imports.extensions + # * mod init func ran for the first time (since reset, at least) + # * m_copy was copied from the main interpreter (was NULL) + # * module's global state was initialized + + # Use an interpreter that gets destroyed right away. + loaded = self.import_in_subinterp() + self.check_common(loaded) + self.check_copied(loaded, main_loaded) + + # At this point: + # * alive in 1 interpreter (main) + # * module def still in _PyRuntime.imports.extensions + # * mod init func ran again + # * m_copy is NULL (claered when the interpreter was destroyed) + # (was from main interpreter) + # * module's global state was updated, not reset + + # Use a subinterpreter that sticks around. + loaded = self.import_in_subinterp(interpid1) + self.check_common(loaded) + self.check_copied(loaded, main_loaded) + + # At this point: + # * alive in 2 interpreters (main, interp1) + # * module def still in _PyRuntime.imports.extensions + # * mod init func ran again + # * m_copy was copied from interp1 + # * module's global state was updated, not reset + + # Use a subinterpreter while the previous one is still alive. + loaded = self.import_in_subinterp(interpid2) + self.check_common(loaded) + self.check_copied(loaded, main_loaded) + + # At this point: + # * alive in 3 interpreters (main, interp1, interp2) + # * module def still in _PyRuntime.imports.extensions + # * mod init func ran again + # * m_copy was copied from interp2 (was from interp1) + # * module's global state was updated, not reset + + @no_rerun(reason="rerun not possible; module state is never cleared (see gh-102251)") + @requires_subinterpreters + def test_basic_multiple_interpreters_deleted_no_reset(self): + # without resetting; already loaded in a deleted interpreter + + if Py_TRACE_REFS: + # It's a Py_TRACE_REFS build. + # This test breaks interpreter isolation a little, + # which causes problems on Py_TRACE_REF builds. + raise unittest.SkipTest('crashes on Py_TRACE_REFS builds') + + # At this point: + # * alive in 0 interpreters + # * module def may or may not be loaded already + # * module def not in _PyRuntime.imports.extensions + # * mod init func has not run yet (since reset, at least) + # * m_copy not set (hasn't been loaded yet or already cleared) + # * module's global state has not been initialized yet + # (or already cleared) + + interpid1 = self.add_subinterpreter() + interpid2 = self.add_subinterpreter() + + # First, load in the main interpreter but then completely clear it. + loaded_main = self.load(self.NAME) + loaded_main.module._clear_globals() + _testinternalcapi.clear_extension(self.NAME, self.ORIGIN) + + # At this point: + # * alive in 0 interpreters + # * module def loaded already + # * module def was in _PyRuntime.imports.extensions, but cleared + # * mod init func ran for the first time (since reset, at least) + # * m_copy was set, but cleared (was NULL) + # * module's global state was initialized but cleared + + # Start with an interpreter that gets destroyed right away. + base = self.import_in_subinterp( + postscript=''' + # Attrs set after loading are not in m_copy. + mod.spam = 'spam, spam, mash, spam, eggs, and spam' + ''') + self.check_common(base) + self.check_fresh(base) + + # At this point: + # * alive in 0 interpreters + # * module def in _PyRuntime.imports.extensions + # * mod init func ran for the first time (since reset) + # * m_copy is still set (owned by main interpreter) + # * module's global state was initialized, not reset + + # Use a subinterpreter that sticks around. + loaded_interp1 = self.import_in_subinterp(interpid1) + self.check_common(loaded_interp1) + self.check_copied(loaded_interp1, base) + + # At this point: + # * alive in 1 interpreter (interp1) + # * module def still in _PyRuntime.imports.extensions + # * mod init func did not run again + # * m_copy was not changed + # * module's global state was not touched + + # Use a subinterpreter while the previous one is still alive. + loaded_interp2 = self.import_in_subinterp(interpid2) + self.check_common(loaded_interp2) + self.check_copied(loaded_interp2, loaded_interp1) + + # At this point: + # * alive in 2 interpreters (interp1, interp2) + # * module def still in _PyRuntime.imports.extensions + # * mod init func did not run again + # * m_copy was not changed + # * module's global state was not touched + + @requires_subinterpreters + def test_basic_multiple_interpreters_reset_each(self): + # resetting between each interpreter + + # At this point: + # * alive in 0 interpreters + # * module def may or may not be loaded already + # * module def not in _PyRuntime.imports.extensions + # * mod init func has not run yet (since reset, at least) + # * m_copy not set (hasn't been loaded yet or already cleared) + # * module's global state has not been initialized yet + # (or already cleared) + + interpid1 = self.add_subinterpreter() + interpid2 = self.add_subinterpreter() + + # Use an interpreter that gets destroyed right away. + loaded = self.import_in_subinterp( + postscript=''' + # Attrs set after loading are not in m_copy. + mod.spam = 'spam, spam, mash, spam, eggs, and spam' + ''', + postcleanup=True, + ) + self.check_common(loaded) + self.check_fresh(loaded) + + # At this point: + # * alive in 0 interpreters + # * module def in _PyRuntime.imports.extensions + # * mod init func ran for the first time (since reset, at least) + # * m_copy is NULL (claered when the interpreter was destroyed) + # * module's global state was initialized, not reset + + # Use a subinterpreter that sticks around. + loaded = self.import_in_subinterp(interpid1, postcleanup=True) + self.check_common(loaded) + self.check_fresh(loaded) + + # At this point: + # * alive in 1 interpreter (interp1) + # * module def still in _PyRuntime.imports.extensions + # * mod init func ran again + # * m_copy was copied from interp1 (was NULL) + # * module's global state was initialized, not reset + + # Use a subinterpreter while the previous one is still alive. + loaded = self.import_in_subinterp(interpid2, postcleanup=True) + self.check_common(loaded) + self.check_fresh(loaded) + + # At this point: + # * alive in 2 interpreters (interp2, interp2) + # * module def still in _PyRuntime.imports.extensions + # * mod init func ran again + # * m_copy was copied from interp2 (was from interp1) + # * module's global state was initialized, not reset + + if __name__ == '__main__': # Test needs to be a package, so we can do relative imports. unittest.main() diff --git a/Lib/test/test_import/data/circular_imports/import_cycle.py b/Lib/test/test_import/data/circular_imports/import_cycle.py new file mode 100644 index 0000000000..cd9507b5f6 --- /dev/null +++ b/Lib/test/test_import/data/circular_imports/import_cycle.py @@ -0,0 +1,3 @@ +import test.test_import.data.circular_imports.import_cycle as m + +m.some_attribute diff --git a/Lib/test/test_import/data/circular_imports/singlephase.py b/Lib/test/test_import/data/circular_imports/singlephase.py new file mode 100644 index 0000000000..05618bc72f --- /dev/null +++ b/Lib/test/test_import/data/circular_imports/singlephase.py @@ -0,0 +1,13 @@ +"""Circular import involving a single-phase-init extension. + +This module is imported from the _testsinglephase_circular module from +_testsinglephase, and imports that module again. +""" + +import importlib +import _testsinglephase +from test.test_import import import_extension_from_file + +name = '_testsinglephase_circular' +filename = _testsinglephase.__file__ +mod = import_extension_from_file(name, filename) diff --git a/Lib/test/test_import/data/double_const.py b/Lib/test/test_import/data/double_const.py new file mode 100644 index 0000000000..67852aaf98 --- /dev/null +++ b/Lib/test/test_import/data/double_const.py @@ -0,0 +1,30 @@ +from test.support import TestFailed + +# A test for SF bug 422177: manifest float constants varied way too much in +# precision depending on whether Python was loading a module for the first +# time, or reloading it from a precompiled .pyc. The "expected" failure +# mode is that when test_import imports this after all .pyc files have been +# erased, it passes, but when test_import imports this from +# double_const.pyc, it fails. This indicates a woeful loss of precision in +# the marshal format for doubles. It's also possible that repr() doesn't +# produce enough digits to get reasonable precision for this box. + +PI = 3.14159265358979324 +TWOPI = 6.28318530717958648 + +PI_str = "3.14159265358979324" +TWOPI_str = "6.28318530717958648" + +# Verify that the double x is within a few bits of eval(x_str). +def check_ok(x, x_str): + assert x > 0.0 + x2 = eval(x_str) + assert x2 > 0.0 + diff = abs(x - x2) + # If diff is no larger than 3 ULP (wrt x2), then diff/8 is no larger + # than 0.375 ULP, so adding diff/8 to x2 should have no effect. + if x2 + (diff / 8.) != x2: + raise TestFailed("Manifest const %s lost too much precision " % x_str) + +check_ok(PI, PI_str) +check_ok(TWOPI, TWOPI_str) diff --git a/Lib/test/test_import/data/package3/__init__.py b/Lib/test/test_import/data/package3/__init__.py new file mode 100644 index 0000000000..7033c22a71 --- /dev/null +++ b/Lib/test/test_import/data/package3/__init__.py @@ -0,0 +1,2 @@ +"""Rebinding the package attribute after importing the module.""" +from .submodule import submodule diff --git a/Lib/test/test_import/data/package3/submodule.py b/Lib/test/test_import/data/package3/submodule.py new file mode 100644 index 0000000000..cd7b30db15 --- /dev/null +++ b/Lib/test/test_import/data/package3/submodule.py @@ -0,0 +1,7 @@ +attr = 'submodule' +class A: + attr = 'submodule' +class submodule: + attr = 'rebound' + class B: + attr = 'rebound' diff --git a/Lib/test/test_import/data/package4/__init__.py b/Lib/test/test_import/data/package4/__init__.py new file mode 100644 index 0000000000..d8af60ab38 --- /dev/null +++ b/Lib/test/test_import/data/package4/__init__.py @@ -0,0 +1,5 @@ +"""Binding the package attribute without importing the module.""" +class submodule: + attr = 'origin' + class B: + attr = 'origin' diff --git a/Lib/test/test_import/data/package4/submodule.py b/Lib/test/test_import/data/package4/submodule.py new file mode 100644 index 0000000000..c861417aec --- /dev/null +++ b/Lib/test/test_import/data/package4/submodule.py @@ -0,0 +1,3 @@ +attr = 'submodule' +class A: + attr = 'submodule' diff --git a/Lib/test/test_pkgutil.py b/Lib/test/test_pkgutil.py index ece4cf2d05..0d44092dab 100644 --- a/Lib/test/test_pkgutil.py +++ b/Lib/test/test_pkgutil.py @@ -1,5 +1,6 @@ +from pathlib import Path from test.support.import_helper import unload, CleanImport -from test.support.warnings_helper import check_warnings +from test.support.warnings_helper import check_warnings, ignore_warnings import unittest import sys import importlib @@ -11,6 +12,10 @@ import shutil import zipfile +from test.support.import_helper import DirsOnSysPath +from test.support.os_helper import FakePath +from test.test_importlib.util import uncache + # Note: pkgutil.walk_packages is currently tested in test_runpy. This is # a hack to get a major issue resolved for 3.3b2. Longer term, it should # be moved back here, perhaps by factoring out the helper code for @@ -91,6 +96,45 @@ def test_getdata_zipfile(self): del sys.modules[pkg] + def test_issue44061_iter_modules(self): + #see: issue44061 + zip = 'test_getdata_zipfile.zip' + pkg = 'test_getdata_zipfile' + + # Include a LF and a CRLF, to test that binary data is read back + RESOURCE_DATA = b'Hello, world!\nSecond line\r\nThird line' + + # Make a package with some resources + zip_file = os.path.join(self.dirname, zip) + z = zipfile.ZipFile(zip_file, 'w') + + # Empty init.py + z.writestr(pkg + '/__init__.py', "") + # Resource files, res.txt + z.writestr(pkg + '/res.txt', RESOURCE_DATA) + z.close() + + # Check we can read the resources + sys.path.insert(0, zip_file) + try: + res = pkgutil.get_data(pkg, 'res.txt') + self.assertEqual(res, RESOURCE_DATA) + + # make sure iter_modules accepts Path objects + names = [] + for moduleinfo in pkgutil.iter_modules([FakePath(zip_file)]): + self.assertIsInstance(moduleinfo, pkgutil.ModuleInfo) + names.append(moduleinfo.name) + self.assertEqual(names, [pkg]) + finally: + del sys.path[0] + sys.modules.pop(pkg, None) + + # assert path must be None or list of paths + expected_msg = "path must be None or list of paths to look for modules in" + with self.assertRaisesRegex(ValueError, expected_msg): + list(pkgutil.iter_modules("invalid_path")) + def test_unreadable_dir_on_syspath(self): # issue7367 - walk_packages failed if unreadable dir on sys.path package_name = "unreadable_package" @@ -187,8 +231,7 @@ def test_walk_packages_raises_on_string_or_bytes_input(self): with self.assertRaises((TypeError, ValueError)): list(pkgutil.walk_packages(bytes_input)) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_name_resolution(self): import logging import logging.handlers @@ -280,6 +323,38 @@ def test_name_resolution(self): with self.assertRaises(exc): pkgutil.resolve_name(s) + def test_name_resolution_import_rebinding(self): + # The same data is also used for testing import in test_import and + # mock.patch in test_unittest. + path = os.path.join(os.path.dirname(__file__), 'test_import', 'data') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package3.submodule.attr'), 'submodule') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package3.submodule:attr'), 'submodule') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package3:submodule.attr'), 'rebound') + self.assertEqual(pkgutil.resolve_name('package3.submodule.attr'), 'submodule') + self.assertEqual(pkgutil.resolve_name('package3:submodule.attr'), 'rebound') + with uncache('package3', 'package3.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package3:submodule.attr'), 'rebound') + self.assertEqual(pkgutil.resolve_name('package3.submodule:attr'), 'submodule') + self.assertEqual(pkgutil.resolve_name('package3:submodule.attr'), 'rebound') + + def test_name_resolution_import_rebinding2(self): + path = os.path.join(os.path.dirname(__file__), 'test_import', 'data') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package4.submodule.attr'), 'submodule') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package4.submodule:attr'), 'submodule') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package4:submodule.attr'), 'origin') + self.assertEqual(pkgutil.resolve_name('package4.submodule.attr'), 'submodule') + self.assertEqual(pkgutil.resolve_name('package4:submodule.attr'), 'submodule') + with uncache('package4', 'package4.submodule'), DirsOnSysPath(path): + self.assertEqual(pkgutil.resolve_name('package4:submodule.attr'), 'origin') + self.assertEqual(pkgutil.resolve_name('package4.submodule:attr'), 'submodule') + self.assertEqual(pkgutil.resolve_name('package4:submodule.attr'), 'submodule') + class PkgutilPEP302Tests(unittest.TestCase): @@ -391,7 +466,7 @@ def test_iter_importers(self): importers = list(iter_importers(fullname)) expected_importer = get_importer(pathitem) for finder in importers: - spec = pkgutil._get_spec(finder, fullname) + spec = finder.find_spec(fullname) loader = spec.loader try: loader = loader.loader @@ -403,7 +478,7 @@ def test_iter_importers(self): self.assertEqual(finder, expected_importer) self.assertIsInstance(loader, importlib.machinery.SourceFileLoader) - self.assertIsNone(pkgutil._get_spec(finder, pkgname)) + self.assertIsNone(finder.find_spec(pkgname)) with self.assertRaises(ImportError): list(iter_importers('invalid.module')) @@ -448,7 +523,43 @@ def test_mixed_namespace(self): del sys.modules['foo.bar'] del sys.modules['foo.baz'] - # XXX: test .pkg files + + def test_extend_path_argument_types(self): + pkgname = 'foo' + dirname_0 = self.create_init(pkgname) + + # If the input path is not a list it is returned unchanged + self.assertEqual('notalist', pkgutil.extend_path('notalist', 'foo')) + self.assertEqual(('not', 'a', 'list'), pkgutil.extend_path(('not', 'a', 'list'), 'foo')) + self.assertEqual(123, pkgutil.extend_path(123, 'foo')) + self.assertEqual(None, pkgutil.extend_path(None, 'foo')) + + # Cleanup + shutil.rmtree(dirname_0) + del sys.path[0] + + + def test_extend_path_pkg_files(self): + pkgname = 'foo' + dirname_0 = self.create_init(pkgname) + + with open(os.path.join(dirname_0, 'bar.pkg'), 'w') as pkg_file: + pkg_file.write('\n'.join([ + 'baz', + '/foo/bar/baz', + '', + '#comment' + ])) + + extended_paths = pkgutil.extend_path(sys.path, 'bar') + + self.assertEqual(extended_paths[:-2], sys.path) + self.assertEqual(extended_paths[-2], 'baz') + self.assertEqual(extended_paths[-1], '/foo/bar/baz') + + # Cleanup + shutil.rmtree(dirname_0) + del sys.path[0] class NestedNamespacePackageTest(unittest.TestCase): @@ -491,36 +602,50 @@ def test_nested(self): self.assertEqual(c, 1) self.assertEqual(d, 2) + +class ImportlibMigrationTests(unittest.TestCase): + # With full PEP 302 support in the standard import machinery, the + # PEP 302 emulation in this module is in the process of being + # deprecated in favour of importlib proper + @unittest.skipIf(__name__ == '__main__', 'not compatible with __main__') + @ignore_warnings(category=DeprecationWarning) def test_get_loader_handles_missing_loader_attribute(self): global __loader__ this_loader = __loader__ del __loader__ try: - with check_warnings() as w: - self.assertIsNotNone(pkgutil.get_loader(__name__)) - self.assertEqual(len(w.warnings), 0) + self.assertIsNotNone(pkgutil.get_loader(__name__)) finally: __loader__ = this_loader + @ignore_warnings(category=DeprecationWarning) def test_get_loader_handles_missing_spec_attribute(self): name = 'spam' mod = type(sys)(name) del mod.__spec__ with CleanImport(name): - sys.modules[name] = mod - loader = pkgutil.get_loader(name) + try: + sys.modules[name] = mod + loader = pkgutil.get_loader(name) + finally: + sys.modules.pop(name, None) self.assertIsNone(loader) + @ignore_warnings(category=DeprecationWarning) def test_get_loader_handles_spec_attribute_none(self): name = 'spam' mod = type(sys)(name) mod.__spec__ = None with CleanImport(name): - sys.modules[name] = mod - loader = pkgutil.get_loader(name) + try: + sys.modules[name] = mod + loader = pkgutil.get_loader(name) + finally: + sys.modules.pop(name, None) self.assertIsNone(loader) + @ignore_warnings(category=DeprecationWarning) def test_get_loader_None_in_sys_modules(self): name = 'totally bogus' sys.modules[name] = None @@ -530,24 +655,38 @@ def test_get_loader_None_in_sys_modules(self): del sys.modules[name] self.assertIsNone(loader) + def test_get_loader_is_deprecated(self): + with check_warnings( + (r".*\bpkgutil.get_loader\b.*", DeprecationWarning), + ): + res = pkgutil.get_loader("sys") + self.assertIsNotNone(res) + + def test_find_loader_is_deprecated(self): + with check_warnings( + (r".*\bpkgutil.find_loader\b.*", DeprecationWarning), + ): + res = pkgutil.find_loader("sys") + self.assertIsNotNone(res) + + @ignore_warnings(category=DeprecationWarning) def test_find_loader_missing_module(self): name = 'totally bogus' loader = pkgutil.find_loader(name) self.assertIsNone(loader) - def test_find_loader_avoids_emulation(self): - with check_warnings() as w: - self.assertIsNotNone(pkgutil.find_loader("sys")) - self.assertIsNotNone(pkgutil.find_loader("os")) - self.assertIsNotNone(pkgutil.find_loader("test.support")) - self.assertEqual(len(w.warnings), 0) - def test_get_importer_avoids_emulation(self): # We use an illegal path so *none* of the path hooks should fire with check_warnings() as w: self.assertIsNone(pkgutil.get_importer("*??")) self.assertEqual(len(w.warnings), 0) + def test_issue44061(self): + try: + pkgutil.get_importer(Path("/home")) + except AttributeError: + self.fail("Unexpected AttributeError when calling get_importer") + def test_iter_importers_avoids_emulation(self): with check_warnings() as w: for importer in pkgutil.iter_importers(): pass diff --git a/Lib/test/test_unittest/test_async_case.py b/Lib/test/test_unittest/test_async_case.py index 1d2b87f161..f77a5888ee 100644 --- a/Lib/test/test_unittest/test_async_case.py +++ b/Lib/test/test_unittest/test_async_case.py @@ -48,8 +48,7 @@ def setUp(self): # starting a new event loop self.addCleanup(support.gc_collect) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_full_cycle(self): expected = ['setUp', 'asyncSetUp', @@ -296,8 +295,7 @@ async def on_cleanup2(self): test.doCleanups() self.assertEqual(events, ['asyncSetUp', 'test', 'asyncTearDown', 'cleanup2', 'cleanup1']) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_deprecation_of_return_val_from_test(self): # Issue 41322 - deprecate return of value that is not None from a test class Nothing: @@ -488,8 +486,7 @@ async def test_demo1(self): result = test.run() self.assertTrue(result.wasSuccessful()) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_loop_factory(self): asyncio.set_event_loop_policy(None) diff --git a/Lib/test/test_unittest/test_discovery.py b/Lib/test/test_unittest/test_discovery.py index 9e231ff8d2..a44b18406c 100644 --- a/Lib/test/test_unittest/test_discovery.py +++ b/Lib/test/test_unittest/test_discovery.py @@ -364,8 +364,6 @@ def __eq__(self, other): [(loader, [], 'test*.py'), (loader, [], 'test*.py')]) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_discover(self): loader = unittest.TestLoader() @@ -412,8 +410,6 @@ def _find_tests(start_dir, pattern): self.assertEqual(_find_tests_args, [(start_dir, 'pattern')]) self.assertIn(top_level_dir, sys.path) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_discover_should_not_persist_top_level_dir_between_calls(self): original_isfile = os.path.isfile original_isdir = os.path.isdir diff --git a/Lib/test/test_unittest/test_loader.py b/Lib/test/test_unittest/test_loader.py index 9881335318..83dd25ca54 100644 --- a/Lib/test/test_unittest/test_loader.py +++ b/Lib/test/test_unittest/test_loader.py @@ -90,8 +90,6 @@ def test_loadTestsFromTestCase__from_TestCase(self): self.assertIsInstance(suite, loader.suiteClass) self.assertEqual(list(suite), []) - # TODO: RUSTPYTHON - @unittest.expectedFailure # "Do not load any tests from `FunctionTestCase` class." def test_loadTestsFromTestCase__from_FunctionTestCase(self): loader = unittest.TestLoader() @@ -121,8 +119,6 @@ def test(self): expected = [loader.suiteClass([MyTestCase('test')])] self.assertEqual(list(suite), expected) - # TODO: RUSTPYTHON - @unittest.expectedFailure # "This test ensures that internal `TestCase` subclasses are not loaded" def test_loadTestsFromModule__TestCase_subclass_internals(self): # See https://github.com/python/cpython/issues/84867 @@ -187,8 +183,6 @@ class NotAModule(object): self.assertEqual(list(suite), reference) - # TODO: RUSTPYTHON - @unittest.expectedFailure # Check that loadTestsFromModule honors a module # with a load_tests function. def test_loadTestsFromModule__load_tests(self): diff --git a/Lib/test/test_unittest/test_program.py b/Lib/test/test_unittest/test_program.py index 8256aaeb8c..2e3a750847 100644 --- a/Lib/test/test_unittest/test_program.py +++ b/Lib/test/test_unittest/test_program.py @@ -75,6 +75,14 @@ def testUnexpectedSuccess(self): class Empty(unittest.TestCase): pass + class SetUpClassFailure(unittest.TestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + raise Exception + def testPass(self): + pass + class TestLoader(unittest.TestLoader): """Test loader that returns a suite containing the supplied testcase.""" @@ -191,6 +199,18 @@ def test_ExitEmptySuite(self): out = stream.getvalue() self.assertIn('\nNO TESTS RAN\n', out) + def test_ExitSetUpClassFailureSuite(self): + stream = BufferedWriter() + with self.assertRaises(SystemExit) as cm: + unittest.main( + argv=["setup_class_failure"], + testRunner=unittest.TextTestRunner(stream=stream), + testLoader=self.TestLoader(self.SetUpClassFailure)) + self.assertEqual(cm.exception.code, 1) + out = stream.getvalue() + self.assertIn("ERROR: setUpClass", out) + self.assertIn("SetUpClassFailure", out) + class InitialisableProgram(unittest.TestProgram): exit = False @@ -485,8 +505,8 @@ def testParseArgsSelectedTestNames(self): self.assertEqual(program.testNamePatterns, ['*foo*', '*bar*', '*pat*']) - # TODO: RUSTPYTHON - @unittest.expectedFailureIf(sys.platform != 'win32', "TODO: RUSTPYTHON") + @unittest.expectedFailureIf(sys.platform != 'win32', 'TODO: RUSTPYTHON') + def testSelectedTestNamesFunctionalTest(self): def run_unittest(args): # Use -E to ignore PYTHONSAFEPATH env var diff --git a/Lib/test/test_unittest/test_runner.py b/Lib/test/test_unittest/test_runner.py index a652c71513..790c4d29ca 100644 --- a/Lib/test/test_unittest/test_runner.py +++ b/Lib/test/test_unittest/test_runner.py @@ -1297,8 +1297,7 @@ def _makeResult(self): expected = ['startTestRun', 'stopTestRun'] self.assertEqual(events, expected) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_pickle_unpickle(self): # Issue #7197: a TextTestRunner should be (un)pickleable. This is # required by test_multiprocessing under Windows (in verbose mode). diff --git a/Lib/test/test_unittest/testmock/testasync.py b/Lib/test/test_unittest/testmock/testasync.py index 06ec7ea204..ddc6f0599c 100644 --- a/Lib/test/test_unittest/testmock/testasync.py +++ b/Lib/test/test_unittest/testmock/testasync.py @@ -340,8 +340,7 @@ def test_spec_async_attributes_instance(self): # only the shape of the spec at the time of mock construction matters self.assertNotIsInstance(mock_async_instance.later_async_func_attr, AsyncMock) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_spec_mock_type_kw(self): def inner_test(mock_type): async_mock = mock_type(spec=async_func) @@ -356,8 +355,7 @@ def inner_test(mock_type): with self.subTest(f"test spec kwarg with {mock_type}"): inner_test(mock_type) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_spec_mock_type_positional(self): def inner_test(mock_type): async_mock = mock_type(async_func) @@ -736,8 +734,7 @@ def __aiter__(self): pass async def __anext__(self): pass - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_aiter_set_return_value(self): mock_iter = AsyncMock(name="tester") mock_iter.__aiter__.return_value = [1, 2, 3] @@ -763,8 +760,7 @@ def inner_test(mock_type): inner_test(mock_type) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_mock_async_for(self): async def iterate(iterator): accumulator = [] @@ -810,8 +806,7 @@ async def _runnable_test(self, *args, **kwargs): async def _await_coroutine(self, coroutine): return await coroutine - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_called_but_not_awaited(self): mock = AsyncMock(AsyncClass) with assertNeverAwaited(self): @@ -852,8 +847,7 @@ def test_assert_called_and_awaited_at_same_time(self): self.mock.assert_called_once() self.mock.assert_awaited_once() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_called_twice_and_awaited_once(self): mock = AsyncMock(AsyncClass) coroutine = mock.async_method() @@ -868,8 +862,7 @@ def test_assert_called_twice_and_awaited_once(self): mock.async_method.assert_awaited() mock.async_method.assert_awaited_once() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_called_once_and_awaited_twice(self): mock = AsyncMock(AsyncClass) coroutine = mock.async_method() @@ -894,8 +887,7 @@ def test_assert_awaited_but_not_called(self): with self.assertRaises(AssertionError): self.mock.assert_called() - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_has_calls_not_awaits(self): kalls = [call('foo')] with assertNeverAwaited(self): @@ -904,8 +896,7 @@ def test_assert_has_calls_not_awaits(self): with self.assertRaises(AssertionError): self.mock.assert_has_awaits(kalls) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_has_mock_calls_on_async_mock_no_spec(self): with assertNeverAwaited(self): self.mock() @@ -919,8 +910,7 @@ def test_assert_has_mock_calls_on_async_mock_no_spec(self): mock_kalls = ([call(), call('foo'), call('baz')]) self.assertEqual(self.mock.mock_calls, mock_kalls) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_assert_has_mock_calls_on_async_mock_with_spec(self): a_class_mock = AsyncMock(AsyncClass) with assertNeverAwaited(self): @@ -936,8 +926,7 @@ def test_assert_has_mock_calls_on_async_mock_with_spec(self): self.assertEqual(a_class_mock.async_method.mock_calls, method_kalls) self.assertEqual(a_class_mock.mock_calls, mock_kalls) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_async_method_calls_recorded(self): with assertNeverAwaited(self): self.mock.something(3, fish=None) @@ -953,8 +942,7 @@ def test_async_method_calls_recorded(self): [("something", (6,), {'cake': sentinel.Cake})], "method calls not recorded correctly") - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_async_arg_lists(self): def assert_attrs(mock): names = ('call_args_list', 'method_calls', 'mock_calls') diff --git a/Lib/test/test_unittest/testmock/testhelpers.py b/Lib/test/test_unittest/testmock/testhelpers.py index c83068beb1..a19f04eb88 100644 --- a/Lib/test/test_unittest/testmock/testhelpers.py +++ b/Lib/test/test_unittest/testmock/testhelpers.py @@ -43,8 +43,7 @@ def test_any_and_datetime(self): mock.assert_called_with(ANY, foo=ANY) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_any_mock_calls_comparison_order(self): mock = Mock() class Foo(object): diff --git a/Lib/test/test_unittest/testmock/testpatch.py b/Lib/test/test_unittest/testmock/testpatch.py index 62c6221f77..87424e07e4 100644 --- a/Lib/test/test_unittest/testmock/testpatch.py +++ b/Lib/test/test_unittest/testmock/testpatch.py @@ -1778,8 +1778,6 @@ def test(mock): 'exception traceback not propagated') - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_name_resolution_import_rebinding(self): # Currently mock.patch uses pkgutil.resolve_name(), but repeat # similar tests just for the case. @@ -1814,8 +1812,6 @@ def check_error(name): check('package3:submodule.B.attr') check_error('package3:submodule.A.attr') - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_name_resolution_import_rebinding2(self): path = os.path.join(os.path.dirname(test.__file__), 'test_import', 'data') def check(name): @@ -2016,8 +2012,7 @@ def test_patch_and_patch_dict_stopall(self): self.assertEqual(dic2, origdic2) - # TODO: RUSTPYTHON - @unittest.expectedFailure + @unittest.expectedFailure # TODO: RUSTPYTHON def test_special_attrs(self): def foo(x=0): """TEST""" diff --git a/Lib/unittest/__init__.py b/Lib/unittest/__init__.py index 7ab3f5e87b..6878c2a8f5 100644 --- a/Lib/unittest/__init__.py +++ b/Lib/unittest/__init__.py @@ -51,10 +51,6 @@ def testMultiply(self): 'registerResult', 'removeResult', 'removeHandler', 'addModuleCleanup', 'doModuleCleanups', 'enterModuleContext'] -# Expose obsolete functions for backwards compatibility -# bpo-5846: Deprecated in Python 3.11, scheduled for removal in Python 3.13. -__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases']) - __unittest = True from .result import TestResult @@ -67,20 +63,6 @@ def testMultiply(self): from .runner import TextTestRunner, TextTestResult from .signals import installHandler, registerResult, removeResult, removeHandler # IsolatedAsyncioTestCase will be imported lazily. -from .loader import makeSuite, getTestCaseNames, findTestCases - -# deprecated -_TextTestResult = TextTestResult - - -# There are no tests here, so don't try to run anything discovered from -# introspecting the symbols (e.g. FunctionTestCase). Instead, all our -# tests come from within unittest.test. -def load_tests(loader, tests, pattern): - import os.path - # top level directory cached on loader instance - this_dir = os.path.dirname(__file__) - return loader.discover(start_dir=this_dir, pattern=pattern) # Lazy import of IsolatedAsyncioTestCase from .async_case @@ -98,6 +80,7 @@ def __getattr__(name): raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + # XXX: RUSTPYTHON # This is very useful to reduce platform difference boilerplates in tests. def expectedFailureIf(condition, reason): @@ -111,4 +94,4 @@ def expectedFailureIf(condition, reason): # Even more useful because most of them are windows only. def expectedFailureIfWindows(reason): import sys - return expectedFailureIf(sys.platform == 'win32', reason) + return expectedFailureIf(sys.platform == 'win32', reason) \ No newline at end of file diff --git a/Lib/unittest/async_case.py b/Lib/unittest/async_case.py index bd2a471156..e761ba7e53 100644 --- a/Lib/unittest/async_case.py +++ b/Lib/unittest/async_case.py @@ -5,6 +5,7 @@ from .case import TestCase +__unittest = True class IsolatedAsyncioTestCase(TestCase): # Names intentionally have a long prefix @@ -25,12 +26,15 @@ class IsolatedAsyncioTestCase(TestCase): # them inside the same task. # Note: the test case modifies event loop policy if the policy was not instantiated - # yet. + # yet, unless loop_factory=asyncio.EventLoop is set. # asyncio.get_event_loop_policy() creates a default policy on demand but never # returns None # I believe this is not an issue in user level tests but python itself for testing # should reset a policy in every test module # by calling asyncio.set_event_loop_policy(None) in tearDownModule() + # or set loop_factory=asyncio.EventLoop + + loop_factory = None def __init__(self, methodName='runTest'): super().__init__(methodName) @@ -118,7 +122,7 @@ def _callMaybeAsync(self, func, /, *args, **kwargs): def _setupAsyncioRunner(self): assert self._asyncioRunner is None, 'asyncio runner is already initialized' - runner = asyncio.Runner(debug=True) + runner = asyncio.Runner(debug=True, loop_factory=self.loop_factory) self._asyncioRunner = runner def _tearDownAsyncioRunner(self): diff --git a/Lib/unittest/loader.py b/Lib/unittest/loader.py index 7e6ce2f224..22797b83a6 100644 --- a/Lib/unittest/loader.py +++ b/Lib/unittest/loader.py @@ -6,7 +6,6 @@ import traceback import types import functools -import warnings from fnmatch import fnmatch, fnmatchcase @@ -57,9 +56,7 @@ def testSkipped(self): TestClass = type("ModuleSkipped", (case.TestCase,), attrs) return suiteClass((TestClass(methodname),)) -def _jython_aware_splitext(path): - if path.lower().endswith('$py.class'): - return path[:-9] +def _splitext(path): return os.path.splitext(path)[0] @@ -87,40 +84,26 @@ def loadTestsFromTestCase(self, testCaseClass): raise TypeError("Test cases should not be derived from " "TestSuite. Maybe you meant to derive from " "TestCase?") - testCaseNames = self.getTestCaseNames(testCaseClass) - if not testCaseNames and hasattr(testCaseClass, 'runTest'): - testCaseNames = ['runTest'] + if testCaseClass in (case.TestCase, case.FunctionTestCase): + # We don't load any tests from base types that should not be loaded. + testCaseNames = [] + else: + testCaseNames = self.getTestCaseNames(testCaseClass) + if not testCaseNames and hasattr(testCaseClass, 'runTest'): + testCaseNames = ['runTest'] loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames)) return loaded_suite - # XXX After Python 3.5, remove backward compatibility hacks for - # use_load_tests deprecation via *args and **kws. See issue 16662. - def loadTestsFromModule(self, module, *args, pattern=None, **kws): + def loadTestsFromModule(self, module, *, pattern=None): """Return a suite of all test cases contained in the given module""" - # This method used to take an undocumented and unofficial - # use_load_tests argument. For backward compatibility, we still - # accept the argument (which can also be the first position) but we - # ignore it and issue a deprecation warning if it's present. - if len(args) > 0 or 'use_load_tests' in kws: - warnings.warn('use_load_tests is deprecated and ignored', - DeprecationWarning) - kws.pop('use_load_tests', None) - if len(args) > 1: - # Complain about the number of arguments, but don't forget the - # required `module` argument. - complaint = len(args) + 1 - raise TypeError('loadTestsFromModule() takes 1 positional argument but {} were given'.format(complaint)) - if len(kws) != 0: - # Since the keyword arguments are unsorted (see PEP 468), just - # pick the alphabetically sorted first argument to complain about, - # if multiple were given. At least the error message will be - # predictable. - complaint = sorted(kws)[0] - raise TypeError("loadTestsFromModule() got an unexpected keyword argument '{}'".format(complaint)) tests = [] for name in dir(module): obj = getattr(module, name) - if isinstance(obj, type) and issubclass(obj, case.TestCase): + if ( + isinstance(obj, type) + and issubclass(obj, case.TestCase) + and obj not in (case.TestCase, case.FunctionTestCase) + ): tests.append(self.loadTestsFromTestCase(obj)) load_tests = getattr(module, 'load_tests', None) @@ -189,7 +172,11 @@ def loadTestsFromName(self, name, module=None): if isinstance(obj, types.ModuleType): return self.loadTestsFromModule(obj) - elif isinstance(obj, type) and issubclass(obj, case.TestCase): + elif ( + isinstance(obj, type) + and issubclass(obj, case.TestCase) + and obj not in (case.TestCase, case.FunctionTestCase) + ): return self.loadTestsFromTestCase(obj) elif (isinstance(obj, types.FunctionType) and isinstance(parent, type) and @@ -267,6 +254,7 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): Paths are sorted before being imported to ensure reproducible execution order even on filesystems with non-alphabetical ordering like ext3/4. """ + original_top_level_dir = self._top_level_dir set_implicit_top = False if top_level_dir is None and self._top_level_dir is not None: # make top_level_dir optional if called from load_tests in a package @@ -320,6 +308,7 @@ def discover(self, start_dir, pattern='test*.py', top_level_dir=None): raise ImportError('Start directory is not importable: %r' % start_dir) tests = list(self._find_tests(start_dir, pattern)) + self._top_level_dir = original_top_level_dir return self.suiteClass(tests) def _get_directory_containing_module(self, module_name): @@ -337,7 +326,7 @@ def _get_directory_containing_module(self, module_name): def _get_name_from_path(self, path): if path == self._top_level_dir: return '.' - path = _jython_aware_splitext(os.path.normpath(path)) + path = _splitext(os.path.normpath(path)) _relpath = os.path.relpath(path, self._top_level_dir) assert not os.path.isabs(_relpath), "Path must be within the project" @@ -415,13 +404,13 @@ def _find_test_path(self, full_path, pattern): else: mod_file = os.path.abspath( getattr(module, '__file__', full_path)) - realpath = _jython_aware_splitext( + realpath = _splitext( os.path.realpath(mod_file)) - fullpath_noext = _jython_aware_splitext( + fullpath_noext = _splitext( os.path.realpath(full_path)) if realpath.lower() != fullpath_noext.lower(): module_dir = os.path.dirname(realpath) - mod_name = _jython_aware_splitext( + mod_name = _splitext( os.path.basename(full_path)) expected_dir = os.path.dirname(full_path) msg = ("%r module incorrectly imported from %r. Expected " @@ -462,47 +451,3 @@ def _find_test_path(self, full_path, pattern): defaultTestLoader = TestLoader() - - -# These functions are considered obsolete for long time. -# They will be removed in Python 3.13. - -def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None): - loader = TestLoader() - loader.sortTestMethodsUsing = sortUsing - loader.testMethodPrefix = prefix - loader.testNamePatterns = testNamePatterns - if suiteClass: - loader.suiteClass = suiteClass - return loader - -def getTestCaseNames(testCaseClass, prefix, sortUsing=util.three_way_cmp, testNamePatterns=None): - import warnings - warnings.warn( - "unittest.getTestCaseNames() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.getTestCaseNames() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, testNamePatterns=testNamePatterns).getTestCaseNames(testCaseClass) - -def makeSuite(testCaseClass, prefix='test', sortUsing=util.three_way_cmp, - suiteClass=suite.TestSuite): - import warnings - warnings.warn( - "unittest.makeSuite() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.loadTestsFromTestCase() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase( - testCaseClass) - -def findTestCases(module, prefix='test', sortUsing=util.three_way_cmp, - suiteClass=suite.TestSuite): - import warnings - warnings.warn( - "unittest.findTestCases() is deprecated and will be removed in Python 3.13. " - "Please use unittest.TestLoader.loadTestsFromModule() instead.", - DeprecationWarning, stacklevel=2 - ) - return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(\ - module) diff --git a/Lib/unittest/main.py b/Lib/unittest/main.py index c3869de3f6..a0cd8a9f7e 100644 --- a/Lib/unittest/main.py +++ b/Lib/unittest/main.py @@ -269,12 +269,12 @@ def runTests(self): testRunner = self.testRunner self.result = testRunner.run(self.test) if self.exit: - if self.result.testsRun == 0 and len(self.result.skipped) == 0: + if not self.result.wasSuccessful(): + sys.exit(1) + elif self.result.testsRun == 0 and len(self.result.skipped) == 0: sys.exit(_NO_TESTS_EXITCODE) - elif self.result.wasSuccessful(): - sys.exit(0) else: - sys.exit(1) + sys.exit(0) main = TestProgram