From c974afb488cda515736e398caab5a00be7607f9e Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Mon, 23 Mar 2026 04:47:08 +0000 Subject: [PATCH] gh-146313: Fix ResourceTracker deadlock after os.fork() Problem ResourceTracker.__del__ (added in gh-88887) calls os.waitpid(pid, 0) which blocks indefinitely if a process created via os.fork() still holds the tracker pipe's write end. The tracker never sees EOF, never exits, and the parent hangs at interpreter shutdown. Root cause Three requirements conflict: - gh-88887 wants the parent to reap the tracker to prevent zombies - gh-80849 wants mp.Process(fork) children to reuse the parent's tracker via the inherited pipe fd - gh-146313 shows the parent can't block in waitpid() if a child holds the fd -- the tracker won't see EOF until all copies close Fix Two layers: Timeout safety-net. _stop_locked() gains a wait_timeout parameter. When called from __del__, it polls with WNOHANG using exponential backoff for up to 1 second instead of blocking indefinitely. At-fork handler. An os.register_at_fork(after_in_child=...) handler closes the inherited pipe fd in the child unless a preserve flag is set. popen_fork.Popen._launch() sets the flag before its fork so mp.Process(fork) children keep the fd and reuse the parent's tracker (preserving gh-80849). Raw os.fork() children close the fd, letting the parent reap promptly. Result Scenario Before After Raw os.fork(), parent exits while child alive deadlock ~30ms reap mp.Process(fork), parent joins then exits ~30ms reap ~30ms reap mp.Process(fork), parent exits abnormally deadlock 1s bounded wait No fork (gh-88887 scenario) ~30ms reap ~30ms reap The at-fork handler makes the timeout unreachable in all well-behaved paths. The timeout remains as a safety net for abnormal shutdowns. --- Lib/multiprocessing/popen_fork.py | 12 +- Lib/multiprocessing/resource_tracker.py | 88 ++++++- Lib/test/_test_multiprocessing.py | 236 +++++++++++++++++- ...-03-22-23-42-22.gh-issue-146313.RtDeAd.rst | 4 + 4 files changed, 331 insertions(+), 9 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py index 7affa1b985f091..a02a53b6a176da 100644 --- a/Lib/multiprocessing/popen_fork.py +++ b/Lib/multiprocessing/popen_fork.py @@ -67,7 +67,17 @@ def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() child_r, parent_w = os.pipe() - self.pid = os.fork() + # gh-146313: Tell the resource tracker's at-fork handler to keep + # the inherited pipe fd so this child reuses the parent's tracker + # (gh-80849) rather than closing it and launching its own. + from .resource_tracker import _fork_intent + _fork_intent.preserve_fd = True + try: + self.pid = os.fork() + finally: + # Reset in both parent and child so the flag does not leak + # into a subsequent raw os.fork() or nested Process launch. + _fork_intent.preserve_fd = False if self.pid == 0: try: atexit._clear() diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 3606d1effb495b..9c46df922ff40f 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -20,6 +20,7 @@ import signal import sys import threading +import time import warnings from collections import deque @@ -75,6 +76,10 @@ def __init__(self): # The reader should understand all formats. self._use_simple_format = False + # Set to True by _stop_locked() if the waitpid polling loop ran to + # its timeout without reaping the tracker. Exposed for tests. + self._waitpid_timed_out = False + def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker # gets interrupted by a garbage collection, invoking a finalizer (*) @@ -87,16 +92,50 @@ def __del__(self): # making sure child processess are cleaned before ResourceTracker # gets destructed. # see https://github.com/python/cpython/issues/88887 - self._stop(use_blocking_lock=False) + # gh-146313: use a timeout to avoid deadlocking if a forked child + # still holds the pipe's write end open. + self._stop(use_blocking_lock=False, wait_timeout=1.0) + + def _after_fork_in_child(self): + # gh-146313: Called in the child right after os.fork(). + # + # The tracker process is a child of the *parent*, not of us, so we + # could never waitpid() it anyway. Clearing _pid means our __del__ + # becomes a no-op (the early return for _pid is None). + # + # Whether we keep the inherited _fd depends on who forked us: + # + # - multiprocessing.Process with the 'fork' start method sets + # _fork_intent.preserve_fd before forking. The child keeps the + # fd and reuses the parent's tracker (gh-80849). This is safe + # because multiprocessing's atexit handler joins all children + # before the parent's __del__ runs, so by then the fd copies + # are gone and the parent can reap the tracker promptly. + # + # - A raw os.fork() leaves the flag unset. We close the fd so + # the parent's __del__ can reap the tracker without waiting + # for us to exit. If we later need a tracker, ensure_running() + # will launch a fresh one. + self._lock._at_fork_reinit() + self._reentrant_messages.clear() + self._pid = None + self._exitcode = None + if (self._fd is not None and + not getattr(_fork_intent, 'preserve_fd', False)): + try: + os.close(self._fd) + except OSError: + pass + self._fd = None - def _stop(self, use_blocking_lock=True): + def _stop(self, use_blocking_lock=True, wait_timeout=None): if use_blocking_lock: with self._lock: - self._stop_locked() + self._stop_locked(wait_timeout=wait_timeout) else: acquired = self._lock.acquire(blocking=False) try: - self._stop_locked() + self._stop_locked(wait_timeout=wait_timeout) finally: if acquired: self._lock.release() @@ -106,6 +145,10 @@ def _stop_locked( close=os.close, waitpid=os.waitpid, waitstatus_to_exitcode=os.waitstatus_to_exitcode, + monotonic=time.monotonic, + sleep=time.sleep, + WNOHANG=os.WNOHANG, + wait_timeout=None, ): # This shouldn't happen (it might when called by a finalizer) # so we check for it anyway. @@ -122,7 +165,30 @@ def _stop_locked( self._fd = None try: - _, status = waitpid(self._pid, 0) + if wait_timeout is None: + _, status = waitpid(self._pid, 0) + else: + # gh-146313: A forked child may still hold the pipe's write + # end open, preventing the tracker from seeing EOF and + # exiting. Poll with WNOHANG to avoid blocking forever. + deadline = monotonic() + wait_timeout + delay = 0.001 + while True: + result_pid, status = waitpid(self._pid, WNOHANG) + if result_pid != 0: + break + remaining = deadline - monotonic() + if remaining <= 0: + # The tracker is still running; it will be + # reparented to PID 1 (or the nearest subreaper) + # when we exit, and reaped there once all pipe + # holders release their fd. + self._pid = None + self._exitcode = None + self._waitpid_timed_out = True + return + delay = min(delay * 2, remaining, 0.1) + sleep(delay) except ChildProcessError: self._pid = None self._exitcode = None @@ -308,12 +374,24 @@ def _send(self, cmd, name, rtype): self._ensure_running_and_write(msg) +# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before +# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so +# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls, +# where the child instead closes the fd so the parent's __del__ can reap the +# tracker. Using threading.local() keeps multiple threads calling +# popen_fork.Popen._launch() at once from clobbering eachothers intent. +_fork_intent = threading.local() + _resource_tracker = ResourceTracker() ensure_running = _resource_tracker.ensure_running register = _resource_tracker.register unregister = _resource_tracker.unregister getfd = _resource_tracker.getfd +# gh-146313: See _after_fork_in_child docstring. +if hasattr(os, 'register_at_fork'): + os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child) + def _decode_message(line): if line.startswith(b'{'): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 490c7ae5e8076c..77f603363d9fc9 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -207,10 +207,38 @@ def spawn_check_wrapper(*args, **kwargs): return decorator +def only_run_in_forkserver_testsuite(reason): + """Returns a decorator: raises SkipTest unless fork is supported + and the current start method is forkserver. + + Combines @support.requires_fork() with the single-run semantics of + only_run_in_spawn_testsuite(), but uses the forkserver testsuite as + the single-run target. Appropriate for tests that exercise + os.fork() directly (raw fork or mp.set_start_method("fork") in a + subprocess) and don't vary by start method, since forkserver is + only available on platforms that support fork. + """ + + def decorator(test_item): + + @functools.wraps(test_item) + def forkserver_check_wrapper(*args, **kwargs): + if not support.has_fork_support: + raise unittest.SkipTest("requires working os.fork()") + if (start_method := multiprocessing.get_start_method()) != "forkserver": + raise unittest.SkipTest( + f"{start_method=}, not 'forkserver'; {reason}") + return test_item(*args, **kwargs) + + return forkserver_check_wrapper + + return decorator + + class TestInternalDecorators(unittest.TestCase): """Logic within a test suite that could errantly skip tests? Test it!""" - @unittest.skipIf(sys.platform == "win32", "test requires that fork exists.") + @support.requires_fork() def test_only_run_in_spawn_testsuite(self): if multiprocessing.get_start_method() != "spawn": raise unittest.SkipTest("only run in test_multiprocessing_spawn.") @@ -234,6 +262,30 @@ def return_four_if_spawn(): finally: multiprocessing.set_start_method(orig_start_method, force=True) + @support.requires_fork() + def test_only_run_in_forkserver_testsuite(self): + if multiprocessing.get_start_method() != "forkserver": + raise unittest.SkipTest("only run in test_multiprocessing_forkserver.") + + try: + @only_run_in_forkserver_testsuite("testing this decorator") + def return_four_if_forkserver(): + return 4 + except Exception as err: + self.fail(f"expected decorated `def` not to raise; caught {err}") + + orig_start_method = multiprocessing.get_start_method(allow_none=True) + try: + multiprocessing.set_start_method("forkserver", force=True) + self.assertEqual(return_four_if_forkserver(), 4) + multiprocessing.set_start_method("spawn", force=True) + with self.assertRaises(unittest.SkipTest) as ctx: + return_four_if_forkserver() + self.assertIn("testing this decorator", str(ctx.exception)) + self.assertIn("start_method=", str(ctx.exception)) + finally: + multiprocessing.set_start_method(orig_start_method, force=True) + # # Creates a wrapper for a function which records the time it takes to finish @@ -6269,8 +6321,9 @@ def test_resource_tracker_sigkill(self): def _is_resource_tracker_reused(conn, pid): from multiprocessing.resource_tracker import _resource_tracker _resource_tracker.ensure_running() - # The pid should be None in the child process, expect for the fork - # context. It should not be a new value. + # The pid should be None in the child (the at-fork handler clears + # it for fork; spawn/forkserver children never had it set). It + # should not be a new value. reused = _resource_tracker._pid in (None, pid) reused &= _resource_tracker._check_alive() conn.send(reused) @@ -6356,6 +6409,183 @@ def test_resource_tracker_blocked_signals(self): # restore sigmask to what it was before executing test signal.pthread_sigmask(signal.SIG_SETMASK, orig_sigmask) + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_fork_deadlock(self): + # gh-146313: ResourceTracker.__del__ used to deadlock if a forked + # child still held the pipe's write end open when the parent + # exited, because the parent would block in waitpid() waiting for + # the tracker to exit, but the tracker would never see EOF. + cmd = '''if 1: + import os, signal + from multiprocessing.resource_tracker import ensure_running + ensure_running() + if os.fork() == 0: + signal.pause() + os._exit(0) + # parent falls through and exits, triggering __del__ + ''' + proc = subprocess.Popen([sys.executable, '-c', cmd], + start_new_session=True) + try: + try: + proc.wait(timeout=support.SHORT_TIMEOUT) + except subprocess.TimeoutExpired: + self.fail( + "Parent process deadlocked in ResourceTracker.__del__" + ) + self.assertEqual(proc.returncode, 0) + finally: + try: + os.killpg(proc.pid, signal.SIGKILL) + except ProcessLookupError: + pass + proc.wait() + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_mp_fork_reuse_and_prompt_reap(self): + # gh-146313 / gh-80849: A child started via multiprocessing.Process + # with the 'fork' start method should reuse the parent's resource + # tracker (the at-fork handler preserves the inherited pipe fd), + # *and* the parent should be able to reap the tracker promptly + # after joining the child, without hitting the waitpid timeout. + cmd = textwrap.dedent(''' + import multiprocessing as mp + from multiprocessing.resource_tracker import _resource_tracker + + def child(conn): + # Prove we can talk to the parent's tracker by registering + # and unregistering a dummy resource over the inherited fd. + # If the fd were closed, ensure_running would launch a new + # tracker and _pid would be non-None. + _resource_tracker.register("x", "dummy") + _resource_tracker.unregister("x", "dummy") + conn.send((_resource_tracker._fd is not None, + _resource_tracker._pid is None, + _resource_tracker._check_alive())) + + if __name__ == "__main__": + mp.set_start_method("fork") + _resource_tracker.ensure_running() + r, w = mp.Pipe(duplex=False) + p = mp.Process(target=child, args=(w,)) + p.start() + child_has_fd, child_pid_none, child_alive = r.recv() + p.join() + w.close(); r.close() + + # Now simulate __del__: the child has exited and released + # its fd copy, so the tracker should see EOF and exit + # promptly -- no timeout. + _resource_tracker._stop(wait_timeout=5.0) + print(child_has_fd, child_pid_none, child_alive, + _resource_tracker._waitpid_timed_out, + _resource_tracker._exitcode) + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + parts = out.decode().split() + self.assertEqual(parts, ['True', 'True', 'True', 'False', '0'], + f"unexpected: {parts!r} stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_raw_fork_prompt_reap(self): + # gh-146313: After a raw os.fork() the at-fork handler closes the + # child's inherited fd, so the parent can reap the tracker + # immediately -- even while the child is still alive -- rather + # than waiting out the 1s timeout. + cmd = textwrap.dedent(''' + import os, signal + from multiprocessing.resource_tracker import _resource_tracker + + _resource_tracker.ensure_running() + r, w = os.pipe() + pid = os.fork() + if pid == 0: + os.close(r) + # Report whether our fd was closed by the at-fork handler. + os.write(w, b"1" if _resource_tracker._fd is None else b"0") + os.close(w) + signal.pause() # stay alive so parent's reap is meaningful + os._exit(0) + os.close(w) + child_fd_closed = os.read(r, 1) == b"1" + os.close(r) + + # Child is still alive and paused. Because it closed its fd + # copy, our close below is the last one and the tracker exits. + _resource_tracker._stop(wait_timeout=5.0) + + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + print(child_fd_closed, + _resource_tracker._waitpid_timed_out, + _resource_tracker._exitcode) + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + parts = out.decode().split() + self.assertEqual(parts, ['True', 'False', '0'], + f"unexpected: {parts!r} stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_lock_reinit_after_fork(self): + # gh-146313: If a parent thread held the tracker's lock at fork + # time, the child would inherit the held lock and deadlock on + # its next ensure_running(). The at-fork handler reinits it. + cmd = textwrap.dedent(''' + import os, threading + from multiprocessing.resource_tracker import _resource_tracker + + held = threading.Event() + release = threading.Event() + def hold(): + with _resource_tracker._lock: + held.set() + release.wait() + t = threading.Thread(target=hold) + t.start() + held.wait() + + pid = os.fork() + if pid == 0: + ok = _resource_tracker._lock.acquire(timeout=5.0) + os._exit(0 if ok else 1) + + release.set() + t.join() + _, status = os.waitpid(pid, 0) + print(os.waitstatus_to_exitcode(status)) + ''') + rc, out, err = script_helper.assert_python_ok( + '-W', 'ignore::DeprecationWarning', '-c', cmd) + self.assertEqual(out.strip(), b'0', + f"child failed to acquire lock: stderr={err!r}") + + @only_run_in_forkserver_testsuite("avoids redundant testing.") + def test_resource_tracker_safety_net_timeout(self): + # gh-146313: When an mp.Process(fork) child holds the preserved + # fd and the parent calls _stop() without joining (simulating + # abnormal shutdown), the safety-net timeout should fire rather + # than deadlocking. + cmd = textwrap.dedent(''' + import multiprocessing as mp + import signal + from multiprocessing.resource_tracker import _resource_tracker + + if __name__ == "__main__": + mp.set_start_method("fork") + _resource_tracker.ensure_running() + p = mp.Process(target=signal.pause) + p.start() + # Stop WITHOUT joining -- child still holds preserved fd + _resource_tracker._stop(wait_timeout=0.5) + print(_resource_tracker._waitpid_timed_out) + p.terminate() + p.join() + ''') + rc, out, err = script_helper.assert_python_ok('-c', cmd) + self.assertEqual(out.strip(), b'True', + f"safety-net timeout did not fire: stderr={err!r}") + + class TestSimpleQueue(unittest.TestCase): @classmethod diff --git a/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst b/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst new file mode 100644 index 00000000000000..1e44ba3bdb1b7b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-03-22-23-42-22.gh-issue-146313.RtDeAd.rst @@ -0,0 +1,4 @@ +Fix a deadlock in :class:`multiprocessing.resource_tracker.ResourceTracker` +where the parent process could hang indefinitely in :func:`os.waitpid` +during interpreter shutdown if a child created via :func:`os.fork` still +held the resource tracker's pipe open.