• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OCHA-DAP / hdx-data-freshness / 17537176964

08 Sep 2025 01:43AM UTC coverage: 90.299% (-0.08%) from 90.379%
17537176964

Pull #33

github

web-flow
Merge e50e5671e into 041724e27
Pull Request #33: HDXDSYS-2205 Deactivate old change detection once new resource change detection goes live

14 of 18 new or added lines in 1 file covered. (77.78%)

2 existing lines in 1 file now uncovered.

1815 of 2010 relevant lines covered (90.3%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.38
/src/hdx/freshness/app/datafreshness.py
1
"""Determine freshness for all datasets in HDX"""
2

3
import logging
1✔
4
import re
1✔
5
from datetime import datetime, timedelta, timezone
1✔
6
from typing import Any, Dict, List, Optional, Tuple, Union
1✔
7
from urllib.parse import urlparse
1✔
8

9
from dateutil.parser import ParserError
1✔
10
from sqlalchemy import exists, select
1✔
11
from sqlalchemy.exc import NoResultFound
1✔
12
from sqlalchemy.orm import Session
1✔
13

14
from ..database.dbdataset import DBDataset
1✔
15
from ..database.dbinfodataset import DBInfoDataset
1✔
16
from ..database.dborganization import DBOrganization
1✔
17
from ..database.dbresource import DBResource
1✔
18
from ..database.dbrun import DBRun
1✔
19
from ..testdata.serialize import (
1✔
20
    serialize_datasets,
21
    serialize_hashresults,
22
    serialize_now,
23
    serialize_results,
24
)
25
from ..utils.retrieval import Retrieval
1✔
26
from hdx.api.configuration import Configuration
1✔
27
from hdx.data.dataset import Dataset
1✔
28
from hdx.data.hdxobject import HDXError
1✔
29
from hdx.data.resource import Resource
1✔
30
from hdx.utilities.dateparse import now_utc, parse_date
1✔
31
from hdx.utilities.dictandlist import (
1✔
32
    dict_of_lists_add,
33
    list_distribute_contents,
34
)
35

36
logger = logging.getLogger(__name__)
1✔
37

38
default_no_urls_to_check = 1000
1✔
39

40

41
class DataFreshness:
1✔
42
    """Data freshness main class
43

44
    Args:
45
        session (sqlalchemy.orm.Session): Session to use for queries
46
        testsession (Optional[sqlalchemy.orm.Session]): Session for test data or None
47
        datasets (Optional[List[Dataset]]): List of datasets or read from HDX if None
48
        now (datetime): Date to use or take current time if None
49
        do_touch (bool): Whether to touch HDX resources whose hash has changed
50
    """
51

52
    bracketed_date = re.compile(r"\((.*)\)")
1✔
53

54
    def __init__(
1✔
55
        self,
56
        configuration: Configuration,
57
        session: Session,
58
        testsession: Optional[Session] = None,
59
        datasets: Optional[List[Dataset]] = None,
60
        now: datetime = None,
61
        do_touch: bool = False,
62
        dont_hash: bool = True,
63
    ) -> None:
64
        """"""
65
        self.session = session
1✔
66
        self.urls_to_check_count = 0
1✔
67
        self.updated_by_script_netlocs_checked = set()
1✔
68
        self.never_update = 0
1✔
69
        self.live_update = 0
1✔
70
        self.asneeded_update = 0
1✔
71
        self.dataset_what_updated = {}
1✔
72
        self.resource_what_updated = {}
1✔
73
        self.resource_last_modified_count = 0
1✔
74
        self.resource_broken_count = 0
1✔
75
        self.do_touch = do_touch
1✔
76
        self.dont_hash = dont_hash
1✔
77

78
        self.url_internal = "data.humdata.org"
1✔
79

80
        self.freshness_by_frequency = {}
1✔
81
        for key, value in configuration["aging"].items():
1✔
82
            update_frequency = int(key)
1✔
83
            freshness_frequency = {}
1✔
84
            for status in value:
1✔
85
                nodays = value[status]
1✔
86
                freshness_frequency[status] = timedelta(days=nodays)
1✔
87
            self.freshness_by_frequency[update_frequency] = freshness_frequency
1✔
88
        self.freshness_statuses = {
1✔
89
            0: "0: Fresh",
90
            1: "1: Due",
91
            2: "2: Overdue",
92
            3: "3: Delinquent",
93
            None: "Freshness Unavailable",
94
        }
95
        self.testsession: Optional[Session] = testsession
1✔
96
        if datasets is None:  # pragma: no cover
97
            Configuration.read().set_read_only(
98
                True
99
            )  # so that we only get public datasets
100
            logger.info("Retrieving all datasets from HDX")
101
            self.datasets: List[Dataset] = Dataset.get_all_datasets()
102
            Configuration.read().set_read_only(False)
103
            if self.testsession:
104
                serialize_datasets(self.testsession, self.datasets)
105
        else:
106
            self.datasets: List[Dataset] = datasets
1✔
107
        if now is None:  # pragma: no cover
108
            self.now = now_utc()
109
            if self.testsession:
110
                serialize_now(self.testsession, self.now)
111
        else:
112
            self.now = now
1✔
113
        self.previous_run_number = self.session.scalar(
1✔
114
            select(DBRun.run_number)
115
            .distinct()
116
            .order_by(DBRun.run_number.desc())
117
            .limit(1)
118
        )
119
        if self.previous_run_number is not None:
1✔
120
            self.run_number = self.previous_run_number + 1
1✔
121
            no_resources = self.no_resources_force_hash()
1✔
122
            if no_resources:
1✔
123
                self.no_urls_to_check = no_resources
×
124
            else:
125
                self.no_urls_to_check = default_no_urls_to_check
1✔
126
        else:
127
            self.previous_run_number = None
1✔
128
            self.run_number = 0
1✔
129
            self.no_urls_to_check = default_no_urls_to_check
1✔
130

131
        logger.info(f"Will force hash {self.no_urls_to_check} resources")
1✔
132

133
    def no_resources_force_hash(self) -> Optional[int]:
1✔
134
        """Get number of resources to force hash
135

136
        Returns:
137
            Optional[int]: Number of resources to force hash or None
138
        """
139

140
        columns = [DBResource.id, DBDataset.updated_by_script]
1✔
141
        filters = [
1✔
142
            DBResource.dataset_id == DBDataset.id,
143
            DBResource.run_number == self.previous_run_number,
144
            DBDataset.run_number == self.previous_run_number,
145
            DBResource.url.notlike(f"%{self.url_internal}%"),
146
        ]
147
        results = self.session.execute(select(*columns).where(*filters))
1✔
148
        noscriptupdate = 0
1✔
149
        noresources = 0
1✔
150
        for result in results:
1✔
151
            updated_by_script = result[1]
1✔
152
            if updated_by_script is not None:
1✔
153
                noscriptupdate += 1
1✔
154
                continue
1✔
155
            noresources += 1
1✔
156
        if noscriptupdate == 0:
1✔
157
            return None
1✔
158
        return noresources
1✔
159

160
    def spread_datasets(self) -> None:
1✔
161
        """Try to arrange the list of datasets so that downloads don't keep hitting the
162
        same server by moving apart datasets from the same organisation
163

164
        Returns:
165
            None
166
        """
167
        self.datasets: List[Dataset] = list_distribute_contents(
1✔
168
            self.datasets, lambda x: x["organization"]["name"]
169
        )
170

171
    def add_new_run(self) -> None:
1✔
172
        """Add a new run number with corresponding date
173

174
        Returns:
175
            None
176
        """
177
        dbrun = DBRun(run_number=self.run_number, run_date=self.now)
1✔
178
        self.session.add(dbrun)
1✔
179
        self.session.commit()
1✔
180

181
    @staticmethod
1✔
182
    def prefix_what_updated(dbresource: DBResource, prefix: str) -> None:
1✔
183
        """Prefix the what_updated field of resource
184

185
        Args:
186
            dbresource (DBResource): DBResource object to change
187
            prefix (str): Prefix to prepend
188

189
        Returns:
190
            None
191
        """
192
        what_updated = f"{prefix}-{dbresource.what_updated}"
1✔
193
        dbresource.what_updated = what_updated
1✔
194

195
    def process_resources(
1✔
196
        self,
197
        dataset_id: str,
198
        previous_dbdataset: DBDataset,
199
        resources: List[Resource],
200
        updated_by_script: Optional[datetime],
201
        hash_ids: List[str] = None,
202
    ) -> Tuple[List[Tuple], Optional[str], Optional[datetime]]:
203
        """Process HDX dataset's resources. If the resource has not been checked for
204
        30 days and we are below the threshold for resource checking, then the resource
205
        is flagged to be hashed even if the dataset is fresh.
206

207
        Args:
208
            dataset_id (str): Dataset id
209
            previous_dbdataset (DBDataset): DBDataset object from previous run
210
            resources (List[Resource]): HDX resources to process
211
            updated_by_script (Optional[datetime]): Time script updated or None
212
            hash_ids (Optional[List[str]]): Resource ids to hash for testing purposes
213

214
        Returns:
215
            Tuple[List[Tuple], Optional[str], Optional[datetime]]:
216
            (resources to download, id of last resource updated, time updated)
217
        """
218
        last_resource_updated = None
1✔
219
        last_resource_modified = None
1✔
220
        dataset_resources = []
1✔
221
        for resource in resources:
1✔
222
            resource_id = resource["id"]
1✔
223
            dict_of_lists_add(self.resource_what_updated, "total", resource_id)
1✔
224
            url = resource["url"]
1✔
225
            name = resource["name"]
1✔
226
            metadata_modified = parse_date(
1✔
227
                resource["metadata_modified"], include_microseconds=True
228
            )
229
            last_modified = parse_date(
1✔
230
                resource["last_modified"], include_microseconds=True
231
            )
232
            if last_resource_modified:
1✔
233
                if last_modified > last_resource_modified:
1✔
234
                    last_resource_updated = resource_id
1✔
235
                    last_resource_modified = last_modified
1✔
236
            else:
237
                last_resource_updated = resource_id
1✔
238
                last_resource_modified = last_modified
1✔
239
            dbresource = DBResource(
1✔
240
                run_number=self.run_number,
241
                id=resource_id,
242
                name=name,
243
                dataset_id=dataset_id,
244
                url=url,
245
                last_modified=last_modified,
246
                metadata_modified=metadata_modified,
247
                latest_of_modifieds=last_modified,
248
                what_updated="firstrun",
249
            )
250
            if previous_dbdataset is not None:
1✔
251
                try:
1✔
252
                    previous_dbresource = self.session.execute(
1✔
253
                        select(DBResource).where(
254
                            DBResource.run_number == previous_dbdataset.run_number,
255
                            DBResource.id == resource_id,
256
                        )
257
                    ).scalar_one()
258
                    if last_modified > previous_dbresource.last_modified:
1✔
259
                        dbresource.what_updated = "filestore"
1✔
260
                    else:
261
                        dbresource.last_modified = previous_dbresource.last_modified
1✔
262
                        dbresource.what_updated = "nothing"
1✔
263
                    if last_modified <= previous_dbresource.latest_of_modifieds:
1✔
264
                        dbresource.latest_of_modifieds = (
1✔
265
                            previous_dbresource.latest_of_modifieds
266
                        )
267
                    dbresource.http_last_modified = (
1✔
268
                        previous_dbresource.http_last_modified
269
                    )
270
                    dbresource.md5_hash = previous_dbresource.md5_hash
1✔
271
                    dbresource.hash_last_modified = (
1✔
272
                        previous_dbresource.hash_last_modified
273
                    )
274
                    dbresource.when_checked = previous_dbresource.when_checked
1✔
275

276
                except NoResultFound:
×
277
                    pass
×
278
            self.session.add(dbresource)
1✔
279

280
            if self.dont_hash:
1✔
UNCOV
281
                dict_of_lists_add(
×
282
                    self.resource_what_updated,
283
                    dbresource.what_updated,
284
                    resource_id,
285
                )
UNCOV
286
                continue
×
287
            else:
288
                should_hash = False
1✔
289
                if updated_by_script:
1✔
290
                    netloc = urlparse(url).netloc
1✔
291
                    if netloc in self.updated_by_script_netlocs_checked:
1✔
NEW
292
                        dict_of_lists_add(
×
293
                            self.resource_what_updated,
294
                            dbresource.what_updated,
295
                            resource_id,
296
                        )
NEW
297
                        continue
×
298
                    else:
299
                        should_hash = True
1✔
300
                        self.updated_by_script_netlocs_checked.add(netloc)
1✔
301
                if self.url_internal in url:
1✔
302
                    self.prefix_what_updated(dbresource, "internal")
1✔
303
                    dict_of_lists_add(
1✔
304
                        self.resource_what_updated,
305
                        dbresource.what_updated,
306
                        resource_id,
307
                    )
308
                    continue
1✔
309
                if hash_ids:
1✔
310
                    should_hash = resource_id in hash_ids
1✔
NEW
311
                elif not should_hash:
×
NEW
312
                    should_hash = self.urls_to_check_count < self.no_urls_to_check and (
×
313
                        dbresource.when_checked is None
314
                        or self.now - dbresource.when_checked > timedelta(days=30)
315
                    )
316
            resource_format = resource["format"].lower()
1✔
317
            dataset_resources.append(
1✔
318
                (
319
                    url,
320
                    resource_id,
321
                    resource_format,
322
                    dbresource.what_updated,
323
                    should_hash,
324
                )
325
            )
326
        return dataset_resources, last_resource_updated, last_resource_modified
1✔
327

328
    def process_datasets(
1✔
329
        self, hash_ids: Optional[List[str]] = None
330
    ) -> Tuple[Dict[str, str], List[Tuple]]:
331
        """Process HDX datasets. Extract necessary metadata and store in the
332
        freshness database. Calculate an initial freshness based on the metadata
333
        (last modified - which can change due to filestore resource changes,
334
        review date - when someone clicks the reviewed button the UI,
335
        updated by script - scripts provide the date of update in HDX metadata)
336
        For datasets that are not initially fresh or which have resources that have not
337
        been checked in the last 30 days (up to the threshold for the number of
338
        resources to check), the resources are flagged to be downloaded and hashed.
339

340
        Args:
341
            hash_ids (Optional[List[str]]): Resource ids to hash for testing purposes
342

343
        Returns:
344
            Tuple[Dict[str, str], List[Tuple]]: (datasets to check, resources to check)
345
        """
346
        resources_to_check = []
1✔
347
        datasets_to_check = {}
1✔
348
        logger.info("Processing datasets")
1✔
349
        for dataset in self.datasets:
1✔
350
            resources = dataset.get_resources()
1✔
351
            if dataset.is_requestable():  # ignore requestable
1✔
352
                continue
×
353
            dataset_id = dataset["id"]
1✔
354
            dict_of_lists_add(self.dataset_what_updated, "total", dataset_id)
1✔
355
            organization_id = dataset["organization"]["id"]
1✔
356
            organization_name = dataset["organization"]["name"]
1✔
357
            organization_title = dataset["organization"]["title"]
1✔
358
            try:
1✔
359
                dborganization = self.session.execute(
1✔
360
                    select(DBOrganization).where(DBOrganization.id == organization_id)
361
                ).scalar_one()
362
                dborganization.name = organization_name
1✔
363
                dborganization.title = organization_title
1✔
364
            except NoResultFound:
1✔
365
                dborganization = DBOrganization(
1✔
366
                    name=organization_name,
367
                    id=organization_id,
368
                    title=organization_title,
369
                )
370
                self.session.add(dborganization)
1✔
371
            dataset_name = dataset["name"]
1✔
372
            dataset_title = dataset["title"]
1✔
373
            dataset_private = dataset["private"]
1✔
374
            dataset_maintainer = dataset["maintainer"]
1✔
375
            dataset_location = ",".join([x["name"] for x in dataset["groups"]])
1✔
376
            try:
1✔
377
                dbinfodataset = self.session.execute(
1✔
378
                    select(DBInfoDataset).where(DBInfoDataset.id == dataset_id)
379
                ).scalar_one()
380
                dbinfodataset.name = dataset_name
1✔
381
                dbinfodataset.title = dataset_title
1✔
382
                dbinfodataset.private = dataset_private
1✔
383
                dbinfodataset.organization_id = organization_id
1✔
384
                dbinfodataset.maintainer = dataset_maintainer
1✔
385
                dbinfodataset.location = dataset_location
1✔
386
            except NoResultFound:
1✔
387
                dbinfodataset = DBInfoDataset(
1✔
388
                    name=dataset_name,
389
                    id=dataset_id,
390
                    title=dataset_title,
391
                    private=dataset_private,
392
                    organization_id=organization_id,
393
                    maintainer=dataset_maintainer,
394
                    location=dataset_location,
395
                )
396
                self.session.add(dbinfodataset)
1✔
397
            try:
1✔
398
                previous_dbdataset = self.session.execute(
1✔
399
                    select(DBDataset).where(
400
                        DBDataset.run_number == self.previous_run_number,
401
                        DBDataset.id == dataset_id,
402
                    )
403
                ).scalar_one()
404
            except NoResultFound:
1✔
405
                previous_dbdataset = None
1✔
406

407
            update_frequency = dataset.get("data_update_frequency")
1✔
408
            updated_by_script = None
1✔
409
            if update_frequency is not None:
1✔
410
                update_frequency = int(update_frequency)
1✔
411
                updated_by_script = dataset.get("updated_by_script")
1✔
412
                if updated_by_script:
1✔
413
                    if "freshness_ignore" in updated_by_script:
1✔
414
                        updated_by_script = None
1✔
415
                    else:
416
                        match = self.bracketed_date.search(updated_by_script)
1✔
417
                        if match is None:
1✔
418
                            updated_by_script = None
1✔
419
                        else:
420
                            try:
1✔
421
                                updated_by_script = parse_date(
1✔
422
                                    match.group(1), include_microseconds=True
423
                                )
424
                            except ParserError:
×
425
                                updated_by_script = None
×
426
            (
1✔
427
                dataset_resources,
428
                last_resource_updated,
429
                last_resource_modified,
430
            ) = self.process_resources(
431
                dataset_id,
432
                previous_dbdataset,
433
                resources,
434
                updated_by_script,
435
                hash_ids=hash_ids,
436
            )
437
            time_period = dataset.get("dataset_date")
1✔
438
            metadata_modified = parse_date(
1✔
439
                dataset["metadata_modified"], include_microseconds=True
440
            )
441
            if "last_modified" in dataset:
1✔
442
                last_modified = parse_date(
1✔
443
                    dataset["last_modified"], include_microseconds=True
444
                )
445
            else:
446
                last_modified = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc)
×
447
            if len(resources) == 0 and last_resource_updated is None:
1✔
448
                last_resource_updated = "NO RESOURCES"
1✔
449
                last_resource_modified = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc)
1✔
450
                error = True
1✔
451
                what_updated = "no resources"
1✔
452
            else:
453
                error = False
1✔
454
                what_updated = "firstrun"
1✔
455
            review_date = dataset.get("review_date")
1✔
456
            if review_date is None:
1✔
457
                latest_of_modifieds = last_modified
1✔
458
            else:
459
                review_date = parse_date(review_date, include_microseconds=True)
1✔
460
                if review_date > last_modified:
1✔
461
                    latest_of_modifieds = review_date
1✔
462
                else:
463
                    latest_of_modifieds = last_modified
1✔
464
            if updated_by_script and updated_by_script > latest_of_modifieds:
1✔
465
                latest_of_modifieds = updated_by_script
1✔
466

467
            fresh = None
1✔
468
            if update_frequency is not None and not error:
1✔
469
                if update_frequency == 0:
1✔
470
                    fresh = 0
1✔
471
                    self.live_update += 1
1✔
472
                elif update_frequency == -1:
1✔
473
                    fresh = 0
1✔
474
                    self.never_update += 1
1✔
475
                elif update_frequency == -2:
1✔
476
                    fresh = 0
×
477
                    self.asneeded_update += 1
×
478
                else:
479
                    fresh = self.calculate_freshness(
1✔
480
                        latest_of_modifieds, update_frequency
481
                    )
482

483
            dbdataset = DBDataset(
1✔
484
                run_number=self.run_number,
485
                id=dataset_id,
486
                dataset_date=time_period,
487
                update_frequency=update_frequency,
488
                review_date=review_date,
489
                last_modified=last_modified,
490
                metadata_modified=metadata_modified,
491
                updated_by_script=updated_by_script,
492
                latest_of_modifieds=latest_of_modifieds,
493
                what_updated=what_updated,
494
                last_resource_updated=last_resource_updated,
495
                last_resource_modified=last_resource_modified,
496
                fresh=fresh,
497
                error=error,
498
            )
499
            if previous_dbdataset is not None and not error:
1✔
500
                dbdataset.what_updated = self.add_what_updated(
1✔
501
                    dbdataset.what_updated, "nothing"
502
                )
503
                if (
1✔
504
                    last_modified > previous_dbdataset.last_modified
505
                ):  # filestore update would cause this
506
                    dbdataset.what_updated = self.add_what_updated(
1✔
507
                        dbdataset.what_updated, "filestore"
508
                    )
509
                else:
510
                    dbdataset.last_modified = previous_dbdataset.last_modified
1✔
511
                if previous_dbdataset.review_date is None:
1✔
512
                    if review_date is not None:
1✔
513
                        dbdataset.what_updated = self.add_what_updated(
1✔
514
                            dbdataset.what_updated, "review date"
515
                        )
516
                else:
517
                    if (
×
518
                        review_date is not None
519
                        and review_date > previous_dbdataset.review_date
520
                    ):  # someone clicked the review button
521
                        dbdataset.what_updated = self.add_what_updated(
×
522
                            dbdataset.what_updated, "review date"
523
                        )
524
                    else:
525
                        dbdataset.review_date = previous_dbdataset.review_date
×
526
                if updated_by_script and (
1✔
527
                    previous_dbdataset.updated_by_script is None
528
                    or updated_by_script > previous_dbdataset.updated_by_script
529
                ):  # new script update of datasets
530
                    dbdataset.what_updated = self.add_what_updated(
1✔
531
                        dbdataset.what_updated, "script update"
532
                    )
533
                else:
534
                    dbdataset.updated_by_script = previous_dbdataset.updated_by_script
1✔
535
                if last_resource_modified <= previous_dbdataset.last_resource_modified:
1✔
536
                    # we keep this so that although we don't normally use it,
537
                    # we retain the ability to run without touching CKAN
538
                    dbdataset.last_resource_updated = (
1✔
539
                        previous_dbdataset.last_resource_updated
540
                    )
541
                    dbdataset.last_resource_modified = (
1✔
542
                        previous_dbdataset.last_resource_modified
543
                    )
544
                if latest_of_modifieds < previous_dbdataset.latest_of_modifieds:
1✔
545
                    dbdataset.latest_of_modifieds = (
1✔
546
                        previous_dbdataset.latest_of_modifieds
547
                    )
548
                    if update_frequency is not None and update_frequency > 0:
1✔
549
                        fresh = self.calculate_freshness(
1✔
550
                            previous_dbdataset.latest_of_modifieds,
551
                            update_frequency,
552
                        )
553
                        dbdataset.fresh = fresh
1✔
554
            self.session.add(dbdataset)
1✔
555

556
            update_string = (
1✔
557
                f"{self.freshness_statuses[fresh]}, Updated {dbdataset.what_updated}"
558
            )
559
            anyresourcestohash = False
1✔
560
            for (
1✔
561
                url,
562
                resource_id,
563
                resource_format,
564
                what_updated,
565
                should_hash,
566
            ) in dataset_resources:
567
                if not should_hash:
1✔
568
                    if (
1✔
569
                        fresh == 0 and update_frequency != 1
570
                    ) or update_frequency is None:
571
                        dict_of_lists_add(
1✔
572
                            self.resource_what_updated,
573
                            what_updated,
574
                            resource_id,
575
                        )
576
                        continue
1✔
577
                resources_to_check.append(
1✔
578
                    (url, resource_id, resource_format, what_updated)
579
                )
580
                self.urls_to_check_count += 1
1✔
581
                anyresourcestohash = True
1✔
582
            if anyresourcestohash:
1✔
583
                datasets_to_check[dataset_id] = update_string
1✔
584
            else:
585
                dict_of_lists_add(self.dataset_what_updated, update_string, dataset_id)
1✔
586
        self.session.commit()
1✔
587
        return datasets_to_check, resources_to_check
1✔
588

589
    def check_urls(
1✔
590
        self,
591
        resources_to_check: List[Tuple],
592
        user_agent: str,
593
        results: Optional[Dict] = None,
594
        hash_results: Optional[Dict] = None,
595
    ) -> Tuple[Dict[str, Tuple], Dict[str, Tuple]]:
596
        """Download resources and hash them. If the hash has changed compared to the
597
        previous run, download and hash again. Return two dictionaries, the first
598
        with the hashes from the first downloads and the second with the hashes from
599
        the second downloads.
600

601
        Args:
602
            resources_to_check (List[Tuple]): List of resources to be checked
603
            user_agent (str): User agent string to use when downloading
604
            results (Optional[Dict]): Test results to use in place of first downloads
605
            hash_results (Optional[Dict]): Test results replacing second downloads
606

607
        Returns:
608
            Tuple[Dict[str, Tuple], Dict[str, Tuple]]:
609
            (results of first download, results of second download)
610
        """
611

612
        def get_netloc(x):
1✔
613
            return urlparse(x[0]).netloc
1✔
614

615
        retrieval = Retrieval(user_agent, self.url_internal)
1✔
616
        if results is None:  # pragma: no cover
617
            resources_to_check = list_distribute_contents(
618
                resources_to_check, get_netloc
619
            )
620
            results = retrieval.retrieve(resources_to_check)
621
            if self.testsession:
622
                serialize_results(self.testsession, results)
623

624
        hash_check = []
1✔
625
        for resource_id in results:
1✔
626
            (
1✔
627
                url,
628
                resource_format,
629
                err,
630
                http_last_modified,
631
                hash,
632
                xlsx_hash,
633
            ) = results[resource_id]
634
            if hash:
1✔
635
                dbresource = self.session.execute(
1✔
636
                    select(DBResource).where(
637
                        DBResource.run_number == self.run_number,
638
                        DBResource.id == resource_id,
639
                    )
640
                ).scalar_one()
641
                if dbresource.md5_hash == hash:  # File unchanged
1✔
642
                    continue
1✔
643
                if xlsx_hash and dbresource.md5_hash == xlsx_hash:  # File unchanged
1✔
644
                    continue
1✔
645
                hash_check.append((url, resource_id, resource_format))
1✔
646

647
        if hash_results is None:  # pragma: no cover
648
            hash_check = list_distribute_contents(hash_check, get_netloc)
649
            hash_results = retrieval.retrieve(hash_check)
650
            if self.testsession:
651
                serialize_hashresults(self.testsession, hash_results)
652

653
        return results, hash_results
1✔
654

655
    def process_results(
1✔
656
        self,
657
        results: Dict[str, Tuple],
658
        hash_results: Dict[str, Tuple],
659
        resourcecls: Union[Resource, Any] = Resource,
660
    ) -> Dict[str, Dict[str, Tuple]]:
661
        """Process the downloaded and hashed resources. If the two hashes are the same
662
        but different to the previous run's, the file has been changed. If the two
663
        hashes are different, it is an API (eg. editable Google sheet) where the hash
664
        constantly changes. If the file is determined to have been changed, then the
665
        resource on HDX is touched to update its last_modified field. Return a
666
        dictionary of dictionaries from dataset id to resource ids to update information
667
        about resources including their latest_of_modifieds.
668

669
        Args:
670
            results (Dict[str, Tuple]): Test results to use in place of first downloads
671
            hash_results (Dict[str, Tuple]): Test results replacing second downloads
672
            resourcecls (Union[Resource, Any]): Class to use. Defaults to Resource.
673

674
        Returns:
675
            Dict[str, Dict[str, Tuple]]: Dataset id to resource id to resource info
676
        """
677

678
        def check_broken(error):
1✔
679
            if error == Retrieval.toolargeerror:
1✔
680
                return False
1✔
681
            if Retrieval.notmatcherror in error:
1✔
682
                return True
1✔
683
            match_error = re.search(Retrieval.clienterror_regex, error)
1✔
684
            if match_error:
1✔
685
                return True
1✔
686
            return False
1✔
687

688
        datasets_resourcesinfo = {}
1✔
689
        for resource_id in sorted(results):
1✔
690
            url, _, err, http_last_modified, hash, xlsx_hash = results[resource_id]
1✔
691
            dbresource = self.session.execute(
1✔
692
                select(DBResource).where(
693
                    DBResource.run_number == self.run_number,
694
                    DBResource.id == resource_id,
695
                )
696
            ).scalar_one()
697
            dataset_id = dbresource.dataset_id
1✔
698
            resourcesinfo = datasets_resourcesinfo.get(dataset_id, dict())
1✔
699
            what_updated = dbresource.what_updated
1✔
700
            update_last_modified = False
1✔
701
            is_broken = False
1✔
702
            if http_last_modified:
1✔
703
                if (
1✔
704
                    dbresource.http_last_modified is None
705
                    or http_last_modified > dbresource.http_last_modified
706
                ):
707
                    dbresource.http_last_modified = http_last_modified
1✔
708
            if hash:
1✔
709
                dbresource.when_checked = self.now
1✔
710
                if dbresource.md5_hash == hash:  # File unchanged
1✔
711
                    what_updated = self.add_what_updated(what_updated, "same hash")
1✔
712
                elif xlsx_hash and dbresource.md5_hash == xlsx_hash:  # File unchanged
1✔
713
                    what_updated = self.add_what_updated(what_updated, "same hash")
1✔
714
                else:  # File updated
715
                    hash_to_set = hash
1✔
716
                    (
1✔
717
                        hash_url,
718
                        _,
719
                        hash_err,
720
                        hash_http_last_modified,
721
                        hash_hash,
722
                        hash_xlsx_hash,
723
                    ) = hash_results[resource_id]
724
                    if hash_http_last_modified:
1✔
725
                        if (
1✔
726
                            dbresource.http_last_modified is None
727
                            or hash_http_last_modified > dbresource.http_last_modified
728
                        ):
729
                            dbresource.http_last_modified = hash_http_last_modified
×
730
                    if hash_hash:
1✔
731
                        if hash_hash != hash:
1✔
732
                            if (  # Check if this is an xlsx file that has been hashed
1✔
733
                                hash_xlsx_hash and hash_xlsx_hash == xlsx_hash
734
                            ):
735
                                hash = xlsx_hash
1✔
736
                                hash_hash = hash_xlsx_hash
1✔
737
                                hash_to_set = hash
1✔
738
                        if hash_hash == hash:
1✔
739
                            if (
1✔
740
                                dbresource.md5_hash is None
741
                            ):  # First occurrence of resource eg. first run - don't use hash
742
                                # for last modified field (and hence freshness calculation)
743
                                dbresource.what_updated = self.add_what_updated(
1✔
744
                                    what_updated, "first hash"
745
                                )
746
                                what_updated = dbresource.what_updated
1✔
747
                            else:
748
                                # Check if hash has occurred before
749
                                # select distinct md5_hash from dbresources where id = '714ef7b5-a303-4e4f-be2f-03b2ce2933c7' and md5_hash='2f3cd6a6fce5ad4d7001780846ad87a7';
750
                                if self.session.scalar(
1✔
751
                                    select(
752
                                        exists().where(
753
                                            DBResource.id == resource_id,
754
                                            DBResource.md5_hash == hash,
755
                                        )
756
                                    )
757
                                ):
758
                                    dbresource.what_updated = self.add_what_updated(
1✔
759
                                        what_updated, "repeat hash"
760
                                    )
761
                                    what_updated = dbresource.what_updated
1✔
762
                                else:
763
                                    (
1✔
764
                                        what_updated,
765
                                        _,
766
                                    ) = self.set_latest_of_modifieds(
767
                                        dbresource, self.now, "hash"
768
                                    )
769
                                    dbresource.hash_last_modified = self.now
1✔
770
                                    update_last_modified = True
1✔
771
                            dbresource.api = False
1✔
772
                        else:
773
                            hash_to_set = hash_hash
1✔
774
                            what_updated = self.add_what_updated(what_updated, "api")
1✔
775
                            dbresource.api = True
1✔
776
                    if hash_err:
1✔
777
                        what_updated = self.add_what_updated(what_updated, "error")
×
778
                        dbresource.error = hash_err
×
779
                        if check_broken(hash_err):
×
780
                            is_broken = True
×
781
                    dbresource.md5_hash = hash_to_set
1✔
782
            if err:
1✔
783
                dbresource.when_checked = self.now
1✔
784
                what_updated = self.add_what_updated(what_updated, "error")
1✔
785
                dbresource.error = err
1✔
786
                if check_broken(err):
1✔
787
                    is_broken = True
1✔
788
            resourcesinfo[resource_id] = (
1✔
789
                dbresource.error,
790
                dbresource.latest_of_modifieds,
791
                dbresource.what_updated,
792
            )
793
            datasets_resourcesinfo[dataset_id] = resourcesinfo
1✔
794
            dict_of_lists_add(self.resource_what_updated, what_updated, resource_id)
1✔
795
            if update_last_modified and self.do_touch:  # Touch resource if needed
1✔
796
                try:
1✔
797
                    logger.info(f"Updating last modified for resource {resource_id}")
1✔
798
                    resource = resourcecls.read_from_hdx(resource_id)
1✔
799
                    if resource:
1✔
800
                        last_modified = parse_date(
1✔
801
                            resource["last_modified"],
802
                            include_microseconds=True,
803
                        )
804
                        dbdataset = self.session.execute(
1✔
805
                            select(DBDataset).where(
806
                                DBDataset.run_number == self.run_number,
807
                                DBDataset.id == dataset_id,
808
                            )
809
                        ).scalar_one()
810
                        update_frequency = dbdataset.update_frequency
1✔
811
                        if update_frequency > 0:
1✔
812
                            if (
1✔
813
                                self.calculate_freshness(
814
                                    last_modified, update_frequency
815
                                )
816
                                == 0
817
                            ):
818
                                dotouch = False
1✔
819
                            else:
820
                                dotouch = True
1✔
821
                        else:
822
                            dotouch = True
1✔
823
                        if dotouch:
1✔
824
                            dt_notz = dbresource.latest_of_modifieds.replace(
1✔
825
                                tzinfo=None
826
                            )
827
                            resource["last_modified"] = dt_notz.isoformat()
1✔
828
                            resource.update_in_hdx(
1✔
829
                                operation="patch",
830
                                batch_mode="KEEP_OLD",
831
                                skip_validation=True,
832
                                ignore_check=True,
833
                            )
834
                            self.resource_last_modified_count += 1
1✔
835
                            logger.info(
1✔
836
                                f"Resource last modified count: {self.resource_last_modified_count}"
837
                            )
838
                        else:
839
                            logger.info(
1✔
840
                                f"Didn't update last modified for resource {resource_id} as it is fresh!"
841
                            )
842
                    else:
843
                        logger.error(
×
844
                            f"Last modified update failed for id {resource_id}! Resource does not exist."
845
                        )
846
                except HDXError:
×
847
                    logger.exception(
×
848
                        f"Last modified update failed for id {resource_id}!"
849
                    )
850
            if is_broken and self.do_touch:
1✔
851
                try:
1✔
852
                    logger.info(f"Marking resource {resource_id} as broken")
1✔
853
                    resource = resourcecls.read_from_hdx(resource_id)
1✔
854
                    if resource:
1✔
855
                        resource.mark_broken()
1✔
856
                        self.resource_broken_count += 1
1✔
857
                        logger.info(
1✔
858
                            f"Resource broken count: {self.resource_broken_count}"
859
                        )
860
                    else:
861
                        logger.error(
×
862
                            f"Mark broken failed for id {resource_id}! Resource does not exist."
863
                        )
864
                except HDXError:
×
865
                    logger.exception(f"Mark broken failed for id {resource_id}!")
×
866
        self.session.commit()
1✔
867
        return datasets_resourcesinfo
1✔
868

869
    def update_dataset_latest_of_modifieds(
1✔
870
        self,
871
        datasets_to_check: Dict[str, str],
872
        datasets_resourcesinfo: Dict[str, Dict[str, Tuple]],
873
    ) -> None:
874
        """Given the dictionary of dictionaries from dataset id to resource ids to
875
        update information about resources including their latest_of_modifieds, work
876
        out latest_of_modifieds for datasets and calculate freshness.
877

878
        Args:
879
            datasets_to_check (Dict[str, str]): Datasets with resources that were hashed
880
            datasets_resourcesinfo (Dict[str, Dict[str, Tuple]]): Dataset id to resource
881
            id to resource info
882

883
        Returns:
884
            None
885
        """
886

887
        for dataset_id in datasets_resourcesinfo:
1✔
888
            dbdataset = self.session.execute(
1✔
889
                select(DBDataset).where(
890
                    DBDataset.run_number == self.run_number,
891
                    DBDataset.id == dataset_id,
892
                )
893
            ).scalar_one()
894
            dataset = datasets_resourcesinfo[dataset_id]
1✔
895
            dataset_latest_of_modifieds = dbdataset.latest_of_modifieds
1✔
896
            dataset_what_updated = dbdataset.what_updated
1✔
897
            last_resource_modified = dbdataset.last_resource_modified
1✔
898
            last_resource_updated = dbdataset.last_resource_updated
1✔
899
            all_errors = True
1✔
900
            for resource_id in sorted(dataset):
1✔
901
                (
1✔
902
                    err,
903
                    new_last_resource_modified,
904
                    new_last_resource_what_updated,
905
                ) = dataset[resource_id]
906
                if not err:
1✔
907
                    all_errors = False
1✔
908
                if new_last_resource_modified:
1✔
909
                    if new_last_resource_modified > last_resource_modified:
1✔
910
                        last_resource_updated = resource_id
1✔
911
                        last_resource_modified = new_last_resource_modified
1✔
912
                    if new_last_resource_modified > dataset_latest_of_modifieds:
1✔
913
                        dataset_latest_of_modifieds = new_last_resource_modified
1✔
914
                        dataset_what_updated = new_last_resource_what_updated
1✔
915
            dbdataset.last_resource_updated = last_resource_updated
1✔
916
            dbdataset.last_resource_modified = last_resource_modified
1✔
917
            self.set_latest_of_modifieds(
1✔
918
                dbdataset, dataset_latest_of_modifieds, dataset_what_updated
919
            )
920
            update_frequency = dbdataset.update_frequency
1✔
921
            if update_frequency is not None and update_frequency > 0:
1✔
922
                dbdataset.fresh = self.calculate_freshness(
1✔
923
                    dbdataset.latest_of_modifieds, update_frequency
924
                )
925
            dbdataset.error = all_errors
1✔
926
            status = f"{self.freshness_statuses[dbdataset.fresh]}, Updated {dbdataset.what_updated}"
1✔
927
            if all_errors:
1✔
928
                status = f"{status},error"
1✔
929
            dict_of_lists_add(self.dataset_what_updated, status, dataset_id)
1✔
930
        self.session.commit()
1✔
931
        for dataset_id in datasets_to_check:
1✔
932
            if dataset_id in datasets_resourcesinfo:
1✔
933
                continue
1✔
934
            dict_of_lists_add(
×
935
                self.dataset_what_updated,
936
                datasets_to_check[dataset_id],
937
                dataset_id,
938
            )
939

940
    def output_counts(self) -> str:
1✔
941
        """Create and display output string
942

943
        Returns:
944
            str: Output string
945
        """
946

947
        def add_what_updated_str(hdxobject_what_updated):
1✔
948
            nonlocal output_str
949
            output_str += f"\n* total: {len(hdxobject_what_updated['total'])} *"
1✔
950
            for countstr in sorted(hdxobject_what_updated):
1✔
951
                if countstr != "total":
1✔
952
                    output_str += (
1✔
953
                        f",\n{countstr}: {len(hdxobject_what_updated[countstr])}"
954
                    )
955

956
        output_str = "\n*** Resources ***"
1✔
957
        add_what_updated_str(self.resource_what_updated)
1✔
958
        output_str += "\n\n*** Datasets ***"
1✔
959
        add_what_updated_str(self.dataset_what_updated)
1✔
960
        output_str += f"\n\n{self.live_update} datasets have update frequency of Live"
1✔
961
        output_str += f"\n{self.never_update} datasets have update frequency of Never"
1✔
962
        output_str += (
1✔
963
            f"\n{self.asneeded_update} datasets have update frequency of As Needed"
964
        )
965

966
        logger.info(output_str)
1✔
967
        return output_str
1✔
968

969
    @staticmethod
1✔
970
    def set_latest_of_modifieds(
1✔
971
        dbobject: Union[DBDataset, DBResource],
972
        modified_date: datetime,
973
        what_updated: str,
974
    ) -> Tuple[str, bool]:
975
        """Set latest of modifieds if provided date is greater than current and add
976
        to the Database object's what_updated field.
977

978
        Args:
979
            dbobject (Union[DBDataset, DBResource]): Database object to update
980
            modified_date (datetime): New modified date
981
            what_updated (str): What updated eg. hash
982

983
        Returns:
984
            Tuple[str, bool]: (DB object's what_updated, whether new date > current)
985
        """
986
        if modified_date > dbobject.latest_of_modifieds:
1✔
987
            dbobject.latest_of_modifieds = modified_date
1✔
988
            dbobject.what_updated = DataFreshness.add_what_updated(
1✔
989
                dbobject.what_updated, what_updated
990
            )
991
            update = True
1✔
992
        else:
993
            update = False
1✔
994
        return dbobject.what_updated, update
1✔
995

996
    @staticmethod
1✔
997
    def add_what_updated(prev_what_updated: str, what_updated: str):
1✔
998
        """Add to what_updated string any new cause of update (such as hash). "nothing"
999
        is removed if anything else is added.
1000

1001
        Args:
1002
            prev_what_updated (str): Previous what_updated string
1003
            what_updated (str): Additional what_updated string
1004

1005
        Returns:
1006
            str: New what_updated string
1007
        """
1008
        if what_updated in prev_what_updated:
1✔
1009
            return prev_what_updated
1✔
1010
        if prev_what_updated != "nothing" and prev_what_updated != "firstrun":
1✔
1011
            if what_updated != "nothing":
1✔
1012
                return f"{prev_what_updated},{what_updated}"
1✔
1013
            return prev_what_updated
×
1014
        else:
1015
            return what_updated
1✔
1016

1017
    def calculate_freshness(
1✔
1018
        self, last_modified: datetime, update_frequency: int
1019
    ) -> int:
1020
        """Calculate freshness based on a last modified date and the expected update
1021
        frequency. Returns 0 for fresh, 1 for due, 2 for overdue and 3 for delinquent.
1022

1023
        Args:
1024
            last_modified (datetime): Last modified date
1025
            update_frequency (int): Expected update frequency
1026

1027
        Returns:
1028
            int: 0 for fresh, 1 for due, 2 for overdue and 3 for delinquent
1029
        """
1030
        delta = self.now - last_modified
1✔
1031
        if delta >= self.freshness_by_frequency[update_frequency]["Delinquent"]:
1✔
1032
            return 3
1✔
1033
        elif delta >= self.freshness_by_frequency[update_frequency]["Overdue"]:
1✔
1034
            return 2
1✔
1035
        elif delta >= self.freshness_by_frequency[update_frequency]["Due"]:
1✔
1036
            return 1
1✔
1037
        return 0
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc