• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

adsabs / ADSCitationCapture / 11921709590

19 Nov 2024 09:02PM UTC coverage: 70.135%. First build
11921709590

Pull #70

github

web-flow
Merge 47217066b into 497a59c1b
Pull Request #70: Concept doi metadata updates

7 of 14 new or added lines in 3 files covered. (50.0%)

2501 of 3566 relevant lines covered (70.13%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.12
/ADSCitationCapture/tasks.py
1
import os
1✔
2
from kombu import Queue
1✔
3
from google.protobuf.json_format import MessageToDict
1✔
4
from datetime import datetime
1✔
5
import ADSCitationCapture.app as app_module
1✔
6
import ADSCitationCapture.webhook as webhook
1✔
7
import ADSCitationCapture.doi as doi
1✔
8
import ADSCitationCapture.url as url
1✔
9
import ADSCitationCapture.db as db
1✔
10
import ADSCitationCapture.forward as forward
1✔
11
import ADSCitationCapture.api as api
1✔
12
import adsmsg
1✔
13
import json
1✔
14

15
# ============================= INITIALIZATION ==================================== #
16

17
proj_home = os.path.realpath(os.path.join(os.path.dirname(__file__), '../'))
1✔
18
app = app_module.ADSCitationCaptureCelery('ads-citation-capture', proj_home=proj_home, local_config=globals().get('local_config', {}))
1✔
19
logger = app.logger
1✔
20

21
app.conf.CELERY_QUEUES = (
1✔
22
    Queue('process-citation-changes', app.exchange, routing_key='process-citation-changes'),
23
    Queue('process-github-urls', app.exchange, routing_key='process-github-urls'),
24
    Queue('process-new-citation', app.exchange, routing_key='process-new-citation'),
25
    Queue('process-updated-citation', app.exchange, routing_key='process-updated-citation'),
26
    Queue('process-deleted-citation', app.exchange, routing_key='process-deleted-citation'),
27
    Queue('maintenance_canonical', app.exchange, routing_key='maintenance_canonical'),
28
    Queue('maintenance_metadata', app.exchange, routing_key='maintenance_metadata'),
29
    Queue('maintenance_resend', app.exchange, routing_key='maintenance_resend'),
30
    Queue('maintenance_reevaluate', app.exchange, routing_key='maintenance_reevaluate'),
31
    Queue('maintenance_associated_works', app.exchange, routing_key='maintenance_associated_works'),
32
    Queue('output-results', app.exchange, routing_key='output-results'),
33
)
34

35
#limit github API queries to keep below rate limit
36
github_api_limit = app.conf.get('GITHUB_API_LIMIT', '80/m')
1✔
37

38
# ============================= TASKS ============================================= #
39

40
@app.task(queue='process-new-citation')
1✔
41
def task_process_new_citation(citation_change, force=False):
1✔
42
    """
43
    Process new citation:
44
    - Retrieve metadata from doi.org
45
    """
46
    canonical_citing_bibcode = api.get_canonical_bibcode(app, citation_change.citing)
1✔
47
    if canonical_citing_bibcode is None:
1✔
48
        logger.error("The citing bibcode '%s' is not in the system yet, it will be skipped in this ingestion", citation_change.citing)
×
49
        return
×
50
    content_type = None
1✔
51
    is_link_alive = False
1✔
52
    status = "DISCARDED"
1✔
53

54
    # Check if we already have the citation target in the DB
55
    metadata = db.get_citation_target_metadata(app, citation_change.content)
1✔
56
    citation_target_in_db = bool(metadata) # False if dict is empty
1✔
57
    raw_metadata = metadata.get('raw', None)
1✔
58
    parsed_metadata = metadata.get('parsed', {})
1✔
59
    associated_version_bibcodes = metadata.get('associated', None)
1✔
60

61
    if citation_target_in_db:
1✔
62
        status = metadata.get('status', 'DISCARDED') # "REGISTERED" if it is a software record
1✔
63

64
    #Zenodo
65
    if citation_change.content_type == adsmsg.CitationChangeContentType.doi \
1✔
66
        and citation_change.content not in ["", None]:
67
        # Default values
68
        content_type = "DOI"
1✔
69
        if not citation_target_in_db:
1✔
70
            # Fetch DOI metadata (if HTTP request fails, an exception is raised
71
            # and the task will be re-queued (see app.py and adsputils))
72
            raw_metadata = doi.fetch_metadata(app.conf['DOI_URL'], app.conf['DATACITE_URL'], citation_change.content)
1✔
73
            if raw_metadata:
1✔
74
                parsed_metadata = doi.parse_metadata(raw_metadata)
1✔
75
                is_software = parsed_metadata.get('doctype', '').lower() == "software"
1✔
76
                if parsed_metadata.get('bibcode') not in (None, "") and is_software:
1✔
77
                    status = "REGISTERED"
1✔
78
                    associated_version_bibcodes = _collect_associated_works(citation_change, parsed_metadata)
1✔
79

80
    #PID
81
    elif citation_change.content_type == adsmsg.CitationChangeContentType.pid \
1✔
82
        and citation_change.content not in ["", None]:
83
        content_type = "PID"
1✔
84
        status = None
1✔
85
        is_link_alive = url.is_alive(app.conf['ASCL_URL'] + citation_change.content)
1✔
86
        parsed_metadata = {'link_alive': is_link_alive, "doctype": "software" }
1✔
87

88
    #URL
89
    elif citation_change.content_type == adsmsg.CitationChangeContentType.url \
1✔
90
        and citation_change.content not in ["", None]:
91
        content_type = "URL"
1✔
92
        is_link_alive = url.is_alive(citation_change.content)
1✔
93
        status = "EMITTABLE"
1✔
94
        license_info = {'license_name': "", 'license_url': ""}
1✔
95
        #If link is alive, attempt to get license info from github. Else return empty license.
96
        if url.is_github(citation_change.content):
1✔
97
            task_process_github_urls.delay(citation_change, metadata)
1✔
98
        else:
99
            status = "DISCARDED"
×
100
        parsed_metadata = {'link_alive': is_link_alive, 'doctype': 'unknown', 'license_name': license_info.get('license_name', ""), 'license_url': license_info.get('license_url', "") }
1✔
101

102
    else:
103
        logger.error("Citation change should have doi, pid or url informed: {}", citation_change)
1✔
104
        status = None
1✔
105

106
    #Generates entry for Zenodo citations and notifies web broker
107
    if status not in [None, "EMITTABLE"]:
1✔
108
        if not citation_target_in_db:
1✔
109
            # Create citation target in the DB
110
            target_stored = db.store_citation_target(app, citation_change, content_type, raw_metadata, parsed_metadata, status, associated_version_bibcodes)
1✔
111
            #If citation target successfully created, update associated records.
112
            if target_stored:
1✔
113
                _update_associated_citation_targets(citation_change, parsed_metadata, associated_version_bibcodes)
1✔
114

115
        if status == "REGISTERED":
1✔
116
            #Connects new bibcode to canonical bibcode and DOI
117
            if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
1✔
118

119
                if canonical_citing_bibcode != citation_change.citing:
1✔
120
                    # These two bibcodes are identical (point to same source) and we can signal the broker
121
                    event_data = webhook.identical_bibcodes_event_data(citation_change.citing, canonical_citing_bibcode)
1✔
122
                    if event_data:
1✔
123
                        dump_prefix = citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S")
1✔
124
                        logger.debug("Calling 'task_emit_event' for '%s' IsIdenticalTo '%s'", citation_change.citing, canonical_citing_bibcode)
1✔
125
                        task_emit_event.delay(event_data, dump_prefix)
1✔
126

127
                citation_target_bibcode = parsed_metadata.get('bibcode')
1✔
128

129
                # The new bibcode and the DOI are identical
130
                event_data = webhook.identical_bibcode_and_doi_event_data(citation_target_bibcode, citation_change.content)
1✔
131
                if event_data:
1✔
132
                    dump_prefix = citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S")
1✔
133
                    logger.debug("Calling 'task_emit_event' for '%s' IsIdenticalTo '%s'", citation_target_bibcode, citation_change.content)
1✔
134
                    task_emit_event.delay(event_data, dump_prefix)
1✔
135

136
                # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
137
                original_citations = db.get_citations_by_bibcode(app, citation_target_bibcode)
1✔
138
                citations = api.get_canonical_bibcodes(app, original_citations)
1✔
139
                #Get readers from db if available.
140
                readers = db.get_citation_target_readers(app, citation_target_bibcode, parsed_metadata.get('alternate_bibcode', []))
1✔
141

142
                # Add canonical bibcode of current detected citation
143
                if canonical_citing_bibcode and canonical_citing_bibcode not in citations:
1✔
144
                    citations.append(canonical_citing_bibcode)
1✔
145

146
                logger.debug("Calling 'task_output_results' with '%s'", citation_change)
1✔
147
                task_output_results.delay(citation_change, parsed_metadata, citations, associated_version_bibcodes, readers=readers)
1✔
148
            logger.debug("Calling '_emit_citation_change' with '%s'", citation_change)
1✔
149

150
            _emit_citation_change(citation_change, parsed_metadata)
1✔
151
        # Store the citation at the very end, so that if an exception is raised before
152
        # this task can be re-run in the future without key collisions in the database
153
        stored = db.store_citation(app, citation_change, content_type, raw_metadata, parsed_metadata, status)
1✔
154
    
155
@app.task(queue='process-github-urls', rate_limit=github_api_limit)
1✔
156
def task_process_github_urls(citation_change, metadata):
1✔
157
    """
158
    Process new github urls
159
    Emit to broker only if it is EMITTABLE
160
    Do not forward to Master
161
    """
162
    logger.info("Processing citation to github url: {}".format(citation_change.content))
1✔
163
    github_api_mode = app.conf.get('GITHUB_API_MODE', False)
1✔
164
    citation_target_in_db = bool(metadata) # False if dict is empty
1✔
165
    raw_metadata = metadata.get('raw', None)
1✔
166
    parsed_metadata = metadata.get('parsed', {})
1✔
167
    content_type = "URL"
1✔
168
    is_link_alive = url.is_alive(citation_change.content)
1✔
169
    status = "EMITTABLE"
1✔
170
    license_info = {'license_name': "", 'license_url': ""}
1✔
171
    #If link is alive, attempt to get license info from github. Else return empty license.
172
    if url.is_github(citation_change.content) and is_link_alive:
1✔
173
        if github_api_mode:
1✔
174
            license_info = api.get_github_metadata(app, citation_change.content)
×
175
    elif not url.is_github(citation_change.content):
×
176
        status = "DISCARDED"
×
177
        logger.debug("Citation to github url {} discarded".format(citation_change.content))
×
178
    parsed_metadata = {'link_alive': is_link_alive, 'doctype': "unknown", 'license_name': license_info.get('license_name', ""), 'license_url': license_info.get('license_url', "") }
1✔
179
    
180
    #Confirm citation hasn't been added to database as TOF between calling task and when task can actually be executed is potentially quite long.
181
    metadata = db.get_citation_target_metadata(app, citation_change.content)
1✔
182
    citation_target_in_db = bool(metadata) # False if dict is empty
1✔
183

184
    #Saves citations to database, and emits citations with "EMITTABLE"
185
    if status is not None:
1✔
186
        if not citation_target_in_db:
1✔
187
            # Create citation target in the DB
188
            target_stored = db.store_citation_target(app, citation_change, content_type, raw_metadata, parsed_metadata, status)
1✔
189
        if status=="EMITTABLE":
1✔
190
            logger.debug("Reached 'call _emit_citation_change' with '%s'", citation_change)
1✔
191
            #Emits citation change to broker.
192
            _emit_citation_change(citation_change, parsed_metadata)
1✔
193
        # Store the citation at the very end, so that if an exception is raised before
194
        # this task can be re-run in the future without key collisions in the database
195
        stored = db.store_citation(app, citation_change, content_type, raw_metadata, parsed_metadata, status)
1✔
196

197
@app.task(queue='process-updated-citation')
1✔
198
def task_process_updated_citation(citation_change, force=False):
1✔
199
    """
200
    Update citation record
201
    Emit/forward the update only if it is REGISTERED
202
    """
203
    updated = db.update_citation(app, citation_change)
1✔
204
    metadata = db.get_citation_target_metadata(app, citation_change.content)
1✔
205
    parsed_metadata = metadata.get('parsed', {})
1✔
206
    citation_target_bibcode = parsed_metadata.get('bibcode', None)
1✔
207
    status = metadata.get('status', 'DISCARDED')
1✔
208
    readers = db.get_citation_target_readers(app, citation_target_bibcode, parsed_metadata.get('alternate_bibcode', []))
1✔
209
    # Emit/forward the update only if status is "REGISTERED"
210
    if updated and status == 'REGISTERED':
1✔
211
        if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
1✔
212
            associated_works = _collect_associated_works(citation_change, parsed_metadata)
1✔
213
            # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
214
            no_self_ref_versions = {key:val for key, val in associated_works.items() if val != citation_target_bibcode} if associated_works else None
1✔
215
            original_citations = db.get_citations_by_bibcode(app, citation_target_bibcode)
1✔
216
            citations = api.get_canonical_bibcodes(app, original_citations)
1✔
217
            logger.debug("Calling 'task_output_results' with '%s'", citation_change)
1✔
218
            task_output_results.delay(citation_change, parsed_metadata, citations, db_versions=no_self_ref_versions, readers=readers)
1✔
219
        logger.debug("Calling '_emit_citation_change' with '%s'", citation_change)
1✔
220
        _emit_citation_change(citation_change, parsed_metadata)
1✔
221

222
def _collect_associated_works(citation_change, parsed_metadata):
1✔
223
    """
224
    Fetches metadata for concept doi and searches database for associated versions for the given record.
225
    """
226
    versions_in_db = None
1✔
227
    try:
1✔
228
        all_versions_doi = doi.fetch_all_versions_doi(app.conf['DOI_URL'], app.conf['DATACITE_URL'], parsed_metadata)
1✔
229
    except:
×
230
        logger.error("Unable to recover related versions for {}",citation_change)
×
231
        all_versions_doi = None
×
232
    #fetch additional versions from db if they exist.
233
    if all_versions_doi['versions'] not in (None,[]):
1✔
234
        logger.info("Found {} versions for {}".format(len(all_versions_doi['versions']), citation_change.content))
1✔
235
        versions_in_db = db.get_associated_works_by_doi(app, all_versions_doi)
1✔
236
        #Only add bibcodes if there are versions in db, otherwise leave as None.
237
    return versions_in_db     
1✔
238

239
def _update_associated_citation_targets(citation_change, parsed_metadata, versions_in_db):
1✔
240
    """
241
    Updates associated works for all associated records of citation_change.content in database.
242
    """
243
    if versions_in_db not in (None, [None]):
1✔
244
        logger.info("Found {} versions in database for {}".format(len(versions_in_db),citation_change.content))
1✔
245
        #adds the new citation target bibcode because it will not be in the db yet, 
246
        # and then appends the versions already in the db.
247
        associated_version_bibcodes = {'Version '+str(parsed_metadata.get('version')): parsed_metadata.get('bibcode')}
1✔
248
        associated_version_bibcodes.update(versions_in_db)
1✔
249
        logger.debug("{}: associated_versions_bibcodes".format(associated_version_bibcodes))
1✔
250
        for bibcode in versions_in_db.values():
1✔
251
            associated_registered_record = db.get_citation_targets_by_bibcode(app, [bibcode])[0] 
1✔
252
            associated_citation_change = adsmsg.CitationChange(content=associated_registered_record['content'],
1✔
253
                                    content_type=getattr(adsmsg.CitationChangeContentType, associated_registered_record['content_type'].lower()),
254
                                    status=adsmsg.Status.updated,
255
                                    timestamp=datetime.now()
256
                                    )
257
            #update associated works for all versions in db
258
            logger.info('Calling task process_updated_associated_works')
1✔
259
            task_process_updated_associated_works.delay(associated_citation_change, associated_version_bibcodes)    
1✔
260

261
@app.task(queue='process-updated-citation')
1✔
262
def task_process_updated_associated_works(citation_change, associated_versions, force=False):
1✔
263
    """
264
    Update associated works in citation record
265
    Do not emit to broker as changes to associated works are not propagated
266
    """
267
    #check if associated works is not empty
268
    updated = bool(associated_versions)
1✔
269
    metadata = db.get_citation_target_metadata(app, citation_change.content, curate=False)
1✔
270
    raw_metadata = metadata.get('raw', {})
1✔
271
    
272
    if raw_metadata:
1✔
273
        citation_target_bibcode = db.get_citation_targets_by_doi(app,[citation_change.content])[0].get('bibcode', None)
1✔
274
        parsed_metadata = metadata.get('parsed', {})
1✔
275
        curated_metadata = metadata.get('curated', {})
1✔
276
        no_self_ref_versions = {key: val for key, val in associated_versions.items() if val != citation_target_bibcode}
1✔
277
        status = metadata.get('status', 'DISCARDED')
1✔
278
        #Forward the update only if status is "REGISTERED" and associated works is not None.
279
        if status == 'REGISTERED' and updated:
1✔
280
            if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
1✔
281
                # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
282
                original_citations = db.get_citations_by_bibcode(app, citation_target_bibcode)
1✔
283
                citations = api.get_canonical_bibcodes(app, original_citations)
1✔
284
                logger.debug("Calling 'task_output_results' with '%s'", citation_change)
1✔
285
                task_output_results.delay(citation_change, parsed_metadata, citations, db_versions=no_self_ref_versions)
1✔
286
                logger.info("Updating associated works for %s", citation_change.content)
1✔
287
                db.update_citation_target_metadata(app, citation_change.content, raw_metadata, parsed_metadata, curated_metadata=curated_metadata, associated=no_self_ref_versions, bibcode=citation_target_bibcode)
1✔
288
        
289
@app.task(queue='process-deleted-citation')
1✔
290
def task_process_deleted_citation(citation_change, force=False):
1✔
291
    """
292
    Mark a citation as deleted
293
    """
294
    marked_as_deleted, previous_status = db.mark_citation_as_deleted(app, citation_change)
1✔
295
    metadata = db.get_citation_target_metadata(app, citation_change.content)
1✔
296
    parsed_metadata = metadata.get('parsed', {})
1✔
297
    citation_target_bibcode = parsed_metadata.get('bibcode', None)
1✔
298
    # Emit/forward the update only if the previous status was "REGISTERED"
299
    if marked_as_deleted and previous_status == 'REGISTERED':
1✔
300
        if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
1✔
301
            # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
302
            original_citations = db.get_citations_by_bibcode(app, citation_target_bibcode)
1✔
303
            citations = api.get_canonical_bibcodes(app, original_citations)
1✔
304
            readers = db.get_citation_target_readers(app, citation_target_bibcode, parsed_metadata.get('alternate_bibcode', []))
1✔
305
            associated_works = db.get_citation_targets_by_doi(app, [citation_change.content])[0].get('associated_works', {"":""})
1✔
306
            logger.debug("Calling 'task_output_results' with '%s'", citation_change)
1✔
307
            task_output_results.delay(citation_change, parsed_metadata, citations, db_versions=associated_works, readers=readers)
1✔
308
        logger.debug("Calling '_emit_citation_change' with '%s'", citation_change)
1✔
309
        _emit_citation_change(citation_change, parsed_metadata)
1✔
310

311
def _protobuf_to_adsmsg_citation_change(pure_protobuf):
1✔
312
    """
313
    Transforms pure citation_change protobuf to adsmsg.CitationChange,
314
    which can be safely sent via Celery/RabbitMQ.
315
    """
316
    tmp = MessageToDict(pure_protobuf, preserving_proto_field_name=True)
1✔
317
    if 'content_type' in tmp:
1✔
318
        # Convert content_type from string to value
319
        tmp['content_type'] = getattr(adsmsg.CitationChangeContentType, tmp['content_type'])
1✔
320
    else:
321
        tmp['content_type'] = 0 # default: adsmsg.CitationChangeContentType.doi
1✔
322
    recover_timestamp = False
1✔
323
    if 'timestamp' in tmp:
1✔
324
        # Ignore timestamp in string format
325
        # 'timestamp': '2019-01-03T21:00:02.010610Z'
326
        del tmp['timestamp']
1✔
327
        recover_timestamp = True
1✔
328
    citation_change =  adsmsg.CitationChange(**tmp)
1✔
329
    if recover_timestamp:
1✔
330
        # Recover timestamp in google.protobuf.timestamp_pb2.Timestamp format
331
        # 'timestamp': seconds: 1546549202 nanos: 10610000
332
        citation_change.timestamp = pure_protobuf.timestamp
1✔
333
    return citation_change
1✔
334

335
@app.task(queue='process-citation-changes')
1✔
336
def task_process_citation_changes(citation_changes, force=False):
1✔
337
    """
338
    Process citation changes
339
    """
340
    logger.debug('Checking content: %s', citation_changes)
1✔
341

342
    for citation_change in citation_changes.changes:
1✔
343
        citation_change = _protobuf_to_adsmsg_citation_change(citation_change)
1✔
344
        # Check: Is this citation already stored in the DB?
345
        citation_in_db = db.citation_already_exists(app, citation_change)
1✔
346

347
        if citation_change.status == adsmsg.Status.new:
1✔
348
            if citation_in_db:
1✔
349
                logger.error("Ignoring new citation (citing '%s', content '%s' and timestamp '%s') because it already exists in the database", citation_change.citing, citation_change.content, citation_change.timestamp.ToJsonString())
1✔
350
            else:
351
                logger.debug("Calling 'task_process_new_citation' with '%s'", citation_change)
1✔
352
                task_process_new_citation.delay(citation_change, force=force)
1✔
353
        elif citation_change.status == adsmsg.Status.updated:
1✔
354
            if not citation_in_db:
1✔
355
                logger.error("Ignoring updated citation (citing '%s', content '%s' and timestamp '%s') because it does not exist in the database", citation_change.citing, citation_change.content, citation_change.timestamp.ToJsonString())
1✔
356
            else:
357
                logger.debug("Calling 'task_process_updated_citation' with '%s'", citation_change)
1✔
358
                task_process_updated_citation.delay(citation_change, force=force)
1✔
359
        elif citation_change.status == adsmsg.Status.deleted:
1✔
360
            if not citation_in_db:
1✔
361
                logger.error("Ignoring deleted citation (citing '%s', content '%s' and timestamp '%s') because it does not exist in the database", citation_change.citing, citation_change.content, citation_change.timestamp.ToJsonString())
1✔
362
            else:
363
                logger.debug("Calling 'task_process_deleted_citation' with '%s'", citation_change)
1✔
364
                task_process_deleted_citation.delay(citation_change)
1✔
365

366
@app.task(queue='process-citation-changes')
1✔
367
def task_process_reader_updates(reader_changes, **kwargs):
1✔
368
    for change in reader_changes:
1✔
369
        registered_records = db.get_citation_targets_by_bibcode(app, [change['bibcode']])
1✔
370
        
371
        if not registered_records:
1✔
372
            registered_records = db.get_citation_targets_by_alt_bibcode(app, [change['bibcode']])
1✔
373

374
        if registered_records:
1✔
375
            registered_record = registered_records[0]
1✔
376
            logger.info("Updating reader data for {}.".format(change['bibcode']))
1✔
377
            
378
            custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
1✔
379
                                                       content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
380
                                                       status=adsmsg.Status.updated,
381
                                                       timestamp=datetime.now()
382
                                                       )
383
            parsed_metadata = db.get_citation_target_metadata(app, custom_citation_change.content).get('parsed', {})
1✔
384

385
            if change['status'] == "NEW":
1✔
386
                status = "REGISTERED"
1✔
387
                logger.info("Adding new reader for bibcode: {} to database.".format(change['bibcode']))
1✔
388
                
389
                if parsed_metadata:
1✔
390
                    logger.debug("Calling 'task_output_results' with '%s'", custom_citation_change)
1✔
391
                else:
392
                    logger.warning("No parsed metadata for citation_target: {}. Marking reader as discarded.".format(custom_citation_change.content))
1✔
393
                    status = "DISCARDED"
1✔
394
                db.store_reader_data(app, change, status)
1✔
395

396
            elif change['status'] == "DELETED":
1✔
397
                status = "DELETED"
1✔
398
                logger.info("Deleting reader {} for bibcode: {} from db.".format(change['reader'], change['bibcode']))                
1✔
399
                db.mark_reader_as_deleted(app, change)
1✔
400

401
        else:
402
            logger.info("{} is not a citation_target in the database. Discarding.".format(change['bibcode']))
1✔
403
            status = "DISCARDED"
1✔
404
            db.store_reader_data(app, change, status)
1✔
405

406
    registered_records = db.get_citation_targets_by_bibcode(app, [reader_changes[0]['bibcode']])
1✔
407
    if not registered_records:
1✔
408
        registered_records = db.get_citation_targets_by_alt_bibcode(app, [change['bibcode']])
1✔
409
        
410
    if registered_records:
1✔
411
        registered_record = registered_records[0]
1✔
412
        #We take the custom citation change for the last change in reader_changes and then use that to output all the changes to readers at the same time.
413
        custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
1✔
414
                                                        content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
415
                                                        status=adsmsg.Status.updated,
416
                                                        timestamp=datetime.now()
417
                                                        )
418
        parsed_metadata = db.get_citation_target_metadata(app, custom_citation_change.content).get('parsed', {})
1✔
419

420
        citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])   
1✔
421
        readers = db.get_citation_target_readers(app, registered_record['bibcode'], parsed_metadata.get('alternate_bibcode', []))
1✔
422
        associated_works = registered_record.get('associated_works', {"":""})
1✔
423
        logger.debug("Calling 'task_output_results' with '%s'", custom_citation_change)    
1✔
424
        task_output_results.delay(custom_citation_change, parsed_metadata, citations, readers=readers, only_nonbib=True, db_versions=associated_works)
1✔
425
    else:
426
        logger.warning("Bibcode: {} is not a target in the database. Cannot forward nonbib record to master.".format(reader_changes[0]['bibcode']))
1✔
427

428
@app.task(queue='process-citation_changes')
1✔
429
def task_write_nonbib_files(results):
1✔
430
    logger.info("Writing nonbib files to disk")
×
431
    db.write_citation_target_data(app, only_status='REGISTERED')
×
432

433
def _emit_citation_change(citation_change, parsed_metadata):
1✔
434
    """
435
    Emit citation change event if the target is a software record
436
    """
437
    is_link_alive = parsed_metadata and parsed_metadata.get("link_alive", False)
1✔
438
    is_software = parsed_metadata and parsed_metadata.get("doctype", "").lower() == "software"
1✔
439
    is_emittable = parsed_metadata and citation_change.content_type == adsmsg.CitationChangeContentType.url
1✔
440

441
    if is_software and is_link_alive:
1✔
442
        event_data = webhook.citation_change_to_event_data(citation_change, parsed_metadata)
1✔
443
        if event_data:
1✔
444
            dump_prefix = citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S")
1✔
445
            logger.debug("Calling 'task_emit_event' for '%s'", citation_change)
1✔
446
            task_emit_event.delay(event_data, dump_prefix)
1✔
447

448
    elif is_emittable and is_link_alive:
1✔
449
        event_data = webhook.citation_change_to_event_data(citation_change, parsed_metadata)
1✔
450
        if event_data:
1✔
451
            dump_prefix = citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S")
1✔
452
            logger.debug("Calling 'task_emit_event' for EMITTABLE citation '%s'", citation_change)
1✔
453
            task_emit_event.delay(event_data, dump_prefix)
1✔
454

455
@app.task(queue='process-emit-event')
1✔
456
def task_emit_event(event_data, dump_prefix):
1✔
457
    """
458
    Emit event
459
    """
460
    emitted = False
1✔
461
    relationship = event_data.get("RelationshipType", {}).get("SubType", None)
1✔
462
    source_id = event_data.get("Source", {}).get("Identifier", {}).get("ID", None)
1✔
463
    target_id = event_data.get("Target", {}).get("Identifier", {}).get("ID", None)
1✔
464

465
    if not app.conf['TESTING_MODE']:
1✔
466
        prefix = os.path.join("emitted", relationship)
1✔
467
        emitted = webhook.emit_event(app.conf['ADS_WEBHOOK_URL'], app.conf['ADS_WEBHOOK_AUTH_TOKEN'], event_data)
1✔
468
    else:
469
        prefix = os.path.join("emulated", relationship)
×
470
        emitted = True
×
471
    if isinstance(dump_prefix, str):
1✔
472
        prefix = os.path.join(prefix, dump_prefix)
1✔
473
    webhook.dump_event(event_data, prefix=prefix)
1✔
474
    stored = db.store_event(app, event_data)
1✔
475

476
    if app.conf['TESTING_MODE'] and emitted:
1✔
477
        logger.debug("Emulated emission of event due to 'testing mode' (relationship '%s', source '%s' and target '%s')", relationship, source_id, target_id)
×
478
    elif emitted:
1✔
479
        logger.debug("Emitted event (relationship '%s', source '%s' and target '%s')", relationship, source_id, target_id)
1✔
480
    else:
481
        logger.debug("Non-emitted event (relationship '%s', source '%s' and target '%s')", relationship, source_id, target_id)
×
482

483
def _remove_duplicated_dict_in_list(l):
1✔
484
    return [x for x in l if x['content'] in set([r['content'] for r in l])]
×
485

486
@app.task(queue='maintenance_canonical')
1✔
487
def task_maintenance_canonical(dois, bibcodes):
1✔
488
    """
489
    Maintenance operation:
490
    - Get all the registered citation targets (or only a subset of them if DOIs and/or bibcodes are specified)
491
    - For each, get their citations bibcodes and transform them to their canonical form
492
    - Send to master an update with the new list of citations canonical bibcodes
493
    """
494
    n_requested = len(dois) + len(bibcodes)
×
495
    if n_requested == 0:
×
496
        registered_records = db.get_citation_targets(app, only_status='REGISTERED')
×
497
    else:
498
        registered_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='REGISTERED')
×
499
        registered_records += db.get_citation_targets_by_doi(app, dois, only_status='REGISTERED')
×
500
        registered_records = _remove_duplicated_dict_in_list(registered_records)
×
501

502
    for registered_record in registered_records:
×
503
        try:
×
504
            # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
505
            original_citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])
×
506
            existing_citation_bibcodes = api.get_canonical_bibcodes(app, original_citations)
×
507

508
        except:
×
509
            logger.exception("Failed API request to retreive existing citations for bibcode '{}'".format(registered_record['bibcode']))
×
510
            continue
×
511
        custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
512
                                                       content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
513
                                                       status=adsmsg.Status.updated,
514
                                                       timestamp=datetime.now()
515
                                                       )
516
        parsed_metadata = db.get_citation_target_metadata(app, custom_citation_change.content).get('parsed', {})
×
517
        readers = db.get_citation_target_readers(app, registered_record['bibcode'], parsed_metadata.get('alternate_bibcode', []))
×
518

519
        if parsed_metadata:
×
520
            logger.debug("Calling 'task_output_results' with '%s'", custom_citation_change)
×
521
            task_output_results.delay(custom_citation_change, parsed_metadata, existing_citation_bibcodes, db_versions=registered_record.get('associated_works', {"":""}), readers=readers)
×
522
   
523
@app.task(queue='maintenance_metadata')
1✔
524
def task_maintenance_metadata(dois, bibcodes, reset=False):
1✔
525
    """
526
    Maintenance operation:
527
    - Get all the registered citation targets (or only a subset of them if DOIs and/or bibcodes are specified)
528
    - For each, retreive metadata and if it is different to what we have in our database:
529
        - Get the citations bibcodes and transform them to their canonical form
530
        - Send to master an update with the new metadata and the current list of citations canonical bibcodes
531
    """
532
    n_requested = len(dois) + len(bibcodes)
×
533
    if n_requested == 0:
×
534
        registered_records = db.get_citation_targets(app, only_status='REGISTERED')
×
535
    else:
536
        registered_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='REGISTERED')
×
537
        registered_records += db.get_citation_targets_by_doi(app, dois, only_status='REGISTERED')
×
538
        registered_records = _remove_duplicated_dict_in_list(registered_records)
×
539

540
    for registered_record in registered_records:
×
541
        updated = False
×
542
        bibcode_replaced = {}
×
543
        # Fetch DOI metadata (if HTTP request fails, an exception is raised
544
        # and the task will be re-queued (see app.py and adsputils))
545

546
        curated_metadata = registered_record.get('curated_metadata', {})
×
547

548
        logger.debug("Curated metadata for {} is {}".format(registered_record['content'], registered_record['curated_metadata']))    
×
549
        raw_metadata = doi.fetch_metadata(app.conf['DOI_URL'], app.conf['DATACITE_URL'], registered_record['content'])
×
550
        if raw_metadata:
×
551
            parsed_metadata = doi.parse_metadata(raw_metadata)
×
552
            is_software = parsed_metadata.get('doctype', '').lower() == "software"
×
553
            bibcode = registered_record.get('bibcode', None)
×
554
            if not is_software:
×
555
                logger.error("The new metadata for '%s' has changed its 'doctype' and it is not 'software' anymore", registered_record['bibcode'])
×
556
            elif parsed_metadata.get('bibcode') in (None, ""):
×
557
                logger.error("The new metadata for '%s' affected the metadata parser and it did not correctly compute a bibcode", registered_record['bibcode'])
×
558
            else:
559
                # Detect concept DOIs: they have one or more versions of the software
560
                # and they are not a version of something else
561
                concept_doi = len(parsed_metadata.get('version_of', [])) == 0 and len(parsed_metadata.get('versions', [])) >= 1
×
NEW
562
                if concept_doi: 
×
NEW
563
                    concept_metadata=db.get_citation_target_metadata(app, registered_record['content'], curate=True, concept=concept_doi)['parsed']
×
564
                different_bibcodes = registered_record['bibcode'] != parsed_metadata['bibcode']
×
NEW
565
                if different_bibcodes and concept_doi:
×
566
                    # Concept DOI publication date changes with newer software version
567
                    # and authors can also change (i.e., first author last name initial)
568
                    # but we want to respect the year in the bibcode, which corresponds
569
                    # to the year of the latest release when it was first ingested
570
                    # by ADS
NEW
571
                    parsed_metadata['pubdate'] = concept_metadata['pubdate']
×
NEW
572
                    parsed_metadata['bibcode'] = concept_metadata['pubdate'][:4] + parsed_metadata['bibcode'][4:]
×
NEW
573
                    parsed_metadata['bibcode'] = parsed_metadata['bibcode'][:-1] + parsed_metadata['bibcode'][-1].upper()                   
×
574
                    # Re-verify if bibcodes are still different (they could be if
575
                    # name parsing has changed):
576
                    different_bibcodes = registered_record['bibcode'] != parsed_metadata['bibcode']
×
577
                    if not different_bibcodes:
×
578
                        logger.debug("bibcode change limited to bibcode year.")
×
579
                
580
                if different_bibcodes:
×
581
                    # These two bibcodes are identical and we can signal the broker
582
                    event_data = webhook.identical_bibcodes_event_data(registered_record['bibcode'], parsed_metadata['bibcode'])
×
583
                    if event_data:
×
584
                        dump_prefix = datetime.now().strftime("%Y%m%d") # "%Y%m%d_%H%M%S"
×
585
                        logger.debug("Calling 'task_emit_event' for '%s' IsIdenticalTo '%s'", registered_record['bibcode'], parsed_metadata['bibcode'])
×
586
                        task_emit_event.delay(event_data, dump_prefix)
×
587
                    # If there is no curated metadata modify record and note replaced bibcode
588
                    if not curated_metadata:
×
589
                        logger.warning("Parsing the new metadata for citation target '%s' produced a different bibcode: '%s'. The former will be moved to the 'alternate_bibcode' list, and the new one will be used as the main one.", registered_record['bibcode'], parsed_metadata.get('bibcode', None))
×
590
                        alternate_bibcode = parsed_metadata.get('alternate_bibcode', [])
×
591
                        alternate_bibcode += registered_record.get('alternate_bibcode', [])
×
592
                        if registered_record['bibcode'] not in alternate_bibcode:
×
593
                            alternate_bibcode.append(registered_record['bibcode'])
×
594
                        parsed_metadata['alternate_bibcode'] = alternate_bibcode
×
595
                        bibcode = parsed_metadata.get('bibcode', None)
×
596
                        bibcode_replaced = {'previous': registered_record['bibcode'], 'new': parsed_metadata['bibcode'] }
×
597
                
598
                #Protect curated metadata from being bulldozed by metadata updates. 
599
                if curated_metadata:
×
600
                    logger.info("Re-applying curated metadata for {}".format(registered_record.get('bibcode')))
×
601
                    modified_metadata = db.generate_modified_metadata(parsed_metadata, curated_metadata)
×
602
                    zenodo_bibstem = "zndo"
×
603
                    #generate bibcode for modified metadata
604
                    bibcode = registered_record.get('bibcode')
×
605
                    new_bibcode = doi.build_bibcode(modified_metadata, doi.zenodo_doi_re, zenodo_bibstem)
×
606
                    #Make sure new bibcode still respects the original publication year.
607
                    new_bibcode = bibcode[:4] + new_bibcode[4:]
×
608
                    alternate_bibcode = registered_record.get('alternate_bibcode', [])
×
609
                    #confirm new_bibcode not in alternate_bibcode list
610
                    try:
×
611
                        alternate_bibcode.remove(new_bibcode)
×
612
                    except:
×
613
                        logger.debug("{} not in alternate_bibcodes".format(new_bibcode))
×
614
                    #Add the clean alternate bibcode list to the parsed metadata
615
                    parsed_metadata['alternate_bibcode'] = list(set(alternate_bibcode))
×
616
                    if 'alternate_bibcode' in curated_metadata.keys():
×
617
                        alternate_bibcode = list(set(alternate_bibcode+curated_metadata['alternate_bibcode']))
×
618
                    #Checks if the new bibcode is now different from the one generated for parsed metadata
619
                    if new_bibcode != parsed_metadata.get('bibcode'):
×
620
                        if parsed_metadata.get('bibcode') not in alternate_bibcode:
×
621
                            #generate complete alt bibcode list including any curated entries
622
                            alternate_bibcode.append(parsed_metadata.get('bibcode'))
×
623
                            #Add the CC generated bibcode to the parsed metadata
624
                            parsed_metadata['alternate_bibcode'].append(parsed_metadata.get('bibcode'))
×
625
                            logger.warning("Parsing the curated metadata for citation target '%s' produced a different bibcode: '%s'. The former will be moved to the 'alternate_bibcode' list, and the new one will be used as the main one.", parsed_metadata['bibcode'], new_bibcode)
×
626
                        #Remove duplicate bibcodes
627
                        parsed_metadata['alternate_bibcode'] = list(set(parsed_metadata.get('alternate_bibcode')))
×
628
                        #Sort bibcodes so CC doesn't think the data has changed and call for an unnecessary update.
629
                        parsed_metadata['alternate_bibcode'].sort()
×
630
                        #set new bibcode
631
                        modified_metadata['bibcode'] = new_bibcode
×
632
                        #Only note bibcode is replaced if the bibcode actually differs from the registered record.
633
                        if new_bibcode != registered_record.get('bibcode'):
×
634
                            bibcode_replaced = {'previous': registered_record['bibcode'], 'new': parsed_metadata['bibcode'] }
×
635
                        else:
636
                            bibcode_replaced = {}
×
637
                        #set curated metadata alt bibcodes sort alt bibcodes or else CC thinks the data has changed.
638
                        alternate_bibcode.sort()
×
639
                        curated_metadata['alternate_bibcode'] = alternate_bibcode
×
640
                    modified_metadata['alternate_bibcode'] = alternate_bibcode
×
641
                else:
642
                    modified_metadata = parsed_metadata
×
643
                    #make sure old alternate bibcodes aren't clobbered.
644
                    if registered_record.get('alternate_bibcode'):
×
645
                        alternate_bibcode = parsed_metadata.get('alternate_bibcode',[])
×
646
                        alternate_bibcode += registered_record.get('alternate_bibcode')
×
647
                        parsed_metadata['alternate_bibcode'] = list(set(alternate_bibcode))
×
648
                
649
                updated = db.update_citation_target_metadata(app, registered_record['content'], raw_metadata, parsed_metadata, curated_metadata=curated_metadata, bibcode=bibcode, associated=registered_record.get('associated_works', {"":""}))
×
650
        
651
        if updated:
×
652
            citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
653
                                                           content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
654
                                                           status=adsmsg.Status.updated,
655
                                                           timestamp=datetime.now()
656
                                                           )
657
            if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
×
658
                # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
659
                original_citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])
×
660
                citations = api.get_canonical_bibcodes(app, original_citations)
×
661
                readers = db.get_citation_target_readers(app, registered_record['bibcode'], parsed_metadata.get('alternate_bibcode', []))
×
662
                logger.debug("Calling 'task_output_results' with '%s'", citation_change)
×
663
                task_output_results.delay(citation_change, modified_metadata, citations, bibcode_replaced=bibcode_replaced, db_versions=registered_record.get('associated_works', {"":""}), readers=readers)     
×
664

665
@app.task(queue='maintenance_metadata')
1✔
666
def task_maintenance_curation(dois, bibcodes, curated_entries, reset=False):
1✔
667
    """
668
    Maintenance operation:
669
    - Get all the registered citation targets for the entries specified in curated_entries
670
    - For each, retreive metadata and if it is different to what we have in our database:
671
        - Get the citations bibcodes and transform them to their canonical form
672
        - Replace the retrieved metadata for values specified in curated_entries
673
        - Send to master an update with the new metadata and the current list of citations canonical bibcodes
674
    """
675
    for curated_entry in curated_entries:
×
676
        updated = False
×
677
        bibcode_replaced = {}
×
678
        
679
        #Try by doi.
680
        if curated_entry.get('doi'):
×
681
            registered_records = db.get_citation_targets_by_doi(app, [curated_entry.get('doi')], only_status='REGISTERED')          
×
682
        #If not, retrieve entry by bibcode.
683
        elif curated_entry.get('bibcode'):
×
684
            registered_records = db.get_citation_targets_by_bibcode(app, [curated_entry.get('bibcode')], only_status='REGISTERED')
×
685
        #report error
686
        else:
687
            logger.error('Unable to retrieve entry for {} from database. Please check input file.'.format(curated_entry))
×
688
        
689
        if registered_records:
×
690
            registered_record = registered_records[0]
×
691
            metadata = db.get_citation_target_metadata(app, registered_record.get('content', ''), curate=False)
×
692
            raw_metadata = metadata.get('raw', '')
×
693
            parsed_metadata = metadata.get('parsed', '')
×
694
            #remove doi and bibcode from metadata to be stored in db.
695
            for key in ['bibcode','doi']:
×
696
                try:
×
697
                    curated_entry.pop(key)
×
698
                except KeyError as e:
×
699
                    logger.warn("Failed to remove key: {} with error {}. Key likely not in curated_metadata.".format(key, e))
×
700
                    continue
×
701
            try:
×
702
                if not reset:
×
703
                    if 'authors' in curated_entry.keys():
×
704
                        #checks to make sure authors are in a list. Errors out if not.
705
                        if isinstance(curated_entry.get('authors', []), list):
×
706
                            curated_entry['normalized_authors'] = doi.renormalize_author_names(curated_entry.get('authors', None))
×
707
                        else:
708
                            logger.error("'author' key is not a list of authors. Stopping.")
×
709
                            err = "'authors' is not a valid list of strings"
×
710
                            raise TypeError(err)
×
711
                    #only check old metadata if we are adding updates, otherwise ignore.
712
                    if curated_entry != registered_record.get('curated_metadata'):
×
713
                        for key in registered_record['curated_metadata'].keys():
×
714
                            #first apply any previous edits to metadata that are not overwritten by new metadata.
715
                            if key != "error" and key not in curated_entry.keys():
×
716
                                curated_entry[key] = registered_record['curated_metadata'][key]
×
717
                    else:
718
                        logger.warn("Supplied metadata is identical to previously added metadata. No updates will occur.")
×
719
                    logger.debug("Curated entry: {}".format(curated_entry))
×
720
                    modified_metadata = db.generate_modified_metadata(parsed_metadata, curated_entry)
×
721
                    logger.debug("Modified bibcode {}".format(modified_metadata.get('bibcode')))
×
722
                    #regenerate bibcode with curated_metadata and append old bibcode to alternate_bibcode 
723
                    zenodo_bibstem = "zndo"
×
724
                    #generates new bibcodes with manual curation data
725
                    new_bibcode = doi.build_bibcode(modified_metadata, doi.zenodo_doi_re, zenodo_bibstem)
×
726
                    modified_metadata['bibcode'] = new_bibcode
×
727
                    #get the original list of alt bibcodes
728
                    alternate_bibcode = registered_record.get('alternate_bibcode', [])
×
729
                    #set parsed_metadata alt bibcodes to match original list
730
                    parsed_metadata['alternate_bibcode'] = registered_record.get('alternate_bibcode', [])
×
731
                    #checks for provided alt bibcodes from manual curation
732
                    if 'alternate_bibcode' in curated_entry.keys():
×
733
                        #checks to make sure alternate_bibcodes are in a list. Errors out if not.
734
                        if isinstance(curated_entry.get('alternate_bibcode', []), list):
×
735
                            alternate_bibcode = list(set(alternate_bibcode+curated_entry['alternate_bibcode']))
×
736
                            logger.debug('alternate bibcodes are {}'.format(alternate_bibcode))
×
737
                        else:
738
                            logger.error("'alternate_bibcodes' key is not a list of alternate_bibcodes. Stopping.")
×
739
                            err = "'alternate_bibcodes' is not a valid list of bibcodes"
×
740
                            raise TypeError(err)
×
741

742
                    #checks to make sure the main bibcode is not in the alt bibcodes
743
                    try:
×
744
                        alternate_bibcode.remove(modified_metadata.get('bibcode'))
×
745
                    except:
×
746
                        pass
×
747
                    #checks if bibcode has changed due to manual curation metadata
748
                    if new_bibcode != registered_record.get('bibcode'):
×
749
                        logger.warning("Parsing the new metadata for citation target '%s' produced a different bibcode: '%s'. The former will be moved to the 'alternate_bibcode' list, and the new one will be used as the main one.", registered_record['bibcode'],new_bibcode)
×
750
                        #Add old bibcode to alt bibcodes
751
                        if registered_record.get('bibcode') not in alternate_bibcode:
×
752
                            #generate complete alt bibcode list including any curated entries
753
                            alternate_bibcode.append(registered_record.get('bibcode'))
×
754
                            #Add the CC generated bibcode to the parsed metadata
755
                            parsed_metadata['alternate_bibcode'].append(registered_record.get('bibcode'))
×
756
                        #removes duplicates from parsed_metadata alt bibcodes
757
                        parsed_metadata['alternate_bibcode'] = list(set(parsed_metadata.get('alternate_bibcode')))
×
758
                        #sets new bibcode
759
                        modified_metadata['bibcode'] = new_bibcode
×
760
                        #removes duplicates from all alt bibcodes including ones provided by manual curation
761
                        alternate_bibcode = list(set(alternate_bibcode))
×
762
                        #updates curated entry alt bibcodes only if a new bibcode is generated due to manual curation
763
                        curated_entry['alternate_bibcode'] = alternate_bibcode
×
764
                        #marks bibcode as replaced
765
                        bibcode_replaced = {'previous': registered_record['bibcode'], 'new': new_bibcode}
×
766
                    #sets modified metadata alt bibcodes to match the full list of alt bibcodes.
767
                    modified_metadata['alternate_bibcode'] = alternate_bibcode
×
768
                    
769
                else:
770
                    #Check to see if curated_metadata exists for the record.
771
                    if registered_record['curated_metadata']:
×
772
                        #Repopulate parsed_metadata with expected bibcode information from parsed_cited_metadata.
773
                        logger.debug("Resetting citation to original parsed metadata")
×
774
                        #regenerate bibcode with parsed_metadata and append old bibcode to alternate_bibcode 
775
                        zenodo_bibstem = "zndo"
×
776
                        new_bibcode = doi.build_bibcode(parsed_metadata, doi.zenodo_doi_re, zenodo_bibstem)
×
777
                        parsed_metadata['bibcode'] = new_bibcode
×
778
                        #get original alt bibcodes
779
                        alternate_bibcode = registered_record.get('alternate_bibcode', [])
×
780
                        parsed_metadata['alternate_bibcode'] = registered_record.get('alternate_bibcode', [])
×
781
                        #reset bibcode if changed
782
                        if new_bibcode != registered_record.get('bibcode'):
×
783
                            logger.warn("Parsing the new metadata for citation target '%s' produced a different bibcode: '%s'. The former will be moved to the 'alternate_bibcode' list, and the new one will be used as the main one.", registered_record['bibcode'],new_bibcode)
×
784
                            #Add old bibcode to alt bibcodes
785
                            if registered_record.get('bibcode') not in alternate_bibcode:
×
786
                                alternate_bibcode.append(registered_record.get('bibcode'))
×
787
                            #set bibcode replaced if necessary
788
                            bibcode_replaced = {'previous': registered_record['bibcode'], 'new': parsed_metadata['bibcode'] }
×
789
                        #set alt bibcodes to full list but try and remove canonical bibcode from alt list
790
                        try:
×
791
                            alternate_bibcode.remove(parsed_metadata.get('bibcode'))
×
792
                        except:
×
793
                            #we pass because this just means the canonical bibcode is not in the list of alt bibcodes
794
                            pass
×
795
                        parsed_metadata['alternate_bibcode'] = list(set(alternate_bibcode))
×
796
                        #reset modified metadata
797
                        modified_metadata = parsed_metadata
×
798
                        #clear curated metadata
799
                        curated_entry = {}
×
800
                    else:
801
                        modified_metadata = parsed_metadata
×
802
                        logger.warn("Cannot delete curated metadata for {}. No curated metadata exists.".format(registered_record.get('content', '')))
×
803
                
804
                different_bibcodes = registered_record['bibcode'] != modified_metadata['bibcode']
×
805
                if different_bibcodes:
×
806
                    event_data = webhook.identical_bibcodes_event_data(registered_record['bibcode'], modified_metadata['bibcode'])
×
807
                    if event_data:
×
808
                        dump_prefix = datetime.now().strftime("%Y%m%d") # "%Y%m%d_%H%M%S"
×
809
                        logger.debug("Calling 'task_emit_event' for '%s' IsIdenticalTo '%s'", registered_record['bibcode'], modified_metadata['bibcode'])
×
810
                        task_emit_event.delay(event_data, dump_prefix)
×
811
                    
812
                updated = db.update_citation_target_metadata(app, registered_record['content'], raw_metadata, parsed_metadata, curated_metadata=curated_entry, bibcode=modified_metadata.get('bibcode'), associated=registered_record.get('associated_works', {"":""}))
×
813
                if updated:
×
814
                    citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
815
                                                                content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
816
                                                                status=adsmsg.Status.updated,
817
                                                                timestamp=datetime.now()
818
                                                                )
819
                    if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
×
820
                        # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
821
                        original_citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])
×
822
                        citations = api.get_canonical_bibcodes(app, original_citations)
×
823
                        readers = db.get_citation_target_readers(app, registered_record['bibcode'], parsed_metadata.get('alternate_bibcode', []))
×
824
                        logger.debug("Calling 'task_output_results' with '%s'", citation_change)
×
825
                        task_output_results.delay(citation_change, modified_metadata, citations, bibcode_replaced=bibcode_replaced, db_versions=registered_record.get('associated_works', {"":""}), readers=readers)    
×
826
                else:
827
                    logger.warn("Curated metadata did not result in a change to recorded metadata for {}.".format(registered_record.get('content')))
×
828
            except Exception as e:
×
829
                err = "task_maintenance_curation Failed to update metadata for {} with Exception: {}. Please check the input data and try again.".format(curated_entry, e)
×
830
                err_dict = registered_record.get('curated_metadata', {})
×
831
                err_dict['error'] = err
×
832
                db.update_citation_target_curator_message(app, registered_record['content'], err_dict)
×
833
                logger.exception(err)
×
834
                raise
×
835
        else:
836
            logger.error('Unable to retrieve entry for {} from database. Please check input file.'.format(curated_entry))
×
837

838
def maintenance_show_metadata(curated_entries):
1✔
839
    """
840
    Print current metadata for a given citation target to standard output.
841
    """
842
    for curated_entry in curated_entries:
×
843

844
        if curated_entry.get('doi'):
×
845
            try:
×
846
                registered_record = db.get_citation_targets_by_doi(app, [curated_entry.get('doi')], only_status='REGISTERED')[0]   
×
847
            except Exception:
×
848
                msg = "Failed to retrieve citation target {}. Please confirm information is correct and citation target is in database.".format(curated_entry)
×
849
                logger.exception(msg)
×
850
                raise Exception(msg)
×
851

852
            custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
853
                                                    content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
854
                                                    status=adsmsg.Status.updated,
855
                                                    timestamp=datetime.now()
856
                                                    )
857
            try:
×
858
                metadata = db.get_citation_target_metadata(app, custom_citation_change.content)
×
859
                parsed = metadata.get('parsed', None)
×
860
                curated = metadata.get('curated', None)
×
861
                if parsed:
×
862
                    print(json.dumps(parsed))
×
863
                if "error" in curated.keys():
×
864
                    print("\n The most recent attempt to curate metadata failed with the following error: {}".format(curated.get("error", "")))
×
865

866
            except Exception:
×
867
                msg = "Failed to load metadata for citation {}. Please confirm information is correct and citation target is in database.".format(curated_entry)
×
868
                logger.exception(msg)
×
869
            
870
        #If no doi, try and retrieve entry by bibcode.
871
        elif curated_entry.get('bibcode'):
×
872
            try:
×
873
                registered_record = db.get_citation_targets_by_bibcode(app, [curated_entry.get('bibcode')], only_status='REGISTERED')[0]   
×
874
            except Exception:
×
875
                msg = "Failed to retrieve citation target {}. Please confirm information is correct and citation target is in database.".format(curated_entry)
×
876
                logger.exception(msg)
×
877
                raise Exception(msg)
×
878

879
            custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
880
                                                    content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
881
                                                    status=adsmsg.Status.updated,
882
                                                    timestamp=datetime.now()
883
                                                    )
884
            try:
×
885
                metadata = db.get_citation_target_metadata(app, custom_citation_change.content)
×
886
                parsed = metadata.get('parsed', None)
×
887
                curated = metadata.get('curated', None)
×
888
                if parsed:
×
889
                    print(json.dumps(parsed))
×
890
                if "error" in curated.keys():
×
891
                    print("\n The most recent attempt to curate metadata failed with the following error: {}".format(curated.get("error", "")))
×
892

893
            except Exception:
×
894
                msg = "Failed to load metadata for citation {}. Please confirm information is correct and citation target is in database.".format(curated_entry)
×
895
                logger.exception(msg)
×
896

897
@app.task(queue='maintenance_metadata')
1✔
898
def task_maintenance_repopulate_bibcode_columns():
1✔
899
    """
900
    Re-populates bibcode column with current canonical bibcode
901
    """
902
    with app.session_scope() as session:
×
903
        db.populate_bibcode_column(session)
×
904

905
@app.task(queue='maintenance_resend')
1✔
906
def task_maintenance_resend(dois, bibcodes, broker, only_nonbib=False):
1✔
907
    """
908
    Maintenance operation:
909
    - Get all the registered citation targets (or only a subset of them if DOIs and/or bibcodes are specified)
910
    - For each:
911
        - Re-send to master (or broker) an update with the current metadata and the current list of citations canonical bibcodes
912
    """
913
    n_requested = len(dois) + len(bibcodes)
×
914
    if n_requested == 0:
×
915
        registered_records = db.get_citation_targets(app, only_status='REGISTERED')
×
916
        if broker:
×
917
            emittable_records = db.get_citation_targets(app, only_status='EMITTABLE')
×
918
        else:
919
            emittable_records=[]
×
920
    else:
921
        registered_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='REGISTERED')
×
922
        registered_records += db.get_citation_targets_by_doi(app, dois, only_status='REGISTERED')
×
923
        registered_records = _remove_duplicated_dict_in_list(registered_records)
×
924

925
        if broker:
×
926
            emittable_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='EMITTABLE')
×
927
            emittable_records = _remove_duplicated_dict_in_list(emittable_records)
×
928
        else:
929
            emittable_records = []
×
930

931
    for registered_record in registered_records:
×
932
        citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])
×
933
        custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
934
                                                       content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
935
                                                       status=adsmsg.Status.updated,
936
                                                       timestamp=datetime.now()
937
                                                       )
938
        parsed_metadata = db.get_citation_target_metadata(app, custom_citation_change.content).get('parsed', {})
×
939
        if parsed_metadata:
×
940
            if not broker:
×
941
                # Only update master
942
                readers = db.get_citation_target_readers(app, parsed_metadata.get('bibcode',''), parsed_metadata.get('alternate_bibcode', []))
×
943
                logger.debug("Calling 'task_output_results' with '%s'", custom_citation_change)
×
944
                task_output_results.delay(custom_citation_change, parsed_metadata, citations, db_versions=registered_record.get('associated_works',{"":""}), readers=readers, only_nonbib=only_nonbib)
×
945

946
            else:
947
                # Only re-emit to the broker
948
                # Signal that the target bibcode and the DOI are identical
949
                event_data = webhook.identical_bibcode_and_doi_event_data(registered_record['bibcode'], registered_record['content'])
×
950
                if event_data:
×
951
                    dump_prefix = custom_citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S_resent")
×
952
                    logger.debug("Calling 'task_emit_event' for '%s' IsIdenticalTo '%s'", registered_record['bibcode'], registered_record['content'])
×
953
                    task_emit_event.delay(event_data, dump_prefix)
×
954
                # And for each citing bibcode to the target DOI
955
                for citing_bibcode in citations:
×
956
                    emit_citation_change = adsmsg.CitationChange(citing=citing_bibcode,
×
957
                                                                   content=registered_record['content'],
958
                                                                   content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
959
                                                                   status=adsmsg.Status.new,
960
                                                                   timestamp=datetime.now()
961
                                                                   )
962
                    # Signal that the citing bibcode cites the DOI
963
                    event_data = webhook.citation_change_to_event_data(emit_citation_change, parsed_metadata)
×
964
                    if event_data:
×
965
                        dump_prefix = emit_citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S_resent")
×
966
                        logger.debug("Calling 'task_emit_event' for '%s'", emit_citation_change)
×
967
                        task_emit_event.delay(event_data, dump_prefix)
×
968
    if broker:
×
969
        for emittable_record in emittable_records:
×
970
            citations = db.get_citations_by_bibcode(app, emittable_record['bibcode'])
×
971
            custom_citation_change = adsmsg.CitationChange(content=emittable_record['content'],
×
972
                                                        content_type=getattr(adsmsg.CitationChangeContentType, emittable_record['content_type'].lower()),
973
                                                        status=adsmsg.Status.updated,
974
                                                        timestamp=datetime.now()
975
                                                        )
976
            parsed_metadata = db.get_citation_target_metadata(app, custom_citation_change.content).get('parsed', {})
×
977
            if parsed_metadata:
×
978
                # And for each citing bibcode to the target DOI
979
                for citing_bibcode in citations:
×
980
                    emit_citation_change = adsmsg.CitationChange(citing=citing_bibcode,
×
981
                                                                content=emittable_record['content'],
982
                                                                content_type=getattr(adsmsg.CitationChangeContentType, emittable_record['content_type'].lower()),
983
                                                                status=adsmsg.Status.new,
984
                                                                timestamp=datetime.now()
985
                                                                )
986
                    # Signal that the citing bibcode cites the DOI
987
                    event_data = webhook.citation_change_to_event_data(emit_citation_change, parsed_metadata)
×
988
                    if event_data:
×
989
                        dump_prefix = emit_citation_change.timestamp.ToDatetime().strftime("%Y%m%d_%H%M%S_resent")
×
990
                        logger.debug("Calling 'task_emit_event' for '%s'", emit_citation_change)
×
991
                        task_emit_event.delay(event_data, dump_prefix)
×
992

993
@app.task(queue='maintenance_reevaluate')
1✔
994
def task_maintenance_reevaluate(dois, bibcodes):
1✔
995
    """
996
    Maintenance operation:
997
    - Get all the registered citation targets (or only a subset of them if DOIs and/or bibcodes are specified)
998
    - For each, retreive metadata and if it is different to what we have in our database:
999
        - Get the citations bibcodes and transform them to their canonical form
1000
        - Send to master an update with the new metadata and the current list of citations canonical bibcodes
1001
    """
1002
    n_requested = len(dois) + len(bibcodes)
×
1003
    if n_requested == 0:
×
1004
        discarded_records = db.get_citation_targets(app, only_status='DISCARDED')
×
1005
    else:
1006
        discarded_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='DISCARDED')
×
1007
        discarded_records += db.get_citation_targets_by_doi(app, dois, only_status='DISCARDED')
×
1008
        discarded_records = _remove_duplicated_dict_in_list(discarded_records)
×
1009

1010
    for previously_discarded_record in discarded_records:
×
1011
        updated = False
×
1012
        bibcode_replaced = {}
×
1013
        # Fetch DOI metadata (if HTTP request fails, an exception is raised
1014
        # and the task will be re-queued (see app.py and adsputils))
1015
        if previously_discarded_record['content_type'] == 'DOI':
×
1016
            raw_metadata = doi.fetch_metadata(app.conf['DOI_URL'], app.conf['DATACITE_URL'], previously_discarded_record['content'])
×
1017
            if raw_metadata:
×
1018
                parsed_metadata = doi.parse_metadata(raw_metadata)
×
1019
                is_software = parsed_metadata.get('doctype', '').lower() == "software"
×
1020
                if not is_software:
×
1021
                    logger.error("Discarded '%s', it is not 'software'", previously_discarded_record['content'])
×
1022
                elif parsed_metadata.get('bibcode') in (None, ""):
×
1023
                    logger.error("The metadata for '%s' could not be parsed correctly and it did not correctly compute a bibcode", previously_discarded_record['content'])
×
1024
                else:
1025
                    # Create citation target in the DB
1026
                    updated = db.update_citation_target_metadata(app, previously_discarded_record['content'], raw_metadata, parsed_metadata, status='REGISTERED')
×
1027
                    if updated:
×
1028
                        db.mark_all_discarded_citations_as_registered(app, previously_discarded_record['content'])
×
1029
            if updated:
×
1030
                citation_change = adsmsg.CitationChange(content=previously_discarded_record['content'],
×
1031
                                                            content_type=getattr(adsmsg.CitationChangeContentType, previously_discarded_record['content_type'].lower()),
1032
                                                            status=adsmsg.Status.new,
1033
                                                            timestamp=datetime.now()
1034
                                                            )
1035
                if citation_change.content_type == adsmsg.CitationChangeContentType.doi:
×
1036
                    # Get citations from the database and transform the stored bibcodes into their canonical ones as registered in Solr.
1037
                    original_citations = db.get_citations_by_bibcode(app, parsed_metadata['bibcode'])
×
1038
                    citations = api.get_canonical_bibcodes(app, original_citations)
×
1039
                    readers = db.get_citation_target_readers(app, parsed_metadata['bibcode'], parsed_metadata.get('alternate_bibcode', []))
×
1040
                    logger.debug("Calling 'task_output_results' with '%s'", citation_change)
×
1041
                    task_output_results.delay(citation_change, parsed_metadata, citations, bibcode_replaced=bibcode_replaced, db_versions=previously_discarded_record.get('associated_works',{"":""}), readers=readers)
×
1042

1043
@app.task(queue='maintenance_resend')
1✔
1044
def task_maintenance_generate_nonbib_files():
1✔
1045
    """
1046
    Write DataPipeline files based on the current state of the CC database.
1047
    """
1048
    logger.info("Rewriting nonbib files to disk")
×
1049
    db.write_citation_target_data(app, only_status='REGISTERED')                                                     
×
1050

1051
@app.task(queue='maintenance_associated_works')
1✔
1052
def task_maintenance_reevaluate_associated_works(dois, bibcodes):
1✔
1053
    """
1054
    Maintenance operation:
1055
    - Get all the registered citation targets (or only a subset of them if DOIs and/or bibcodes are specified)
1056
    - For each, retreive metadata and:
1057
        - Get associated works from metadata.
1058
        - Determine which associated works are currently in the db.
1059
        - Send updates to master with the added associated works.
1060
    """
1061
    n_requested = len(dois) + len(bibcodes)
×
1062
    if n_requested == 0:
×
1063
        registered_records = db.get_citation_targets(app, only_status='REGISTERED')
×
1064
    else:
1065
        registered_records = db.get_citation_targets_by_bibcode(app, bibcodes, only_status='REGISTERED')
×
1066
        registered_records += db.get_citation_targets_by_doi(app, dois, only_status='REGISTERED')
×
1067
        registered_records = _remove_duplicated_dict_in_list(registered_records)
×
1068

1069
    #convert record into citation_change message
1070
    for registered_record in registered_records:
×
1071
        citations = db.get_citations_by_bibcode(app, registered_record['bibcode'])
×
1072
        custom_citation_change = adsmsg.CitationChange(content=registered_record['content'],
×
1073
                                                       content_type=getattr(adsmsg.CitationChangeContentType, registered_record['content_type'].lower()),
1074
                                                       status=adsmsg.Status.updated,
1075
                                                       timestamp=datetime.now()
1076
                                                       )
1077
        metadata = db.get_citation_target_metadata(app, registered_record['content'])
×
1078
        raw_metadata = metadata.get('raw', {})
×
1079

1080
        #confirm citation is registered software, then check for associated works.
1081
        if raw_metadata:
×
1082
            parsed_metadata = metadata.get('parsed', {})
×
1083
            is_software = parsed_metadata.get('doctype', '').lower() == "software"
×
1084
            if not is_software:
×
1085
                logger.error("Discarded '%s', it is not 'software'", registered_record['content'])
×
1086
            elif parsed_metadata.get('bibcode') in (None, ""):
×
1087
                logger.error("The metadata for '%s' could not be parsed correctly and it did not correctly compute a bibcode", registered_record['content'])
×
1088
            else:
1089
                logger.debug("Checking associated records for '%s'", custom_citation_change)
×
1090
                #Check for additional versions
1091
                try:
×
1092
                    all_versions_doi = doi.fetch_all_versions_doi(app.conf['DOI_URL'], app.conf['DATACITE_URL'], parsed_metadata)
×
1093
                except:
×
1094
                    logger.error("Unable to recover related versions for {}", custom_citation_change)
×
1095
                    all_versions_doi = None
×
1096
                #fetch additional versions from db if they exist.
1097
                if all_versions_doi['versions'] not in (None,[]):
×
1098
                    logger.debug("Found {} versions for {}".format(len(all_versions_doi['versions']), custom_citation_change.content))
×
1099
                    versions_in_db = db.get_associated_works_by_doi(app, all_versions_doi)
×
1100
                    #Only add bibcodes if there are versions in db, otherwise leave as None.
1101
                    if versions_in_db not in (None, [None]) and registered_record.get('associated_works', None) != versions_in_db:
×
1102
                        logger.info("Found {} versions in database for {}".format(len(versions_in_db), custom_citation_change.content))
×
1103
                        logger.debug("{}: associated_versions_bibcodes".format(versions_in_db))
×
1104
                        task_process_updated_associated_works.delay(custom_citation_change, versions_in_db)
×
1105
                    
1106
@app.task(queue='output-results')
1✔
1107
def task_output_results(citation_change, parsed_metadata, citations, db_versions={"":""}, bibcode_replaced={}, readers=[], only_nonbib=False):
1✔
1108
    """
1109
    This worker will forward results to the outside
1110
    exchange (typically an ADSMasterPipeline) to be
1111
    incorporated into the storage
1112

1113
    :param citation_change: contains citation changes
1114
    :return: no return
1115
    """
1116
    try:
1✔
1117
        entry_date = db.get_citation_target_entry_date(app, citation_change.content)
1✔
1118
    except:
×
1119
        try:
×
1120
            entry_date = db.get_citation_target_entry_date(app, citation_change['content'])
×
1121
        except Exception as e:
×
1122
            logger.error("Failed to retrieve entry date for {}".format(citation_change))
×
1123

1124
    messages = []
1✔
1125
    if bibcode_replaced:
1✔
1126
        # Bibcode was replaced, this is not a simple update
1127
        # we need to issue a deletion of the previous record
1128
        logger.debug("Calling delete for record: {}".format(bibcode_replaced['previous']))
1✔
1129
        custom_citation_change = adsmsg.CitationChange(content=citation_change.content,
1✔
1130
                                                       content_type=citation_change.content_type,
1131
                                                       status=adsmsg.Status.deleted,
1132
                                                       timestamp=datetime.now()
1133
                                                       )
1134
        delete_parsed_metadata = parsed_metadata.copy()
1✔
1135
        delete_parsed_metadata['bibcode'] = bibcode_replaced['previous']
1✔
1136
        delete_parsed_metadata['alternate_bibcode'] = [x for x in delete_parsed_metadata.get('alternate_bibcode', []) if x not in (bibcode_replaced['previous'], bibcode_replaced['new'])]
1✔
1137
        delete_record, delete_nonbib_record = forward.build_record(app, custom_citation_change, delete_parsed_metadata, citations, db_versions=parsed_metadata.get('associated',{"":""}), entry_date=entry_date)
1✔
1138
        messages.append((delete_record, delete_nonbib_record))
1✔
1139
    # Main message:
1140
    record, nonbib_record = forward.build_record(app, citation_change, parsed_metadata, citations, db_versions, readers=readers, entry_date=entry_date)
1✔
1141
    messages.append((record, nonbib_record))
1✔
1142

1143
    for record, nonbib_record in messages:
1✔
1144
        if not only_nonbib:
1✔
1145
            logger.debug('Will forward bib record: %s', record)    
1✔
1146
            logger.debug("Calling 'app.forward_message' with '%s'", str(record.toJSON()))
1✔
1147
            if not app.conf['CELERY_ALWAYS_EAGER']:
1✔
1148
                app.forward_message(record)
1✔
1149
        else:
1150
            logger.debug("Only asked to forward nonbib record")
1✔
1151
        logger.debug('Will forward nonbib record: %s', nonbib_record)
1✔
1152
        logger.debug("Calling 'app.forward_message' with '%s'", str(nonbib_record.toJSON()))
1✔
1153
        if not app.conf['CELERY_ALWAYS_EAGER']:
1✔
1154
            app.forward_message(nonbib_record)
1✔
1155

1156

1157
if __name__ == '__main__':
1✔
1158
    app.start()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc