Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Lib/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# This relies on each of the submodules having an __all__ variable.
from .base_events import *
from .cancelscope import *
from .coroutines import *
from .events import *
from .exceptions import *
Expand All @@ -24,6 +25,7 @@
from .transports import *

__all__ = (base_events.__all__ +
cancelscope.__all__ +
coroutines.__all__ +
events.__all__ +
exceptions.__all__ +
Expand Down
154 changes: 154 additions & 0 deletions Lib/asyncio/cancelscope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""CancelScope — level-triggered cancellation for asyncio."""

__all__ = ('CancelScope', 'cancel_scope', 'cancel_scope_at')

from . import events
from . import exceptions
from . import tasks


class CancelScope:
"""Async context manager providing level-triggered cancellation.
Once cancelled (via :meth:`cancel` or deadline expiry), every subsequent
``await`` inside the scope raises :exc:`~asyncio.CancelledError` until the
scope exits. The coroutine cannot simply catch-and-ignore the error.
Parameters
----------
deadline : float or None
Absolute event-loop time after which the scope auto-cancels.
shield : bool
If ``True``, the level-triggered re-injection is suppressed while
the scope is the current scope.
"""

def __init__(self, *, deadline=None, shield=False):
self._deadline = deadline
self._shield = shield
self._cancel_called = False
self._task = None
self._parent_scope = None
self._timeout_handle = None
self._host_task_cancelling = 0
self._cancelled_caught = False

# -- public properties ---------------------------------------------------

@property
def deadline(self):
"""Absolute event-loop time of the deadline, or *None*."""
return self._deadline

@deadline.setter
def deadline(self, value):
self._deadline = value
if self._task is not None and not self._task.done():
self._reschedule()

@property
def shield(self):
"""Whether level-triggered re-injection is suppressed."""
return self._shield

@shield.setter
def shield(self, value):
self._shield = value

@property
def cancel_called(self):
"""``True`` after :meth:`cancel` has been called."""
return self._cancel_called

@property
def cancelled_caught(self):
"""``True`` if the scope caught the :exc:`CancelledError` on exit."""
return self._cancelled_caught

# -- control methods -----------------------------------------------------

def cancel(self):
"""Cancel this scope.
All subsequent awaits inside the scope will raise
:exc:`~asyncio.CancelledError`.
"""
if not self._cancel_called:
self._cancel_called = True
if self._task is not None and not self._task.done():
self._task.cancel()

def reschedule(self, deadline):
"""Change the deadline.
If *deadline* is ``None`` the timeout is removed.
"""
self._deadline = deadline
if self._task is not None:
self._reschedule()

# -- async context manager -----------------------------------------------

async def __aenter__(self):
task = tasks.current_task()
if task is None:
# Fallback: _PyTask uses Python-level tracking that the
# C current_task() does not see.
task = tasks._py_current_task()
if task is None:
raise RuntimeError("CancelScope requires a running task")
self._task = task
self._host_task_cancelling = task.cancelling()
self._parent_scope = task._current_cancel_scope
task._current_cancel_scope = self
if self._deadline is not None:
loop = events.get_running_loop()
self._timeout_handle = loop.call_at(
self._deadline, self._on_deadline)
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._timeout_handle is not None:
self._timeout_handle.cancel()
self._timeout_handle = None

# Pop scope stack
self._task._current_cancel_scope = self._parent_scope

if self._cancel_called:
# Consume the one cancel() we injected, bringing the task's
# cancellation counter back to where it was on __aenter__.
if self._task.cancelling() > self._host_task_cancelling:
self._task.uncancel()
if exc_type is not None and issubclass(
exc_type, exceptions.CancelledError):
self._cancelled_caught = True
return True # suppress the CancelledError

return False

# -- internal helpers ----------------------------------------------------

def _reschedule(self):
if self._timeout_handle is not None:
self._timeout_handle.cancel()
self._timeout_handle = None
if self._deadline is not None and not self._task.done():
loop = events.get_running_loop()
self._timeout_handle = loop.call_at(
self._deadline, self._on_deadline)

def _on_deadline(self):
self._timeout_handle = None
self.cancel()


def cancel_scope(delay, *, shield=False):
"""Return a :class:`CancelScope` that expires *delay* seconds from now."""
loop = events.get_running_loop()
return CancelScope(deadline=loop.time() + delay, shield=shield)


def cancel_scope_at(when, *, shield=False):
"""Return a :class:`CancelScope` that expires at absolute time *when*."""
return CancelScope(deadline=when, shield=shield)
61 changes: 60 additions & 1 deletion Lib/asyncio/taskgroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,40 @@
# license: PSFL.


__all__ = ("TaskGroup",)
__all__ = ("TaskGroup", "TaskStatus")

from . import events
from . import exceptions
from . import futures
from . import tasks


class TaskStatus:
"""Status object passed to tasks started via :meth:`TaskGroup.start`.

The task calls :meth:`started` to signal readiness, passing an
optional value back to the ``start()`` caller.
"""

def __init__(self):
self._future = None # set by TaskGroup.start()
self._started = False

def started(self, value=None):
"""Signal that the task is ready.

*value* is returned to the ``await TaskGroup.start(...)`` caller.
May only be called once.
"""
if self._started:
raise RuntimeError("task already signalled readiness")
if self._future is None or self._future.done():
raise RuntimeError(
"TaskStatus is not associated with a pending start()")
self._started = True
self._future.set_result(value)


class TaskGroup:
"""Asynchronous context manager for managing groups of tasks.

Expand Down Expand Up @@ -210,6 +236,39 @@ def create_task(self, coro, **kwargs):
# task.exception().__traceback__->TaskGroup.create_task->task
del task

async def start(self, coro_fn, *args, name=None, context=None):
"""Start a task and wait until it signals readiness.

*coro_fn* is called as ``coro_fn(*args, task_status=task_status)``.
The coroutine must call ``task_status.started(value)`` to signal
readiness. The *value* passed to ``started()`` is returned by
this method. The task continues running in the group after
``started()`` is called.
"""
if not self._entered:
raise RuntimeError(f"TaskGroup {self!r} has not been entered")
if self._exiting and not self._tasks:
raise RuntimeError(f"TaskGroup {self!r} is finished")
if self._aborting:
raise RuntimeError(f"TaskGroup {self!r} is shutting down")

task_status = TaskStatus()
task_status._future = self._loop.create_future()

coro = coro_fn(*args, task_status=task_status)
kwargs = {}
if name is not None:
kwargs['name'] = name
if context is not None:
kwargs['context'] = context
task = self.create_task(coro, **kwargs)

try:
return await task_status._future
except BaseException:
task.cancel()
raise

# Since Python 3.8 Tasks propagate all exceptions correctly,
# except for KeyboardInterrupt and SystemExit which are
# still considered special.
Expand Down
6 changes: 6 additions & 0 deletions Lib/asyncio/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def __init__(self, coro, *, loop=None, name=None, context=None,
self._must_cancel = False
self._fut_waiter = None
self._coro = coro
self._current_cancel_scope = None
if context is None:
self._context = contextvars.copy_context()
else:
Expand Down Expand Up @@ -271,6 +272,11 @@ def __step(self, exc=None):
if not isinstance(exc, exceptions.CancelledError):
exc = self._make_cancelled_error()
self._must_cancel = False
elif (self._current_cancel_scope is not None
and self._current_cancel_scope._cancel_called
and not self._current_cancel_scope._shield
and not isinstance(exc, exceptions.CancelledError)):
exc = self._make_cancelled_error()
self._fut_waiter = None

_py_enter_task(self._loop, self)
Expand Down
Loading
Loading