Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 21 additions & 61 deletions Lib/concurrent/interpreters/_crossinterp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,79 +29,39 @@ def __get__(self, obj, cls):
return self.getter(None, cls)


class UnboundItem:
"""Represents a cross-interpreter item no longer bound to an interpreter.
class UnboundBehavior:
__slots__ = ('name', '_unboundop')

An item is unbound when the interpreter that added it to the
cross-interpreter container is destroyed.
"""

__slots__ = ()

@classonly
def singleton(cls, kind, module, name='UNBOUND'):
doc = cls.__doc__
if doc:
doc = doc.replace(
'cross-interpreter container', kind,
).replace(
'cross-interpreter', kind,
)
subclass = type(
f'Unbound{kind.capitalize()}Item',
(cls,),
{
"_MODULE": module,
"_NAME": name,
"__doc__": doc,
},
)
return object.__new__(subclass)

_MODULE = __name__
_NAME = 'UNBOUND'

def __new__(cls):
raise Exception(f'use {cls._MODULE}.{cls._NAME}')
def __init__(self, name, *, _unboundop):
self.name = name
self._unboundop = _unboundop

def __repr__(self):
return f'{self._MODULE}.{self._NAME}'
# return f'interpreters._queues.UNBOUND'
return f'<{self.name}>'


UNBOUND = object.__new__(UnboundItem)
UNBOUND_ERROR = object()
UNBOUND_REMOVE = object()
UNBOUND = UnboundBehavior('UNBOUND', _unboundop=3)
UNBOUND_ERROR = UnboundBehavior('UNBOUND_ERROR', _unboundop=2)
UNBOUND_REMOVE = UnboundBehavior('UNBOUND_REMOVE', _unboundop=1)

_UNBOUND_CONSTANT_TO_FLAG = {
UNBOUND_REMOVE: 1,
UNBOUND_ERROR: 2,
UNBOUND: 3,
}
_UNBOUND_FLAG_TO_CONSTANT = {v: k
for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}


def serialize_unbound(unbound):
op = unbound
def unbound_to_flag(unbounditems):
if unbounditems is None:
return -1
try:
flag = _UNBOUND_CONSTANT_TO_FLAG[op]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
return flag,

return unbounditems._unboundop
except AttributeError:
raise NotImplementedError(
f'unsupported unbound replacement object {unbounditems!r}')

def resolve_unbound(flag, exctype_destroyed):
try:
op = _UNBOUND_FLAG_TO_CONSTANT[flag]
except KeyError:
raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
if op is UNBOUND_REMOVE:
if flag == UNBOUND_REMOVE._unboundop:
# "remove" not possible here
raise NotImplementedError
elif op is UNBOUND_ERROR:
elif flag == UNBOUND_ERROR._unboundop:
raise exctype_destroyed("item's original interpreter destroyed")
elif op is UNBOUND:
elif flag == UNBOUND._unboundop:
return UNBOUND
else:
raise NotImplementedError(repr(op))
raise NotImplementedError(
f'unsupported unbound replacement op {flag!r}')
35 changes: 8 additions & 27 deletions Lib/concurrent/interpreters/_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
QueueError, QueueNotFoundError,
)
from ._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
UNBOUND, UNBOUND_ERROR, UNBOUND_REMOVE,
)

__all__ = [
Expand Down Expand Up @@ -42,24 +42,12 @@ class ItemInterpreterDestroyed(QueueError,
"""Raised from get() and get_nowait()."""


_SHARED_ONLY = 0
_PICKLED = 1


UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)


def _serialize_unbound(unbound):
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
return _crossinterp.serialize_unbound(unbound)
def _resolve_unbound(flag):
return _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)


def _resolve_unbound(flag):
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
if resolved is _crossinterp.UNBOUND:
resolved = UNBOUND
return resolved
_SHARED_ONLY = 0
_PICKLED = 1


def create(maxsize=0, *, unbounditems=UNBOUND):
Expand All @@ -71,8 +59,7 @@ def create(maxsize=0, *, unbounditems=UNBOUND):
supported values. The default value is UNBOUND, which replaces
the unbound item.
"""
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
unboundop = _crossinterp.unbound_to_flag(unbounditems)
qid = _queues.create(maxsize, unboundop, -1)
self = Queue(qid)
self._set_unbound(unboundop, unbounditems)
Expand Down Expand Up @@ -211,10 +198,7 @@ def put(self, obj, block=True, timeout=None, *,
"""
if not block:
return self.put_nowait(obj, unbounditems=unbounditems)
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
Expand All @@ -231,10 +215,7 @@ def put(self, obj, block=True, timeout=None, *,
break

def put_nowait(self, obj, *, unbounditems=None):
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
_queues.put(self._id, obj, unboundop)

def get(self, block=True, timeout=None, *,
Expand Down
52 changes: 9 additions & 43 deletions Lib/test/support/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ChannelEmptyError, ChannelNotEmptyError, # noqa: F401
)
from concurrent.interpreters._crossinterp import (
UNBOUND_ERROR, UNBOUND_REMOVE,
UNBOUND, UNBOUND_ERROR, UNBOUND_REMOVE,
)


Expand All @@ -28,20 +28,8 @@ class ItemInterpreterDestroyed(ChannelError,
"""Raised from get() and get_nowait()."""


UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)


def _serialize_unbound(unbound):
if unbound is UNBOUND:
unbound = _crossinterp.UNBOUND
return _crossinterp.serialize_unbound(unbound)


def _resolve_unbound(flag):
resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
if resolved is _crossinterp.UNBOUND:
resolved = UNBOUND
return resolved
return _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)


def create(*, unbounditems=UNBOUND):
Expand All @@ -53,8 +41,9 @@ def create(*, unbounditems=UNBOUND):
See SendChannel.send() for supported values. The default value
is UNBOUND, which replaces the unbound item when received.
"""
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
if unbounditems is None:
raise TypeError(f'unbounditems must not be None')
unboundop = _crossinterp.unbound_to_flag(unbounditems)
cid = _channels.create(unboundop, -1)
recv, send = RecvChannel(cid), SendChannel(cid)
send._set_unbound(unboundop, unbounditems)
Expand Down Expand Up @@ -179,17 +168,6 @@ class SendChannel(_ChannelEnd):

_end = 'send'

# def __new__(cls, cid, *, _unbound=None):
# if _unbound is None:
# try:
# op = _channels.get_channel_defaults(cid)
# _unbound = (op,)
# except ChannelNotFoundError:
# _unbound = _serialize_unbound(UNBOUND)
# self = super().__new__(cls, cid)
# self._unbound = _unbound
# return self

def _set_unbound(self, op, items=None):
assert not hasattr(self, '_unbound')
if items is None:
Expand Down Expand Up @@ -219,10 +197,7 @@ def send(self, obj, timeout=None, *,

This blocks until the object is received.
"""
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)

def send_nowait(self, obj, *,
Expand All @@ -233,10 +208,7 @@ def send_nowait(self, obj, *,
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
Expand All @@ -249,10 +221,7 @@ def send_buffer(self, obj, timeout=None, *,

This blocks until the object is received.
"""
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)

Expand All @@ -264,10 +233,7 @@ def send_buffer_nowait(self, obj, *,
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
unboundop = _crossinterp.unbound_to_flag(unbounditems)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)

def close(self):
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test__interpchannels.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)


REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
REPLACE = _crossinterp.UNBOUND._unboundop


# Additional tests are found in Lib/test/test_interpreters/test_channels.py.
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_interpreters/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .utils import _run_output, TestBase as _TestBase

HUGE_TIMEOUT = 3600
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
REPLACE = _crossinterp.UNBOUND._unboundop


def get_num_queues():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix spurious :exc:`KeyError` when :mod:`!concurrent.interpreters._queues` is
reloaded after import.
Loading