• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #76

04 Aug 2024 07:46PM UTC coverage: 34.976% (-59.7%) from 94.693%
#76

push

coveralls-python

jrdh
feat: remove default config values

BREAKING CHANGE: remove default config values

3 of 19 new or added lines in 3 files covered. (15.79%)

613 existing lines in 13 files now uncovered.

369 of 1055 relevant lines covered (34.98%)

0.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.42
/splitgill/manager.py
1
from enum import Enum
1✔
2
from typing import Optional, Iterable, List, Dict, Union
1✔
3

4
from cytoolz.dicttoolz import get_in
1✔
5
from elasticsearch import Elasticsearch
1✔
6
from elasticsearch_dsl import Search, A
1✔
7
from elasticsearch_dsl.query import Query
1✔
8
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
9
from pymongo.collection import Collection
1✔
10
from pymongo.database import Database
1✔
11

12
from splitgill.indexing.fields import DocumentField, FieldInfo
1✔
13
from splitgill.indexing.index import IndexNames, generate_index_ops
1✔
14
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
15
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
16
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
17
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
18
from splitgill.locking import LockManager
1✔
19
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
20
from splitgill.search import create_version_query
1✔
21
from splitgill.utils import partition, now
1✔
22

23
MONGO_DATABASE_NAME = "sg"
1✔
24
OPTIONS_COLLECTION_NAME = "options"
1✔
25
LOCKS_COLLECTION_NAME = "locks"
1✔
26

27

28
class SplitgillClient:
1✔
29
    """
30
    Splitgill client class which holds a mongo connection, an elasticsearch connection
31
    and any other general information Splitgill needs to manage the databases.
32
    """
33

34
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
UNCOV
35
        self.mongo = mongo
×
UNCOV
36
        self.elasticsearch = elasticsearch
×
UNCOV
37
        self.lock_manager = LockManager(self.get_lock_collection())
×
38

39
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
40
        """
41
        Returns a SplitgillDatabase object.
42

43
        :param name: the name of the database
44
        :return: a new SplitgillDatabase object
45
        """
UNCOV
46
        return SplitgillDatabase(name, self)
×
47

48
    def get_mongo_database(self) -> Database:
1✔
49
        """
50
        Returns the MongoDB database in use.
51

52
        :return: a pymongo Database object
53
        """
UNCOV
54
        return self.mongo.get_database(MONGO_DATABASE_NAME)
×
55

56
    def get_options_collection(self) -> Collection:
1✔
57
        """
58
        Returns the options collection.
59

60
        :return: a pymongo Collection object
61
        """
UNCOV
62
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
×
63

64
    def get_data_collection(self, name: str) -> Collection:
1✔
65
        """
66
        Returns the data collection for the given Splitgill database.
67

68
        :param name: the name of the Splitgill database
69
        :return: a pymongo Collection object
70
        """
UNCOV
71
        return self.get_mongo_database().get_collection(f"data-{name}")
×
72

73
    def get_lock_collection(self) -> Collection:
1✔
74
        """
75
        Returns the locks collection.
76

77
        :return: a pymongo Collection object
78
        """
UNCOV
79
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
×
80

81

82
class SearchVersion(Enum):
1✔
83
    """
84
    Indicator for the SplitgillDatabase.search method as to which version of the data
85
    you would like to search.
86
    """
87

88
    # searches the latest data
89
    latest = "latest"
1✔
90
    # searches all data
91
    all = "all"
1✔
92

93

94
class SplitgillDatabase:
1✔
95
    """
96
    Represents a single set of data to be managed by Splitgill.
97

98
    Under the hood, this data will exist in several MongoDB collections and
99
    Elasticsearch indices, but this object provides an abstraction layer to access all
100
    of that from one object.
101
    """
102

103
    def __init__(self, name: str, client: SplitgillClient):
1✔
104
        """
105
        :param name: the name of the database, needs to be a valid MongoDB collection
106
                     name and a valid Elasticsearch index name
107
        :param client: a SplitgillClient object
108
        """
UNCOV
109
        self.name = name
×
UNCOV
110
        self._client = client
×
111
        # Mongo collection objects
UNCOV
112
        self.data_collection = self._client.get_data_collection(self.name)
×
UNCOV
113
        self.options_collection = self._client.get_options_collection()
×
114
        # index names
UNCOV
115
        self.indices = IndexNames(self.name)
×
UNCOV
116
        self.locker = self._client.lock_manager
×
117

118
    def get_committed_version(self) -> Optional[int]:
1✔
119
        """
120
        Returns the latest committed version of the data or config (whichever is
121
        higher). If no records or options exist, or if neither has any committed values,
122
        None is returned.
123

124
        :return: the max version or None
125
        """
UNCOV
126
        sort = [("version", DESCENDING)]
×
UNCOV
127
        last_data = self.data_collection.find_one(sort=sort)
×
UNCOV
128
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
×
129

UNCOV
130
        last_data_version = last_data.get("version") if last_data is not None else None
×
UNCOV
131
        last_options_version = (
×
132
            last_options.get("version") if last_options is not None else None
133
        )
134
        # there's no committed data or options
UNCOV
135
        if last_data_version is None and last_options_version is None:
×
UNCOV
136
            return None
×
137

UNCOV
138
        return max(
×
139
            last
140
            for last in (last_data_version, last_options_version)
141
            if last is not None
142
        )
143

144
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
145
        """
146
        Returns the latest version found in the Elasticsearch indices for this database.
147
        If no records exist in any index, None is returned.
148

149
        :return: the max version or None
150
        """
UNCOV
151
        result = self._client.elasticsearch.search(
×
152
            aggs={"max_version": {"max": {"field": DocumentField.VERSION}}},
153
            size=0,
154
            # search all data indices for this database (really the most recent version
155
            # will always be in the latest index, but using a wildcard on all indices
156
            # means we can avoid doing a check that the latest index exists first and
157
            # just go for it).
158
            index=self.indices.wildcard,
159
        )
160

UNCOV
161
        version = get_in(("aggregations", "max_version", "value"), result, None)
×
UNCOV
162
        if version is None:
×
UNCOV
163
            return None
×
164

165
        # elasticsearch does max aggs using the double type apparently, so we need to
166
        # convert it back to an int to avoid returning a float and causing confusion
UNCOV
167
        return int(version)
×
168

169
    def has_data(self) -> bool:
1✔
170
        """
171
        Returns True if there is at least one committed record in this database,
172
        otherwise returns False. Note that this ignored options.
173

174
        :return: True if there is data, False if not
175
        """
UNCOV
176
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
×
177

178
    def has_options(self) -> bool:
1✔
179
        """
180
        Returns True if there is at least one committed options in this database,
181
        otherwise returns False. Note that this ignored data.
182

183
        :return: True if there is options, False if not
184
        """
UNCOV
185
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
×
186

187
    def commit(self) -> Optional[int]:
1✔
188
        """
189
        Commits the currently uncommitted data and options changes for this database.
190
        All new data/options will be given the same version which is the current time.
191
        If no changes were made, None is returned, otherwise the new version is
192
        returned.
193

194
        If a commit is already ongoing this will raise an AlreadyLocked exception.
195

196
        :return: the new version or None if no uncommitted changes were found
197
        """
198
        # TODO: global now?
199
        # TODO: transaction/rollback? Can't do this without replicasets so who knows?
UNCOV
200
        with self.locker.lock(self.name, stage="commit"):
×
UNCOV
201
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
×
202
                # nothing to commit, so nothing to do
UNCOV
203
                return None
×
204

UNCOV
205
            if not self.has_options() and not self.has_uncommitted_options():
×
206
                # no existing options and no options to be committed, so create some
207
                # basic, valid parsing options to use. The options just contain the
208
                # required settings with some sensible values and nothing more
NEW
209
                options = (
×
210
                    ParsingOptionsBuilder()
211
                    .with_keyword_length(256)
212
                    # use 15 significant digits here which roughly matches how a float
213
                    # is actually stored in elasticsearch and therefore gives a somewhat
214
                    # sensible representative idea to users of what the number actually
215
                    # is and how it can be searched. This format will produce string
216
                    # representations of numbers in scientific notation if it decides it
217
                    # needs to.
218
                    .with_float_format("{0:.15g}")
219
                    .build()
220
                )
NEW
221
                self.update_options(options, commit=False)
×
222

UNCOV
223
            version = now()
×
224

225
            # update the uncommitted data and options in a transaction
UNCOV
226
            for collection in [self.data_collection, self.options_collection]:
×
UNCOV
227
                collection.update_many(
×
228
                    filter={"version": None}, update={"$set": {"version": version}}
229
                )
UNCOV
230
            return version
×
231

232
    def ingest(
1✔
233
        self,
234
        records: Iterable[Record],
235
        commit=True,
236
        modified_field: Optional[str] = None,
237
    ) -> IngestResult:
238
        """
239
        Ingests the given records to the database. This only adds the records to the
240
        MongoDB data collection, it doesn't trigger the indexing of this new data into
241
        the Elasticsearch cluster. All data will be added with a None version unless the
242
        commit parameter is True in which case a version will be assigned.
243

244
        Use the commit keyword argument to either close the "transaction" after writing
245
        these records or leave it open. By default, the "transaction" is committed
246
        before the method returns, and the version is set then.
247

248
        If an error occurs, the "transaction" will not be committed, but the changes
249
        will not be rolled back.
250

251
        :param records: the records to add. These will be added in batches, so it is
252
                        safe to pass a very large stream of records
253
        :param commit: whether to commit the data added with a new version after writing
254
                       the records. Default: True.
255
        :param modified_field: a field name which, if the only changes in the record
256
                               data are in this field means the changes will be ignored.
257
                               As you can probably guess from the name, the root reason
258
                               for this parameter existing is to avoid committing a new
259
                               version of a record when all that has happened is the
260
                               record has been touched and the modified field's date
261
                               value updated even though the rest of the record remains
262
                               the same. Default: None, meaning all fields are checked
263
                               for changes.
264
        :return: returns a IngestResult object
265
        """
266
        # this does nothing if the indexes already exist
UNCOV
267
        self.data_collection.create_indexes(
×
268
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
269
        )
270

UNCOV
271
        result = IngestResult()
×
272
        # this is used for the find size and the bulk ops partition size which both need
273
        # to be the same to ensure we can handle duplicates in the record stream
UNCOV
274
        size = 200
×
275

UNCOV
276
        for ops in partition(
×
277
            generate_ops(self.data_collection, records, modified_field, size), size
278
        ):
UNCOV
279
            bulk_result = self.data_collection.bulk_write(ops)
×
UNCOV
280
            result.update(bulk_result)
×
281

UNCOV
282
        if commit:
×
UNCOV
283
            result.version = self.commit()
×
284

UNCOV
285
        return result
×
286

287
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
288
        """
289
        Update the parsing options for this database.
290

291
        :param options: the new parsing options
292
        :param commit: whether to commit the new config added with a new version after
293
                       writing the config. Default: True.
294
        :return: returns the new version if a commit happened, otherwise None. If a
295
                 commit was requested but nothing was changed, None is returned.
296
        """
297
        # get the latest options that have been committed (get_options ignores
298
        # uncommitted options)
UNCOV
299
        all_options = self.get_options()
×
UNCOV
300
        if all_options:
×
UNCOV
301
            latest_options = all_options[max(all_options)]
×
302
        else:
UNCOV
303
            latest_options = None
×
304

UNCOV
305
        if self.has_uncommitted_options():
×
306
            self.rollback_options()
×
307

308
        # if the options are the same as the latest existing ones, don't update
UNCOV
309
        if latest_options == options:
×
310
            return None
×
311

312
        # either the options are completely new or they differ from the existing
313
        # options, write a fresh entry
UNCOV
314
        new_doc = {
×
315
            "name": self.name,
316
            # version = None to indicate this is an uncommitted change
317
            "version": None,
318
            "options": options.to_doc(),
319
        }
UNCOV
320
        self.options_collection.insert_one(new_doc)
×
321

UNCOV
322
        if commit:
×
UNCOV
323
            return self.commit()
×
UNCOV
324
        return None
×
325

326
    def rollback_options(self) -> int:
1✔
327
        """
328
        Remove any uncommitted option changes.
329

330
        There should only ever be one, but this deletes them all ensuring everything is
331
        clean and tidy.
332

333
        :return: the number of documents deleted
334
        """
335
        return self.options_collection.delete_many({"version": None}).deleted_count
×
336

337
    def rollback_records(self):
1✔
338
        """
339
        Remove any uncommitted data changes.
340

341
        This method has to interrogate every uncommitted record in the data collection
342
        to perform the rollback and therefore, depending on how much uncommitted data
343
        there is, may take a bit of time to run.
344
        """
345
        if self.has_uncommitted_data():
×
346
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
347
                self.data_collection.bulk_write(ops)
×
348

349
    def has_uncommitted_data(self) -> bool:
1✔
350
        """
351
        Check if there are any uncommitted records stored against this database.
352

353
        :return: returns True if there are any uncommitted records, False if not
354
        """
UNCOV
355
        return self.data_collection.find_one({"version": None}) is not None
×
356

357
    def has_uncommitted_options(self) -> bool:
1✔
358
        """
359
        Check if there are any uncommitted options stored against this database.
360

361
        :return: returns True if there are any uncommitted options, False if not
362
        """
UNCOV
363
        return self.options_collection.find_one({"version": None}) is not None
×
364

365
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
366
        """
367
        Retrieve all the parsing options configured for this database in a dict mapping
368
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
369
        indicate whether to include the uncommitted options or not.
370

371
        :return: a dict of versions and options
372
        """
UNCOV
373
        return {
×
374
            doc["version"]: ParsingOptions.from_doc(doc["options"])
375
            for doc in self.options_collection.find({"name": self.name})
376
            if include_uncommitted or doc["version"] is not None
377
        }
378

379
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
380
        """
381
        Yields MongoRecord objects matching the given find kwargs. As you can probably
382
        guess, the find_kwargs argument is just passed directly to PyMongo's find
383
        method.
384

385
        :param find_kwargs: args to pass to the data collection's find method
386
        :return: yields matching MongoRecord objects
387
        """
UNCOV
388
        yield from (
×
389
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
390
        )
391

392
    def sync(
1✔
393
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
394
    ) -> WriteResult:
395
        """
396
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
397
        updating the latest and old data indices as required.
398

399
        To find the data that needs to be updated, the current version of the data in
400
        MongoDB is compared to the current version of the data in Elasticsearch, and the
401
        two are synced (assuming MongoDB's version is <= Elasticsearch).
402

403
        While the data is being indexed, refreshing is paused on this database's indexes
404
        and only resumed once all the data has been indexed. If an error occurs during
405
        indexing then the refresh interval is not reset meaning the updates that have
406
        made it to Elasticsearch will not impact searches until a refresh is triggered,
407
        or the refresh interval is reset. This kinda makes this function transactional.
408
        If no errors occur, the refresh interval is reset (along with the replica count)
409
        and a refresh is called. This means that if this function returns successfully,
410
        the data updated by it will be immediately available for searches.
411

412
        :param bulk_options: options determining how the bulk operations are sent to
413
                             Elasticsearch
414
        :param resync: whether to resync all records with Elasticsearch regardless of
415
                       the currently synced version. This won't delete any data first
416
                       and just replaces documents in Elasticsearch as needed.
417
        :return: a WriteResult object
418
        """
UNCOV
419
        if not self.has_data():
×
UNCOV
420
            return WriteResult()
×
421

UNCOV
422
        all_options = self.get_options(include_uncommitted=False)
×
UNCOV
423
        last_sync = self.get_elasticsearch_version() if not resync else None
×
UNCOV
424
        if last_sync is None:
×
425
            # elasticsearch has nothing so find all committed records
UNCOV
426
            find_filter = {"version": {"$ne": None}}
×
427
        else:
UNCOV
428
            committed_version = self.get_committed_version()
×
UNCOV
429
            if last_sync >= committed_version:
×
430
                # elasticsearch and mongo are in sync (use >= rather than == just in
431
                # case some bizarro-ness has occurred)
432
                return WriteResult()
×
UNCOV
433
            if any(version > last_sync for version in all_options):
×
434
                # there's an options change ahead, this means we need to check all
435
                # records again, so filter out committed records only
436
                find_filter = {"version": {"$ne": None}}
×
437
            else:
438
                # find all the updated records that haven't had their updates synced yet
UNCOV
439
                find_filter = {"version": {"$gt": last_sync}}
×
440

UNCOV
441
        client = self._client.elasticsearch
×
442

UNCOV
443
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
×
UNCOV
444
        for index in self.indices.all:
×
UNCOV
445
            if not client.indices.exists(index=index):
×
UNCOV
446
                client.indices.create(index=index)
×
447

448
        # apply optimal indexing settings to all the indices we may update
UNCOV
449
        client.indices.put_settings(
×
450
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
451
            index=self.indices.all,
452
        )
453

UNCOV
454
        result = write_ops(
×
455
            client,
456
            generate_index_ops(
457
                self.indices,
458
                self.iter_records(filter=find_filter),
459
                all_options,
460
                last_sync,
461
            ),
462
            bulk_options,
463
        )
464

465
        # refresh all indices to make the changes visible all at once
UNCOV
466
        client.indices.refresh(index=self.indices.all)
×
467

468
        # reset the settings we changed (None forces them to revert to defaults)
UNCOV
469
        client.indices.put_settings(
×
470
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
471
            index=self.indices.all,
472
        )
473

474
        # do a bit of a tidy up by deleting any indexes without docs
UNCOV
475
        for index in self.indices.all:
×
UNCOV
476
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
×
UNCOV
477
                client.indices.delete(index=index)
×
478

UNCOV
479
        return result
×
480

481
    def search(
1✔
482
        self, version: Union[SearchVersion, int] = SearchVersion.latest
483
    ) -> Search:
484
        """
485
        Creates a Search DSL object to use on this database's indexed data. This Search
486
        object will be setup with the appropriate index and version filter depending on
487
        the given version parameter, and the Elasticsearch client object in use on this
488
        database.
489

490
        :param version: the version to search at, this should either be a SearchVersion
491
                        enum option or an int. SearchVersion.latest will result in a
492
                        search on the latest index with no version filter thus searching
493
                        the latest data. SearchVersion.all will result in a search on
494
                        all indices using a wildcard and no version filter. Passing an
495
                        int version will search at the given timestamp. The default is
496
                        SearchVersion.latest.
497
        :return: a Search DSL object
498
        """
UNCOV
499
        search = Search(using=self._client.elasticsearch)
×
500

UNCOV
501
        if isinstance(version, int):
×
UNCOV
502
            search = search.index(self.indices.wildcard)
×
UNCOV
503
            search = search.filter(create_version_query(version))
×
504
        else:
UNCOV
505
            if version == SearchVersion.latest:
×
UNCOV
506
                search = search.index(self.indices.latest)
×
UNCOV
507
            elif version == SearchVersion.all:
×
UNCOV
508
                search = search.index(self.indices.wildcard)
×
509

UNCOV
510
        return search
×
511

512
    def get_versions(self) -> List[int]:
1✔
513
        """
514
        Returns a list of the available versions that have been indexed into
515
        Elasticsearch for this database. The versions are in ascending order.
516

517
        :return: the available versions in ascending order
518
        """
519
        versions = set()
×
520
        after = None
×
521
        while True:
522
            search = self.search(version=SearchVersion.all)[:0]
×
523
            search.aggs.bucket(
×
524
                "versions",
525
                "composite",
526
                size=50,
527
                sources={
528
                    "version": A("terms", field=DocumentField.VERSION, order="asc")
529
                },
530
            )
531
            if after is not None:
×
532
                search.aggs["versions"].after = after
×
533
            result = search.execute().aggs.to_dict()
×
534
            buckets = get_in(("versions", "buckets"), result, [])
×
535
            after = get_in(("versions", "after_key"), result, None)
×
536
            if not buckets:
×
537
                break
×
538
            versions.update(bucket["key"]["version"] for bucket in buckets)
×
539
        return sorted(versions)
×
540

541
    def get_fields(
1✔
542
        self, version: Optional[int] = None, query: Optional[Query] = None
543
    ) -> FieldInfo:
544
        """
545
        Get information about the fields available in this database at the given version
546
        under the given query criteria. The FieldInfo object that is returned contains
547
        information about both fields in the source data (data fields) and fields in the
548
        searchable data (parsed fields).
549

550
        :param version: the version to search at, if None (the default), search at the
551
                        latest version
552
        :param query: a filter to apply to the result, if None (the default), all
553
                      records at the given version are searched
554
        :return: a FieldInfo object
555
        """
UNCOV
556
        base = self.search(version if version is not None else SearchVersion.latest)
×
UNCOV
557
        if query is not None:
×
UNCOV
558
            base = base.filter(query)
×
559

UNCOV
560
        field_info = FieldInfo()
×
561

UNCOV
562
        work = [
×
563
            (DocumentField.DATA_TYPES, field_info.add_data_type),
564
            (DocumentField.PARSED_TYPES, field_info.add_parsed_type),
565
        ]
566

UNCOV
567
        for document_field, add_method in work:
×
UNCOV
568
            after = None
×
569

570
            while True:
571
                # this has a dual purpose, it ensures we don't get any search results
572
                # when we don't need them, and it ensures we get a fresh copy of the
573
                # search to work with
UNCOV
574
                search = base[:0]
×
UNCOV
575
                search.aggs.bucket(
×
576
                    "paths",
577
                    "composite",
578
                    # let's try and get all the fields in one go if we can
579
                    size=100,
580
                    sources={"path": A("terms", field=document_field)},
581
                )
UNCOV
582
                if after is not None:
×
UNCOV
583
                    search.aggs["paths"].after = after
×
584

UNCOV
585
                result = search.execute().aggs.to_dict()
×
586

UNCOV
587
                buckets = get_in(("paths", "buckets"), result, [])
×
UNCOV
588
                after = get_in(("paths", "after_key"), result, None)
×
UNCOV
589
                if not buckets:
×
UNCOV
590
                    break
×
591
                else:
UNCOV
592
                    for bucket in buckets:
×
UNCOV
593
                        add_method(bucket["key"]["path"], bucket["doc_count"])
×
594

UNCOV
595
        return field_info
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc