diff --git a/Lib/concurrent/interpreters/_crossinterp.py b/Lib/concurrent/interpreters/_crossinterp.py index a5f46b20fbb4c5..81f52e859d7cd8 100644 --- a/Lib/concurrent/interpreters/_crossinterp.py +++ b/Lib/concurrent/interpreters/_crossinterp.py @@ -29,79 +29,39 @@ def __get__(self, obj, cls): return self.getter(None, cls) -class UnboundItem: - """Represents a cross-interpreter item no longer bound to an interpreter. +class UnboundBehavior: + __slots__ = ('name', '_unboundop') - An item is unbound when the interpreter that added it to the - cross-interpreter container is destroyed. - """ - - __slots__ = () - - @classonly - def singleton(cls, kind, module, name='UNBOUND'): - doc = cls.__doc__ - if doc: - doc = doc.replace( - 'cross-interpreter container', kind, - ).replace( - 'cross-interpreter', kind, - ) - subclass = type( - f'Unbound{kind.capitalize()}Item', - (cls,), - { - "_MODULE": module, - "_NAME": name, - "__doc__": doc, - }, - ) - return object.__new__(subclass) - - _MODULE = __name__ - _NAME = 'UNBOUND' - - def __new__(cls): - raise Exception(f'use {cls._MODULE}.{cls._NAME}') + def __init__(self, name, *, _unboundop): + self.name = name + self._unboundop = _unboundop def __repr__(self): - return f'{self._MODULE}.{self._NAME}' -# return f'interpreters._queues.UNBOUND' + return f'<{self.name}>' -UNBOUND = object.__new__(UnboundItem) -UNBOUND_ERROR = object() -UNBOUND_REMOVE = object() +UNBOUND = UnboundBehavior('UNBOUND', _unboundop=3) +UNBOUND_ERROR = UnboundBehavior('UNBOUND_ERROR', _unboundop=2) +UNBOUND_REMOVE = UnboundBehavior('UNBOUND_REMOVE', _unboundop=1) -_UNBOUND_CONSTANT_TO_FLAG = { - UNBOUND_REMOVE: 1, - UNBOUND_ERROR: 2, - UNBOUND: 3, -} -_UNBOUND_FLAG_TO_CONSTANT = {v: k - for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()} - -def serialize_unbound(unbound): - op = unbound +def unbound_to_flag(unbounditems): + if unbounditems is None: + return -1 try: - flag = _UNBOUND_CONSTANT_TO_FLAG[op] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {op!r}') - return flag, - + return unbounditems._unboundop + except AttributeError: + raise NotImplementedError( + f'unsupported unbound replacement object {unbounditems!r}') def resolve_unbound(flag, exctype_destroyed): - try: - op = _UNBOUND_FLAG_TO_CONSTANT[flag] - except KeyError: - raise NotImplementedError(f'unsupported unbound replacement op {flag!r}') - if op is UNBOUND_REMOVE: + if flag == UNBOUND_REMOVE._unboundop: # "remove" not possible here raise NotImplementedError - elif op is UNBOUND_ERROR: + elif flag == UNBOUND_ERROR._unboundop: raise exctype_destroyed("item's original interpreter destroyed") - elif op is UNBOUND: + elif flag == UNBOUND._unboundop: return UNBOUND else: - raise NotImplementedError(repr(op)) + raise NotImplementedError( + f'unsupported unbound replacement op {flag!r}') diff --git a/Lib/concurrent/interpreters/_queues.py b/Lib/concurrent/interpreters/_queues.py index ee159d7de63827..e4d7bd143a5ef8 100644 --- a/Lib/concurrent/interpreters/_queues.py +++ b/Lib/concurrent/interpreters/_queues.py @@ -11,7 +11,7 @@ QueueError, QueueNotFoundError, ) from ._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, + UNBOUND, UNBOUND_ERROR, UNBOUND_REMOVE, ) __all__ = [ @@ -42,24 +42,12 @@ class ItemInterpreterDestroyed(QueueError, """Raised from get() and get_nowait().""" -_SHARED_ONLY = 0 -_PICKLED = 1 - - -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) +def _resolve_unbound(flag): + return _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) -def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved +_SHARED_ONLY = 0 +_PICKLED = 1 def create(maxsize=0, *, unbounditems=UNBOUND): @@ -71,8 +59,7 @@ def create(maxsize=0, *, unbounditems=UNBOUND): supported values. The default value is UNBOUND, which replaces the unbound item. """ - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound + unboundop = _crossinterp.unbound_to_flag(unbounditems) qid = _queues.create(maxsize, unboundop, -1) self = Queue(qid) self._set_unbound(unboundop, unbounditems) @@ -211,10 +198,7 @@ def put(self, obj, block=True, timeout=None, *, """ if not block: return self.put_nowait(obj, unbounditems=unbounditems) - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) if timeout is not None: timeout = int(timeout) if timeout < 0: @@ -231,10 +215,7 @@ def put(self, obj, block=True, timeout=None, *, break def put_nowait(self, obj, *, unbounditems=None): - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) _queues.put(self._id, obj, unboundop) def get(self, block=True, timeout=None, *, diff --git a/Lib/test/support/channels.py b/Lib/test/support/channels.py index fab1797659b312..caf94e49868f88 100644 --- a/Lib/test/support/channels.py +++ b/Lib/test/support/channels.py @@ -10,7 +10,7 @@ ChannelEmptyError, ChannelNotEmptyError, # noqa: F401 ) from concurrent.interpreters._crossinterp import ( - UNBOUND_ERROR, UNBOUND_REMOVE, + UNBOUND, UNBOUND_ERROR, UNBOUND_REMOVE, ) @@ -28,20 +28,8 @@ class ItemInterpreterDestroyed(ChannelError, """Raised from get() and get_nowait().""" -UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__) - - -def _serialize_unbound(unbound): - if unbound is UNBOUND: - unbound = _crossinterp.UNBOUND - return _crossinterp.serialize_unbound(unbound) - - def _resolve_unbound(flag): - resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) - if resolved is _crossinterp.UNBOUND: - resolved = UNBOUND - return resolved + return _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed) def create(*, unbounditems=UNBOUND): @@ -53,8 +41,9 @@ def create(*, unbounditems=UNBOUND): See SendChannel.send() for supported values. The default value is UNBOUND, which replaces the unbound item when received. """ - unbound = _serialize_unbound(unbounditems) - unboundop, = unbound + if unbounditems is None: + raise TypeError(f'unbounditems must not be None') + unboundop = _crossinterp.unbound_to_flag(unbounditems) cid = _channels.create(unboundop, -1) recv, send = RecvChannel(cid), SendChannel(cid) send._set_unbound(unboundop, unbounditems) @@ -179,17 +168,6 @@ class SendChannel(_ChannelEnd): _end = 'send' -# def __new__(cls, cid, *, _unbound=None): -# if _unbound is None: -# try: -# op = _channels.get_channel_defaults(cid) -# _unbound = (op,) -# except ChannelNotFoundError: -# _unbound = _serialize_unbound(UNBOUND) -# self = super().__new__(cls, cid) -# self._unbound = _unbound -# return self - def _set_unbound(self, op, items=None): assert not hasattr(self, '_unbound') if items is None: @@ -219,10 +197,7 @@ def send(self, obj, timeout=None, *, This blocks until the object is received. """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True) def send_nowait(self, obj, *, @@ -233,10 +208,7 @@ def send_nowait(self, obj, *, If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) # XXX Note that at the moment channel_send() only ever returns # None. This should be fixed when channel_send_wait() is added. # See bpo-32604 and gh-19829. @@ -249,10 +221,7 @@ def send_buffer(self, obj, timeout=None, *, This blocks until the object is received. """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) _channels.send_buffer(self._id, obj, unboundop, timeout=timeout, blocking=True) @@ -264,10 +233,7 @@ def send_buffer_nowait(self, obj, *, If the object is immediately received then return True (else False). Otherwise this is the same as send(). """ - if unbounditems is None: - unboundop = -1 - else: - unboundop, = _serialize_unbound(unbounditems) + unboundop = _crossinterp.unbound_to_flag(unbounditems) return _channels.send_buffer(self._id, obj, unboundop, blocking=False) def close(self): diff --git a/Lib/test/test__interpchannels.py b/Lib/test/test__interpchannels.py index 2b0aba42896c06..8a68f1e530b65c 100644 --- a/Lib/test/test__interpchannels.py +++ b/Lib/test/test__interpchannels.py @@ -17,7 +17,7 @@ ) -REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] +REPLACE = _crossinterp.UNBOUND._unboundop # Additional tests are found in Lib/test/test_interpreters/test_channels.py. diff --git a/Lib/test/test_interpreters/test_queues.py b/Lib/test/test_interpreters/test_queues.py index 77334aea3836b9..d1be70ca4c9892 100644 --- a/Lib/test/test_interpreters/test_queues.py +++ b/Lib/test/test_interpreters/test_queues.py @@ -8,11 +8,12 @@ # Raise SkipTest if subinterpreters not supported. _queues = import_helper.import_module('_interpqueues') from concurrent import interpreters +from concurrent.futures import InterpreterPoolExecutor from concurrent.interpreters import _queues as queues, _crossinterp from .utils import _run_output, TestBase as _TestBase HUGE_TIMEOUT = 3600 -REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND] +REPLACE = _crossinterp.UNBOUND._unboundop def get_num_queues(): @@ -93,6 +94,14 @@ def test_bind_release(self): with self.assertRaises(queues.QueueError): _queues.release(qid) + def test_interpreter_pool_executor_after_reload(self): + # Regression test for gh-142414 (KeyError in serialize_unbound). + importlib.reload(queues) + code = "import struct" + with InterpreterPoolExecutor(max_workers=1) as executor: + results = executor.map(exec, [code] * 1) + self.assertEqual(list(results), [None] * 1) + class QueueTests(TestBase): diff --git a/Misc/NEWS.d/next/Library/2026-02-24-17-00-13.gh-issue-142414.YnybWr.rst b/Misc/NEWS.d/next/Library/2026-02-24-17-00-13.gh-issue-142414.YnybWr.rst new file mode 100644 index 00000000000000..4c8e085724b33c --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-02-24-17-00-13.gh-issue-142414.YnybWr.rst @@ -0,0 +1,2 @@ +Fix spurious :exc:`KeyError` when :mod:`!concurrent.interpreters._queues` is +reloaded after import.