Source code for django_delayed_union.base

import abc
import inspect
from builtins import object
from functools import partial

from django.db.models import QuerySet
from future.utils import with_metaclass

from .utils import get_formatted_function_signature


class DelayedQuerySetDescriptor(with_metaclass(abc.ABCMeta, object)):
    """
    A base class for the descriptors which are used for
    :class:`DelayedQuerySet`.

    Used in conjunction with :class:`DelayedQuerySetBase` in order to set
    their names upon class creation.
    """
    def __init__(self, name=None):
        self.set_name(name)

    def set_name(self, name):
        """
        Sets the name of the descriptor to *name*.

        :param str name: the name of the descriptor
        """
        self.name = name
        if name is not None:
            self.__doc__ = self.get_docstring()

    @abc.abstractmethod
    def get_base_docstring(self):
        """
        Returns a string which will be prepended to the docstring for
        the corresponding method on :class:`django.db.models.QuerySet`.
        """

    def get_docstring(self):
        """
        Returns a docstring for this descriptor based on the corresponding
        docstring for :class:`django.db.models.QuerySet`.
        """
        docstring = self.get_base_docstring().strip()

        queryset_attr = getattr(QuerySet, self.name, None)
        queryset_doc = getattr(queryset_attr, '__doc__', '')
        if queryset_doc:
            if docstring:
                docstring += '  Documentation for *{name}*:\n'
            docstring += queryset_doc

        if inspect.ismethod(queryset_attr):
            docstring = '{}{}\n{}'.format(
                self.name,
                get_formatted_function_signature(queryset_attr),
                docstring
            )
        return docstring.strip().format(name=self.name)


class DelayedQuerySetMethod(DelayedQuerySetDescriptor):
    """
    A descriptor which acts like a method on a class.  When accessed
    as an attribute on *obj*, it returns its ``__call__`` method with
    *obj* being passed in as the first argument.
    """
    def __get__(self, obj, cls=None):
        if obj is None:
            return self
        return partial(self.__call__, obj)


class PostApplyMethod(DelayedQuerySetMethod):
    """
    When this descriptor is called, it runs :meth:`DelayedQuerySet._apply`
    first, and then calls the corresponding method on that result of
    that operation.

    For example, with a :class:`DelayedUnionQuerySet`, when we call
    ``count()`` on it, then this descriptor will first take the union
    of its component querysets and then call ``.count()`` on the result.
    """
    def __call__(self, obj, *args, **kwargs):
        return getattr(obj._apply(), self.name)(*args, **kwargs)

    def get_base_docstring(self):
        return """
        Returns the output of ``{name}(...)`` after having applied the delayed
        operation.
        """


class PostApplyProperty(DelayedQuerySetDescriptor):
    """
    When this descriptor is called, it runs :meth:`DelayedQuerySet._apply`
    first, and then returns the corresponding property on the result of
    that operation.

    For example, with a :class:`DelayedUnionQuerySet`, when we access
    ``db`` on it, then this descriptor will first take the union
    of its component querysets and then return the ``db`` property on the
    result.
    """
    def __get__(self, obj, cls=None):
        if obj is None:
            return self
        return getattr(obj._apply(), self.name)

    def __set__(self, obj, value):
        return setattr(obj._apply(), self.name, value)

    def get_base_docstring(self):
        return ""


class PassthroughMethod(DelayedQuerySetMethod):
    """
    When this descriptor is called, it calls the corresponding operation
    on all of the component querysets and then returns a clone of *obj*
    with those querysets.

    For example, when ``filter()`` is called on a :class:`DelayedUnionQuerySet`,
    it calls the corresponding filter method on all of the component
    querysets and then returns a :class:`DelayedUnionQuerySet` using
    those.
    """
    def __call__(self, obj, *args, **kwargs):
        return obj._clone([
            getattr(qs, self.name)(*args, **kwargs) for qs in obj._querysets
        ])

    def get_base_docstring(self):
        return """
        Returns the a new delayed queryset with ``{name}(...)`` having been called
        on each of the component querysets.:
        """


class FirstQuerySetPassthroughMethod(DelayedQuerySetMethod):
    """
    When this descriptor is called, returns a :class:`DelayedQuerySet`
    where the corresponding method has been applied to just the first
    component queryset.

    For example, when doing ``prefetch_related`` on a
    :class:`DelayedUnionQuerySet`, we only need to call ``prefetch_related``
    on the first queryset.  After the union is applied, then Django will
    use the prefetches from just that first queryset.
    """
    def __call__(self, obj, *args, **kwargs):
        querysets = (
            getattr(obj._querysets[0], self.name)(*args, **kwargs),
        ) + obj._querysets[1:]
        return obj._clone(querysets)

    def get_base_docstring(self):
        return """
        Returns the a new delayed queryset with ``{name}(...)`` having been
        called on the first component queryset, while the rest remain unchanged.
        """


class FirstQuerySetMethod(DelayedQuerySetMethod):
    """
    When this descriptor is called, returns the result when calling the
    corresponding method on the first queryset.
    """
    def __call__(self, obj, *args, **kwargs):
        return getattr(obj._querysets[0], self.name)(*args, **kwargs)

    def get_base_docstring(self):
        return """
        Returns the result of calling ``{name}(...)`` on the first component
        queryset.
        """


class NotImplementedMethod(DelayedQuerySetMethod):
    """
    A descriptor which raises a :class:`NotImplementedError` when called.
    """
    def __call__(self, obj, *args, **kwargs):
        raise NotImplementedError()

    def get_base_docstring(self):
        return """
        Raises :class:`NotImplementedError`.
        """


class CountPostApplyMethod(PostApplyMethod):
    def __call__(self, obj, *args, **kwargs):
        # We make sure there are no select_related calls before calling
        # count to ensure we don't get an error on MySQL when doing
        # SELECT COUNT(*) from subquery where there are multiple columns
        # with the same name in subquery.
        if obj._result_cache is not None:
            return len(obj._result_cache)

        obj = obj.select_related(None)
        return super(CountPostApplyMethod, self).__call__(obj, *args, **kwargs)


class DelayedQuerySetBase(abc.ABCMeta):
    """
    This is the metaclass for :`DelayedQuerySet`.  It's purpose is to make
    sure that the names are set for all :class:`DelayedQuerySetDescriptor`
    instances in the class.
    """
    def __new__(cls, name, bases, attrs):
        for key, value in attrs.items():
            if isinstance(value, DelayedQuerySetDescriptor):
                value.set_name(key)
        return super(DelayedQuerySetBase, cls).__new__(cls, name, bases, attrs)


