Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 44 additions & 15 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ def __init__(self):
self._asyncgens_shutdown_called = False
# Set to True when `loop.shutdown_default_executor` is called.
self._executor_shutdown_called = False
# Set up eager behavior for processing ready handlers
self._settings = events.EventLoopSettings(eager_timeout=1e-6,
eager_bunch_size=100)

def __repr__(self):
return (
Expand Down Expand Up @@ -2029,23 +2032,40 @@ def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()
self._run_handle(handle)

if self._settings.eager_timeout > 0 and self._ready:
deadline = (self.time() + self._settings.eager_timeout
+ self._clock_resolution)
while True:
ntodo = min(self._settings.eager_bunch_size, len(self._ready))
for i in range(ntodo):
handle = self._ready.popleft()
self._run_handle(handle)
if not self._ready:
break
if self.time() > deadline:
break

handle = None # Needed to break cycles when an exception occurs.

def _run_handle(self, handle):
if handle._cancelled:
return
if self._debug:
try:
self._current_handle = handle
t0 = self.time()
handle._run()
dt = self.time() - t0
if dt >= self.slow_callback_duration:
logger.warning('Executing %s took %.3f seconds',
_format_handle(handle), dt)
finally:
self._current_handle = None
else:
handle._run()

def _set_coroutine_origin_tracking(self, enabled):
if bool(enabled) == bool(self._coroutine_origin_tracking_enabled):
return
Expand All @@ -2069,3 +2089,12 @@ def set_debug(self, enabled):

if self.is_running():
self.call_soon_threadsafe(self._set_coroutine_origin_tracking, enabled)

def get_settings(self):
return self._settings

def set_settings(self, settings):
self._settings = settings

# Wake up the loop to apply settings immediatelly
self.call_soon_threadsafe(lambda: None)
16 changes: 16 additions & 0 deletions Lib/asyncio/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
__all__ = (
"AbstractEventLoop",
"AbstractServer",
"EventLoopSettings",
"Handle",
"TimerHandle",
"get_event_loop_policy",
Expand All @@ -20,6 +21,7 @@
)

import contextvars
import dataclasses
import os
import signal
import socket
Expand Down Expand Up @@ -251,6 +253,12 @@ async def __aexit__(self, *exc):
await self.wait_closed()


@dataclasses.dataclass(frozen=True, kw_only=True)
class EventLoopSettings:
eager_timeout: float
eager_bunch_size: int


class AbstractEventLoop:
"""Abstract event loop."""

Expand Down Expand Up @@ -661,6 +669,14 @@ def get_debug(self):
def set_debug(self, enabled):
raise NotImplementedError

# Settings management.

def get_settings(self):
raise NotImplementedError

def set_settings(self, settings):
raise NotImplementedError


class _AbstractEventLoopPolicy:
"""Abstract policy for accessing the event loop."""
Expand Down
23 changes: 22 additions & 1 deletion Lib/test/test_asyncio/test_base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,11 @@ def test_set_debug(self):
self.loop.set_debug(False)
self.assertFalse(self.loop.get_debug())

def test__run_once_schedule_handle(self):
def test__run_once_schedule_handle_non_eager(self):
handle = None
processed = False
self.loop._settings = asyncio.EventLoopSettings(eager_timeout=0,
eager_bunch_size=0)

def cb(loop):
nonlocal processed, handle
Expand All @@ -444,6 +446,25 @@ def cb(loop):
self.assertTrue(processed)
self.assertEqual([handle], list(self.loop._ready))

def test__run_once_schedule_handle_eager(self):
handle = None
processed = False

def cb(loop):
nonlocal processed, handle
processed = True
handle = loop.call_soon(lambda: True)

h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
self.loop, None)

self.loop._process_events = mock.Mock()
self.loop._scheduled.append(h)
self.loop._run_once()

self.assertTrue(processed)
self.assertEqual([], list(self.loop._ready))

def test__run_once_cancelled_event_cleanup(self):
self.loop._process_events = mock.Mock()

Expand Down
5 changes: 4 additions & 1 deletion Lib/test/test_asyncio/test_futures2.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ async def raise_exc():
async def test_task_exc_handler_correct_context(self):
# see https://github.com/python/cpython/issues/96704
name = contextvars.ContextVar('name', default='foo')
name_val = name.get()
exc_handler_called = False

def exc_handler(*args):
self.assertEqual(name.get(), 'bar')
nonlocal name_val
name_val = name.get()
nonlocal exc_handler_called
exc_handler_called = True

Expand All @@ -47,6 +49,7 @@ async def task():
self.cls(task())
await asyncio.sleep(0)
self.assertTrue(exc_handler_called)
self.assertEqual(name_val, 'bar')

async def test_handle_exc_handler_correct_context(self):
# see https://github.com/python/cpython/issues/96704
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ def test_write_eof_duplex_pipe(self):
close_transport(tr)

def test_pause_resume_reading(self):
self.loop.set_settings(asyncio.EventLoopSettings(eager_timeout=0,
eager_bunch_size=0))
tr = self.socket_transport()
index = 0
msgs = [b'data1', b'data2', b'data3', b'data4', b'data5', b'']
Expand Down
2 changes: 2 additions & 0 deletions Lib/test/test_asyncio/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def setUp(self):
self.loop = self.new_test_loop()
self.loop.set_task_factory(self.new_task)
self.loop.create_future = lambda: self.new_future(self.loop)
self.loop.set_settings(asyncio.EventLoopSettings(eager_timeout=0,
eager_bunch_size=0))

def test_generic_alias(self):
task = self.__class__.Task[str]
Expand Down
Loading