From 08ab9d1d0357e5dc94838c24fc9cfbe6918a0804 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Mon, 23 Feb 2026 16:36:34 +0100 Subject: [PATCH] Implement eager loop runner --- Lib/asyncio/base_events.py | 59 ++++++++++++++----- Lib/asyncio/events.py | 16 +++++ Lib/test/test_asyncio/test_base_events.py | 23 +++++++- Lib/test/test_asyncio/test_futures2.py | 5 +- Lib/test/test_asyncio/test_proactor_events.py | 2 + Lib/test/test_asyncio/test_tasks.py | 2 + 6 files changed, 90 insertions(+), 17 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 6619c87bcf5b93..17f21e3d2b910d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -447,6 +447,9 @@ def __init__(self): self._asyncgens_shutdown_called = False # Set to True when `loop.shutdown_default_executor` is called. self._executor_shutdown_called = False + # Set up eager behavior for processing ready handlers + self._settings = events.EventLoopSettings(eager_timeout=1e-6, + eager_bunch_size=100) def __repr__(self): return ( @@ -2029,23 +2032,40 @@ def _run_once(self): ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() - if handle._cancelled: - continue - if self._debug: - try: - self._current_handle = handle - t0 = self.time() - handle._run() - dt = self.time() - t0 - if dt >= self.slow_callback_duration: - logger.warning('Executing %s took %.3f seconds', - _format_handle(handle), dt) - finally: - self._current_handle = None - else: - handle._run() + self._run_handle(handle) + + if self._settings.eager_timeout > 0 and self._ready: + deadline = (self.time() + self._settings.eager_timeout + + self._clock_resolution) + while True: + ntodo = min(self._settings.eager_bunch_size, len(self._ready)) + for i in range(ntodo): + handle = self._ready.popleft() + self._run_handle(handle) + if not self._ready: + break + if self.time() > deadline: + break + handle = None # Needed to break cycles when an exception occurs. + def _run_handle(self, handle): + if handle._cancelled: + return + if self._debug: + try: + self._current_handle = handle + t0 = self.time() + handle._run() + dt = self.time() - t0 + if dt >= self.slow_callback_duration: + logger.warning('Executing %s took %.3f seconds', + _format_handle(handle), dt) + finally: + self._current_handle = None + else: + handle._run() + def _set_coroutine_origin_tracking(self, enabled): if bool(enabled) == bool(self._coroutine_origin_tracking_enabled): return @@ -2069,3 +2089,12 @@ def set_debug(self, enabled): if self.is_running(): self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled) + + def get_settings(self): + return self._settings + + def set_settings(self, settings): + self._settings = settings + + # Wake up the loop to apply settings immediatelly + self.call_soon_threadsafe(lambda: None) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index a7fb55982abe9c..77cf3a421725bf 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -7,6 +7,7 @@ __all__ = ( "AbstractEventLoop", "AbstractServer", + "EventLoopSettings", "Handle", "TimerHandle", "get_event_loop_policy", @@ -20,6 +21,7 @@ ) import contextvars +import dataclasses import os import signal import socket @@ -251,6 +253,12 @@ async def __aexit__(self, *exc): await self.wait_closed() +@dataclasses.dataclass(frozen=True, kw_only=True) +class EventLoopSettings: + eager_timeout: float + eager_bunch_size: int + + class AbstractEventLoop: """Abstract event loop.""" @@ -661,6 +669,14 @@ def get_debug(self): def set_debug(self, enabled): raise NotImplementedError + # Settings management. + + def get_settings(self): + raise NotImplementedError + + def set_settings(self, settings): + raise NotImplementedError + class _AbstractEventLoopPolicy: """Abstract policy for accessing the event loop.""" diff --git a/Lib/test/test_asyncio/test_base_events.py b/Lib/test/test_asyncio/test_base_events.py index 8c02de77c24740..ba844aaa3694d8 100644 --- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -425,9 +425,11 @@ def test_set_debug(self): self.loop.set_debug(False) self.assertFalse(self.loop.get_debug()) - def test__run_once_schedule_handle(self): + def test__run_once_schedule_handle_non_eager(self): handle = None processed = False + self.loop._settings = asyncio.EventLoopSettings(eager_timeout=0, + eager_bunch_size=0) def cb(loop): nonlocal processed, handle @@ -444,6 +446,25 @@ def cb(loop): self.assertTrue(processed) self.assertEqual([handle], list(self.loop._ready)) + def test__run_once_schedule_handle_eager(self): + handle = None + processed = False + + def cb(loop): + nonlocal processed, handle + processed = True + handle = loop.call_soon(lambda: True) + + h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), + self.loop, None) + + self.loop._process_events = mock.Mock() + self.loop._scheduled.append(h) + self.loop._run_once() + + self.assertTrue(processed) + self.assertEqual([], list(self.loop._ready)) + def test__run_once_cancelled_event_cleanup(self): self.loop._process_events = mock.Mock() diff --git a/Lib/test/test_asyncio/test_futures2.py b/Lib/test/test_asyncio/test_futures2.py index c7c0ebdac1b676..e14b21ae8ea7b3 100644 --- a/Lib/test/test_asyncio/test_futures2.py +++ b/Lib/test/test_asyncio/test_futures2.py @@ -31,10 +31,12 @@ async def raise_exc(): async def test_task_exc_handler_correct_context(self): # see https://github.com/python/cpython/issues/96704 name = contextvars.ContextVar('name', default='foo') + name_val = name.get() exc_handler_called = False def exc_handler(*args): - self.assertEqual(name.get(), 'bar') + nonlocal name_val + name_val = name.get() nonlocal exc_handler_called exc_handler_called = True @@ -47,6 +49,7 @@ async def task(): self.cls(task()) await asyncio.sleep(0) self.assertTrue(exc_handler_called) + self.assertEqual(name_val, 'bar') async def test_handle_exc_handler_correct_context(self): # see https://github.com/python/cpython/issues/96704 diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py index edfad5e11db35e..e0ea2fde77eeca 100644 --- a/Lib/test/test_asyncio/test_proactor_events.py +++ b/Lib/test/test_asyncio/test_proactor_events.py @@ -393,6 +393,8 @@ def test_write_eof_duplex_pipe(self): close_transport(tr) def test_pause_resume_reading(self): + self.loop.set_settings(asyncio.EventLoopSettings(eager_timeout=0, + eager_bunch_size=0)) tr = self.socket_transport() index = 0 msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b''] diff --git a/Lib/test/test_asyncio/test_tasks.py b/Lib/test/test_asyncio/test_tasks.py index dc179acd86e8a6..09881ceb883e23 100644 --- a/Lib/test/test_asyncio/test_tasks.py +++ b/Lib/test/test_asyncio/test_tasks.py @@ -100,6 +100,8 @@ def setUp(self): self.loop = self.new_test_loop() self.loop.set_task_factory(self.new_task) self.loop.create_future = lambda: self.new_future(self.loop) + self.loop.set_settings(asyncio.EventLoopSettings(eager_timeout=0, + eager_bunch_size=0)) def test_generic_alias(self): task = self.__class__.Task[str]