From 1d30034ee03272db49fd7eb943647ade0a224f9d Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 30 Aug 2017 17:54:18 +0200 Subject: [PATCH 1/3] bpo-31310: multiprocessing's semaphore tracker should be launched again if crashed --- Lib/multiprocessing/semaphore_tracker.py | 24 ++++++++++++++--- Lib/test/_test_multiprocessing.py | 33 +++++++++++++++++++++--- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index d5f259c246b2a0..402d7620a37fab 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -29,6 +29,7 @@ class SemaphoreTracker(object): def __init__(self): self._lock = threading.Lock() self._fd = None + self._pid = None def getfd(self): self.ensure_running() @@ -40,8 +41,24 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the semaphore created by its parent.''' with self._lock: - if self._fd is not None: - return + if self._pid is not None: + # semaphore tracker was launched before, is it still running? + try: + pid, status = os.waitpid(self._pid, os.WNOHANG) + except ChildProcessError: + # On Linux at least, this means the forkserver died, + # as Python forces SIGCHLD to SIG_DFL. + # (https://linux.die.net/man/2/wait) + pass + else: + if not pid: + # semaphore tracker still alive + return + # semaphore tracker is dead, launch it again + os.close(self._fd) + self._fd = None + self._pid = None + fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) @@ -55,12 +72,13 @@ def ensure_running(self): exe = spawn.get_executable() args = [exe] + util._args_from_interpreter_flags() args += ['-c', cmd % r] - util.spawnv_passfds(exe, args, fds_to_pass) + pid = util.spawnv_passfds(exe, args, fds_to_pass) except: os.close(w) raise else: self._fd = w + self._pid = pid finally: os.close(r) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d6fe7d62675631..2e1b7b987d0150 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4261,14 +4261,14 @@ def test_preload_resources(self): self.fail("failed spawning forkserver or grandchild") -# -# Check that killing process does not leak named semaphores -# - @unittest.skipIf(sys.platform == "win32", "test semantics don't make sense on Windows") class TestSemaphoreTracker(unittest.TestCase): + def test_semaphore_tracker(self): + # + # Check that killing process does not leak named semaphores + # import subprocess cmd = '''if 1: import multiprocessing as mp, time, os @@ -4302,6 +4302,31 @@ def test_semaphore_tracker(self): self.assertRegex(err, expected) self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) + def check_semaphore_tracker_death(self, signum): + # bpo-31310: if the semaphore tracker process has died, it should + # be restarted implicitly. + from multiprocessing.semaphore_tracker import _semaphore_tracker + _semaphore_tracker.ensure_running() + pid = _semaphore_tracker._pid + os.kill(pid, signum) + os.waitpid(pid, 0) + + ctx = multiprocessing.get_context("spawn") + sem = ctx.Semaphore() + sem.acquire() + sem.release() + wr = weakref.ref(sem) + # ensure `sem` gets collected, which triggers communication with + # the semaphore tracker + del sem + gc.collect() + self.assertIsNone(wr()) + + def test_semaphore_tracker_sigkill(self): + # Uncatchable signal. Note the semaphore tracker ignores SIGINT. + self.check_semaphore_tracker_death(signal.SIGKILL) + + class TestSimpleQueue(unittest.TestCase): @classmethod From 459efde25f1a0dfa2cc117d84281d04636632cdb Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 30 Aug 2017 18:21:53 +0200 Subject: [PATCH 2/3] Avoid mucking with process state in test. Add a warning if the semaphore process died, as semaphores may then be leaked. --- Lib/multiprocessing/semaphore_tracker.py | 20 ++++++------- Lib/test/_test_multiprocessing.py | 36 +++++++++++++++--------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py index 402d7620a37fab..3e31bf8402ee2d 100644 --- a/Lib/multiprocessing/semaphore_tracker.py +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -43,22 +43,18 @@ def ensure_running(self): with self._lock: if self._pid is not None: # semaphore tracker was launched before, is it still running? - try: - pid, status = os.waitpid(self._pid, os.WNOHANG) - except ChildProcessError: - # On Linux at least, this means the forkserver died, - # as Python forces SIGCHLD to SIG_DFL. - # (https://linux.die.net/man/2/wait) - pass - else: - if not pid: - # semaphore tracker still alive - return - # semaphore tracker is dead, launch it again + pid, status = os.waitpid(self._pid, os.WNOHANG) + if not pid: + # => still alive + return + # => dead, launch it again os.close(self._fd) self._fd = None self._pid = None + warnings.warn('semaphore_tracker: process died unexpectedly, ' + 'relaunching. Some semaphores might leak.') + fds_to_pass = [] try: fds_to_pass.append(sys.stderr.fileno()) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 2e1b7b987d0150..15729f0d690270 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -4,6 +4,7 @@ import unittest import queue as pyqueue +import contextlib import time import io import itertools @@ -4302,29 +4303,38 @@ def test_semaphore_tracker(self): self.assertRegex(err, expected) self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1) - def check_semaphore_tracker_death(self, signum): + def check_semaphore_tracker_death(self, signum, should_die): # bpo-31310: if the semaphore tracker process has died, it should # be restarted implicitly. from multiprocessing.semaphore_tracker import _semaphore_tracker _semaphore_tracker.ensure_running() pid = _semaphore_tracker._pid os.kill(pid, signum) - os.waitpid(pid, 0) + time.sleep(1.0) # give it time to die ctx = multiprocessing.get_context("spawn") - sem = ctx.Semaphore() - sem.acquire() - sem.release() - wr = weakref.ref(sem) - # ensure `sem` gets collected, which triggers communication with - # the semaphore tracker - del sem - gc.collect() - self.assertIsNone(wr()) + with contextlib.ExitStack() as stack: + if should_die: + stack.enter_context(self.assertWarnsRegex( + UserWarning, + "semaphore_tracker: process died")) + sem = ctx.Semaphore() + sem.acquire() + sem.release() + wr = weakref.ref(sem) + # ensure `sem` gets collected, which triggers communication with + # the semaphore tracker + del sem + gc.collect() + self.assertIsNone(wr()) + + def test_semaphore_tracker_sigint(self): + # Catchable signal (ignored by semaphore tracker) + self.check_semaphore_tracker_death(signal.SIGINT, False) def test_semaphore_tracker_sigkill(self): - # Uncatchable signal. Note the semaphore tracker ignores SIGINT. - self.check_semaphore_tracker_death(signal.SIGKILL) + # Uncatchable signal. + self.check_semaphore_tracker_death(signal.SIGKILL, True) class TestSimpleQueue(unittest.TestCase): From ee72cba20c82da48b5f307c79f0f5f7f28c8576e Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Wed, 30 Aug 2017 18:23:56 +0200 Subject: [PATCH 3/3] Add NEWS entry --- .../NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst diff --git a/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst new file mode 100644 index 00000000000000..4d340f07364019 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-08-30-18-23-54.bpo-31310.7D1UNt.rst @@ -0,0 +1 @@ +multiprocessing's semaphore tracker should be launched again if crashed.