[docs]class DelayedQuerySet(with_metaclass(DelayedQuerySetBase, object)): """ A class used to work around some of the issues with Django's built-in support for set operations with querysets (such as ``UNION``). The primary issue is that after ```.union()`` call is made any subsequent filtering will silently fail. This class works around that issue by maintaing all of the individual querysets and not applying an operation like ``.union()`` until it's needed. For example, suppose we have ``qs = DelayedUnionQuerySet(qs0, qs1)```, then running ``qs = qs.filter(id=42)`` will be equivalent to doing ``qs = DelayedUnionQuerySet(qs0.filter(id=42), qs1.filter(id=42))``. Then, when we actually need to evaluate the queryset say by doing ``obj = qs.first()``, it will return ``qs0.union(qs1).first()`` behind the scenes. Subclasses need to implement the :meth:`_apply_operation`, which performs the operation such as ``.union()`` that is being delayed. """ def __init__(self, *querysets, **kwargs): """ :param tuple querysets: the component querysets :param dict kwargs: these are captured and made available for use in subclasses """ if not all(isinstance(qs, QuerySet) for qs in querysets): # I think this should be able to work with DelayedQuerySets # as well, but the _apply method would need to call _apply # on any DelayedQuerySet in self._querysets. Additionally, # only certain database engines support nested set operations. raise ValueError('can only pass in QuerySets for now') self._querysets = tuple(qs.order_by() for qs in querysets) self._kwargs = kwargs self._standard_ordering = True self._order_by = () self._applied = None # a cache for the queryset after the operation has been applied def _apply(self): """ Returns a :class:`django.db.models.QuerySet` where :meth:`_apply_operation` has been applied and the global ordering has been set. :rtype: :class:`django.db.models.QuerySet` """ if self._applied is not None: return self._applied qs = self._apply_operation().order_by(*self._order_by) qs.query.standard_ordering = self._standard_ordering self._applied = qs return self._applied @abc.abstractmethod def _apply_operation(self): """ Returns a proper :class:`django.db.models.QuerySet` from the component queryset. This is the operation that :class:`DelayedQuerySet` is delaying. For example, :class:`DelayedUnionQuerySet` runs :meth:`django.db.models.QuerySet.union` with all of the component querysets. :rtype: :class:`django.db.models.QuerySet` """ @property def model(self): """ Returns the model class for the :class:`DelayedQuerySet`. """ return self._querysets[0].model def _clone(self, querysets=None): """ Returns a copy of this :class:`DelayedQuerySet` with the ordering preserved. :parama querysets: an optional iterable of querysets to use in the in the clone """ if querysets is None: querysets = [qs._clone() for qs in self._querysets] clone = type(self)(*querysets, **self._kwargs) clone._order_by = self._order_by clone._standard_ordering = self._standard_ordering return clone __repr__ = PostApplyMethod() __len__ = PostApplyMethod() __iter__ = PostApplyMethod() __bool__ = PostApplyMethod() __nonzero__ = PostApplyMethod() __getitem__ = PostApplyMethod() __deepcopy__ = NotImplementedMethod() __getstate__ = NotImplementedMethod() __setstate__ = NotImplementedMethod() __and__ = NotImplementedMethod() __or__ = NotImplementedMethod() _result_cache = PostApplyProperty() _add_hints = PostApplyProperty() _hints = PostApplyProperty() query = PostApplyProperty() iterator = PostApplyMethod() count = CountPostApplyMethod() earliest = PostApplyMethod() latest = PostApplyMethod() first = PostApplyMethod() last = PostApplyMethod() delete = PostApplyMethod() exists = PostApplyMethod() none = PassthroughMethod() raw = PostApplyMethod() db = PostApplyProperty() all = PassthroughMethod() filter = PassthroughMethod() exclude = PassthroughMethod() values = PassthroughMethod() values_list = PassthroughMethod() annotate = PassthroughMethod() select_related = PassthroughMethod() defer = PassthroughMethod() only = PassthroughMethod() extra = PassthroughMethod() using = PassthroughMethod() complex_filter = PassthroughMethod() prefetch_related = FirstQuerySetPassthroughMethod() as_manager = FirstQuerySetPassthroughMethod() create = FirstQuerySetMethod() bulk_create = FirstQuerySetMethod() # These are left as not implemented at the moment. # We explicity put it here so that it is obvious to # the user of DelayedQuerysSet why things are not working. distinct = NotImplementedMethod() aggregate = NotImplementedMethod() union = NotImplementedMethod() intersection = NotImplementedMethod() difference = NotImplementedMethod() update = NotImplementedMethod() select_for_update = NotImplementedMethod() get_or_create = NotImplementedMethod() update_or_create = NotImplementedMethod() # These rely on sorting on an annotated field which is not available # once the union is applied dates = NotImplementedMethod() datetimes = NotImplementedMethod()
[docs] def get(self, *args, **kwargs): """ Performs the query and returns a single object matching the given keyword arguments. .. note:: We cannot use :class:`PostApplyMethod` for this since that does additional filtering which does not work with querysets that have been "unioned" for example. """ clone = self.filter(*args, **kwargs) values = list(clone) num = len(values) if num == 1: return values[0] if not num: raise self.model.DoesNotExist( "%s matching query does not exist." % self.model._meta.object_name ) raise self.model.MultipleObjectsReturned( "get() returned more than one %s -- it returned %s!" % (self.model._meta.object_name, num) )
[docs] def order_by(self, *field_names): """ Returns a new :class:`DelayedQuerySet`` instance with the ordering changed. .. note:: We need to have a custom implementation for this because we want to change the ordering of the final queryset, not just the ordering within each component queryset. """ qs = self._clone() qs._order_by = field_names return qs
@property def ordered(self): """ Returns True if the :class:`DelayedQuerySet` is ordered -- i.e. has an order_by() clause. :rtype: bool """ return bool(self._order_by)
[docs] def reverse(self): """ Reverses the ordering of the :class:`DelayedQuerySet`. .. note:: We need to have a custom implementation for this because we want to reverse the ordering of the final queryset, not just the ordering within each component queryset. """ qs = self._clone() qs._standard_ordering = not qs._standard_ordering return qs
[docs] def in_bulk(self, id_list=None): """ Returns a dictionary mapping each of the given IDs to the object with that ID. If `id_list` isn't provided, the entire :class:`DelayedQuerySet` is evaluated. """ if id_list is not None: if not id_list: return {} qs = self.filter(pk__in=id_list).order_by() else: qs = self._clone() return {obj._get_pk_val(): obj for obj in qs}