Source code for gd.events.utils

import asyncio
import atexit
import threading

from gd import tasks
from gd.events.listener import all_listeners, get_loop, run_loop, set_loop, shutdown_loop
from gd.typing import List, Optional

__all__ = (
    "attach_to_loop",
    "cancel_tasks",
    "disable",
    "enable",
    "run",
    "shutdown_loops",
    "shutdown_threads",
    "start",
)

# these are used internally
_GD_EVENT_RUNNING = "_gd_event_running"

_current_thread = 1
_loops: List[asyncio.AbstractEventLoop] = []
_tasks: List[tasks.Loop] = []
_threads: List[threading.Thread] = []


def cancel_tasks() -> None:
    """Cancel all running task-loop objects."""
    for task in _tasks:
        try:
            task.cancel()
        except Exception:  # noqa
            pass  # uwu

    _tasks.clear()


def shutdown_loops() -> None:
    """Shutdown all loops used for event listening."""
    for loop in _loops:
        try:
            shutdown_loop(loop)

        except Exception:  # noqa
            pass  # sorry

    _loops.clear()


def shutdown_threads() -> None:
    """Join all threads (should be daemon). Call this function after shutting loops down."""
    for thread in _threads:
        thread.join()

    _threads.clear()


[docs]def disable() -> None: """Shorthand for calling all three functions: .. code-block:: python3 gd.events.cancel_tasks() gd.events.shutdown_loops() gd.events.shutdown_threads() This function is intended to be called at the end of programs, and is also run on shutdown. """ cancel_tasks() shutdown_loops() shutdown_threads()
[docs]def enable(loop: Optional[asyncio.AbstractEventLoop] = None) -> None: """Attach all listeners to the ``loop``. If not provided, default loop is used.""" if loop is None: loop = get_loop() if getattr(loop, _GD_EVENT_RUNNING, False): return _loops.append(loop) for listener in all_listeners: task = tasks.loop(seconds=listener.delay, loop=loop)(listener.main) _tasks.append(task) task.start() setattr(loop, _GD_EVENT_RUNNING, True)
[docs]def attach_to_loop(loop: asyncio.AbstractEventLoop) -> None: """Set ``loop`` as default loop in gd.events.""" set_loop(loop)
[docs]def run(loop: Optional[asyncio.AbstractEventLoop] = None) -> None: """Run listeners in a given loop.""" if loop is None: loop = get_loop() enable(loop) try: run_loop(loop) except KeyboardInterrupt: pass
[docs]def start(loop: Optional[asyncio.AbstractEventLoop] = None) -> None: """Does the same as :func:`.run`, but in the remote thread.""" global _current_thread thread = threading.Thread( target=run, args=(loop,), name=f"ListenerThread-{_current_thread}", daemon=True ) _current_thread += 1 _threads.append(thread) thread.start()
atexit.register(disable)