Source code for gd.utils.async_utils

import asyncio
import functools

import inspect

from gd.logging import get_logger
from gd.typing import (
    Any,
    Awaitable,
    Callable,
    Coroutine,
    Iterable,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
)

__all__ = (
    "run_blocking_io",
    "wait",
    "run",
    "gather",
    "cancel_all_tasks",
    "shutdown_loop",
    "maybe_coroutine",
    "acquire_loop",
    "aiter",
    "anext",
)

log = get_logger("gd.async")


[docs]async def run_blocking_io(func: Callable, *args, **kwargs) -> Any: """|coro| Run some blocking function in an event loop. If there is a running loop, ``'func'`` is executed in it. Otherwise, a new loop is being created and closed at the end of the execution. Example: .. code-block:: python3 def make_image(): ... # long code of creating an image # somewhere in an async function: await run_blocking_io(make_image) """ loop = acquire_loop(running=True) asyncio.set_event_loop(loop) return await loop.run_in_executor(None, functools.partial(func, *args, **kwargs))
[docs]async def gather( *aws: Sequence[Awaitable], loop: Optional[asyncio.AbstractEventLoop] = None, return_exceptions: bool = False, ) -> List[Any]: """A function that is calling :func:`asyncio.gather`. Used for less imports inside and outside gd.py. One small addition is that a sequence of awaitables can be given as the only positional argument. This way, :func:`asyncio.gather` will be run on that sequence. """ if len(aws) == 1: maybe_aw = aws[0] if not inspect.isawaitable(maybe_aw): aws = maybe_aw return await asyncio.gather(*aws, loop=loop, return_exceptions=return_exceptions)
[docs]async def wait( *fs: Iterable[Coroutine], loop: Optional[asyncio.AbstractEventLoop] = None, timeout: Optional[Union[float, int]] = None, return_when: str = "ALL_COMPLETED", ) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]: """A function that is calling :func:`asyncio.wait`. Used for less imports inside and outside of this library. Wait for the Futures and coroutines given by fs to complete. The sequence futures must not be empty. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending). Usage: .. code-block:: python3 done, pending = await gd.utils.wait(fs) .. note:: This does not raise :exc:`TimeoutError`! Futures that aren't done when the timeout occurs are returned in the second set. """ if len(fs) == 1: maybe_aw = fs[0] if not inspect.isawaitable(maybe_aw): fs = maybe_aw return await asyncio.wait(fs, loop=loop, timeout=timeout, return_when=return_when)
[docs]def run( coro: Coroutine, *, loop: Optional[asyncio.AbstractEventLoop] = None, debug: bool = False, set_to_none: bool = False, ) -> Any: """Run a |coroutine_link|_. This function runs the passed coroutine, taking care of the event loop and shutting down asynchronous generators. This function is basically ported from Python 3.7 for backwards compability with earlier versions of Python. This function cannot be called when another event loop is running in the same thread. If ``debug`` is ``True``, the event loop will be run in debug mode. This function creates a new event loop and closes it at the end if a ``loop`` is ``None``. If a loop is given, this function basically calls :meth:`asyncio.AbstractEventLoop.run_until_complete`. It should be used as a main entry point to asyncio programs, and should ideally be called only once. Example: .. code-block:: python3 async def test(pid): return pid one = gd.utils.run(test(1)) Parameters ---------- coro: |coroutine_link|_ Coroutine to run. loop: Optional[:class:`asyncio.AbstractEventLoop`] A loop to run ``coro`` with. If ``None`` or omitted, a new event loop is created. debug: :class:`bool` Whether or not to run event loop in debug mode. set_to_none: :class:`bool` Indicates if the loop should be set to None after execution. Returns ------- `Any` Anything that ``coro`` returns. """ if asyncio._get_running_loop() is not None: raise RuntimeError("Can not perform gd.utils.run() in a running event loop.") if not asyncio.iscoroutine(coro): raise ValueError(f"A coroutine was expected, got {coro!r}.") shutdown = False if loop is None: loop = asyncio.new_event_loop() shutdown = True try: asyncio.set_event_loop(loop) loop.set_debug(debug) return loop.run_until_complete(coro) finally: if shutdown: shutdown_loop(loop) if set_to_none: loop = None else: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop)
[docs]def cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None: """Cancels all tasks in a loop. Parameters ---------- loop: :class:`asyncio.AbstractEventLoop` Event loop to cancel tasks in. """ try: to_cancel = asyncio.all_tasks(loop) except AttributeError: # py < 3.7 to_cancel = asyncio.Task.all_tasks(loop) if not to_cancel: return for task in to_cancel: task.cancel() loop.run_until_complete(asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)) for task in to_cancel: if task.cancelled(): continue if task.exception() is not None: loop.call_exception_handler( { "message": "Unhandled exception during runner shutdown", "exception": task.exception(), "task": task, } )
def shutdown_loop(loop: asyncio.AbstractEventLoop) -> None: if loop.is_closed(): return try: cancel_all_tasks(loop) loop.run_until_complete(loop.shutdown_asyncgens()) except Exception as exc: log.warning(f"Error<{type(exc).__name__}> was raised. {exc}") finally: loop.close() async def maybe_coroutine(func: Callable, *args, **kwargs) -> Any: value = func(*args, **kwargs) if inspect.isawaitable(value): return await value else: return value
[docs]def acquire_loop(running: bool = False) -> asyncio.AbstractEventLoop: """Gracefully acquire a loop. The function tries to get an event loop via :func:`asyncio.get_event_loop`. On fail, returns a new loop using :func:`asyncio.new_event_loop`. Parameters ---------- running: :class:`bool` Indicates if the function should get a loop that is already running. """ try: loop = asyncio._get_running_loop() except Exception: # an error might occur actually loop = None if running and loop is not None: return loop else: try: loop = asyncio.get_event_loop() if loop.is_running() and not running: # loop is running while we have to get the non-running one, # let us raise an error to go into <except> clause. raise ValueError("Current event loop is already running.") except Exception: loop = asyncio.new_event_loop() return loop
async def aiter(some_object: Any) -> Any: return await some_object.__aiter__() async def anext(async_generator: Any) -> Any: return await async_generator.__anext__()