• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #94

21 Aug 2024 11:11PM UTC coverage: 96.025% (+1.2%) from 94.809%
#94

push

coveralls-python

jrdh
fix: fixes get_versions bug where versions containing only deletes were missed

This is the same fix as was implemented on the get_elasticsearch_version method in a previous commit - just check next as well as version.

12 of 12 new or added lines in 1 file covered. (100.0%)

9 existing lines in 2 files now uncovered.

1063 of 1107 relevant lines covered (96.03%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.41
/splitgill/manager.py
1
from bisect import bisect_right
1✔
2
from enum import Enum
1✔
3
from typing import Optional, Iterable, List, Dict, Union
1✔
4

5
from cytoolz.dicttoolz import get_in
1✔
6
from elasticsearch import Elasticsearch
1✔
7
from elasticsearch_dsl import Search, A
1✔
8
from elasticsearch_dsl.query import Query
1✔
9
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
10
from pymongo.collection import Collection
1✔
11
from pymongo.database import Database
1✔
12

13
from splitgill.indexing.fields import DocumentField, FieldInfo
1✔
14
from splitgill.indexing.index import IndexNames, generate_index_ops
1✔
15
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
16
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
17
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
18
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
19
from splitgill.locking import LockManager
1✔
20
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
21
from splitgill.search import create_version_query
1✔
22
from splitgill.utils import partition, now
1✔
23

24
MONGO_DATABASE_NAME = "sg"
1✔
25
OPTIONS_COLLECTION_NAME = "options"
1✔
26
LOCKS_COLLECTION_NAME = "locks"
1✔
27

28

29
class SplitgillClient:
1✔
30
    """
31
    Splitgill client class which holds a mongo connection, an elasticsearch connection
32
    and any other general information Splitgill needs to manage the databases.
33
    """
34

35
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
36
        self.mongo = mongo
1✔
37
        self.elasticsearch = elasticsearch
1✔
38
        self.lock_manager = LockManager(self.get_lock_collection())
1✔
39

40
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
41
        """
42
        Returns a SplitgillDatabase object.
43

44
        :param name: the name of the database
45
        :return: a new SplitgillDatabase object
46
        """
47
        return SplitgillDatabase(name, self)
1✔
48

49
    def get_mongo_database(self) -> Database:
1✔
50
        """
51
        Returns the MongoDB database in use.
52

53
        :return: a pymongo Database object
54
        """
55
        return self.mongo.get_database(MONGO_DATABASE_NAME)
1✔
56

57
    def get_options_collection(self) -> Collection:
1✔
58
        """
59
        Returns the options collection.
60

61
        :return: a pymongo Collection object
62
        """
63
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
1✔
64

65
    def get_data_collection(self, name: str) -> Collection:
1✔
66
        """
67
        Returns the data collection for the given Splitgill database.
68

69
        :param name: the name of the Splitgill database
70
        :return: a pymongo Collection object
71
        """
72
        return self.get_mongo_database().get_collection(f"data-{name}")
1✔
73

74
    def get_lock_collection(self) -> Collection:
1✔
75
        """
76
        Returns the locks collection.
77

78
        :return: a pymongo Collection object
79
        """
80
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
1✔
81

82

83
class SearchVersion(Enum):
1✔
84
    """
85
    Indicator for the SplitgillDatabase.search method as to which version of the data
86
    you would like to search.
87
    """
88

89
    # searches the latest data
90
    latest = "latest"
1✔
91
    # searches all data
92
    all = "all"
1✔
93

94

95
class SplitgillDatabase:
1✔
96
    """
97
    Represents a single set of data to be managed by Splitgill.
98

99
    Under the hood, this data will exist in several MongoDB collections and
100
    Elasticsearch indices, but this object provides an abstraction layer to access all
101
    of that from one object.
102
    """
103

104
    def __init__(self, name: str, client: SplitgillClient):
1✔
105
        """
106
        :param name: the name of the database, needs to be a valid MongoDB collection
107
                     name and a valid Elasticsearch index name
108
        :param client: a SplitgillClient object
109
        """
110
        self.name = name
1✔
111
        self._client = client
1✔
112
        # Mongo collection objects
113
        self.data_collection = self._client.get_data_collection(self.name)
1✔
114
        self.options_collection = self._client.get_options_collection()
1✔
115
        # index names
116
        self.indices = IndexNames(self.name)
1✔
117
        self.locker = self._client.lock_manager
1✔
118

119
    def get_committed_version(self) -> Optional[int]:
1✔
120
        """
121
        Returns the latest committed version of the data or config (whichever is
122
        higher). If no records or options exist, or if neither has any committed values,
123
        None is returned.
124

125
        :return: the max version or None
126
        """
127
        sort = [("version", DESCENDING)]
1✔
128
        last_data = self.data_collection.find_one(sort=sort)
1✔
129
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
1✔
130

131
        last_data_version = last_data.get("version") if last_data is not None else None
1✔
132
        last_options_version = (
1✔
133
            last_options.get("version") if last_options is not None else None
134
        )
135
        # there's no committed data or options
136
        if last_data_version is None and last_options_version is None:
1✔
137
            return None
1✔
138

139
        return max(
1✔
140
            last
141
            for last in (last_data_version, last_options_version)
142
            if last is not None
143
        )
144

145
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
146
        """
147
        Returns the latest version found in the Elasticsearch indices for this database.
148
        If no records exist in any index, None is returned. This method checks both the
149
        maximum value in the version field and the next field. Checking the next field
150
        accounts for updates that only include deletions.
151

152
        :return: the max version or None
153
        """
154
        version = None
1✔
155
        for field in (DocumentField.VERSION, DocumentField.NEXT):
1✔
156
            result = self._client.elasticsearch.search(
1✔
157
                aggs={"max_version": {"max": {"field": field}}},
158
                size=0,
159
                # search all indices so that we catch deletes which won't have a
160
                # document in latest
161
                index=self.indices.wildcard,
162
            )
163
            value = get_in(("aggregations", "max_version", "value"), result, None)
1✔
164
            if value is not None and (version is None or value > version):
1✔
165
                version = value
1✔
166

167
        # elasticsearch does max aggs using the double type apparently, so we need to
168
        # convert it back to an int to avoid returning a float and causing confusion
169
        return int(version) if version is not None else None
1✔
170

171
    def get_rounded_version(self, version: int) -> Optional[int]:
1✔
172
        """
173
        Given a target version, rounds the version down to the nearest available
174
        version. This in effect returns the version of the data that is application to
175
        the given target version.
176

177
        If the target version is below the earliest version or this database's indexed
178
        data, or, no indexed versions are available, None is returned.
179

180
        :param version: the target version
181
        :return: a version or None
182
        """
183
        versions = self.get_versions()
1✔
184
        if not versions or version < versions[0]:
1✔
185
            return None
1✔
186

187
        return versions[bisect_right(versions, version) - 1]
1✔
188

189
    def has_data(self) -> bool:
1✔
190
        """
191
        Returns True if there is at least one committed record in this database,
192
        otherwise returns False. Note that this ignored options.
193

194
        :return: True if there is data, False if not
195
        """
196
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
1✔
197

198
    def has_options(self) -> bool:
1✔
199
        """
200
        Returns True if there is at least one committed options in this database,
201
        otherwise returns False. Note that this ignored data.
202

203
        :return: True if there is options, False if not
204
        """
205
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
1✔
206

207
    def commit(self) -> Optional[int]:
1✔
208
        """
209
        Commits the currently uncommitted data and options changes for this database.
210
        All new data/options will be given the same version which is the current time.
211
        If no changes were made, None is returned, otherwise the new version is
212
        returned.
213

214
        If a commit is already ongoing this will raise an AlreadyLocked exception.
215

216
        :return: the new version or None if no uncommitted changes were found
217
        """
218
        # todo: global now?
219
        # todo: transaction/rollback? Can't do this without replicasets so who knows?
220
        with self.locker.lock(self.name, stage="commit"):
1✔
221
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
1✔
222
                # nothing to commit, so nothing to do
223
                return None
1✔
224

225
            if not self.has_options() and not self.has_uncommitted_options():
1✔
226
                # no existing options and no options to be committed, so create some
227
                # basic parsing options to use
228
                options = ParsingOptionsBuilder().build()
1✔
229
                self.update_options(options, commit=False)
1✔
230

231
            version = now()
1✔
232

233
            # update the uncommitted data and options in a transaction
234
            for collection in [self.data_collection, self.options_collection]:
1✔
235
                collection.update_many(
1✔
236
                    filter={"version": None}, update={"$set": {"version": version}}
237
                )
238
            return version
1✔
239

240
    def ingest(
1✔
241
        self,
242
        records: Iterable[Record],
243
        commit=True,
244
        modified_field: Optional[str] = None,
245
    ) -> IngestResult:
246
        """
247
        Ingests the given records to the database. This only adds the records to the
248
        MongoDB data collection, it doesn't trigger the indexing of this new data into
249
        the Elasticsearch cluster. All data will be added with a None version unless the
250
        commit parameter is True in which case a version will be assigned.
251

252
        Use the commit keyword argument to either close the "transaction" after writing
253
        these records or leave it open. By default, the "transaction" is committed
254
        before the method returns, and the version is set then.
255

256
        If an error occurs, the "transaction" will not be committed, but the changes
257
        will not be rolled back.
258

259
        :param records: the records to add. These will be added in batches, so it is
260
                        safe to pass a very large stream of records
261
        :param commit: whether to commit the data added with a new version after writing
262
                       the records. Default: True.
263
        :param modified_field: a field name which, if the only changes in the record
264
                               data are in this field means the changes will be ignored.
265
                               As you can probably guess from the name, the root reason
266
                               for this parameter existing is to avoid committing a new
267
                               version of a record when all that has happened is the
268
                               record has been touched and the modified field's date
269
                               value updated even though the rest of the record remains
270
                               the same. Default: None, meaning all fields are checked
271
                               for changes.
272
        :return: returns a IngestResult object
273
        """
274
        # this does nothing if the indexes already exist
275
        self.data_collection.create_indexes(
1✔
276
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
277
        )
278

279
        result = IngestResult()
1✔
280
        # this is used for the find size and the bulk ops partition size which both need
281
        # to be the same to ensure we can handle duplicates in the record stream
282
        size = 200
1✔
283

284
        for ops in partition(
1✔
285
            generate_ops(self.data_collection, records, modified_field, size), size
286
        ):
287
            bulk_result = self.data_collection.bulk_write(ops)
1✔
288
            result.update(bulk_result)
1✔
289

290
        if commit:
1✔
291
            result.version = self.commit()
1✔
292

293
        return result
1✔
294

295
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
296
        """
297
        Update the parsing options for this database.
298

299
        :param options: the new parsing options
300
        :param commit: whether to commit the new config added with a new version after
301
                       writing the config. Default: True.
302
        :return: returns the new version if a commit happened, otherwise None. If a
303
                 commit was requested but nothing was changed, None is returned.
304
        """
305
        # get the latest options that have been committed (get_options ignores
306
        # uncommitted options)
307
        all_options = self.get_options()
1✔
308
        if all_options:
1✔
309
            latest_options = all_options[max(all_options)]
1✔
310
        else:
311
            latest_options = None
1✔
312

313
        if self.has_uncommitted_options():
1✔
UNCOV
314
            self.rollback_options()
×
315

316
        # if the options are the same as the latest existing ones, don't update
317
        if latest_options == options:
1✔
318
            return None
1✔
319

320
        # either the options are completely new or they differ from the existing
321
        # options, write a fresh entry
322
        new_doc = {
1✔
323
            "name": self.name,
324
            # version = None to indicate this is an uncommitted change
325
            "version": None,
326
            "options": options.to_doc(),
327
        }
328
        self.options_collection.insert_one(new_doc)
1✔
329

330
        if commit:
1✔
331
            return self.commit()
1✔
332
        return None
1✔
333

334
    def rollback_options(self) -> int:
1✔
335
        """
336
        Remove any uncommitted option changes.
337

338
        There should only ever be one, but this deletes them all ensuring everything is
339
        clean and tidy.
340

341
        :return: the number of documents deleted
342
        """
UNCOV
343
        return self.options_collection.delete_many({"version": None}).deleted_count
×
344

345
    def rollback_records(self):
1✔
346
        """
347
        Remove any uncommitted data changes.
348

349
        This method has to interrogate every uncommitted record in the data collection
350
        to perform the rollback and therefore, depending on how much uncommitted data
351
        there is, may take a bit of time to run.
352
        """
UNCOV
353
        if self.has_uncommitted_data():
×
UNCOV
354
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
UNCOV
355
                self.data_collection.bulk_write(ops)
×
356

357
    def has_uncommitted_data(self) -> bool:
1✔
358
        """
359
        Check if there are any uncommitted records stored against this database.
360

361
        :return: returns True if there are any uncommitted records, False if not
362
        """
363
        return self.data_collection.find_one({"version": None}) is not None
1✔
364

365
    def has_uncommitted_options(self) -> bool:
1✔
366
        """
367
        Check if there are any uncommitted options stored against this database.
368

369
        :return: returns True if there are any uncommitted options, False if not
370
        """
371
        return self.options_collection.find_one({"version": None}) is not None
1✔
372

373
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
374
        """
375
        Retrieve all the parsing options configured for this database in a dict mapping
376
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
377
        indicate whether to include the uncommitted options or not.
378

379
        :return: a dict of versions and options
380
        """
381
        return {
1✔
382
            doc["version"]: ParsingOptions.from_doc(doc["options"])
383
            for doc in self.options_collection.find({"name": self.name})
384
            if include_uncommitted or doc["version"] is not None
385
        }
386

387
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
388
        """
389
        Yields MongoRecord objects matching the given find kwargs. As you can probably
390
        guess, the find_kwargs argument is just passed directly to PyMongo's find
391
        method.
392

393
        :param find_kwargs: args to pass to the data collection's find method
394
        :return: yields matching MongoRecord objects
395
        """
396
        yield from (
1✔
397
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
398
        )
399

400
    def sync(
1✔
401
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
402
    ) -> WriteResult:
403
        """
404
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
405
        updating the latest and old data indices as required.
406

407
        To find the data that needs to be updated, the current version of the data in
408
        MongoDB is compared to the current version of the data in Elasticsearch, and the
409
        two are synced (assuming MongoDB's version is <= Elasticsearch).
410

411
        While the data is being indexed, refreshing is paused on this database's indexes
412
        and only resumed once all the data has been indexed. If an error occurs during
413
        indexing then the refresh interval is not reset meaning the updates that have
414
        made it to Elasticsearch will not impact searches until a refresh is triggered,
415
        or the refresh interval is reset. This kinda makes this function transactional.
416
        If no errors occur, the refresh interval is reset (along with the replica count)
417
        and a refresh is called. This means that if this function returns successfully,
418
        the data updated by it will be immediately available for searches.
419

420
        :param bulk_options: options determining how the bulk operations are sent to
421
                             Elasticsearch
422
        :param resync: whether to resync all records with Elasticsearch regardless of
423
                       the currently synced version. This won't delete any data first
424
                       and just replaces documents in Elasticsearch as needed.
425
        :return: a WriteResult object
426
        """
427
        if not self.has_data():
1✔
428
            return WriteResult()
1✔
429

430
        all_options = self.get_options(include_uncommitted=False)
1✔
431
        last_sync = self.get_elasticsearch_version() if not resync else None
1✔
432
        if last_sync is None:
1✔
433
            # elasticsearch has nothing so find all committed records
434
            find_filter = {"version": {"$ne": None}}
1✔
435
        else:
436
            committed_version = self.get_committed_version()
1✔
437
            if last_sync >= committed_version:
1✔
438
                # elasticsearch and mongo are in sync (use >= rather than == just in
439
                # case some bizarro-ness has occurred)
UNCOV
440
                return WriteResult()
×
441
            if any(version > last_sync for version in all_options):
1✔
442
                # there's an options change ahead, this means we need to check all
443
                # records again, so filter out committed records only
UNCOV
444
                find_filter = {"version": {"$ne": None}}
×
445
            else:
446
                # find all the updated records that haven't had their updates synced yet
447
                find_filter = {"version": {"$gt": last_sync}}
1✔
448

449
        client = self._client.elasticsearch
1✔
450

451
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
1✔
452
        for index in self.indices.all:
1✔
453
            if not client.indices.exists(index=index):
1✔
454
                client.indices.create(index=index)
1✔
455

456
        # apply optimal indexing settings to all the indices we may update
457
        client.indices.put_settings(
1✔
458
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
459
            index=self.indices.all,
460
        )
461

462
        result = write_ops(
1✔
463
            client,
464
            generate_index_ops(
465
                self.indices,
466
                self.iter_records(filter=find_filter),
467
                all_options,
468
                last_sync,
469
            ),
470
            bulk_options,
471
        )
472

473
        # refresh all indices to make the changes visible all at once
474
        client.indices.refresh(index=self.indices.all)
1✔
475

476
        # reset the settings we changed (None forces them to revert to defaults)
477
        client.indices.put_settings(
1✔
478
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
479
            index=self.indices.all,
480
        )
481

482
        # do a bit of a tidy up by deleting any indexes without docs
483
        for index in self.indices.all:
1✔
484
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
1✔
485
                client.indices.delete(index=index)
1✔
486

487
        return result
1✔
488

489
    def search(
1✔
490
        self, version: Union[SearchVersion, int] = SearchVersion.latest
491
    ) -> Search:
492
        """
493
        Creates a Search DSL object to use on this database's indexed data. This Search
494
        object will be setup with the appropriate index and version filter depending on
495
        the given version parameter, and the Elasticsearch client object in use on this
496
        database.
497

498
        :param version: the version to search at, this should either be a SearchVersion
499
                        enum option or an int. SearchVersion.latest will result in a
500
                        search on the latest index with no version filter thus searching
501
                        the latest data. SearchVersion.all will result in a search on
502
                        all indices using a wildcard and no version filter. Passing an
503
                        int version will search at the given timestamp. The default is
504
                        SearchVersion.latest.
505
        :return: a Search DSL object
506
        """
507
        search = Search(using=self._client.elasticsearch)
1✔
508

509
        if isinstance(version, int):
1✔
510
            search = search.index(self.indices.wildcard)
1✔
511
            search = search.filter(create_version_query(version))
1✔
512
        else:
513
            if version == SearchVersion.latest:
1✔
514
                search = search.index(self.indices.latest)
1✔
515
            elif version == SearchVersion.all:
1✔
516
                search = search.index(self.indices.wildcard)
1✔
517

518
        return search
1✔
519

520
    def get_versions(self) -> List[int]:
1✔
521
        """
522
        Returns a list of the available versions that have been indexed into
523
        Elasticsearch for this database. The versions are in ascending order and will be
524
        retrieved from both the version and next document fields to ensure we capture
525
        all versions.
526

527
        :return: the available versions in ascending order
528
        """
529
        versions = set()
1✔
530

531
        # get all versions present in the version and next document fields
532
        for field in (DocumentField.VERSION, DocumentField.NEXT):
1✔
533
            after = None
1✔
534
            while True:
535
                search = self.search(version=SearchVersion.all)[:0]
1✔
536
                search.aggs.bucket(
1✔
537
                    "versions",
538
                    "composite",
539
                    size=50,
540
                    sources={"version": A("terms", field=field, order="asc")},
541
                )
542
                if after is not None:
1✔
543
                    search.aggs["versions"].after = after
1✔
544
                result = search.execute().aggs.to_dict()
1✔
545
                buckets = get_in(("versions", "buckets"), result, [])
1✔
546
                after = get_in(("versions", "after_key"), result, None)
1✔
547
                if not buckets:
1✔
548
                    break
1✔
549
                versions.update(bucket["key"]["version"] for bucket in buckets)
1✔
550

551
        return sorted(versions)
1✔
552

553
    def get_fields(
1✔
554
        self, version: Optional[int] = None, query: Optional[Query] = None
555
    ) -> FieldInfo:
556
        """
557
        Get information about the fields available in this database at the given version
558
        under the given query criteria. The FieldInfo object that is returned contains
559
        information about both fields in the source data (data fields) and fields in the
560
        searchable data (parsed fields).
561

562
        :param version: the version to search at, if None (the default), search at the
563
                        latest version
564
        :param query: a filter to apply to the result, if None (the default), all
565
                      records at the given version are searched
566
        :return: a FieldInfo object
567
        """
568
        base = self.search(version if version is not None else SearchVersion.latest)
1✔
569
        if query is not None:
1✔
570
            base = base.filter(query)
1✔
571

572
        field_info = FieldInfo()
1✔
573

574
        work = [
1✔
575
            (DocumentField.DATA_TYPES, field_info.add_data_type),
576
            (DocumentField.PARSED_TYPES, field_info.add_parsed_type),
577
        ]
578

579
        for document_field, add_method in work:
1✔
580
            after = None
1✔
581

582
            while True:
583
                # this has a dual purpose, it ensures we don't get any search results
584
                # when we don't need them, and it ensures we get a fresh copy of the
585
                # search to work with
586
                search = base[:0]
1✔
587
                search.aggs.bucket(
1✔
588
                    "paths",
589
                    "composite",
590
                    # let's try and get all the fields in one go if we can
591
                    size=100,
592
                    sources={"path": A("terms", field=document_field)},
593
                )
594
                if after is not None:
1✔
595
                    search.aggs["paths"].after = after
1✔
596

597
                result = search.execute().aggs.to_dict()
1✔
598

599
                buckets = get_in(("paths", "buckets"), result, [])
1✔
600
                after = get_in(("paths", "after_key"), result, None)
1✔
601
                if not buckets:
1✔
602
                    break
1✔
603
                else:
604
                    for bucket in buckets:
1✔
605
                        add_method(bucket["key"]["path"], bucket["doc_count"])
1✔
606

607
        return field_info
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc