• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

barseghyanartur / django-mongoengine-filter / 4760522550

pending completion
4760522550

Pull #19

github

GitHub
Merge e0435c946 into ed5b2b52b
Pull Request #19: added is_valid() to BaseFilterSet class; fixing issue #8 allowing for…

1 of 1 new or added line in 1 file covered. (100.0%)

30 existing lines in 1 file now uncovered.

331 of 512 relevant lines covered (64.65%)

3.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.86
/django_mongoengine_filter/filterset.py
1
from __future__ import unicode_literals
5✔
2

3
from collections import OrderedDict
5✔
4
from copy import deepcopy
5✔
5

6
import mongoengine
5✔
7
import six
5✔
8
from django import forms
5✔
9
from django.core.validators import EMPTY_VALUES
5✔
10
from django.db import models
5✔
11
from django.db.models.constants import LOOKUP_SEP
5✔
12
from django.utils.text import capfirst
5✔
13
from django.utils.translation import gettext as _
5✔
14
from mongoengine.errors import LookUpError
5✔
15
from mongoengine.fields import EmbeddedDocumentField, ListField
5✔
16

17
from .filters import (  # DateFilter,; TimeFilter,
5✔
18
    BooleanFilter,
19
    ChoiceFilter,
20
    DateTimeFilter,
21
    Filter,
22
    ModelChoiceFilter,
23
    ModelMultipleChoiceFilter,
24
    NumberFilter,
25
    StringFilter,
26
)
27

28
ORDER_BY_FIELD = "o"
5✔
29

30
__all__ = (
5✔
31
    "BaseFilterSet",
32
    "FILTER_FOR_DBFIELD_DEFAULTS",
33
    "filters_for_model",
34
    "FilterSet",
35
    "filterset_factory",
36
    "FilterSetMetaclass",
37
    "FilterSetOptions",
38
    "get_declared_filters",
39
    "get_model_field",
40
    "ORDER_BY_FIELD",
41
)
42

43

44
def get_declared_filters(bases, attrs, with_base_filters=True):
5✔
45
    filters = []
5✔
46
    for filter_name, obj in list(attrs.items()):
5✔
47
        if isinstance(obj, Filter):
5✔
48
            obj = attrs.pop(filter_name)
5✔
49
            if getattr(obj, "name", None) is None:
5✔
50
                obj.name = filter_name
5✔
51
            filters.append((filter_name, obj))
5✔
52
    filters.sort(key=lambda x: x[1].creation_counter)
5✔
53

54
    if with_base_filters:
5✔
55
        for base in bases[::-1]:
×
56
            if hasattr(base, "base_filters"):
×
57
                filters = list(base.base_filters.items()) + filters
×
58
    else:
59
        for base in bases[::-1]:
5✔
60
            if hasattr(base, "declared_filters"):
5✔
61
                filters = list(base.declared_filters.items()) + filters
×
62

63
    return OrderedDict(filters)
5✔
64

65

66
def get_model_field(model, f):
5✔
67
    parts = f.split(LOOKUP_SEP)
5✔
68
    member = None
5✔
69
    if len(parts) == 1:
5✔
70
        try:
5✔
71
            return model._lookup_field(f)[0]
5✔
72
        except LookUpError:
5✔
73
            return None
5✔
74
    for part in parts[:-1]:
×
75
        try:
×
76
            member = model._lookup_field(part)[0].lookup_member(parts[-1])
×
77
        except LookUpError:
×
78
            return None
×
79
        if isinstance(member, (EmbeddedDocumentField, ListField)):
×
80
            model = member
×
81
    return member
×
82

83

84
def filters_for_model(
5✔
85
    model,
86
    fields=None,
87
    exclude=None,
88
    filter_for_field=None,
89
    filter_for_reverse_field=None,
90
):
91
    field_dict = OrderedDict()
5✔
92
    opts = model._meta
5✔
93
    if fields is None:
5✔
94
        fields = [
×
95
            f.name
96
            for f in sorted(opts.fields + opts.many_to_many)
97
            if not isinstance(f, models.AutoField)
98
        ]
99
    for f in fields:
5✔
100
        field = get_model_field(model, f)
5✔
101
        if field is None:
5✔
102
            field_dict[f] = None
5✔
103
            continue
5✔
104
        if isinstance(fields, dict):
5✔
105
            # Create a filter for each lookup type.
106
            for lookup_type in fields[f]:
×
107
                filter_ = filter_for_field(field, f, lookup_type)
×
108
                if filter_:
×
109
                    filter_name = f
×
110
                    # Don't add "exact" to filter names
111
                    if lookup_type != "exact":
×
112
                        filter_name = f + LOOKUP_SEP + lookup_type
×
113
                    field_dict[filter_name] = filter_
×
114
        else:
115
            filter_ = filter_for_field(field, f)
5✔
116
            if filter_:
5✔
117
                field_dict[f] = filter_
5✔
118
    return field_dict
5✔
119

120

121
class FilterSetOptions:
5✔
122
    def __init__(self, options=None):
5✔
123
        self.model = getattr(options, "model", None)
5✔
124
        self.fields = getattr(options, "fields", None)
5✔
125
        self.exclude = getattr(options, "exclude", None)
5✔
126
        self.order_by = getattr(options, "order_by", False)
5✔
127
        self.form = getattr(options, "form", forms.Form)
5✔
128

129

130
class FilterSetMetaclass(type):
5✔
131
    def __new__(cls, name, bases, attrs):
5✔
132
        try:
5✔
133
            parents = [b for b in bases if issubclass(b, FilterSet)]
5✔
134
        except NameError:
5✔
135
            # We are defining FilterSet itself here
136
            parents = None
5✔
137
        declared_filters = get_declared_filters(bases, attrs, False)
5✔
138
        new_class = super(FilterSetMetaclass, cls).__new__(
5✔
139
            cls, name, bases, attrs
140
        )
141

142
        if not parents:
5✔
143
            return new_class
5✔
144

145
        opts = new_class._meta = FilterSetOptions(
5✔
146
            getattr(new_class, "Meta", None)
147
        )
148
        if opts.model:
5✔
149
            filters = filters_for_model(
5✔
150
                opts.model,
151
                opts.fields,
152
                opts.exclude,
153
                new_class.filter_for_field,
154
                new_class.filter_for_reverse_field,
155
            )
156
            filters.update(declared_filters)
5✔
157
        else:
158
            filters = declared_filters
×
159

160
        if None in filters.values():
5✔
161
            raise TypeError(
×
162
                "Meta.fields contains a field that isn't defined "
163
                "on this FilterSet"
164
            )
165

166
        new_class.declared_filters = declared_filters
5✔
167
        new_class.base_filters = filters
5✔
168
        return new_class
5✔
169

170

171
FILTER_FOR_DBFIELD_DEFAULTS = {
5✔
172
    mongoengine.UUIDField: {"filter_class": NumberFilter},
173
    mongoengine.StringField: {"filter_class": StringFilter},
174
    mongoengine.BooleanField: {"filter_class": BooleanFilter},
175
    mongoengine.DateTimeField: {"filter_class": DateTimeFilter},
176
    mongoengine.DecimalField: {"filter_class": NumberFilter},
177
    mongoengine.IntField: {"filter_class": NumberFilter},
178
    mongoengine.FloatField: {"filter_class": NumberFilter},
179
    mongoengine.EmailField: {"filter_class": StringFilter},
180
    mongoengine.URLField: {"filter_class": StringFilter},
181
}
182

183

184
class BaseFilterSet:
5✔
185
    filter_overrides = {}
5✔
186
    order_by_field = ORDER_BY_FIELD
5✔
187
    strict = True
5✔
188

189
    def __init__(
5✔
190
        self,
191
        data=None,
192
        queryset=None,
193
        *,
194
        request=None,
195
        prefix=None,
196
        strict=None,
197
    ):
198
        self.is_bound = data is not None
5✔
199
        self.data = data or {}
5✔
200
        if queryset is None:
5✔
201
            queryset = self._meta.model.objects
5✔
202
        self.queryset = queryset
5✔
203
        self.request = request
5✔
204
        self.form_prefix = prefix
5✔
205
        if strict is not None:
5✔
206
            self.strict = strict
×
207

208
        self.filters = deepcopy(self.base_filters)
5✔
209
        # propagate the model being used through the filters
210
        for filter_ in self.filters.values():
5✔
211
            filter_.model = self._meta.model
5✔
212

213
        # Apply the parent to the filters, this will allow the filters to
214
        # access the filterset
215
        for filter_key, filter_ in six.iteritems(self.filters):
5✔
216
            filter_.parent = self
5✔
217

218
    def __iter__(self):
5✔
219
        for obj in self.qs:
×
220
            yield obj
×
221

222
    def __len__(self):
5✔
223
        return len(self.qs)
×
224

225
    def __getitem__(self, key):
5✔
226
        return self.qs[key]
×
227

228
    def is_valid(self):
5✔
229
        """
230
        Return True if the underlying form has no errors, or False otherwise.
231
        """
232
        return self.is_bound and self.form.is_valid()
5✔
233

234
    @property
5✔
235
    def qs(self):
4✔
236
        if not hasattr(self, "_qs"):
5✔
237
            valid = self.is_bound and self.form.is_valid()
5✔
238

239
            if self.strict and self.is_bound and not valid:
5✔
UNCOV
240
                self._qs = self.queryset.none()
×
UNCOV
241
                return self._qs
×
242

243
            # start with all the results and filter from there
244
            qs = self.queryset.all()
5✔
245
            for name, filter_ in six.iteritems(self.filters):
5✔
246
                value = None
5✔
247
                if valid:
5✔
248
                    value = self.form.cleaned_data[name]
5✔
249
                else:
250
                    raw_value = self.form[name].value()
5✔
251
                    try:
5✔
252
                        value = self.form.fields[name].clean(raw_value)
5✔
UNCOV
253
                    except forms.ValidationError:
×
254
                        # for invalid values either:
255
                        # strictly "apply" filter yielding no results
256
                        # and get out of here
UNCOV
257
                        if self.strict:
×
258
                            self._qs = self.queryset.none()
×
UNCOV
259
                            return self._qs
×
260
                        else:  # or ignore this filter altogether
UNCOV
261
                            pass
×
262

263
                if value is not None:  # valid & clean data
5✔
264
                    qs = filter_.filter(qs, value)
5✔
265

266
            if self._meta.order_by:
5✔
267
                order_field = self.form.fields[self.order_by_field]
×
268
                data = self.form[self.order_by_field].data
×
269
                ordered_value = None
×
270
                try:
×
UNCOV
271
                    ordered_value = order_field.clean(data)
×
272
                except forms.ValidationError:
×
273
                    pass
×
274

UNCOV
275
                if ordered_value in EMPTY_VALUES and self.strict:
×
UNCOV
276
                    ordered_value = self.form.fields[
×
277
                        self.order_by_field
278
                    ].choices[0][0]
279

UNCOV
280
                if ordered_value:
×
UNCOV
281
                    qs = qs.order_by(*self.get_order_by(ordered_value))
×
282

283
            self._qs = qs
5✔
284

285
        return self._qs
5✔
286

287
    def count(self):
5✔
288
        return self.qs.count()
5✔
289

290
    @property
5✔
291
    def form(self):
4✔
292
        if not hasattr(self, "_form"):
5✔
293
            fields = OrderedDict(
5✔
294
                [
295
                    (name, filter_.field)
296
                    for name, filter_ in six.iteritems(self.filters)
297
                ]
298
            )
299
            fields[self.order_by_field] = self.ordering_field
5✔
300
            Form = type(
5✔
301
                str("%sForm" % self.__class__.__name__),
302
                (self._meta.form,),
303
                fields,
304
            )
305
            if self.is_bound:
5✔
306
                self._form = Form(self.data, prefix=self.form_prefix)
5✔
307
            else:
308
                self._form = Form(prefix=self.form_prefix)
5✔
309
        return self._form
5✔
310

311
    def get_ordering_field(self):
5✔
312
        if self._meta.order_by:
5✔
313
            if isinstance(self._meta.order_by, (list, tuple)):
×
UNCOV
314
                if isinstance(self._meta.order_by[0], (list, tuple)):
×
315
                    # e.g. (('field', 'Display name'), ...)
UNCOV
316
                    choices = [(f[0], f[1]) for f in self._meta.order_by]
×
317
                else:
UNCOV
318
                    choices = [
×
319
                        (
320
                            f,
321
                            _("%s (descending)" % capfirst(f[1:]))
322
                            if f[0] == "-"
323
                            else capfirst(f),
324
                        )
325
                        for f in self._meta.order_by
326
                    ]
327
            else:
328
                # add asc and desc field names
329
                # use the filter's label if provided
UNCOV
330
                choices = []
×
UNCOV
331
                for f, fltr in self.filters.items():
×
UNCOV
332
                    choices.extend(
×
333
                        [
334
                            (fltr.name or f, fltr.label or capfirst(f)),
335
                            (
336
                                "-%s" % (fltr.name or f),
337
                                _(
338
                                    "%s (descending)"
339
                                    % (fltr.label or capfirst(f))
340
                                ),
341
                            ),
342
                        ]
343
                    )
UNCOV
344
            return forms.ChoiceField(
×
345
                label="Ordering", required=False, choices=choices
346
            )
347

348
    @property
5✔
349
    def ordering_field(self):
4✔
350
        if not hasattr(self, "_ordering_field"):
5✔
351
            self._ordering_field = self.get_ordering_field()
5✔
352
        return self._ordering_field
5✔
353

354
    def get_order_by(self, order_choice):
5✔
UNCOV
355
        return [order_choice]
×
356

357
    @classmethod
5✔
358
    def filter_for_field(cls, f, name, lookup_type="exact"):
5✔
359
        filter_for_field = dict(FILTER_FOR_DBFIELD_DEFAULTS)
5✔
360
        filter_for_field.update(cls.filter_overrides)
5✔
361
        default = {
5✔
362
            "name": name,
363
            "label": capfirst(
364
                f.verbose_name if getattr(f, "verbose_name", None) else f.name
365
            ),
366
            "lookup_type": lookup_type,
367
        }
368

369
        if f.choices:
5✔
370
            default["choices"] = f.choices
5✔
371
            return ChoiceFilter(**default)
5✔
372

373
        data = filter_for_field.get(f.__class__)
5✔
374
        if data is None:
5✔
375
            # could be a derived field, inspect parents
376
            for class_ in f.__class__.mro():
×
377
                # skip if class_ is models.Field or object
378
                # 1st item in mro() is original class
379
                if class_ in (f.__class__, models.Field, object):
×
380
                    continue
×
381
                data = filter_for_field.get(class_)
×
382
                if data:
×
UNCOV
383
                    break
×
UNCOV
384
            if data is None:
×
UNCOV
385
                return
×
386
        filter_class = data.get("filter_class")
5✔
387
        default.update(data.get("extra", lambda f: {})(f))
5✔
388
        if filter_class is not None:
5✔
389
            return filter_class(**default)
5✔
390

391
    @classmethod
5✔
392
    def filter_for_reverse_field(cls, f, name):
4✔
UNCOV
393
        rel = f.field.rel
×
UNCOV
394
        queryset = f.model.objects
×
UNCOV
395
        default = {
×
396
            "name": name,
397
            "label": capfirst(rel.related_name),
398
            "queryset": queryset,
399
        }
400
        if rel.multiple:
×
UNCOV
401
            return ModelMultipleChoiceFilter(**default)
×
402
        else:
UNCOV
403
            return ModelChoiceFilter(**default)
×
404

405

406
class FilterSet(six.with_metaclass(FilterSetMetaclass, BaseFilterSet)):
5✔
407
    pass
5✔
408

409

410
def filterset_factory(model):
5✔
UNCOV
411
    meta = type(str("Meta"), (object,), {"model": model})
×
UNCOV
412
    filterset = type(
×
413
        str("%sFilterSet" % model._meta.object_name),
414
        (FilterSet,),
415
        {"Meta": meta},
416
    )
UNCOV
417
    return filterset
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc