• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IGVF-DACC / igvfd / #6993

24 Jan 2025 07:02PM UTC coverage: 89.638% (-0.02%) from 89.662%
#6993

Pull #1279

coveralls-python

zhwshen
Addressed Jennifer's comment
Pull Request #1279: IGVF-2245-multi-seqspec-audit

17 of 17 new or added lines in 1 file covered. (100.0%)

14 existing lines in 3 files now uncovered.

7457 of 8319 relevant lines covered (89.64%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.06
/src/igvfd/report.py
1
from collections import OrderedDict
1✔
2
from pyramid.httpexceptions import HTTPBadRequest
1✔
3
from pyramid.view import view_config
1✔
4
from snovault import TYPES
1✔
5
from snovault.elasticsearch.searches.interfaces import SEARCH_CONFIG
1✔
6
from snosearch.parsers import QueryString
1✔
7
from snovault.compat import bytes_
1✔
8
from igvfd.searches.generator import search_generator
1✔
9

10
import datetime
1✔
11
import re
1✔
12

13
# Those columns contain href value
14
HREF_COLUMN_KEYS = ['href', 'attachment', 'attachment.href', 'files.href']
1✔
15

16

17
def includeme(config):
1✔
18
    config.add_route('report_download', '/report.tsv')
1✔
19
    config.add_route('multitype_report_download', '/multireport.tsv')
1✔
20
    config.scan(__name__, categories=None)
1✔
21

22

23
def lookup_column_value(value, path):
1✔
24
    nodes = [value]
1✔
25
    names = path.split('.')
1✔
26
    for name in names:
1✔
27
        nextnodes = []
1✔
28
        for node in nodes:
1✔
29
            if name not in node:
1✔
30
                continue
1✔
31
            value = node[name]
1✔
32
            if isinstance(value, list):
1✔
33
                nextnodes.extend(value)
1✔
34
            else:
35
                nextnodes.append(value)
1✔
36
        nodes = nextnodes
1✔
37
        if not nodes:
1✔
38
            return ''
1✔
39
    # if we ended with an embedded object, show the @id
40
    if nodes and hasattr(nodes[0], '__contains__') and '@id' in nodes[0]:
1✔
41
        nodes = [node['@id'] for node in nodes]
1✔
42
    deduped_nodes = []
1✔
43
    for n in nodes:
1✔
44
        if isinstance(n, dict):
1✔
45
            n = str(n)
1✔
46
        if n not in deduped_nodes:
1✔
47
            deduped_nodes.append(n)
1✔
48
    return u','.join(u'{}'.format(n) for n in deduped_nodes)
1✔
49

50

51
def format_row(columns):
1✔
52
    """Format a list of text columns as a tab-separated byte string."""
53
    return b'\t'.join([bytes_(' '.join(c.strip('\t\n\r').split()), 'utf-8') for c in columns]) + b'\r\n'
1✔
54

55

56
def format_row_full_url(columns, href_index, host_url, id):
1✔
57
    """Format a list of text columns as a tab-separated byte string. add host_url to href to form a full length url"""
58
    row = []
1✔
59
    for index, column in enumerate(columns):
1✔
60
        ls = column.strip('\t\n\r').split()
1✔
61
        if index in href_index:
1✔
62
            # href is not embedded, append host_url directly
63
            if len(ls) == 1:
1✔
64
                # files.href, FileSet can have more than one file in files
65
                if ',' in ls[0]:
1✔
66
                    files = ls[0].split(',')
1✔
67
                    files_with_host_url = []
1✔
68
                    for file in files:
1✔
69
                        file = host_url + file.strip()
1✔
70
                        files_with_host_url.append(file)
1✔
71
                    ls[0] = ','.join(files_with_host_url)
1✔
72

73
                # attachment.href
74
                elif ls[0].startswith('@@download'):
1✔
75
                    ls[0] = host_url + id + ls[0]
1✔
76
                # href from File or files.href when there is one file in FileSet
77
                else:
78
                    ls[0] = host_url + ls[0]
1✔
79
            # href is embedded
80
            elif len(ls) > 1:
1✔
81
                for index, item in enumerate(ls):
1✔
82
                    if ''.join([i for i in item if i.isalpha()]) == 'href':
1✔
83
                        embedded_index = index + 1
1✔
84
                        break
1✔
85
                if embedded_index:
1✔
86
                    ls[embedded_index] = ls[embedded_index][0] + host_url + id + ls[embedded_index][1:]
1✔
87

88
        row.append(bytes_(' '.join(ls), 'utf-8'))
1✔
89
    return b'\t'.join(row) + b'\r\n'
1✔
90

91

92
def _convert_camel_to_snake(type_str):
1✔
93
    tmp = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', type_str)
1✔
94
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', tmp).lower()
1✔
95

96

97
@view_config(route_name='report_download', request_method='GET')
1✔
98
def report_download(context, request):
1✔
99
    downloadtime = datetime.datetime.now()
1✔
100

101
    types = request.params.getall('type')
1✔
102
    if len(types) != 1:
1✔
103
        msg = 'Report view requires specifying a single type.'
×
104
        raise HTTPBadRequest(explanation=msg)
×
105

106
    # Make sure we get all results
107
    request.GET['limit'] = 'all'
1✔
108
    type_str = types[0]
1✔
109
    schema = request.registry[TYPES][type_str].schema
1✔
110
    search_config = request.registry[SEARCH_CONFIG].as_dict()[type_str]
1✔
111
    columns = list_visible_columns_for_schemas(request, schema, search_config)
1✔
112
    snake_type = _convert_camel_to_snake(type_str).replace("'", '')
1✔
113
    results = search_generator(request)
1✔
114

115
    def format_header():
1✔
116
        newheader = '%s\t%s%s?%s\r\n' % (downloadtime, request.host_url, '/report/', request.query_string)
1✔
117
        return (bytes(newheader, 'utf-8'))
1✔
118

119
    # Work around Excel bug; can't open single column TSV with 'ID' header
120
    if len(columns) == 1 and '@id' in columns:
1✔
121
        columns['@id']['title'] = 'id'
×
122

123
    header = [column.get('title') or field for field, column in columns.items()]
1✔
124

125
    def generate_rows():
1✔
126
        yield format_header()
1✔
127
        yield format_row(header)
1✔
128
        for item in results['@graph']:
1✔
129
            values = [lookup_column_value(item, path) for path in columns]
1✔
130
            yield format_row(values)
1✔
131

132
    # Stream response using chunked encoding.
133
    request.response.content_type = 'text/tsv'
1✔
134
    request.response.content_disposition = 'attachment;filename="{}_report_{}_{}_{}_{}h_{}m.tsv"'.format(
1✔
135
        snake_type,
136
        downloadtime.year,
137
        downloadtime.month,
138
        downloadtime.day,
139
        downloadtime.hour,
140
        downloadtime.minute
141
    )
142
    request.response.app_iter = generate_rows()
1✔
143
    return request.response
1✔
144

145

146
def list_visible_columns_for_schemas(request, schema, search_config):
1✔
147
    """
148
    Returns mapping of default columns for a set of schemas.
149
    """
150
    columns = OrderedDict({'@id': {'title': 'ID'}})
1✔
151
    if 'columns' in search_config:
1✔
152
        columns.update(search_config['columns'])
1✔
153
    else:
154
        # default columns if not explicitly specified
UNCOV
155
        columns.update(OrderedDict(
×
156
            (name, {
157
                'title': schema['properties'][name].get('title', name)
158
            })
159
            for name in [
160
                '@id', 'title', 'description', 'name', 'accession',
161
                'aliases'
162
            ] if name in schema['properties']
163
        ))
164
    fields_requested = request.params.getall('field')
1✔
165
    if fields_requested:
1✔
166
        limited_columns = OrderedDict()
1✔
167
        for field in fields_requested:
1✔
168
            if field in columns:
1✔
169
                limited_columns[field] = columns[field]
1✔
170
            else:
171
                # We don't currently traverse to other schemas for embedded
172
                # objects to find property titles. In this case we'll just
173
                # show the field's dotted path for now.
UNCOV
174
                limited_columns[field] = {'title': field}
×
UNCOV
175
                if field in schema['properties']:
×
UNCOV
176
                    limited_columns[field] = {
×
177
                        'title': schema['properties'][field]['title']
178
                    }
179
        columns = limited_columns
1✔
180
    return columns
1✔
181

182

183
@view_config(route_name='multitype_report_download', request_method='GET')
1✔
184
def multitype_report_download(context, request):
1✔
185
    downloadtime = datetime.datetime.now()
1✔
186
    qs = QueryString(request)
1✔
187
    qs.drop('limit')
1✔
188
    qs.append(('limit', '0'))
1✔
189
    query_string = qs.get_query_string()
1✔
190
    response = request.embed(f'/multireport?{query_string}')
1✔
191
    facets = response['facets']
1✔
192
    columns = response['result_columns']
1✔
193
    abstract_types = get_abstract_types(request)
1✔
194
    types_in_search_result = []
1✔
195
    for facet in facets:
1✔
196
        if facet['field'] == 'type':
1✔
197
            for term in facet['terms']:
1✔
198
                type_name = term['key']
1✔
199
                if type_name not in abstract_types:
1✔
200
                    types_in_search_result.append(term['key'])
1✔
201
            break
1✔
202
    report_type = 'mixed'
1✔
203
    if len(types_in_search_result) == 1:
1✔
204
        report_type = _convert_camel_to_snake(types_in_search_result[0]).replace("'", '')
1✔
205

206
    # Make sure we get all results
207
    request.GET['limit'] = 'all'
1✔
208
    results = search_generator(request)
1✔
209

210
    def format_header():
1✔
211
        newheader = '%s\t%s%s?%s\r\n' % (downloadtime, request.host_url, '/multireport/', request.query_string)
1✔
212
        return (bytes(newheader, 'utf-8'))
1✔
213

214
    # Work around Excel bug; can't open single column TSV with 'ID' header
215
    if len(columns) == 1 and '@id' in columns:
1✔
UNCOV
216
        columns['@id']['title'] = 'id'
×
217

218
    header_row = [column.get('title') or field for field, column in columns.items()]
1✔
219
    columns_keys = list(columns.keys())
1✔
220
    href_index = []
1✔
221
    for index, item in enumerate(columns_keys):
1✔
222
        if item in HREF_COLUMN_KEYS:
1✔
223
            href_index.append(index)
1✔
224

225
    def generate_rows():
1✔
226
        yield format_header()
1✔
227
        yield format_row(header_row)
1✔
228
        for item in results['@graph']:
1✔
229
            id = item['@id']
1✔
230
            values = [lookup_column_value(item, path) for path in columns]
1✔
231
            yield format_row_full_url(values, href_index, request.host_url, id)
1✔
232

233
    # Stream response using chunked encoding.
234
    request.response.content_type = 'text/tsv'
1✔
235
    request.response.content_disposition = 'attachment;filename="igvf_{}_report_{}_{}_{}_{}h_{}m.tsv"'.format(
1✔
236
        report_type,
237
        downloadtime.year,
238
        downloadtime.month,
239
        downloadtime.day,
240
        downloadtime.hour,
241
        downloadtime.minute
242
    )
243
    request.response.app_iter = generate_rows()
1✔
244
    return request.response
1✔
245

246
# only return the columns of the concrete types if the type is returned in search restult
247

248

249
def get_result_columns(request, facets, report_response_columns):
1✔
250
    columns = OrderedDict({'@id': {'title': 'ID'}})
1✔
251
    configs = request.params.getall('config')
1✔
252
    abstract_types = get_abstract_types(request)
1✔
253
    types_in_search_result = []
1✔
254
    for facet in facets:
1✔
255
        if facet['field'] == 'type':
1✔
256
            for term in facet['terms']:
1✔
257
                type_name = term['key']
1✔
258
                if type_name not in abstract_types:
1✔
259
                    types_in_search_result.append(term['key'])
1✔
260
            break
1✔
261
    # if config in query string
262
    if configs:
1✔
263
        columns.update(report_response_columns)
1✔
264

265
    else:
266
        for type_str in types_in_search_result:
1✔
267
            schema = request.registry[TYPES][type_str].schema
1✔
268
            search_config = request.registry[SEARCH_CONFIG].as_dict()[type_str]
1✔
269

270
            if 'columns' in search_config:
1✔
271
                columns.update(search_config['columns'])
1✔
272
            else:
273
                # default columns if not explicitly specified
274
                columns.update(OrderedDict(
1✔
275
                    (name, {
276
                        'title': schema['properties'][name].get('title', name)
277
                    })
278
                    for name in [
279
                        '@id', 'title', 'description', 'name', 'accession',
280
                        'aliases'
281
                    ] if name in schema['properties']
282
                ))
283
    fields_requested = request.params.getall('field')
1✔
284
    # if field in query string
285
    if fields_requested:
1✔
286
        limited_columns = OrderedDict()
1✔
287
        for field in fields_requested:
1✔
288
            if field in columns:
1✔
289
                limited_columns[field] = columns[field]
1✔
290
            else:
291
                # We don't currently traverse to other schemas for embedded
292
                # objects to find property titles. In this case we'll just
293
                # show the field's dotted path for now.
294
                limited_columns[field] = {'title': field}
1✔
295
                for type_str in types_in_search_result:
1✔
296
                    schema = request.registry[TYPES][type_str].schema
1✔
297
                    if field in schema['properties']:
1✔
298
                        limited_columns[field] = {
1✔
299
                            'title': schema['properties'][field]['title']
300
                        }
301
                        break
1✔
302
        columns = limited_columns
1✔
303
    return columns
1✔
304

305

306
def get_abstract_types(request):
1✔
307
    types = []
1✔
308
    item_registry = request.registry[TYPES]
1✔
309
    for name, item in item_registry.abstract.items():
1✔
310
        subtypes = item.subtypes
1✔
311
        if len(subtypes) > 1:
1✔
312
            types.append(name)
1✔
313
    return types
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc