• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ProteinsWebTeam / interpro7-api / 9887981828

11 Jul 2024 08:20AM UTC coverage: 93.827% (-0.2%) from 94.023%
9887981828

Pull #151

github

web-flow
Update webfront/serializers/content_serializers.py

Co-authored-by: Matthias Blum <mat.blum@gmail.com>
Pull Request #151: Replacing `_subset` with `_url` in filter by DB requests

429 of 436 new or added lines in 29 files covered. (98.39%)

6 existing lines in 5 files now uncovered.

9089 of 9687 relevant lines covered (93.83%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.01
/webfront/views/queryset_manager.py
1
from webfront.models import (
1✔
2
    Entry,
3
    Protein,
4
    Structure,
5
    Taxonomy,
6
    Proteome,
7
    Set,
8
)
9
from django.db.models import Q
1✔
10
from functools import reduce
1✔
11
from operator import or_
1✔
12
import re
1✔
13

14

15
def escape(text):
1✔
16
    return re.sub(r'([-+!(){}[\]^"~*?:\\\/])', r"\\\1", str(text))
1✔
17

18

19
def merge_two_dicts(x, y):
1✔
20
    z = x.copy()  # start with x's keys and values
1✔
21
    z.update(y)  # modifies z with y's keys and values & returns None
1✔
22
    return z
1✔
23

24

25
def uses_wildcards(query):
1✔
26
    return re.match(r'[-+!(){}[\]^"~*?:\\\/]', query) is not None
1✔
27

28

29
class QuerysetManager:
1✔
30
    main_endpoint = None
1✔
31
    filters = {}
1✔
32
    exclusions = {}
1✔
33
    endpoints = []
1✔
34
    order_field = None
1✔
35
    order_field_in_pagination = True
1✔
36
    other_fields = None
1✔
37
    show_subset = False
1✔
38

39
    def reset_filters(self, endpoint, endpoint_levels=[]):
1✔
40
        self.main_endpoint = endpoint
1✔
41
        self.endpoints = endpoint_levels
1✔
42
        self.filters = {
1✔
43
            "search": {},
44
            "searcher": {},
45
            "entry": {},
46
            "structure": {},
47
            "protein": {},
48
            "taxonomy": {},
49
            "proteome": {},
50
            "set": {},
51
        }
52
        self.exclusions = self.filters.copy()
1✔
53
        self.order_field = None
1✔
54
        self.order_field_in_pagination = True
1✔
55

56
    def set_main_endpoint(self, endpoint):
1✔
57
        self.main_endpoint = endpoint
×
58

59
    def add_filter(self, endpoint, **kwargs):
1✔
60
        self.filters[endpoint] = merge_two_dicts(self.filters[endpoint], kwargs)
1✔
61

62
    def add_exclusion(self, endpoint, **kwargs):
1✔
63
        self.exclusions[endpoint] = merge_two_dicts(self.exclusions[endpoint], kwargs)
×
64

65
    def remove_filter(self, endpoint, f):
1✔
66
        tmp = self.filters[endpoint][f]
1✔
67
        del self.filters[endpoint][f]
1✔
68
        return tmp
1✔
69

70
    def order_by(self, field, for_pagination=True):
1✔
71
        self.order_field = field
1✔
72
        self.order_field_in_pagination = for_pagination
1✔
73

74
    def get_order(self):
1✔
75
        return self.order_field if self.order_field_in_pagination else None
1✔
76

77
    # Generates a query string for elasticsearch from the registered queryset filters.
78
    # It explicitely goes through all the filters and create the query string case by case.
79
    def get_searcher_query(self, include_search=False, use_lineage=False):
1✔
80
        blocks = []
1✔
81
        search_blocks = []
1✔
82
        for ep in self.filters:
1✔
83
            for k, v in self.filters[ep].items():
1✔
84
                if ep == "searcher":
1✔
85
                    search_blocks.append("{}:{}".format(k, escape(v)))
1✔
86
                elif ep == "proteome" and k == "is_reference":
1✔
87
                    blocks.append("proteome_is_reference:{}".format(escape(v).strip()))
1✔
88
                elif include_search and ep == "search":
1✔
89
                    main_ep = self.main_endpoint
1✔
90
                    if uses_wildcards(v):
1✔
91
                        blocks.append(
×
92
                            "text_{}:{}".format(main_ep, v.replace(" ", "%20"))
93
                        )
94
                    else:
95
                        for token in v.split():
1✔
96
                            blocks.append("text_{}:{}~0".format(main_ep, token))
1✔
97
                elif k == "source_database__isnull":
1✔
98
                    blocks.append("{}_exists_:{}_db".format("!" if v else "", ep))
1✔
99
                elif k == "accession" or k == "accession__iexact":
1✔
100
                    if ep == "taxonomy":
1✔
101
                        blocks.append("tax_lineage:{}".format(escape(v)))
1✔
102
                    else:
103
                        blocks.append("{}_acc:{}".format(ep, escape(v)))
1✔
104
                elif k == "accession__isnull":
1✔
105
                    if ep == "structure":
1✔
106
                        blocks.append("{}_exists_:{}_acc".format("!" if v else "", ep))
1✔
107
                    elif ep == "taxonomy":
1✔
108
                        blocks.append("{}_exists_:tax_id".format("!" if v else ""))
1✔
109
                    elif ep == "proteome":
1✔
110
                        blocks.append(
1✔
111
                            "{}_exists_:proteome_acc".format("!" if v else "")
112
                        )
113
                    else:
114
                        blocks.append("{}_exists_:{}_db".format("!" if v else "", ep))
1✔
115

116
                elif (
1✔
117
                    k == "accession__in"
118
                    and ep == "taxonomy"
119
                    and isinstance(v, list)
120
                    and len(v) > 0
121
                ):
122
                    template = "tax_lineage:{}" if use_lineage else "tax_id:{}"
×
123
                    blocks.append(
×
124
                        "({})".format(
125
                            " || ".join([template.format(value) for value in v])
126
                        )
127
                    )
128
                elif (
1✔
129
                    k == "integrated"
130
                    or k == "integrated__accession__iexact"
131
                    or k == "integrated__iexact"
132
                    or k == "integrated__contains"
133
                ):
134
                    blocks.append("entry_integrated:{}".format(escape(v)))
1✔
135
                elif k == "integrated__isnull":
1✔
136
                    blocks.append(
1✔
137
                        "{}_exists_:entry_integrated".format(
138
                            "!entry_db:interpro && !" if v else ""
139
                        )
140
                    )
141
                elif k == "type" or k == "type__iexact" or k == "type__exact":
1✔
UNCOV
142
                    blocks.append("{}_type:{}".format(ep, escape(v)))
×
143
                elif k == "is_fragment":
1✔
144
                    blocks.append("{}_{}:{}".format(ep, k, escape(v)))
1✔
145
                elif k == "tax_id" or k == "tax_id__iexact" or k == "tax_id__contains":
1✔
146
                    blocks.append("tax_id:{}".format(escape(v)))
×
147
                elif "tax_lineage__contains" in k:
1✔
148
                    blocks.append("tax_lineage:{}".format(escape(v).strip()))
×
149
                elif "experiment_type__" in k:
1✔
150
                    blocks.append("structure_evidence:{}".format(escape(v).strip()))
×
151
                elif "__gt" in k:
1✔
152
                    filter_k = "protein_" + k if k.startswith("length_") else k
×
153
                    blocks.append(
×
154
                        "{}:{}{} TO *]".format(
155
                            re.sub(r"__gte?", "", filter_k),
156
                            "[" if "__gte" in filter_k else "{",
157
                            escape(v),
158
                        )
159
                    )
160
                elif "__lt" in k:
1✔
161
                    filter_k = "protein_" + k if k.startswith("length_") else k
×
162
                    blocks.append(
×
163
                        "{}:[* TO {}{}".format(
164
                            re.sub(r"__lte?", "", filter_k),
165
                            escape(v),
166
                            "]" if "__lte" in filter_k else "}",
167
                        )
168
                    )
169
                elif ep != "structure":
1✔
170
                    if k == "source_database" or k == "source_database__iexact":
1✔
171
                        blocks.append("{}_db:{}".format(ep, escape(v)))
1✔
172

173
        # Normalizes the blocks(sorts and lower) and joins them with ' && '.
174
        blocks = list(set(blocks))
1✔
175
        blocks.sort()
1✔
176
        q = " && ".join(blocks).lower()
1✔
177
        if len(search_blocks) > 0:
1✔
178
            sq = " && ".join(search_blocks)
1✔
179
            if len(q) > 0:
1✔
180
                q += " && " + sq
1✔
181
            else:
182
                q = sq
×
183
        if (
1✔
184
            self.order_field is not None
185
            and self.order_field != "num_proteins"
186
            and self.order_field != "-num_proteins"
187
        ):
188
            q += "&sort=" + self.order_field
×
189
        return q
1✔
190

191
    def get_base_queryset(self, endpoint):
1✔
192
        queryset = Entry.objects.all()
1✔
193
        if endpoint == "entry":
1✔
194
            queryset = Entry.objects.all()
1✔
195
        elif endpoint == "structure":
1✔
196
            queryset = Structure.objects.all()
1✔
197
        elif endpoint == "protein":
1✔
198
            queryset = Protein.objects.all()
1✔
199
        elif endpoint == "proteome":
1✔
200
            queryset = Proteome.objects.all()
1✔
201
        elif endpoint == "taxonomy":
1✔
202
            queryset = Taxonomy.objects.all()
1✔
203
        elif endpoint == "set":
1✔
204
            queryset = Set.objects.all()
1✔
205
        return queryset
1✔
206

207
    @staticmethod
1✔
208
    def get_current_filters(filters, endpoint, only_main_endpoint):
1✔
209
        current_filters = {}
1✔
210
        for ep in filters:
1✔
211
            if ep == "search" or ep == "searcher":
1✔
212
                continue
1✔
213
            if ep == endpoint:
1✔
214
                current_filters = merge_two_dicts(
1✔
215
                    current_filters, {k: v for k, v in filters[ep].items()}
216
                )
217
            elif not only_main_endpoint:
1✔
218
                current_filters = merge_two_dicts(
1✔
219
                    current_filters, {ep + "__" + k: v for k, v in filters[ep].items()}
220
                )
221
        return current_filters
1✔
222

223
    def get_queryset(self, endpoint=None, only_main_endpoint=False):
1✔
224
        if endpoint is None:
1✔
225
            endpoint = self.main_endpoint
1✔
226
        queryset = self.get_base_queryset(endpoint)
1✔
227
        current_filters = self.get_current_filters(
1✔
228
            self.filters, endpoint, only_main_endpoint
229
        )
230
        if "accession__isnull" in current_filters:
1✔
231
            del current_filters["accession__isnull"]
1✔
232
        current_exclusions = self.get_current_filters(
1✔
233
            self.exclusions, endpoint, only_main_endpoint
234
        )
235

236
        # creates an `OR` filter for the search fields
237
        search_filters = self.filters.get("search")
1✔
238
        if search_filters:
1✔
239
            or_filter = reduce(or_, (Q(**{f[0]: f[1]}) for f in search_filters.items()))
1✔
240
            queryset = queryset.filter(or_filter, **current_filters)
1✔
241
        queryset = queryset.filter(**current_filters)
1✔
242
        if len(current_exclusions) > 0:
1✔
243
            queryset = queryset.exclude(**current_exclusions)
×
244
        if self.order_field is not None:
1✔
245
            queryset = queryset.order_by(self.order_field)
1✔
246
        return queryset
1✔
247

248
    def update_integrated_filter(self, endpoint):
1✔
249
        c = self.filters[endpoint].copy()
1✔
250
        for k, f in c.items():
1✔
251
            if k == "source_database" or k == "source_database__iexact":
1✔
252
                if endpoint == "set" and "integrated" in c:
1✔
253
                    del self.filters[endpoint]["integrated"]
×
254
                else:
255
                    self.filters[endpoint]["integrated__isnull"] = False
1✔
256
                    del self.filters[endpoint][k]
1✔
257
            elif k == "accession" or k == "accession__iexact":
1✔
258
                self.filters[endpoint]["integrated__accession__iexact"] = f.lower()
1✔
259
                del self.filters[endpoint][k]
1✔
260

261
    def is_single_endpoint(self):
1✔
262
        main_ep = self.main_endpoint
1✔
263
        filters = [
1✔
264
            f
265
            for f in self.filters
266
            if f != main_ep and f != "search" and self.filters[f] != {}
267
        ]
268
        return len(filters) == 0
1✔
269

270

271
def can_use_taxonomy_per_entry(filters):
1✔
272
    for key, value in filters.items():
1✔
273
        if key not in ["entry", "taxonomy"] and value:
1✔
274
            return False
1✔
275

276
    return (
1✔
277
        "accession" in filters["entry"] and "integrated__isnull" not in filters["entry"]
278
    )
279

280

281
def can_use_taxonomy_per_db(filters):
1✔
282
    for key, value in filters.items():
1✔
283
        if key not in ["entry", "taxonomy"] and value:
1✔
284
            return False
1✔
285

286
    return (
1✔
287
        "source_database" in filters["entry"]
288
        and "integrated__isnull" not in filters["entry"]
289
    )
290

291

292
def can_use_proteome_per_entry(filters):
1✔
293
    for key, value in filters.items():
1✔
294
        if key not in ["entry", "proteome"] and value:
1✔
295
            return False
1✔
296

297
    return (
1✔
298
        "accession" in filters["entry"] and "integrated__isnull" not in filters["entry"]
299
    )
300

301

302
def can_use_proteome_per_db(filters):
1✔
303
    for key, value in filters.items():
1✔
304
        if key not in ["entry", "proteome"] and value:
1✔
305
            return False
1✔
306

307
    return (
1✔
308
        "source_database" in filters["entry"]
309
        and "integrated__isnull" not in filters["entry"]
310
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc