• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #111

02 Apr 2025 10:05PM UTC coverage: 94.977% (+0.03%) from 94.946%
#111

push

coveralls-python

jrdh
fix: accommodate None values in lists during data rebuild

2 of 2 new or added lines in 1 file covered. (100.0%)

15 existing lines in 3 files now uncovered.

1229 of 1294 relevant lines covered (94.98%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.17
/splitgill/manager.py
1
from bisect import bisect_right
1✔
2
from enum import Enum
1✔
3
from typing import Optional, Iterable, List, Dict, Union
1✔
4

5
from cytoolz.dicttoolz import get_in
1✔
6
from elasticsearch import Elasticsearch
1✔
7
from elasticsearch_dsl import Search
1✔
8
from elasticsearch_dsl.query import Query
1✔
9
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
10
from pymongo.collection import Collection
1✔
11
from pymongo.database import Database
1✔
12

13
from splitgill.indexing.fields import (
1✔
14
    DocumentField,
15
    DataField,
16
    ParsedField,
17
)
18
from splitgill.indexing.index import IndexNames, generate_index_ops, ArcStatus
1✔
19
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
20
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
21
from splitgill.indexing.templates import create_templates
1✔
22
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
23
from splitgill.locking import LockManager
1✔
24
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
25
from splitgill.search import version_query
1✔
26
from splitgill.utils import partition, now, iter_terms
1✔
27

28
OPTIONS_COLLECTION_NAME = "options"
1✔
29
LOCKS_COLLECTION_NAME = "locks"
1✔
30

31

32
class SplitgillClient:
1✔
33
    """
34
    Splitgill client class which holds a mongo connection, an elasticsearch connection
35
    and any other general information Splitgill needs to manage the databases.
36
    """
37

38
    def __init__(
1✔
39
        self,
40
        mongo: MongoClient,
41
        elasticsearch: Elasticsearch,
42
        mongo_database_name: str = "sg",
43
    ):
44
        self.mongo = mongo
1✔
45
        self.elasticsearch = elasticsearch
1✔
46
        self.mongo_database_name = mongo_database_name
1✔
47
        self.lock_manager = LockManager(self.get_lock_collection())
1✔
48

49
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
50
        """
51
        Returns a SplitgillDatabase object.
52

53
        :param name: the name of the database
54
        :return: a new SplitgillDatabase object
55
        """
56
        return SplitgillDatabase(name, self)
1✔
57

58
    def get_mongo_database(self) -> Database:
1✔
59
        """
60
        Returns the MongoDB database in use.
61

62
        :return: a pymongo Database object
63
        """
64
        return self.mongo.get_database(self.mongo_database_name)
1✔
65

66
    def get_options_collection(self) -> Collection:
1✔
67
        """
68
        Returns the options collection.
69

70
        :return: a pymongo Collection object
71
        """
72
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
1✔
73

74
    def get_data_collection(self, name: str) -> Collection:
1✔
75
        """
76
        Returns the data collection for the given Splitgill database.
77

78
        :param name: the name of the Splitgill database
79
        :return: a pymongo Collection object
80
        """
81
        return self.get_mongo_database().get_collection(f"data-{name}")
1✔
82

83
    def get_lock_collection(self) -> Collection:
1✔
84
        """
85
        Returns the locks collection.
86

87
        :return: a pymongo Collection object
88
        """
89
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
1✔
90

91

92
class SearchVersion(Enum):
1✔
93
    """
94
    Indicator for the SplitgillDatabase.search method as to which version of the data
95
    you would like to search.
96
    """
97

98
    # searches the latest data
99
    latest = "latest"
1✔
100
    # searches all data
101
    all = "all"
1✔
102

103

104
class SplitgillDatabase:
1✔
105
    """
106
    Represents a single set of data to be managed by Splitgill.
107

108
    Under the hood, this data will exist in several MongoDB collections and
109
    Elasticsearch indices, but this object provides an abstraction layer to access all
110
    of that from one object.
111
    """
112

113
    def __init__(self, name: str, client: SplitgillClient):
1✔
114
        """
115
        :param name: the name of the database, needs to be a valid MongoDB collection
116
                     name and a valid Elasticsearch index name
117
        :param client: a SplitgillClient object
118
        """
119
        self.name = name
1✔
120
        self._client = client
1✔
121
        self.data_collection = self._client.get_data_collection(self.name)
1✔
122
        self.options_collection = self._client.get_options_collection()
1✔
123
        self.indices = IndexNames(self.name)
1✔
124
        self.locker = self._client.lock_manager
1✔
125

126
    def get_current_indices(self) -> List[str]:
1✔
127
        """
128
        Returns a list of the indices currently in use by this database in Elasticsearch
129
        sorted in ascending alphabetical order.
130

131
        :return: a list of index names
132
        """
133
        res = self._client.elasticsearch.indices.get_alias(index=self.indices.wildcard)
1✔
134
        return sorted(res.keys())
1✔
135

136
    def get_arc_status(self) -> ArcStatus:
1✔
137
        """
138
        Get the status of the latest archive index in use by this database in
139
        Elasticsearch. Old versions of records are stored in the archive (arc) indices
140
        using an append strategy. This starts with arc-0 and after this arc is filled
141
        moves on to arc-1 and so on. This method returns the index of the most recently
142
        used arc (i.e. the arc with the highest index number) and the number of
143
        documents in it.
144

145
        :return: an ArcStatus object
146
        """
147
        # get all arc index names for this database
148
        result = self._client.elasticsearch.indices.get_alias(
1✔
149
            index=self.indices.arc_wildcard
150
        )
151
        if result:
1✔
152
            latest_arc_index = max(
1✔
153
                int(arc_index_name.split("-")[-1]) for arc_index_name in result.keys()
154
            )
155
            count_result = self._client.elasticsearch.count(
1✔
156
                index=self.indices.get_arc(latest_arc_index)
157
            )
158
            return ArcStatus(latest_arc_index, count_result["count"])
1✔
159
        else:
160
            return ArcStatus(0, 0)
1✔
161

162
    def get_committed_version(self) -> Optional[int]:
1✔
163
        """
164
        Returns the latest committed version of the data or config (whichever is
165
        higher). If no records or options exist, or if neither has any committed values,
166
        None is returned.
167

168
        :return: the max version or None
169
        """
170
        sort = [("version", DESCENDING)]
1✔
171
        last_data = self.data_collection.find_one(sort=sort)
1✔
172
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
1✔
173

174
        last_data_version = last_data.get("version") if last_data is not None else None
1✔
175
        last_options_version = (
1✔
176
            last_options.get("version") if last_options is not None else None
177
        )
178
        # there's no committed data or options
179
        if last_data_version is None and last_options_version is None:
1✔
180
            return None
1✔
181

182
        return max(
1✔
183
            last
184
            for last in (last_data_version, last_options_version)
185
            if last is not None
186
        )
187

188
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
189
        """
190
        Returns the latest version found in the Elasticsearch indices for this database.
191
        If no records exist in any index, None is returned. This method checks both the
192
        maximum value in the version field and the next field. Checking the next field
193
        accounts for updates that only include deletions.
194

195
        :return: the max version or None
196
        """
197
        version = None
1✔
198
        for field in (DocumentField.VERSION, DocumentField.NEXT):
1✔
199
            result = self._client.elasticsearch.search(
1✔
200
                aggs={"max_version": {"max": {"field": field}}},
201
                size=0,
202
                # search all indices so that we catch deletes which won't have a
203
                # document in latest
204
                index=self.indices.wildcard,
205
            )
206
            value = get_in(("aggregations", "max_version", "value"), result, None)
1✔
207
            if value is not None and (version is None or value > version):
1✔
208
                version = value
1✔
209

210
        # elasticsearch does max aggs using the double type apparently, so we need to
211
        # convert it back to an int to avoid returning a float and causing confusion
212
        return int(version) if version is not None else None
1✔
213

214
    def get_rounded_version(self, version: int) -> Optional[int]:
1✔
215
        """
216
        Given a target version, rounds the version down to the nearest available
217
        version. This in effect returns the version of the data that is application to
218
        the given target version.
219

220
        If the target version is below the earliest version or this database's indexed
221
        data, or, no indexed versions are available, None is returned.
222

223
        :param version: the target version
224
        :return: a version or None
225
        """
226
        versions = self.get_versions()
1✔
227
        if not versions or version < versions[0]:
1✔
228
            return None
1✔
229

230
        return versions[bisect_right(versions, version) - 1]
1✔
231

232
    def has_data(self) -> bool:
1✔
233
        """
234
        Returns True if there is at least one committed record in this database,
235
        otherwise returns False. Note that this ignored options.
236

237
        :return: True if there is data, False if not
238
        """
239
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
1✔
240

241
    def has_options(self) -> bool:
1✔
242
        """
243
        Returns True if there is at least one committed options in this database,
244
        otherwise returns False. Note that this ignored data.
245

246
        :return: True if there is options, False if not
247
        """
248
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
1✔
249

250
    def commit(self) -> Optional[int]:
1✔
251
        """
252
        Commits the currently uncommitted data and options changes for this database.
253
        All new data/options will be given the same version which is the current time.
254
        If no changes were made, None is returned, otherwise the new version is
255
        returned.
256

257
        If a commit is already ongoing this will raise an AlreadyLocked exception.
258

259
        :return: the new version or None if no uncommitted changes were found
260
        """
261
        # todo: global now?
262
        # todo: transaction/rollback? Can't do this without replicasets so who knows?
263
        with self.locker.lock(self.name, stage="commit"):
1✔
264
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
1✔
265
                # nothing to commit, so nothing to do
266
                return None
1✔
267

268
            if not self.has_options() and not self.has_uncommitted_options():
1✔
269
                # no existing options and no options to be committed, so create some
270
                # basic parsing options to use
271
                options = ParsingOptionsBuilder().build()
1✔
272
                self.update_options(options, commit=False)
1✔
273

274
            version = now()
1✔
275

276
            # update the uncommitted data and options in a transaction
277
            for collection in [self.data_collection, self.options_collection]:
1✔
278
                collection.update_many(
1✔
279
                    filter={"version": None}, update={"$set": {"version": version}}
280
                )
281
            return version
1✔
282

283
    def ingest(
1✔
284
        self,
285
        records: Iterable[Record],
286
        commit=True,
287
        modified_field: Optional[str] = None,
288
    ) -> IngestResult:
289
        """
290
        Ingests the given records to the database. This only adds the records to the
291
        MongoDB data collection, it doesn't trigger the indexing of this new data into
292
        the Elasticsearch cluster. All data will be added with a None version unless the
293
        commit parameter is True in which case a version will be assigned.
294

295
        Use the commit keyword argument to either close the "transaction" after writing
296
        these records or leave it open. By default, the "transaction" is committed
297
        before the method returns, and the version is set then.
298

299
        If an error occurs, the "transaction" will not be committed, but the changes
300
        will not be rolled back.
301

302
        :param records: the records to add. These will be added in batches, so it is
303
                        safe to pass a very large stream of records
304
        :param commit: whether to commit the data added with a new version after writing
305
                       the records. Default: True.
306
        :param modified_field: a field name which, if the only changes in the record
307
                               data are in this field means the changes will be ignored.
308
                               As you can probably guess from the name, the root reason
309
                               for this parameter existing is to avoid committing a new
310
                               version of a record when all that has happened is the
311
                               record has been touched and the modified field's date
312
                               value updated even though the rest of the record remains
313
                               the same. Default: None, meaning all fields are checked
314
                               for changes.
315
        :return: returns a IngestResult object
316
        """
317
        # this does nothing if the indexes already exist
318
        self.data_collection.create_indexes(
1✔
319
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
320
        )
321

322
        result = IngestResult()
1✔
323
        # this is used for the find size and the bulk ops partition size which both need
324
        # to be the same to ensure we can handle duplicates in the record stream
325
        size = 200
1✔
326

327
        for ops in partition(
1✔
328
            generate_ops(self.data_collection, records, modified_field, size), size
329
        ):
330
            bulk_result = self.data_collection.bulk_write(ops)
1✔
331
            result.update(bulk_result)
1✔
332

333
        if commit:
1✔
334
            result.version = self.commit()
1✔
335

336
        return result
1✔
337

338
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
339
        """
340
        Update the parsing options for this database.
341

342
        :param options: the new parsing options
343
        :param commit: whether to commit the new config added with a new version after
344
                       writing the config. Default: True.
345
        :return: returns the new version if a commit happened, otherwise None. If a
346
                 commit was requested but nothing was changed, None is returned.
347
        """
348
        # get the latest options that have been committed (get_options ignores
349
        # uncommitted options)
350
        all_options = self.get_options()
1✔
351
        if all_options:
1✔
352
            latest_options = all_options[max(all_options)]
1✔
353
        else:
354
            latest_options = None
1✔
355

356
        if self.has_uncommitted_options():
1✔
357
            self.rollback_options()
×
358

359
        # if the options are the same as the latest existing ones, don't update
360
        if latest_options == options:
1✔
361
            return None
×
362

363
        # either the options are completely new or they differ from the existing
364
        # options, write a fresh entry
365
        new_doc = {
1✔
366
            "name": self.name,
367
            # version = None to indicate this is an uncommitted change
368
            "version": None,
369
            "options": options.to_doc(),
370
        }
371
        self.options_collection.insert_one(new_doc)
1✔
372

373
        if commit:
1✔
374
            return self.commit()
1✔
375
        return None
1✔
376

377
    def rollback_options(self) -> int:
1✔
378
        """
379
        Remove any uncommitted option changes.
380

381
        There should only ever be one, but this deletes them all ensuring everything is
382
        clean and tidy.
383

384
        :return: the number of documents deleted
385
        """
386
        return self.options_collection.delete_many({"version": None}).deleted_count
×
387

388
    def rollback_records(self):
1✔
389
        """
390
        Remove any uncommitted data changes.
391

392
        This method has to interrogate every uncommitted record in the data collection
393
        to perform the rollback and therefore, depending on how much uncommitted data
394
        there is, may take a bit of time to run.
395
        """
396
        if self.has_uncommitted_data():
×
397
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
398
                self.data_collection.bulk_write(ops)
×
399

400
    def has_uncommitted_data(self) -> bool:
1✔
401
        """
402
        Check if there are any uncommitted records stored against this database.
403

404
        :return: returns True if there are any uncommitted records, False if not
405
        """
406
        return self.data_collection.find_one({"version": None}) is not None
1✔
407

408
    def has_uncommitted_options(self) -> bool:
1✔
409
        """
410
        Check if there are any uncommitted options stored against this database.
411

412
        :return: returns True if there are any uncommitted options, False if not
413
        """
414
        return self.options_collection.find_one({"version": None}) is not None
1✔
415

416
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
417
        """
418
        Retrieve all the parsing options configured for this database in a dict mapping
419
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
420
        indicate whether to include the uncommitted options or not.
421

422
        :return: a dict of versions and options
423
        """
424
        return {
1✔
425
            doc["version"]: ParsingOptions.from_doc(doc["options"])
426
            for doc in self.options_collection.find({"name": self.name})
427
            if include_uncommitted or doc["version"] is not None
428
        }
429

430
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
431
        """
432
        Yields MongoRecord objects matching the given find kwargs. As you can probably
433
        guess, the find_kwargs argument is just passed directly to PyMongo's find
434
        method.
435

436
        :param find_kwargs: args to pass to the data collection's find method
437
        :return: yields matching MongoRecord objects
438
        """
439
        yield from (
1✔
440
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
441
        )
442

443
    def sync(
1✔
444
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
445
    ) -> WriteResult:
446
        """
447
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
448
        updating the latest and old data indices as required.
449

450
        To find the data that needs to be updated, the current version of the data in
451
        MongoDB is compared to the current version of the data in Elasticsearch, and the
452
        two are synced (assuming MongoDB's version is <= Elasticsearch).
453

454
        While the data is being indexed, refreshing is paused on this database's indexes
455
        and only resumed once all the data has been indexed. If an error occurs during
456
        indexing then the refresh interval is not reset meaning the updates that have
457
        made it to Elasticsearch will not impact searches until a refresh is triggered,
458
        or the refresh interval is reset. This kinda makes this function transactional.
459
        If no errors occur, the refresh interval is reset (along with the replica count)
460
        and a refresh is called. This means that if this function returns successfully,
461
        the data updated by it will be immediately available for searches.
462

463
        Note that if resync=True, the arc indices will be deleted before reindexing
464
        begins! The latest index is not deleted as the documents in the latest index
465
        have IDs so can be directly replaced.
466

467
        :param bulk_options: options determining how the bulk operations are sent to
468
                             Elasticsearch
469
        :param resync: whether to resync all records with Elasticsearch regardless of
470
                       the currently synced version. This won't delete any data in the
471
                       latest index as those records are replaced during resync, but any
472
                       arc indices will be deleted prior to re-indexing.
473
        :return: a WriteResult object
474
        """
475
        if not self.has_data():
1✔
476
            return WriteResult()
1✔
477

478
        all_options = self.get_options(include_uncommitted=False)
1✔
479
        last_sync = self.get_elasticsearch_version() if not resync else None
1✔
480
        if last_sync is None:
1✔
481
            # elasticsearch has nothing so find all committed records
482
            find_filter = {"version": {"$ne": None}}
1✔
483
        else:
484
            committed_version = self.get_committed_version()
1✔
485
            if last_sync >= committed_version:
1✔
486
                # elasticsearch and mongo are in sync (use >= rather than == just in
487
                # case some bizarro-ness has occurred)
UNCOV
488
                return WriteResult()
×
489
            if any(version > last_sync for version in all_options):
1✔
490
                # there's an options change ahead, this means we need to check all
491
                # records again, so filter out committed records only
UNCOV
492
                find_filter = {"version": {"$ne": None}}
×
493
            else:
494
                # find all the updated records that haven't had their updates synced yet
495
                find_filter = {"version": {"$gt": last_sync}}
1✔
496

497
        if resync:
1✔
498
            # delete all arcs
499
            res = self._client.elasticsearch.indices.get_alias(
1✔
500
                index=self.indices.arc_wildcard
501
            )
502
            for arc_index in res.keys():
1✔
503
                self._client.elasticsearch.indices.delete(index=arc_index)
1✔
504

505
        create_templates(self._client.elasticsearch)
1✔
506

507
        return write_ops(
1✔
508
            self._client.elasticsearch,
509
            generate_index_ops(
510
                self.indices,
511
                self.get_arc_status(),
512
                self.iter_records(filter=find_filter),
513
                all_options,
514
                last_sync,
515
            ),
516
            bulk_options,
517
        )
518

519
    def search(
1✔
520
        self, version: Union[SearchVersion, int] = SearchVersion.latest
521
    ) -> Search:
522
        """
523
        Creates a Search DSL object to use on this database's indexed data. This Search
524
        object will be setup with the appropriate index and version filter depending on
525
        the given version parameter, and the Elasticsearch client object in use on this
526
        database.
527

528
        If a version number is passed as the version parameter, it will be checked
529
        against the latest version available in Elasticsearch. If it is below the latest
530
        version available in Elasticsearch, all indices will be searched and a term
531
        filter will be used to get the right data. If the version is equal to or above
532
        the latest version available in Elasticsearch, the latest index will be searched
533
        and no version term filter will be used. This is for Elasticsearch performance
534
        and caching.
535

536
        If version is not an int but is instead a SearchVersion, this is used to set the
537
        index on the search object. SearchVersion.latest sets the index to this
538
        database's latest index, whereas SearchVersion.all sets the index to a wildcard
539
        to search on all indices used by this database.
540

541
        :param version: the version to search at, this should either be a SearchVersion
542
                        enum option or an int version.
543
        :return: a Search DSL object
544
        """
545
        search = Search(using=self._client.elasticsearch)
1✔
546

547
        if isinstance(version, int):
1✔
548
            current_version = self.get_elasticsearch_version()
1✔
549
            if current_version is not None and current_version <= version:
1✔
550
                # the version requested is above the latest version, use the latest
551
                # index instead of a filter, it'll be faster and more easily cachable
552
                # for elasticsearch
553
                search = search.index(self.indices.latest)
1✔
554
            else:
555
                search = search.index(self.indices.wildcard)
1✔
556
                search = search.filter(version_query(version))
1✔
557
        else:
558
            if version == SearchVersion.latest:
1✔
559
                search = search.index(self.indices.latest)
1✔
560
            elif version == SearchVersion.all:
1✔
561
                search = search.index(self.indices.wildcard)
1✔
562

563
        return search
1✔
564

565
    def get_version_changed_counts(self) -> Dict[int, int]:
1✔
566
        """
567
        Retrieves the available versions and the number of documents that have changed
568
        in the database with that version. Changes can be additions, modifications, or
569
        deletions.
570

571
        :return: versions and associated change counts
572
        """
573
        search = self.search(version=SearchVersion.all)
1✔
574

575
        # aggregate over the version field first
576
        versions = {
1✔
577
            term.value: term.count for term in iter_terms(search, DocumentField.VERSION)
578
        }
579

580
        # then aggregate over the next field to catch deleted records and add those
581
        versions.update(
1✔
582
            {term.value: term.count for term in iter_terms(search, DocumentField.NEXT)}
583
        )
584

585
        return versions
1✔
586

587
    def get_versions(self) -> List[int]:
1✔
588
        """
589
        Returns a list of the available versions that have been indexed into
590
        Elasticsearch for this database. The versions are in ascending order and will be
591
        retrieved from both the version and next document fields to ensure we capture
592
        all versions.
593

594
        :return: the available versions in ascending order
595
        """
596
        return sorted(self.get_version_changed_counts().keys())
1✔
597

598
    def get_data_fields(
1✔
599
        self, version: Optional[int] = None, query: Optional[Query] = None
600
    ) -> List[DataField]:
601
        """
602
        Retrieves the available data fields for this database, optionally at the given
603
        version with the given query.
604

605
        :param version: the version to find data fields at, if None, the latest data is
606
                        searched
607
        :param query: the query to filter records with before finding the data fields,
608
                      if None, all record data is considered
609
        :return: a list of DataField objects with the most frequent field first
610
        """
611
        search = self.search(version if version is not None else SearchVersion.latest)
1✔
612
        if query is not None:
1✔
613
            search = search.filter(query)
1✔
614

615
        fields: Dict[str, DataField] = {}
1✔
616

617
        # create the basic field objects and add type counts
618
        for term in iter_terms(search, DocumentField.DATA_TYPES):
1✔
619
            path, raw_types = term.value.rsplit(".", 1)
1✔
620
            if path not in fields:
1✔
621
                fields[path] = DataField(path)
1✔
622
            fields[path].add(raw_types, term.count)
1✔
623

624
        # go through each field and link it with other fields to create hierarchy
625
        for field in fields.values():
1✔
626
            if not field.is_container:
1✔
627
                continue
1✔
628
            target_dot_count = field.path.count(".") + 1
1✔
629
            for child in fields.values():
1✔
630
                if child.path.count(".") == target_dot_count and child.path.startswith(
1✔
631
                    f"{field.path}."
632
                ):
633
                    field.children.append(child)
1✔
634
                    child.parent = field
1✔
635

636
        # return the data fields as a list in a specific order. Note that because we're
637
        # using multiple orderings in different directions, we do multiple sorts in the
638
        # reverse order of the order we want them applied.
639
        # descending depth (so fields closest to the root first)
640
        data_fields = sorted(
1✔
641
            fields.values(), key=lambda f: f.path.count("."), reverse=True
642
        )
643
        # ascending alphabetical order
644
        data_fields.sort(key=lambda f: f.path)
1✔
645
        # descending frequency (so most frequent fields first)
646
        data_fields.sort(key=lambda f: f.count, reverse=True)
1✔
647
        return data_fields
1✔
648

649
    def get_parsed_fields(
1✔
650
        self, version: Optional[int] = None, query: Optional[Query] = None
651
    ) -> List[ParsedField]:
652
        """
653
        Retrieves the available parsed fields for this database, optionally at the given
654
        version with the given query.
655

656
        :param version: the version to find parsed fields at, if None, the latest data
657
                        is searched
658
        :param query: the query to filter records with before finding the parsed fields,
659
                      if None, all record data is considered
660
        :return: a list of ParsedField objects with the most frequent field first
661
        """
662
        search = self.search(version if version is not None else SearchVersion.latest)
1✔
663
        if query is not None:
1✔
664
            search = search.filter(query)
1✔
665

666
        fields: Dict[str, ParsedField] = {}
1✔
667

668
        # create the basic field objects and add type counts
669
        for term in iter_terms(search, DocumentField.PARSED_TYPES):
1✔
670
            path, raw_types = term.value.rsplit(".", 1)
1✔
671
            if path not in fields:
1✔
672
                fields[path] = ParsedField(path)
1✔
673
            fields[path].add(raw_types, term.count)
1✔
674

675
        # return the parsed fields as a list in a specific order. Note that because
676
        # we're using multiple orderings in different directions, we do multiple sorts
677
        # in the reverse order of the order we want them applied.
678
        # descending depth (so fields closest to the root first)
679
        parsed_fields = sorted(
1✔
680
            fields.values(), key=lambda f: f.path.count("."), reverse=True
681
        )
682
        # ascending alphabetical order
683
        parsed_fields.sort(key=lambda f: f.path)
1✔
684
        # descending frequency (so most frequent fields first)
685
        parsed_fields.sort(key=lambda f: f.count, reverse=True)
1✔
686
        return parsed_fields
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc