Source code for asynctest.selector

# coding: utf-8
"""
Module ``selector``
-------------------

Mock of :mod:`selectors` and compatible objects performing asynchronous IO.

This module provides classes to mock objects performing IO (files, sockets,
etc). These mocks are compatible with :class:`~asynctest.TestSelector`, which
can simulate the behavior of a selector on the mock objects, or forward actual
work to a real selector.
"""

import asyncio
try:
    import selectors
except ImportError:
    # In the case of Python 3.3, attempt to use the selectors
    # modules from within the asyncio package
    import asyncio.selectors as selectors
import socket

try:
    import ssl
except ImportError:
    # allow python to be compiled without ssl
    ssl = None

from . import mock
from . import _fail_on


[docs]class FileDescriptor(int): """ A subclass of int which allows to identify the virtual file-descriptor of a :class:`~asynctest.FileMock`. If :class:`~asynctest.FileDescriptor()` without argument, its value will be the value of :data:`~FileDescriptor.next_fd`. When an object is created, :data:`~FileDescriptor.next_fd` is set to the highest value for a :class:`~asynctest.FileDescriptor` object + 1. """ next_fd = 0 def __new__(cls, *args, **kwargs): if not args and not kwargs: s = super().__new__(cls, FileDescriptor.next_fd) else: s = super().__new__(cls, *args, **kwargs) FileDescriptor.next_fd = max(FileDescriptor.next_fd + 1, s + 1) return s def __hash__(self): # Return a different hash than the int so we can register both a # FileDescriptor object and an int of the same value return hash('__FileDescriptor_{}'.format(self))
[docs]def fd(fileobj): """ Return the :class:`~asynctest.FileDescriptor` value of ``fileobj``. If ``fileobj`` is a :class:`~asynctest.FileDescriptor`, ``fileobj`` is returned, else ``fileobj.fileno()`` is returned instead. Note that if fileobj is an int, :exc:`ValueError` is raised. :raise ValueError: if ``fileobj`` is not a :class:`~asynctest.FileMock`, a file-like object or a :class:`~asynctest.FileDescriptor`. """ try: return fileobj if isinstance(fileobj, FileDescriptor) else fileobj.fileno() except Exception: raise ValueError
[docs]def isfilemock(obj): """ Return ``True`` if the ``obj`` or ``obj.fileno()`` is a :class:`asynctest.FileDescriptor`. """ try: return (isinstance(obj, FileDescriptor) or isinstance(obj.fileno(), FileDescriptor)) except AttributeError: # obj has no attribute fileno() return False
[docs]class FileMock(mock.Mock): """ Mock a file-like object. A FileMock is an intelligent mock which can work with TestSelector to simulate IO events during tests. .. method:: fileno() Return a :class:`~asynctest.FileDescriptor` object. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fileno.return_value = FileDescriptor() def _get_child_mock(self, *args, **kwargs): # A FileMock returns a Mock by default, not a FileMock return mock.Mock(**kwargs)
[docs]class SocketMock(FileMock): """ Mock a socket. See :class:`~asynctest.FileMock`. """ def __init__(self, side_effect=None, return_value=mock.DEFAULT, wraps=None, name=None, spec_set=None, parent=None, **kwargs): super().__init__(socket.socket, side_effect=side_effect, return_value=return_value, wraps=wraps, name=name, spec_set=spec_set, parent=parent, **kwargs)
if ssl:
[docs] class SSLSocketMock(SocketMock): """ Mock a socket wrapped by the :mod:`ssl` module. See :class:`~asynctest.FileMock`. .. versionadded:: 0.5 """ def __init__(self, side_effect=None, return_value=mock.DEFAULT, wraps=None, name=None, spec_set=None, parent=None, **kwargs): FileMock.__init__(self, ssl.SSLSocket, side_effect=side_effect, return_value=return_value, wraps=wraps, name=name, spec_set=spec_set, parent=parent, **kwargs)
def _set_event_ready(fileobj, loop, event): selector = loop._selector fd = selector._fileobj_lookup(fileobj) if fd in selector._fd_to_key: loop._process_events([(selector._fd_to_key[fd], event)])
[docs]def set_read_ready(fileobj, loop): """ Schedule callbacks registered on ``loop`` as if the selector notified that data is ready to be read on ``fileobj``. :param fileobj: file object or :class:`~asynctest.FileMock` on which the event is mocked. :param loop: :class:`asyncio.SelectorEventLoop` watching for events on ``fileobj``. :: mock = asynctest.SocketMock() mock.recv.return_value = b"Data" def read_ready(sock): print("received:", sock.recv(1024)) loop.add_reader(mock, read_ready, mock) set_read_ready(mock, loop) loop.run_forever() # prints received: b"Data" .. versionadded:: 0.4 """ # since the selector would notify of events at the beginning of the next # iteration, we let this iteration finish before actually scheduling the # reader (hence the call_soon) loop.call_soon_threadsafe(_set_event_ready, fileobj, loop, selectors.EVENT_READ)
[docs]def set_write_ready(fileobj, loop): """ Schedule callbacks registered on ``loop`` as if the selector notified that data can be written to ``fileobj``. :param fileobj: file object or :class:`~asynctest.FileMock` on which th event is mocked. :param loop: :class:`asyncio.SelectorEventLoop` watching for events on ``fileobj``. .. versionadded:: 0.4 """ loop.call_soon_threadsafe(_set_event_ready, fileobj, loop, selectors.EVENT_WRITE)
[docs]class TestSelector(selectors._BaseSelectorImpl): """ A selector which supports IOMock objects. It can wrap an actual implementation of a selector, so the selector will work both with mocks and real file-like objects. A common use case is to patch the selector loop:: loop._selector = asynctest.TestSelector(loop._selector) :param selector: optional, if provided, this selector will be used to work with real file-like objects. """ def __init__(self, selector=None): super().__init__() self._selector = selector def _fileobj_lookup(self, fileobj): if isfilemock(fileobj): return fd(fileobj) return super()._fileobj_lookup(fileobj)
[docs] def register(self, fileobj, events, data=None): """ Register a file object or a :class:`~asynctest.FileMock`. If a real selector object has been supplied to the :class:`~asynctest.TestSelector` object and ``fileobj`` is not a :class:`~asynctest.FileMock` or a :class:`~asynctest.FileDescriptor` returned by :meth:`FileMock.fileno()`, the object will be registered to the real selector. See :meth:`selectors.BaseSelector.register`. """ if isfilemock(fileobj) or self._selector is None: key = super().register(fileobj, events, data) else: key = self._selector.register(fileobj, events, data) if key: self._fd_to_key[key.fd] = key return key
[docs] def unregister(self, fileobj): """ Unregister a file object or a :class:`~asynctest.FileMock`. See :meth:`selectors.BaseSelector.unregister`. """ if isfilemock(fileobj) or self._selector is None: key = super().unregister(fileobj) else: key = self._selector.unregister(fileobj) if key and key.fd in self._fd_to_key: del self._fd_to_key[key.fd] return key
[docs] def modify(self, fileobj, events, data=None): """ Shortcut when calling :meth:`TestSelector.unregister` then :meth:`TestSelector.register` to update the registration of a an object to the selector. See :meth:`selectors.BaseSelector.modify`. """ if isfilemock(fileobj) or self._selector is None: key = super().modify(fileobj, events, data) else: # del the key first because modify() fails if events is incorrect fd = self._fileobj_lookup(fileobj) if fd in self._fd_to_key: del self._fd_to_key[fd] key = self._selector.modify(fileobj, events, data) if key: self._fd_to_key[key.fd] = key return key
[docs] def select(self, timeout=None): """ Perform the selection. This method is a no-op if no actual selector has been supplied. See :meth:`selectors.BaseSelector.select`. """ if self._selector is None: return [] return self._selector.select(timeout)
[docs] def close(self): """ Close the selector. Close the actual selector if supplied, unregister all mocks. See :meth:`selectors.BaseSelector.close`. """ if self._selector is not None: self._selector.close() super().close()
def get_registered_events(selector): watched_events = [] for event in selector.get_map().values(): watched_events.append(event) if selector._selector is not None: # this is our TestSelector, wrapping a true selector object for event in selector._selector.get_map().values(): watched_events.append(event) return set(watched_events) if hasattr(asyncio, "format_helpers"): # Python 3.7+ def _format_callback(handle): return asyncio.format_helpers._format_callback(handle._callback, handle._args, None) elif hasattr(asyncio.events, "_format_args_and_kwargs"): # Python 3.5, 3.6 def _format_callback(handle): return asyncio.events._format_callback(handle._callback, handle._args, None) else: # Python 3.4 def _format_callback(handle): return asyncio.events._format_callback(handle._callback, handle._args) def _format_event(event): callbacks = [] if event.events & selectors.EVENT_READ: callbacks.append("add_reader({}, {})".format( event.fileobj, _format_callback(event.data[0]))) if event.events & selectors.EVENT_WRITE: callbacks.append("add_writer({}, {})".format( event.fileobj, _format_callback(event.data[1]))) return callbacks def fail_on_before_test_active_selector_callbacks(case): case._active_selector_callbacks = get_registered_events( case.loop._selector) def fail_on_active_selector_callbacks(case): ignored_events = case._active_selector_callbacks active_events = get_registered_events(case.loop._selector) output = ["some events watched during the tests were not removed:"] for c in map(_format_event, active_events - ignored_events): output.extend(c) if len(output) > 1: case.fail("\n - ".join(output)) _fail_on.DEFAULTS["active_selector_callbacks"] = False _fail_on._fail_on.active_selector_callbacks = staticmethod(fail_on_active_selector_callbacks) _fail_on._fail_on.before_test_active_selector_callbacks = \ staticmethod(fail_on_before_test_active_selector_callbacks)