Source code for asynctest.mock

# coding: utf-8
"""
Module ``mock``
---------------

Wrapper to unittest.mock reducing the boilerplate when testing asyncio powered
code.

A mock can behave as a coroutine, as specified in the documentation of
:class:`~asynctest.mock.Mock`.
"""

import asyncio
import asyncio.coroutines
import contextlib
import enum
import functools
import inspect
import sys
import types
import unittest.mock


if sys.version_info >= (3, 5):
    from . import _awaitable

    async_magic_coroutines = ("__aenter__", "__aexit__", "__anext__")
    _async_magics = async_magic_coroutines + ("__aiter__", )

    async_magic_coroutines = set(async_magic_coroutines)
    _async_magics = set(_async_magics)

    # We use unittest.mock.MagicProxy which works well, but it's not aware that
    # we want __aexit__ to return a falsy value by default.
    # We add the entry in unittest internal dict as it will not change the
    # normal behavior of unittest.
    unittest.mock._return_values["__aexit__"] = False

    def _get_async_iter(mock):
        def __aiter__():
            return_value = mock.__aiter__._mock_return_value
            if return_value is DEFAULT:
                iterator = iter([])
            else:
                iterator = iter(return_value)

            return _awaitable.AsyncIterator(iterator)

        if asyncio.iscoroutinefunction(mock.__aiter__):
            return asyncio.coroutine(__aiter__)

        return __aiter__

    unittest.mock._side_effect_methods["__aiter__"] = _get_async_iter
else:
    _awaitable = None
    async_magic_coroutines = _async_magics = set()

# From python 3.6, a sentinel object is used to mark coroutines (rather than
# a boolean) to prevent a mock/proxy object to return a truthy value.
# see: https://github.com/python/asyncio/commit/ea776a11f632a975ad3ebbb07d8981804aa292db
try:
    _is_coroutine = asyncio.coroutines._is_coroutine
except AttributeError:
    _is_coroutine = True


try:
    # Python 3.5+
    _isawaitable = inspect.isawaitable
except AttributeError:
    _isawaitable = asyncio.iscoroutine


def _raise(exception):
    raise exception


def _is_started(patching):
    if isinstance(patching, _patch_dict):
        return patching._is_started
    else:
        return unittest.mock._is_started(patching)


class FakeInheritanceMeta(type):
    """
    A metaclass which recreates the original inheritance model from
    unittest.mock.

    - NonCallableMock > NonCallableMagicMock
    - NonCallable > Mock
    - Mock > MagicMock
    """
    def __init__(self, name, bases, attrs):
        attrs['__new__'] = types.MethodType(self.__new, self)
        super().__init__(name, bases, attrs)

    @staticmethod
    def __new(cls, *args, **kwargs):
        new = type(cls.__name__, (cls, ), {'__doc__': cls.__doc__})
        return object.__new__(new, *args, **kwargs)

    def __instancecheck__(cls, obj):
        # That's tricky, each type(mock) is actually a subclass of the actual
        # Mock type (see __new__)
        if super().__instancecheck__(obj):
            return True

        _type = type(obj)
        if issubclass(cls, NonCallableMock):
            if issubclass(_type, (NonCallableMagicMock, Mock, )):
                return True

        if issubclass(cls, Mock) and not issubclass(cls, CoroutineMock):
            if issubclass(_type, (MagicMock, )):
                return True

        return False


def _get_is_coroutine(self):
    return self.__dict__['_mock_is_coroutine']


def _set_is_coroutine(self, value):
    # property setters and getters are overridden by Mock(), we need to
    # update the dict to add values
    value = _is_coroutine if bool(value) else False
    self.__dict__['_mock_is_coroutine'] = value


def _mock_add_spec(self, spec, *args, **kwargs):
    unittest.mock.NonCallableMock._mock_add_spec(self, spec, *args, **kwargs)

    _spec_coroutines = []
    for attr in dir(spec):
        if asyncio.iscoroutinefunction(getattr(spec, attr)):
            _spec_coroutines.append(attr)

    self.__dict__['_spec_coroutines'] = _spec_coroutines


def _get_child_mock(self, *args, **kwargs):
    _new_name = kwargs.get("_new_name")
    if _new_name in self.__dict__['_spec_coroutines']:
        return CoroutineMock(*args, **kwargs)

    _type = type(self)

    if issubclass(_type, MagicMock) and _new_name in async_magic_coroutines:
        klass = CoroutineMock
    elif issubclass(_type, CoroutineMock):
        klass = MagicMock
    elif not issubclass(_type, unittest.mock.CallableMixin):
        if issubclass(_type, unittest.mock.NonCallableMagicMock):
            klass = MagicMock
        elif issubclass(_type, NonCallableMock):
            klass = Mock
    else:
        klass = _type.__mro__[1]

    return klass(*args, **kwargs)


class MockMetaMixin(FakeInheritanceMeta):
    def __new__(meta, name, base, namespace):
        if not any((isinstance(baseclass, meta) for baseclass in base)):
            # this ensures that inspect.iscoroutinefunction() doesn't return
            # True when testing a mock.
            code_mock = unittest.mock.NonCallableMock(spec_set=types.CodeType)
            code_mock.co_flags = 0

            namespace.update({
                '_mock_add_spec': _mock_add_spec,
                '_get_child_mock': _get_child_mock,
                '__code__': code_mock,
            })

        return super().__new__(meta, name, base, namespace)


class IsCoroutineArgMeta(MockMetaMixin):
    def __new__(meta, name, base, namespace):
        if not any((isinstance(baseclass, meta) for baseclass in base)):
            namespace.update({
                '_asynctest_get_is_coroutine': _get_is_coroutine,
                '_asynctest_set_is_coroutine': _set_is_coroutine,
                'is_coroutine': property(_get_is_coroutine, _set_is_coroutine,
                                         doc="True if the object mocked is a coroutine"),
                '_is_coroutine': property(_get_is_coroutine),
            })

            def __setattr__(self, name, value):
                if name == 'is_coroutine':
                    self._asynctest_set_is_coroutine(value)
                else:
                    return base[0].__setattr__(self, name, value)

            namespace['__setattr__'] = __setattr__

        return super().__new__(meta, name, base, namespace)


class AsyncMagicMixin:
    """
    Add support for async magic methods to :class:`MagicMock` and
    :class:`NonCallableMagicMock`.

    Actually, it's a shameless copy-paste of :class:`unittest.mock.MagicMixin`:
        when added to our classes, it will just do exactly what its
        :mod:`unittest` counterpart does, but for magic methods. It adds some
        behavior but should be compatible with future additions of
        :class:`MagicMock`.
    """
    # Magic methods are invoked as type(obj).__magic__(obj), as seen in
    # PEP-343 (with) and PEP-492 (async with)
    def __init__(self, *args, **kwargs):
        self._mock_set_async_magics()  # make magic work for kwargs in init
        unittest.mock._safe_super(AsyncMagicMixin, self).__init__(*args, **kwargs)
        self._mock_set_async_magics()  # fix magic broken by upper level init

    def _mock_set_async_magics(self):
        these_magics = _async_magics

        if getattr(self, "_mock_methods", None) is not None:
            these_magics = _async_magics.intersection(self._mock_methods)
            remove_magics = _async_magics - these_magics

            for entry in remove_magics:
                if entry in type(self).__dict__:
                    # remove unneeded magic methods
                    delattr(self, entry)

        # don't overwrite existing attributes if called a second time
        these_magics = these_magics - set(type(self).__dict__)

        _type = type(self)
        for entry in these_magics:
            setattr(_type, entry, unittest.mock.MagicProxy(entry, self))

    def mock_add_spec(self, *args, **kwargs):
        unittest.mock.MagicMock.mock_add_spec(self, *args, **kwargs)
        self._mock_set_async_magics()

    def __setattr__(self, name, value):
        _mock_methods = getattr(self, '_mock_methods', None)
        if _mock_methods is None or name in _mock_methods:
            if name in _async_magics:
                if not unittest.mock._is_instance_mock(value):
                    setattr(type(self), name,
                            unittest.mock._get_method(name, value))
                    original = value

                    def value(*args, **kwargs):
                        return original(self, *args, **kwargs)
                else:
                    unittest.mock._check_and_set_parent(self, value, None, name)
                    setattr(type(self), name, value)
                    self._mock_children[name] = value

                return object.__setattr__(self, name, value)

        unittest.mock._safe_super(AsyncMagicMixin, self).__setattr__(name, value)


# Notes about unittest.mock:
#  - MagicMock > Mock > NonCallableMock (where ">" means inherits from)
#  - when a mock instance is created, a new class (type) is created
#    dynamically,
#  - we *must* use magic or object's internals when we want to add our own
#    properties, and often override __getattr__/__setattr__ which are used
#    in unittest.mock.NonCallableMock.
[docs]class NonCallableMock(unittest.mock.NonCallableMock, metaclass=IsCoroutineArgMeta): """ Enhance :class:`unittest.mock.NonCallableMock` with features allowing to mock a coroutine function. If ``is_coroutine`` is set to ``True``, the :class:`NonCallableMock` object will behave so :func:`asyncio.iscoroutinefunction` will return ``True`` with ``mock`` as parameter. If ``spec`` or ``spec_set`` is defined and an attribute is get, :class:`~asynctest.CoroutineMock` is returned instead of :class:`~asynctest.Mock` when the matching spec attribute is a coroutine function. The test author can also specify a wrapped object with ``wraps``. In this case, the :class:`~asynctest.Mock` object behavior is the same as with an :class:`unittest.mock.Mock` object: the wrapped object may have methods defined as coroutine functions. See :class:`unittest.mock.NonCallableMock` """ def __init__(self, spec=None, wraps=None, name=None, spec_set=None, is_coroutine=None, parent=None, **kwargs): super().__init__(spec=spec, wraps=wraps, name=name, spec_set=spec_set, parent=parent, **kwargs) self._asynctest_set_is_coroutine(is_coroutine)
class NonCallableMagicMock(AsyncMagicMixin, unittest.mock.NonCallableMagicMock, metaclass=IsCoroutineArgMeta): """ A version of :class:`~asynctest.MagicMock` that isn't callable. """ def __init__(self, spec=None, wraps=None, name=None, spec_set=None, is_coroutine=None, parent=None, **kwargs): super().__init__(spec=spec, wraps=wraps, name=name, spec_set=spec_set, parent=parent, **kwargs) self._asynctest_set_is_coroutine(is_coroutine)
[docs]class Mock(unittest.mock.Mock, metaclass=MockMetaMixin): """ Enhance :class:`unittest.mock.Mock` so it returns a :class:`~asynctest.CoroutineMock` object instead of a :class:`~asynctest.Mock` object where a method on a ``spec`` or ``spec_set`` object is a coroutine. For instance: >>> class Foo: ... @asyncio.coroutine ... def foo(self): ... pass ... ... def bar(self): ... pass >>> type(asynctest.mock.Mock(Foo()).foo) <class 'asynctest.mock.CoroutineMock'> >>> type(asynctest.mock.Mock(Foo()).bar) <class 'asynctest.mock.Mock'> The test author can also specify a wrapped object with ``wraps``. In this case, the :class:`~asynctest.Mock` object behavior is the same as with an :class:`unittest.mock.Mock` object: the wrapped object may have methods defined as coroutine functions. If you want to mock a coroutine function, use :class:`CoroutineMock` instead. See :class:`~asynctest.NonCallableMock` for details about :mod:`asynctest` features, and :mod:`unittest.mock` for the comprehensive documentation about mocking. """
[docs]class MagicMock(AsyncMagicMixin, unittest.mock.MagicMock, metaclass=MockMetaMixin): """ Enhance :class:`unittest.mock.MagicMock` so it returns a :class:`~asynctest.CoroutineMock` object instead of a :class:`~asynctest.Mock` object where a method on a ``spec`` or ``spec_set`` object is a coroutine. If you want to mock a coroutine function, use :class:`CoroutineMock` instead. :class:`MagicMock` allows to mock ``__aenter__``, ``__aexit__``, ``__aiter__`` and ``__anext__``. When mocking an asynchronous iterator, you can set the ``return_value`` of ``__aiter__`` to an iterable to define the list of values to be returned during iteration. You can not mock ``__await__``. If you want to mock an object implementing __await__, :class:`CoroutineMock` will likely be sufficient. see :class:`~asynctest.Mock`. .. versionadded:: 0.11 support of asynchronous iterators and asynchronous context managers. """
class _AwaitEvent: def __init__(self, mock): self._mock = mock self._condition = None @asyncio.coroutine def wait(self, skip=0): """ Wait for await. :param skip: How many awaits will be skipped. As a result, the mock should be awaited at least ``skip + 1`` times. """ def predicate(mock): return mock.await_count > skip return (yield from self.wait_for(predicate)) @asyncio.coroutine def wait_next(self, skip=0): """ Wait for the next await. Unlike :meth:`wait` that counts any await, mock has to be awaited once more, disregarding to the current :attr:`asynctest.CoroutineMock.await_count`. :param skip: How many awaits will be skipped. As a result, the mock should be awaited at least ``skip + 1`` more times. """ await_count = self._mock.await_count def predicate(mock): return mock.await_count > await_count + skip return (yield from self.wait_for(predicate)) @asyncio.coroutine def wait_for(self, predicate): """ Wait for a given predicate to become True. :param predicate: A callable that receives mock which result will be interpreted as a boolean value. The final predicate value is the return value. """ condition = self._get_condition() try: yield from condition.acquire() def _predicate(): return predicate(self._mock) return (yield from condition.wait_for(_predicate)) finally: condition.release() @asyncio.coroutine def _notify(self): condition = self._get_condition() try: yield from condition.acquire() condition.notify_all() finally: condition.release() def _get_condition(self): """ Creation of condition is delayed, to minimize the change of using the wrong loop. A user may create a mock with _AwaitEvent before selecting the execution loop. Requiring a user to delay creation is error-prone and inflexible. Instead, condition is created when user actually starts to use the mock. """ # No synchronization is needed: # - asyncio is thread unsafe # - there are no awaits here, method will be executed without # switching asyncio context. if self._condition is None: self._condition = asyncio.Condition() return self._condition def __bool__(self): return self._mock.await_count != 0
[docs]class CoroutineMock(Mock): """ Enhance :class:`~asynctest.mock.Mock` with features allowing to mock a coroutine function. The :class:`~asynctest.CoroutineMock` object will behave so the object is recognized as coroutine function, and the result of a call as a coroutine: >>> mock = CoroutineMock() >>> asyncio.iscoroutinefunction(mock) True >>> asyncio.iscoroutine(mock()) True The result of ``mock()`` is a coroutine which will have the outcome of ``side_effect`` or ``return_value``: - if ``side_effect`` is a function, the coroutine will return the result of that function, - if ``side_effect`` is an exception, the coroutine will raise the exception, - if ``side_effect`` is an iterable, the coroutine will return the next value of the iterable, however, if the sequence of result is exhausted, ``StopIteration`` is raised immediately, - if ``side_effect`` is not defined, the coroutine will return the value defined by ``return_value``, hence, by default, the coroutine returns a new :class:`~asynctest.CoroutineMock` object. If the outcome of ``side_effect`` or ``return_value`` is a coroutine, the mock coroutine obtained when the mock object is called will be this coroutine itself (and not a coroutine returning a coroutine). The test author can also specify a wrapped object with ``wraps``. In this case, the :class:`~asynctest.Mock` object behavior is the same as with an :class:`unittest.mock.Mock` object: the wrapped object may have methods defined as coroutine functions. """ #: Property which is set when the mock is awaited. Its ``wait`` and #: ``wait_next`` coroutine methods can be used to synchronize execution. #: #: .. versionadded:: 0.12 awaited = unittest.mock._delegating_property('awaited') #: Number of times the mock has been awaited. #: #: .. versionadded:: 0.12 await_count = unittest.mock._delegating_property('await_count') await_args = unittest.mock._delegating_property('await_args') await_args_list = unittest.mock._delegating_property('await_args_list') def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # asyncio.iscoroutinefunction() checks this property to say if an # object is a coroutine # It is set through __dict__ because when spec_set is True, this # attribute is likely undefined. self.__dict__['_is_coroutine'] = _is_coroutine self.__dict__['_mock_awaited'] = _AwaitEvent(self) self.__dict__['_mock_await_count'] = 0 self.__dict__['_mock_await_args'] = None self.__dict__['_mock_await_args_list'] = unittest.mock._CallList() def _mock_call(_mock_self, *args, **kwargs): try: result = super()._mock_call(*args, **kwargs) except StopIteration as e: side_effect = _mock_self.side_effect if side_effect is not None and not callable(side_effect): raise result = asyncio.coroutine(_raise)(e) except BaseException as e: result = asyncio.coroutine(_raise)(e) _call = _mock_self.call_args @asyncio.coroutine def proxy(): try: if _isawaitable(result): return (yield from result) else: return result finally: _mock_self.await_count += 1 _mock_self.await_args = _call _mock_self.await_args_list.append(_call) yield from _mock_self.awaited._notify() return proxy()
[docs] def assert_awaited(_mock_self): """ Assert that the mock was awaited at least once. .. versionadded:: 0.12 """ self = _mock_self if self.await_count == 0: msg = ("Expected '%s' to have been awaited." % self._mock_name or 'mock') raise AssertionError(msg)
[docs] def assert_awaited_once(_mock_self, *args, **kwargs): """ Assert that the mock was awaited exactly once. .. versionadded:: 0.12 """ self = _mock_self if not self.await_count == 1: msg = ("Expected '%s' to have been awaited once. Awaited %s times." % (self._mock_name or 'mock', self.await_count)) raise AssertionError(msg)
[docs] def assert_awaited_with(_mock_self, *args, **kwargs): """ Assert that the last await was with the specified arguments. .. versionadded:: 0.12 """ self = _mock_self if self.await_args is None: expected = self._format_mock_call_signature(args, kwargs) raise AssertionError('Expected await: %s\nNot awaited' % (expected,)) def _error_message(): msg = self._format_mock_failure_message(args, kwargs) return msg expected = self._call_matcher((args, kwargs)) actual = self._call_matcher(self.await_args) if expected != actual: cause = expected if isinstance(expected, Exception) else None raise AssertionError(_error_message()) from cause
[docs] def assert_awaited_once_with(_mock_self, *args, **kwargs): """ Assert that the mock was awaited exactly once and with the specified arguments. .. versionadded:: 0.12 """ self = _mock_self if not self.await_count == 1: msg = ("Expected '%s' to be awaited once. Awaited %s times." % (self._mock_name or 'mock', self.await_count)) raise AssertionError(msg) return self.assert_awaited_with(*args, **kwargs)
[docs] def assert_any_await(_mock_self, *args, **kwargs): """ Assert the mock has ever been awaited with the specified arguments. .. versionadded:: 0.12 """ self = _mock_self expected = self._call_matcher((args, kwargs)) actual = [self._call_matcher(c) for c in self.await_args_list] if expected not in actual: cause = expected if isinstance(expected, Exception) else None expected_string = self._format_mock_call_signature(args, kwargs) raise AssertionError( '%s await not found' % expected_string ) from cause
[docs] def assert_has_awaits(_mock_self, calls, any_order=False): """ Assert the mock has been awaited with the specified calls. The :attr:`await_args_list` list is checked for the awaits. If `any_order` is False (the default) then the awaits must be sequential. There can be extra calls before or after the specified awaits. If `any_order` is True then the awaits can be in any order, but they must all appear in :attr:`await_args_list`. .. versionadded:: 0.12 """ self = _mock_self expected = [self._call_matcher(c) for c in calls] cause = expected if isinstance(expected, Exception) else None all_awaits = unittest.mock._CallList(self._call_matcher(c) for c in self.await_args_list) if not any_order: if expected not in all_awaits: raise AssertionError( 'Awaits not found.\nExpected: %r\n' 'Actual: %r' % (unittest.mock._CallList(calls), self.await_args_list) ) from cause return all_awaits = list(all_awaits) not_found = [] for kall in expected: try: all_awaits.remove(kall) except ValueError: not_found.append(kall) if not_found: raise AssertionError( '%r not all found in await list' % (tuple(not_found),) ) from cause
[docs] def assert_not_awaited(_mock_self): """ Assert that the mock was never awaited. .. versionadded:: 0.12 """ self = _mock_self if self.await_count != 0: msg = ("Expected '%s' to not have been awaited. Awaited %s times." % (self._mock_name or 'mock', self.await_count)) raise AssertionError(msg)
[docs] def reset_mock(self, *args, **kwargs): """ See :func:`unittest.mock.Mock.reset_mock()` """ super().reset_mock(*args, **kwargs) self.awaited = _AwaitEvent(self) self.await_count = 0 self.await_args = None self.await_args_list = unittest.mock._CallList()
[docs]def create_autospec(spec, spec_set=False, instance=False, _parent=None, _name=None, **kwargs): """ Create a mock object using another object as a spec. Attributes on the mock will use the corresponding attribute on the spec object as their spec. ``spec`` can be a coroutine function, a class or object with coroutine functions as attributes. If ``spec`` is a coroutine function, and ``instance`` is not ``False``, a :exc:`RuntimeError` is raised. .. versionadded:: 0.12 """ if unittest.mock._is_list(spec): spec = type(spec) is_type = isinstance(spec, type) is_coroutine_func = asyncio.iscoroutinefunction(spec) _kwargs = {'spec': spec} if spec_set: _kwargs = {'spec_set': spec} elif spec is None: # None we mock with a normal mock without a spec _kwargs = {} if _kwargs and instance: _kwargs['_spec_as_instance'] = True _kwargs.update(kwargs) Klass = MagicMock if inspect.isdatadescriptor(spec): _kwargs = {} elif is_coroutine_func: if instance: raise RuntimeError("Instance can not be True when create_autospec " "is mocking a coroutine function") Klass = CoroutineMock elif not unittest.mock._callable(spec): Klass = NonCallableMagicMock elif is_type and instance and not unittest.mock._instance_callable(spec): Klass = NonCallableMagicMock _name = _kwargs.pop('name', _name) _new_name = _name if _parent is None: _new_name = '' mock = Klass(parent=_parent, _new_parent=_parent, _new_name=_new_name, name=_name, **_kwargs) if isinstance(spec, unittest.mock.FunctionTypes): wrapped_mock = mock # _set_signature returns an object wrapping the mock, not the mock # itself. mock = unittest.mock._set_signature(mock, spec) if is_coroutine_func: # Can't wrap the mock with asyncio.coroutine because it doesn't # detect a CoroWrapper as an awaitable in debug mode. # It is safe to do so because the mock object wrapped by # _set_signature returns the result of the CoroutineMock itself, # which is a Coroutine (as defined in CoroutineMock._mock_call) mock._is_coroutine = _is_coroutine mock.awaited = _AwaitEvent(mock) mock.await_count = 0 mock.await_args = None mock.await_args_list = unittest.mock._CallList() for a in ('assert_awaited', 'assert_awaited_once', 'assert_awaited_with', 'assert_awaited_once_with', 'assert_any_await', 'assert_has_awaits', 'assert_not_awaited'): setattr(mock, a, getattr(wrapped_mock, a)) else: unittest.mock._check_signature(spec, mock, is_type, instance) if _parent is not None and not instance: _parent._mock_children[_name] = mock if is_type and not instance and 'return_value' not in kwargs: mock.return_value = create_autospec(spec, spec_set, instance=True, _name='()', _parent=mock) for entry in dir(spec): if unittest.mock._is_magic(entry): continue try: original = getattr(spec, entry) except AttributeError: continue kwargs = {'spec': original} if spec_set: kwargs = {'spec_set': original} if not isinstance(original, unittest.mock.FunctionTypes): new = unittest.mock._SpecState(original, spec_set, mock, entry, instance) mock._mock_children[entry] = new else: parent = mock if isinstance(spec, unittest.mock.FunctionTypes): parent = mock.mock skipfirst = unittest.mock._must_skip(spec, entry, is_type) kwargs['_eat_self'] = skipfirst if asyncio.iscoroutinefunction(original): child_klass = CoroutineMock else: child_klass = MagicMock new = child_klass(parent=parent, name=entry, _new_name=entry, _new_parent=parent, **kwargs) mock._mock_children[entry] = new unittest.mock._check_signature(original, new, skipfirst=skipfirst) if isinstance(new, unittest.mock.FunctionTypes): setattr(mock, entry, new) return mock
[docs]def mock_open(mock=None, read_data=''): """ A helper function to create a mock to replace the use of :func:`open()`. It works for :func:`open()` called directly or used as a context manager. :param mock: mock object to configure, by default a :class:`~asynctest.MagicMock` object is created with the API limited to methods or attributes available on standard file handles. :param read_data: string for the :func:`read()` and :func:`readlines()` of the file handle to return. This is an empty string by default. """ if mock is None: mock = MagicMock(name='open', spec=open) return unittest.mock.mock_open(mock, read_data)
ANY = unittest.mock.ANY DEFAULT = unittest.mock.sentinel.DEFAULT def _update_new_callable(patcher, new, new_callable): if new == DEFAULT and not new_callable: original = patcher.get_original()[0] if isinstance(original, (classmethod, staticmethod)): # the original object is the raw descriptor, if it's a classmethod # or a static method, we need to unwrap it original = original.__get__(None, object) if asyncio.iscoroutinefunction(original): patcher.new_callable = CoroutineMock else: patcher.new_callable = MagicMock return patcher PatchScope = enum.Enum('PatchScope', 'LIMITED GLOBAL') #: Value of ``scope``, deactivating a patch when a decorated generator or #: a coroutine pauses (``yield`` or ``await``). LIMITED = PatchScope.LIMITED #: Value of ``scope``, activating a patch until the decorated generator or #: coroutine returns or raises an exception. GLOBAL = PatchScope.GLOBAL def _decorate_coroutine_callable(func, new_patching): if hasattr(func, 'patchings'): func.patchings.append(new_patching) return func # Python 3.5 returns True for is_generator_func(new_style_coroutine) if # there is an "await" statement in the function body, which is wrong. It is # fixed in 3.6, but I can't find which commit fixes this. # The only way to work correctly with 3.5 and 3.6 seems to use # inspect.iscoroutinefunction() is_generator_func = inspect.isgeneratorfunction(func) is_coroutine_func = asyncio.iscoroutinefunction(func) try: is_native_coroutine_func = inspect.iscoroutinefunction(func) except AttributeError: is_native_coroutine_func = False if not (is_generator_func or is_coroutine_func): return None patchings = [new_patching] def patched_factory(*args, **kwargs): extra_args = [] patchers_to_exit = [] patch_dict_with_limited_scope = [] exc_info = tuple() try: for patching in patchings: arg = patching.__enter__() if patching.scope == LIMITED: patchers_to_exit.append(patching) if isinstance(patching, _patch_dict): if patching.scope == GLOBAL: for limited_patching in patch_dict_with_limited_scope: if limited_patching.in_dict is patching.in_dict: limited_patching._keep_global_patch(patching) else: patch_dict_with_limited_scope.append(patching) else: if patching.attribute_name is not None: kwargs.update(arg) if patching.new is DEFAULT: patching.new = arg[patching.attribute_name] elif patching.new is DEFAULT: patching.mock_to_reuse = arg extra_args.append(arg) args += tuple(extra_args) gen = func(*args, **kwargs) return _PatchedGenerator(gen, patchings, asyncio.iscoroutinefunction(func)) except BaseException: if patching not in patchers_to_exit and _is_started(patching): # the patcher may have been started, but an exception # raised whilst entering one of its additional_patchers patchers_to_exit.append(patching) # Pass the exception to __exit__ exc_info = sys.exc_info() # re-raise the exception raise finally: for patching in reversed(patchers_to_exit): patching.__exit__(*exc_info) # wrap the factory in a native coroutine or a generator to respect # introspection. if is_native_coroutine_func: # inspect.iscoroutinefunction() returns True patched = _awaitable.make_native_coroutine(patched_factory) elif is_generator_func: # inspect.isgeneratorfunction() returns True def patched_generator(*args, **kwargs): return (yield from patched_factory(*args, **kwargs)) patched = patched_generator if is_coroutine_func: # asyncio.iscoroutinefunction() returns True patched = asyncio.coroutine(patched) else: patched = patched_factory patched.patchings = patchings return functools.wraps(func)(patched) class _PatchedGenerator(asyncio.coroutines.CoroWrapper): # Inheriting from asyncio.CoroWrapper gives us a comprehensive wrapper # implementing one or more workarounds for cpython bugs def __init__(self, gen, patchings, is_coroutine): self.gen = gen self._is_coroutine = is_coroutine self.__name__ = getattr(gen, '__name__', None) self.__qualname__ = getattr(gen, '__qualname__', None) self.patchings = patchings self.global_patchings = [p for p in patchings if p.scope == GLOBAL] self.limited_patchings = [p for p in patchings if p.scope == LIMITED] # GLOBAL patches have been started in the _patch/patched() wrapper def _limited_patchings_stack(self): with contextlib.ExitStack() as stack: for patching in self.limited_patchings: stack.enter_context(patching) return stack.pop_all() def _stop_global_patchings(self): for patching in reversed(self.global_patchings): if _is_started(patching): patching.stop() def __repr__(self): return repr(self.generator) def __next__(self): try: with self._limited_patchings_stack(): return self.gen.send(None) except BaseException: # the generator/coroutine terminated, stop the patchings self._stop_global_patchings() raise def send(self, value): with self._limited_patchings_stack(): return super().send(value) def throw(self, exc, value=None, traceback=None): with self._limited_patchings_stack(): return self.gen.throw(exc, value, traceback) def close(self): try: with self._limited_patchings_stack(): return self.gen.close() finally: self._stop_global_patchings() def __del__(self): # The generator/coroutine is deleted before it terminated, we must # still stop the patchings self._stop_global_patchings() class _patch(unittest.mock._patch): def __init__(self, *args, scope=GLOBAL, **kwargs): super().__init__(*args, **kwargs) self.scope = scope self.mock_to_reuse = None def copy(self): patcher = _patch( self.getter, self.attribute, self.new, self.spec, self.create, self.spec_set, self.autospec, self.new_callable, self.kwargs, scope=self.scope) patcher.attribute_name = self.attribute_name patcher.additional_patchers = [ p.copy() for p in self.additional_patchers ] return patcher def __enter__(self): # When patching a coroutine, we reuse the same mock object if self.mock_to_reuse is not None: self.target = self.getter() self.temp_original, self.is_local = self.get_original() setattr(self.target, self.attribute, self.mock_to_reuse) if self.attribute_name is not None: for patching in self.additional_patchers: patching.__enter__() return self.mock_to_reuse else: return self._perform_patch() def _perform_patch(self): # This will intercept the result of super().__enter__() if we need to # override the default behavior (ie: we need to use our own autospec). original, local = self.get_original() result = super().__enter__() if self.autospec is None or not self.autospec: # no need to override the default behavior return result if self.autospec is True: autospec = original else: autospec = self.autospec new = create_autospec(autospec, spec_set=bool(self.spec_set), _name=self.attribute, **self.kwargs) self.temp_original = original self.is_local = local setattr(self.target, self.attribute, new) if self.attribute_name is not None: if self.new is DEFAULT: result[self.attribute_name] = new return result return new def decorate_callable(self, func): wrapped = _decorate_coroutine_callable(func, self) if wrapped is None: return super().decorate_callable(func) else: return wrapped
[docs]def patch(target, new=DEFAULT, spec=None, create=False, spec_set=None, autospec=None, new_callable=None, scope=GLOBAL, **kwargs): """ A context manager, function decorator or class decorator which patches the target with the value given by the ``new`` argument. ``new`` specifies which object will replace the ``target`` when the patch is applied. By default, the target will be patched with an instance of :class:`~asynctest.CoroutineMock` if it is a coroutine, or a :class:`~asynctest.MagicMock` object. It is a replacement to :func:`unittest.mock.patch`, but using :mod:`asynctest.mock` objects. When a generator or a coroutine is patched using the decorator, the patch is activated or deactivated according to the ``scope`` argument value: * :const:`asynctest.GLOBAL`: the default, enables the patch until the generator or the coroutine finishes (returns or raises an exception), * :const:`asynctest.LIMITED`: the patch will be activated when the generator or coroutine is being executed, and deactivated when it yields a value and pauses its execution (with ``yield``, ``yield from`` or ``await``). The behavior differs from :func:`unittest.mock.patch` for generators. When used as a context manager, the patch is still active even if the generator or coroutine is paused, which may affect concurrent tasks:: @asyncio.coroutine def coro(): with asynctest.mock.patch("module.function"): yield from asyncio.get_event_loop().sleep(1) @asyncio.coroutine def independent_coro(): assert not isinstance(module.function, asynctest.mock.Mock) asyncio.create_task(coro()) asyncio.create_task(independent_coro()) # this will raise an AssertionError(coro() is scheduled first)! loop.run_forever() :param scope: :const:`asynctest.GLOBAL` or :const:`asynctest.LIMITED`, controls when the patch is activated on generators and coroutines When used as a decorator with a generator based coroutine, the order of the decorators matters. The order of the ``@patch()`` decorators is in the reverse order of the parameters produced by these patches for the patched function. And the ``@asyncio.coroutine`` decorator should be the last since ``@patch()`` conceptually patches the coroutine, not the function:: @patch("module.function2") @patch("module.function1") @asyncio.coroutine def test_coro(self, mock_function1, mock_function2): yield from asyncio.get_event_loop().sleep(1) see :func:`unittest.mock.patch()`. .. versionadded:: 0.6 patch into generators and coroutines with a decorator. """ getter, attribute = unittest.mock._get_target(target) patcher = _patch(getter, attribute, new, spec, create, spec_set, autospec, new_callable, kwargs, scope=scope) return _update_new_callable(patcher, new, new_callable)
def _patch_object(target, attribute, new=DEFAULT, spec=None, create=False, spec_set=None, autospec=None, new_callable=None, scope=GLOBAL, **kwargs): patcher = _patch(lambda: target, attribute, new, spec, create, spec_set, autospec, new_callable, kwargs, scope=scope) return _update_new_callable(patcher, new, new_callable) def _patch_multiple(target, spec=None, create=False, spec_set=None, autospec=None, new_callable=None, scope=GLOBAL, **kwargs): if type(target) is str: def getter(): return unittest.mock._importer(target) else: def getter(): return target if not kwargs: raise ValueError('Must supply at least one keyword argument with ' 'patch.multiple') items = list(kwargs.items()) attribute, new = items[0] patcher = _patch(getter, attribute, new, spec, create, spec_set, autospec, new_callable, {}, scope=scope) patcher.attribute_name = attribute for attribute, new in items[1:]: this_patcher = _patch(getter, attribute, new, spec, create, spec_set, autospec, new_callable, {}, scope=scope) this_patcher.attribute_name = attribute patcher.additional_patchers.append(this_patcher) def _update(patcher): return _update_new_callable(patcher, patcher.new, new_callable) patcher = _update(patcher) patcher.additional_patchers = list(map(_update, patcher.additional_patchers)) return patcher class _patch_dict(unittest.mock._patch_dict): # documentation is in doc/asynctest.mock.rst def __init__(self, in_dict, values=(), clear=False, scope=GLOBAL, **kwargs): super().__init__(in_dict, values, clear, **kwargs) self.scope = scope self._is_started = False self._global_patchings = [] def _keep_global_patch(self, other_patching): self._global_patchings.append(other_patching) def decorate_class(self, klass): for attr in dir(klass): attr_value = getattr(klass, attr) if (attr.startswith(patch.TEST_PREFIX) and hasattr(attr_value, "__call__")): decorator = _patch_dict(self.in_dict, self.values, self.clear) decorated = decorator(attr_value) setattr(klass, attr, decorated) return klass def __call__(self, func): if isinstance(func, type): return self.decorate_class(func) wrapper = _decorate_coroutine_callable(func, self) if wrapper is None: return super().__call__(func) else: return wrapper def _patch_dict(self): self._is_started = True # Since Python 3.7.3, the moment when a dict specified by a target # string has been corrected. (see #115) if isinstance(self.in_dict, str): self.in_dict = unittest.mock._importer(self.in_dict) try: self._original = self.in_dict.copy() except AttributeError: # dict like object with no copy method # must support iteration over keys self._original = {} for key in self.in_dict: self._original[key] = self.in_dict[key] if self.clear: _clear_dict(self.in_dict) try: self.in_dict.update(self.values) except AttributeError: # dict like object with no update method for key in self.values: self.in_dict[key] = self.values[key] def _unpatch_dict(self): self._is_started = False if self.scope == LIMITED: # add to self.values the updated values which where not in # the original dict, as the patch may be reactivated for key in self.in_dict: if (key not in self._original or self._original[key] is not self.in_dict[key]): self.values[key] = self.in_dict[key] _clear_dict(self.in_dict) originals = [self._original] for patching in self._global_patchings: if patching._is_started: # keep the values of global patches originals.append(patching.values) for original in originals: try: self.in_dict.update(original) except AttributeError: for key in original: self.in_dict[key] = original[key] _clear_dict = unittest.mock._clear_dict patch.object = _patch_object patch.dict = _patch_dict patch.multiple = _patch_multiple patch.stopall = unittest.mock._patch_stopall patch.TEST_PREFIX = 'test' sentinel = unittest.mock.sentinel call = unittest.mock.call PropertyMock = unittest.mock.PropertyMock
[docs]def return_once(value, then=None): """ Helper to use with ``side_effect``, so a mock will return a given value only once, then return another value. When used as a ``side_effect`` value, if one of ``value`` or ``then`` is an :class:`Exception` type, an instance of this exception will be raised. >>> mock.recv = Mock(side_effect=return_once(b"data")) >>> mock.recv() b"data" >>> repr(mock.recv()) 'None' >>> repr(mock.recv()) 'None' >>> mock.recv = Mock(side_effect=return_once(b"data", then=BlockingIOError)) >>> mock.recv() b"data" >>> mock.recv() Traceback BlockingIOError :param value: value to be returned once by the mock when called. :param then: value returned for any subsequent call. .. versionadded:: 0.4 """ yield value while True: yield then