• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

inspirehep / inspire-next / 12418

pending completion
12418

Pull #3419

travis-ci

web-flow
e2e: assert that robotupload was called once

Signed-off-by: David Caro <david@dcaro.es>
Pull Request #3419: e2e: create a test for core harvest with RT

7238 of 9243 relevant lines covered (78.31%)

2.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.92
/inspirehep/modules/workflows/tasks/matching.py
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of INSPIRE.
4
# Copyright (C) 2014-2017 CERN.
5
#
6
# INSPIRE is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# INSPIRE is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with INSPIRE. If not, see <http://www.gnu.org/licenses/>.
18
#
19
# In applying this license, CERN does not waive the privileges and immunities
20
# granted to it by virtue of its status as an Intergovernmental Organization
21
# or submit itself to any jurisdiction.
22

23
"""Tasks to check if the incoming record already exist."""
5✔
24

25
from __future__ import absolute_import, division, print_function
5✔
26

27
from flask import current_app
5✔
28

29
from invenio_db import db
5✔
30
from invenio_workflows import workflow_object_class, WorkflowEngine
5✔
31
from invenio_workflows.errors import WorkflowsError
5✔
32

33

34
from inspire_matcher.api import match
5✔
35
from inspire_utils.dedupers import dedupe_list
5✔
36
from inspirehep.utils.record import get_arxiv_categories, get_value
5✔
37
from inspirehep.modules.workflows.tasks.actions import mark
5✔
38

39
from ..utils import with_debug_logging
5✔
40

41

42
@with_debug_logging
5✔
43
def exact_match(obj, eng):
44
    """Return ``True`` if the record is already present in the system.
45

46
    Uses the default configuration of the ``inspire-matcher`` to find
47
    duplicates of the current workflow object in the system.
48

49
    Also sets the ``matches.exact`` property in ``extra_data`` to the list of
50
    control numbers that matched.
51

52
    Arguments:
53
        obj: a workflow object.
54
        eng: a workflow engine.
55

56
    Returns:
57
        bool: ``True`` if the workflow object has a duplicate in the system
58
        ``False`` otherwise.
59

60
    """
61
    exact_match_config = current_app.config['EXACT_MATCH']
3✔
62
    matches = dedupe_list(match(obj.data, exact_match_config))
3✔
63
    record_ids = [el['_source']['control_number'] for el in matches]
3✔
64
    obj.extra_data.setdefault('matches', {})['exact'] = record_ids
3✔
65
    return bool(record_ids)
3✔
66

67

68
@with_debug_logging
5✔
69
def fuzzy_match(obj, eng):
70
    """Return ``True`` if a similar record is found in the system.
71

72
    Uses a custom configuration for ``inspire-matcher`` to find records
73
    similar to the current workflow object's payload in the system.
74

75
    Also sets the ``matches.fuzzy`` property in ``extra_data`` to the list of
76
    the brief of first 5 record that matched.
77

78
    Arguments:
79
        obj: a workflow object.
80
        eng: a workflow engine.
81

82
    Returns:
83
        bool: ``True`` if the workflow object has a duplicate in the system
84
        ``False`` otherwise.
85

86
    """
87
    if not current_app.config.get('FEATURE_FLAG_ENABLE_FUZZY_MATCHER'):
3✔
88
        return False
3✔
89

90
    fuzzy_match_config = current_app.config['FUZZY_MATCH']
2✔
91
    matches = dedupe_list(match(obj.data, fuzzy_match_config))
2✔
92
    record_ids = [_get_hep_record_brief(el['_source']) for el in matches]
2✔
93
    obj.extra_data.setdefault('matches', {})['fuzzy'] = record_ids[0:5]
2✔
94
    return bool(record_ids)
2✔
95

96

97
def _get_hep_record_brief(hep_record):
5✔
98
    brief = {
2✔
99
        'control_number': hep_record['control_number'],
100
        'title': get_value(hep_record, 'titles[0].title'),
101
    }
102

103
    abstract = get_value(hep_record, 'abstracts[0].value')
2✔
104
    if abstract is not None:
2✔
105
        brief['abstract'] = abstract
2✔
106

107
    arxiv_eprint = get_value(hep_record, 'arxiv_eprints[0].value')
2✔
108
    if arxiv_eprint is not None:
2✔
109
        brief['arxiv_eprint'] = arxiv_eprint
2✔
110

111
    number_of_pages = get_value(hep_record, 'number_of_pages')
2✔
112
    if number_of_pages is not None:
2✔
113
        brief['number_of_pages'] = number_of_pages
1✔
114

115
    earliest_date = get_value(hep_record, 'earliest_date')
2✔
116
    if earliest_date is not None:
2✔
117
        brief['earliest_date'] = earliest_date
1✔
118

119
    authors = hep_record.get('authors')
2✔
120
    if authors is not None:
2✔
121
        brief['authors_count'] = len(authors)
2✔
122
        author_briefs = []
2✔
123
        for author in authors[:3]:
2✔
124
            author_briefs.append({'full_name': author['full_name']})
2✔
125
        brief['authors'] = author_briefs
2✔
126

127
    public_notes = hep_record.get('public_notes')
2✔
128
    if public_notes is not None:
2✔
129
        public_notes_value = []
1✔
130
        for public_note in public_notes:
1✔
131
            public_notes_value.append({'value': public_note['value']})
1✔
132
        brief['public_notes'] = public_notes_value
1✔
133

134
    publication_info = hep_record.get('publication_info')
2✔
135
    if publication_info is not None:
2✔
136
        brief['publication_info'] = publication_info
1✔
137

138
    return brief
2✔
139

140

141
@with_debug_logging
5✔
142
def is_fuzzy_match_approved(obj, eng):
143
    """Check if a fuzzy match has been approved by a human."""
144
    return obj.extra_data.get('fuzzy_match_approved_id')
2✔
145

146

147
@with_debug_logging
5✔
148
def set_fuzzy_match_approved_in_extradata(obj, eng):
149
    """Set the human approved match in `matches.approved` in extra_data."""
150
    approved_match = obj.extra_data.get('fuzzy_match_approved_id')
2✔
151
    obj.extra_data.setdefault('matches', {})['approved'] = approved_match
2✔
152

153

154
@with_debug_logging
5✔
155
def set_exact_match_as_approved_in_extradata(obj, eng):
156
    """Set the best match in `matches.approved` in extra_data."""
157
    best_match = obj.extra_data['matches']['exact'][0]
2✔
158
    obj.extra_data.setdefault('matches', {})['approved'] = best_match
2✔
159

160

161
def auto_approve(obj, eng):
5✔
162
    """Check if auto approve the current ingested article.
163

164
    Arguments:
165
        obj: a workflow object.
166
        eng: a workflow engine.
167

168
    Return:
169
        bool: True when the record belongs to an arXiv category that is fully
170
        harvested or if the primary category is `physics.data-an`, otherwise
171
        False.
172
    """
173
    return has_fully_harvested_category(obj.data) or physics_data_an_is_primary_category(obj.data)
2✔
174

175

176
def has_fully_harvested_category(record):
5✔
177
    """Check if the record in `obj.data` has fully harvested categories.
178

179
    Arguments:
180
        record(dict): the ingested article.
181

182
    Return:
183
        bool: True when the record belongs to an arXiv category that is fully
184
        harvested, otherwise False.
185
    """
186
    record_categories = set(get_arxiv_categories(record))
3✔
187
    harvested_categories = current_app.config.get('ARXIV_CATEGORIES', {})
3✔
188
    return len(
3✔
189
        record_categories &
190
        set(
191
            harvested_categories.get('core') +
192
            harvested_categories.get('non-core')
193
        )
194
    ) > 0
195

196

197
def physics_data_an_is_primary_category(record):
5✔
198
    record_categories = get_arxiv_categories(record)
2✔
199
    if record_categories:
2✔
200
        return record_categories[0] == 'physics.data-an'
1✔
201
    return False
2✔
202

203

204
@with_debug_logging
5✔
205
def set_core_in_extra_data(obj, eng):
206
    """Set `core=True` in `obj.extra_data` if the record belongs to a core arXiv category"""
207
    def _is_core(record):
2✔
208
        return set(get_arxiv_categories(record)) & \
2✔
209
            set(current_app.config.get('ARXIV_CATEGORIES', {}).get('core'))
210

211
    if _is_core(obj.data):
2✔
212
        obj.extra_data['core'] = True
2✔
213

214

215
def match_non_completed_wf_in_holdingpen(obj, eng):
5✔
216
    """Return ``True`` if a matching wf is processing in the HoldingPen.
217

218
    Uses a custom configuration of the ``inspire-matcher`` to find duplicates
219
    of the current workflow object in the Holding Pen not in the
220
    COMPLETED state.
221

222
    Also sets ``holdingpen_matches`` in ``extra_data`` to the list of ids that
223
    matched.
224

225
    Arguments:
226
        obj: a workflow object.
227
        eng: a workflow engine.
228

229
    Returns:
230
        bool: ``True`` if the workflow object has a duplicate in the Holding
231
        Pen that is not COMPLETED, ``False`` otherwise.
232

233
    """
234
    def _non_completed(base_record, match_result):
2✔
235
        return not get_value(match_result, '_source._workflow.status') == 'COMPLETED'
2✔
236

237
    matched_ids = pending_in_holding_pen(obj, _non_completed)
2✔
238
    obj.extra_data['holdingpen_matches'] = matched_ids
2✔
239
    return bool(matched_ids)
2✔
240

241

242
def raise_if_match_wf_in_error_or_initial(obj, eng):
5✔
243
    """Raise if a matching wf is in ERROR or INITIAL state in the HoldingPen.
244

245
    Uses a custom configuration of the ``inspire-matcher`` to find duplicates
246
    of the current workflow object in the Holding Pen not in the
247
    that are in ERROR or INITIAL state.
248

249
    If any match is found, it sets ``error_workflows_matched`` in
250
    ``extra_data`` to the list of ids that matched and raise an error.
251

252
    Arguments:
253
        obj: a workflow object.
254
        eng: a workflow engine.
255

256
    Returns:
257
        None
258

259
    """
260
    def _filter(base_record, match_result):
2✔
261
        return get_value(
1✔
262
            match_result, '_source._workflow.status'
263
        ) in ('ERROR', 'INITIAL')
264

265
    matched_ids = pending_in_holding_pen(obj, _filter)
2✔
266
    if bool(matched_ids):
2✔
267
        obj.extra_data['error_workflows_matched'] = matched_ids
1✔
268
        raise WorkflowsError(
1✔
269
            'Cannot continue processing. Found workflows in ERROR or INITIAL '
270
            'state: {}'.format(matched_ids)
271
        )
272

273

274
def match_previously_rejected_wf_in_holdingpen(obj, eng):
5✔
275
    """Return ``True`` if matches a COMPLETED and rejected wf in the HoldingPen.
276

277
    Uses a custom configuration of the ``inspire-matcher`` to find duplicates
278
    of the current workflow object in the Holding Pen in the
279
    COMPLETED state, marked as ``approved = False``.
280

281
    Also sets ``holdingpen_matches`` in ``extra_data`` to the list of ids that
282
    matched.
283

284
    Arguments:
285
        obj: a workflow object.
286
        eng: a workflow engine.
287

288
    Returns:
289
        bool: ``True`` if the workflow object has a duplicate in the Holding
290
        Pen that is not COMPLETED, ``False`` otherwise.
291

292
    """
293
    def _rejected_and_completed(base_record, match_result):
2✔
294
        return get_value(match_result, '_source._workflow.status') == 'COMPLETED' and \
2✔
295
            get_value(match_result, '_source._extra_data.approved') is False
296

297
    matched_ids = pending_in_holding_pen(obj, _rejected_and_completed)
2✔
298
    obj.extra_data['previously_rejected_matches'] = matched_ids
2✔
299
    return bool(matched_ids)
2✔
300

301

302
@with_debug_logging
5✔
303
def pending_in_holding_pen(obj, validation_func):
304
    """Return the list of matching workflows in the holdingpen.
305

306
    Matches the holdingpen records by their ``arxiv_eprint``, their ``doi``,
307
    and by a custom validator function.
308

309
    Args:
310
        obj: a workflow object.
311
        validation_func: a function used to filter the matched records.
312

313
    Returns:
314
        (list): the ids matching the current ``obj`` that satisfy
315
        ``validation_func``.
316

317
    """
318
    config = {
3✔
319
        'algorithm': [
320
            {
321
                'queries': [
322
                    {
323
                        'path': 'arxiv_eprints.value',
324
                        'search_path': 'metadata.arxiv_eprints.value.raw',
325
                        'type': 'exact',
326
                    },
327
                    {
328
                        'path': 'dois.value',
329
                        'search_path': 'metadata.dois.value.raw',
330
                        'type': 'exact',
331
                    },
332
                ],
333
                'validator': validation_func,
334
            },
335
        ],
336
        'doc_type': 'hep',
337
        'index': 'holdingpen-hep',
338
    }
339
    matches = dedupe_list(match(obj.data, config))
3✔
340
    return [int(el['_id']) for el in matches if int(el['_id']) != obj.id]
3✔
341

342

343
@with_debug_logging
5✔
344
def delete_self_and_stop_processing(obj, eng):
345
    """Delete both versions of itself and stops the workflow."""
346
    db.session.delete(obj.model)
×
347
    eng.skip_token()
×
348

349

350
@with_debug_logging
5✔
351
def stop_processing(obj, eng):
352
    """Stop processing the given workflow.
353

354
    Stops the given workflow engine. This causes the stop of all the workflows
355
    related to it.
356

357
    Args:
358
        obj: a workflow object.
359
        eng: a workflow engine.
360

361
    Returns:
362
        None
363
    """
364
    eng.stop()
2✔
365

366

367
def has_same_source(extra_data_key):
5✔
368
    """Match a workflow in obj.extra_data[`extra_data_key`] by the source.
369

370
    Takes a list of workflows from extra_data using as key `extra_data_key`
371
    and goes through them checking if at least one workflow has the same source
372
    of the current workflow object.
373

374
    Args:
375
        extra_data_key: the key to retrieve a workflow list from the current
376
        workflow object.
377

378
    Returns:
379
        bool: True if a workflow, whose id is in obj.extra_data[
380
        `extra_data_key`], matches the current workflow by the source.
381
    """
382

383
    def _get_wfs_same_source(obj, eng):
5✔
384
        current_source = get_value(obj.data, 'acquisition_source.source').lower()
2✔
385

386
        try:
2✔
387
            workflows = obj.extra_data[extra_data_key]
2✔
388
        except KeyError:
×
389
            workflows = []
×
390

391
        for wf_id in workflows:
2✔
392
            wf = workflow_object_class.get(wf_id)
2✔
393
            wf_source = get_value(wf.data, 'acquisition_source.source').lower()
2✔
394
            if wf_source == current_source:
2✔
395
                return True
2✔
396
        return False
2✔
397

398
    return _get_wfs_same_source
5✔
399

400

401
def stop_matched_holdingpen_wfs(obj, eng):
5✔
402
    """Stop the matched workflow objects in the holdingpen.
403

404
    Stops the matched workflows in the holdingpen by replacing their steps with
405
    a new one defined on the fly, containing a ``stop`` step, and executing it.
406
    For traceability reason, these workflows are also marked as
407
    ``'stopped-by-wf'``, whose value is the current workflow's id.
408

409
    In the use case of harvesting twice an article, this function is involved
410
    to stop the first workflow and let the current one being processed,
411
    since it the latest metadata.
412

413
    Args:
414
        obj: a workflow object.
415
        eng: a workflow engine.
416

417
    Returns:
418
        None
419
    """
420
    stopping_steps = [mark('stopped-by-wf', int(obj.id)), stop_processing]
2✔
421

422
    obj.save()
2✔
423

424
    for holdingpen_wf_id in obj.extra_data['holdingpen_matches']:
2✔
425
        holdingpen_wf = workflow_object_class.get(holdingpen_wf_id)
2✔
426
        holdingpen_wf_eng = WorkflowEngine.from_uuid(holdingpen_wf.id_workflow)
2✔
427

428
        # stop this holdingpen workflow by replacing its steps with a stop step
429
        holdingpen_wf_eng.callbacks.replace(stopping_steps)
2✔
430
        holdingpen_wf_eng.process([holdingpen_wf])
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2024 Coveralls, Inc