• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #79

05 Aug 2024 10:32AM UTC coverage: 94.602% (+0.2%) from 94.429%
#79

push

coveralls-python

jrdh
feat: make quad_segs circle creation option available as a parsing option per hint

4 of 4 new or added lines in 3 files covered. (100.0%)

16 existing lines in 1 file now uncovered.

999 of 1056 relevant lines covered (94.6%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.71
/splitgill/manager.py
1
from enum import Enum
1✔
2
from typing import Optional, Iterable, List, Dict, Union
1✔
3

4
from cytoolz.dicttoolz import get_in
1✔
5
from elasticsearch import Elasticsearch
1✔
6
from elasticsearch_dsl import Search, A
1✔
7
from elasticsearch_dsl.query import Query
1✔
8
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
9
from pymongo.collection import Collection
1✔
10
from pymongo.database import Database
1✔
11

12
from splitgill.indexing.fields import DocumentField, FieldInfo
1✔
13
from splitgill.indexing.index import IndexNames, generate_index_ops
1✔
14
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
15
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
16
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
17
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
18
from splitgill.locking import LockManager
1✔
19
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
20
from splitgill.search import create_version_query
1✔
21
from splitgill.utils import partition, now
1✔
22

23
MONGO_DATABASE_NAME = "sg"
1✔
24
OPTIONS_COLLECTION_NAME = "options"
1✔
25
LOCKS_COLLECTION_NAME = "locks"
1✔
26

27

28
class SplitgillClient:
1✔
29
    """
30
    Splitgill client class which holds a mongo connection, an elasticsearch connection
31
    and any other general information Splitgill needs to manage the databases.
32
    """
33

34
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
35
        self.mongo = mongo
1✔
36
        self.elasticsearch = elasticsearch
1✔
37
        self.lock_manager = LockManager(self.get_lock_collection())
1✔
38

39
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
40
        """
41
        Returns a SplitgillDatabase object.
42

43
        :param name: the name of the database
44
        :return: a new SplitgillDatabase object
45
        """
46
        return SplitgillDatabase(name, self)
1✔
47

48
    def get_mongo_database(self) -> Database:
1✔
49
        """
50
        Returns the MongoDB database in use.
51

52
        :return: a pymongo Database object
53
        """
54
        return self.mongo.get_database(MONGO_DATABASE_NAME)
1✔
55

56
    def get_options_collection(self) -> Collection:
1✔
57
        """
58
        Returns the options collection.
59

60
        :return: a pymongo Collection object
61
        """
62
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
1✔
63

64
    def get_data_collection(self, name: str) -> Collection:
1✔
65
        """
66
        Returns the data collection for the given Splitgill database.
67

68
        :param name: the name of the Splitgill database
69
        :return: a pymongo Collection object
70
        """
71
        return self.get_mongo_database().get_collection(f"data-{name}")
1✔
72

73
    def get_lock_collection(self) -> Collection:
1✔
74
        """
75
        Returns the locks collection.
76

77
        :return: a pymongo Collection object
78
        """
79
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
1✔
80

81

82
class SearchVersion(Enum):
1✔
83
    """
84
    Indicator for the SplitgillDatabase.search method as to which version of the data
85
    you would like to search.
86
    """
87

88
    # searches the latest data
89
    latest = "latest"
1✔
90
    # searches all data
91
    all = "all"
1✔
92

93

94
class SplitgillDatabase:
1✔
95
    """
96
    Represents a single set of data to be managed by Splitgill.
97

98
    Under the hood, this data will exist in several MongoDB collections and
99
    Elasticsearch indices, but this object provides an abstraction layer to access all
100
    of that from one object.
101
    """
102

103
    def __init__(self, name: str, client: SplitgillClient):
1✔
104
        """
105
        :param name: the name of the database, needs to be a valid MongoDB collection
106
                     name and a valid Elasticsearch index name
107
        :param client: a SplitgillClient object
108
        """
109
        self.name = name
1✔
110
        self._client = client
1✔
111
        # Mongo collection objects
112
        self.data_collection = self._client.get_data_collection(self.name)
1✔
113
        self.options_collection = self._client.get_options_collection()
1✔
114
        # index names
115
        self.indices = IndexNames(self.name)
1✔
116
        self.locker = self._client.lock_manager
1✔
117

118
    def get_committed_version(self) -> Optional[int]:
1✔
119
        """
120
        Returns the latest committed version of the data or config (whichever is
121
        higher). If no records or options exist, or if neither has any committed values,
122
        None is returned.
123

124
        :return: the max version or None
125
        """
126
        sort = [("version", DESCENDING)]
1✔
127
        last_data = self.data_collection.find_one(sort=sort)
1✔
128
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
1✔
129

130
        last_data_version = last_data.get("version") if last_data is not None else None
1✔
131
        last_options_version = (
1✔
132
            last_options.get("version") if last_options is not None else None
133
        )
134
        # there's no committed data or options
135
        if last_data_version is None and last_options_version is None:
1✔
136
            return None
1✔
137

138
        return max(
1✔
139
            last
140
            for last in (last_data_version, last_options_version)
141
            if last is not None
142
        )
143

144
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
145
        """
146
        Returns the latest version found in the Elasticsearch indices for this database.
147
        If no records exist in any index, None is returned.
148

149
        :return: the max version or None
150
        """
151
        result = self._client.elasticsearch.search(
1✔
152
            aggs={"max_version": {"max": {"field": DocumentField.VERSION}}},
153
            size=0,
154
            # search all data indices for this database (really the most recent version
155
            # will always be in the latest index, but using a wildcard on all indices
156
            # means we can avoid doing a check that the latest index exists first and
157
            # just go for it).
158
            index=self.indices.wildcard,
159
        )
160

161
        version = get_in(("aggregations", "max_version", "value"), result, None)
1✔
162
        if version is None:
1✔
163
            return None
1✔
164

165
        # elasticsearch does max aggs using the double type apparently, so we need to
166
        # convert it back to an int to avoid returning a float and causing confusion
167
        return int(version)
1✔
168

169
    def has_data(self) -> bool:
1✔
170
        """
171
        Returns True if there is at least one committed record in this database,
172
        otherwise returns False. Note that this ignored options.
173

174
        :return: True if there is data, False if not
175
        """
176
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
1✔
177

178
    def has_options(self) -> bool:
1✔
179
        """
180
        Returns True if there is at least one committed options in this database,
181
        otherwise returns False. Note that this ignored data.
182

183
        :return: True if there is options, False if not
184
        """
185
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
1✔
186

187
    def commit(self) -> Optional[int]:
1✔
188
        """
189
        Commits the currently uncommitted data and options changes for this database.
190
        All new data/options will be given the same version which is the current time.
191
        If no changes were made, None is returned, otherwise the new version is
192
        returned.
193

194
        If a commit is already ongoing this will raise an AlreadyLocked exception.
195

196
        :return: the new version or None if no uncommitted changes were found
197
        """
198
        # TODO: global now?
199
        # TODO: transaction/rollback? Can't do this without replicasets so who knows?
200
        with self.locker.lock(self.name, stage="commit"):
1✔
201
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
1✔
202
                # nothing to commit, so nothing to do
203
                return None
1✔
204

205
            if not self.has_options() and not self.has_uncommitted_options():
1✔
206
                # no existing options and no options to be committed, so create some
207
                # basic parsing options to use
208
                options = ParsingOptionsBuilder().build()
1✔
209
                self.update_options(options, commit=False)
1✔
210

211
            version = now()
1✔
212

213
            # update the uncommitted data and options in a transaction
214
            for collection in [self.data_collection, self.options_collection]:
1✔
215
                collection.update_many(
1✔
216
                    filter={"version": None}, update={"$set": {"version": version}}
217
                )
218
            return version
1✔
219

220
    def ingest(
1✔
221
        self,
222
        records: Iterable[Record],
223
        commit=True,
224
        modified_field: Optional[str] = None,
225
    ) -> IngestResult:
226
        """
227
        Ingests the given records to the database. This only adds the records to the
228
        MongoDB data collection, it doesn't trigger the indexing of this new data into
229
        the Elasticsearch cluster. All data will be added with a None version unless the
230
        commit parameter is True in which case a version will be assigned.
231

232
        Use the commit keyword argument to either close the "transaction" after writing
233
        these records or leave it open. By default, the "transaction" is committed
234
        before the method returns, and the version is set then.
235

236
        If an error occurs, the "transaction" will not be committed, but the changes
237
        will not be rolled back.
238

239
        :param records: the records to add. These will be added in batches, so it is
240
                        safe to pass a very large stream of records
241
        :param commit: whether to commit the data added with a new version after writing
242
                       the records. Default: True.
243
        :param modified_field: a field name which, if the only changes in the record
244
                               data are in this field means the changes will be ignored.
245
                               As you can probably guess from the name, the root reason
246
                               for this parameter existing is to avoid committing a new
247
                               version of a record when all that has happened is the
248
                               record has been touched and the modified field's date
249
                               value updated even though the rest of the record remains
250
                               the same. Default: None, meaning all fields are checked
251
                               for changes.
252
        :return: returns a IngestResult object
253
        """
254
        # this does nothing if the indexes already exist
255
        self.data_collection.create_indexes(
1✔
256
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
257
        )
258

259
        result = IngestResult()
1✔
260
        # this is used for the find size and the bulk ops partition size which both need
261
        # to be the same to ensure we can handle duplicates in the record stream
262
        size = 200
1✔
263

264
        for ops in partition(
1✔
265
            generate_ops(self.data_collection, records, modified_field, size), size
266
        ):
267
            bulk_result = self.data_collection.bulk_write(ops)
1✔
268
            result.update(bulk_result)
1✔
269

270
        if commit:
1✔
271
            result.version = self.commit()
1✔
272

273
        return result
1✔
274

275
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
276
        """
277
        Update the parsing options for this database.
278

279
        :param options: the new parsing options
280
        :param commit: whether to commit the new config added with a new version after
281
                       writing the config. Default: True.
282
        :return: returns the new version if a commit happened, otherwise None. If a
283
                 commit was requested but nothing was changed, None is returned.
284
        """
285
        # get the latest options that have been committed (get_options ignores
286
        # uncommitted options)
287
        all_options = self.get_options()
1✔
288
        if all_options:
1✔
289
            latest_options = all_options[max(all_options)]
1✔
290
        else:
291
            latest_options = None
1✔
292

293
        if self.has_uncommitted_options():
1✔
UNCOV
294
            self.rollback_options()
×
295

296
        # if the options are the same as the latest existing ones, don't update
297
        if latest_options == options:
1✔
UNCOV
298
            return None
×
299

300
        # either the options are completely new or they differ from the existing
301
        # options, write a fresh entry
302
        new_doc = {
1✔
303
            "name": self.name,
304
            # version = None to indicate this is an uncommitted change
305
            "version": None,
306
            "options": options.to_doc(),
307
        }
308
        self.options_collection.insert_one(new_doc)
1✔
309

310
        if commit:
1✔
311
            return self.commit()
1✔
312
        return None
1✔
313

314
    def rollback_options(self) -> int:
1✔
315
        """
316
        Remove any uncommitted option changes.
317

318
        There should only ever be one, but this deletes them all ensuring everything is
319
        clean and tidy.
320

321
        :return: the number of documents deleted
322
        """
UNCOV
323
        return self.options_collection.delete_many({"version": None}).deleted_count
×
324

325
    def rollback_records(self):
1✔
326
        """
327
        Remove any uncommitted data changes.
328

329
        This method has to interrogate every uncommitted record in the data collection
330
        to perform the rollback and therefore, depending on how much uncommitted data
331
        there is, may take a bit of time to run.
332
        """
UNCOV
333
        if self.has_uncommitted_data():
×
UNCOV
334
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
335
                self.data_collection.bulk_write(ops)
×
336

337
    def has_uncommitted_data(self) -> bool:
1✔
338
        """
339
        Check if there are any uncommitted records stored against this database.
340

341
        :return: returns True if there are any uncommitted records, False if not
342
        """
343
        return self.data_collection.find_one({"version": None}) is not None
1✔
344

345
    def has_uncommitted_options(self) -> bool:
1✔
346
        """
347
        Check if there are any uncommitted options stored against this database.
348

349
        :return: returns True if there are any uncommitted options, False if not
350
        """
351
        return self.options_collection.find_one({"version": None}) is not None
1✔
352

353
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
354
        """
355
        Retrieve all the parsing options configured for this database in a dict mapping
356
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
357
        indicate whether to include the uncommitted options or not.
358

359
        :return: a dict of versions and options
360
        """
361
        return {
1✔
362
            doc["version"]: ParsingOptions.from_doc(doc["options"])
363
            for doc in self.options_collection.find({"name": self.name})
364
            if include_uncommitted or doc["version"] is not None
365
        }
366

367
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
368
        """
369
        Yields MongoRecord objects matching the given find kwargs. As you can probably
370
        guess, the find_kwargs argument is just passed directly to PyMongo's find
371
        method.
372

373
        :param find_kwargs: args to pass to the data collection's find method
374
        :return: yields matching MongoRecord objects
375
        """
376
        yield from (
1✔
377
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
378
        )
379

380
    def sync(
1✔
381
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
382
    ) -> WriteResult:
383
        """
384
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
385
        updating the latest and old data indices as required.
386

387
        To find the data that needs to be updated, the current version of the data in
388
        MongoDB is compared to the current version of the data in Elasticsearch, and the
389
        two are synced (assuming MongoDB's version is <= Elasticsearch).
390

391
        While the data is being indexed, refreshing is paused on this database's indexes
392
        and only resumed once all the data has been indexed. If an error occurs during
393
        indexing then the refresh interval is not reset meaning the updates that have
394
        made it to Elasticsearch will not impact searches until a refresh is triggered,
395
        or the refresh interval is reset. This kinda makes this function transactional.
396
        If no errors occur, the refresh interval is reset (along with the replica count)
397
        and a refresh is called. This means that if this function returns successfully,
398
        the data updated by it will be immediately available for searches.
399

400
        :param bulk_options: options determining how the bulk operations are sent to
401
                             Elasticsearch
402
        :param resync: whether to resync all records with Elasticsearch regardless of
403
                       the currently synced version. This won't delete any data first
404
                       and just replaces documents in Elasticsearch as needed.
405
        :return: a WriteResult object
406
        """
407
        if not self.has_data():
1✔
408
            return WriteResult()
1✔
409

410
        all_options = self.get_options(include_uncommitted=False)
1✔
411
        last_sync = self.get_elasticsearch_version() if not resync else None
1✔
412
        if last_sync is None:
1✔
413
            # elasticsearch has nothing so find all committed records
414
            find_filter = {"version": {"$ne": None}}
1✔
415
        else:
416
            committed_version = self.get_committed_version()
1✔
417
            if last_sync >= committed_version:
1✔
418
                # elasticsearch and mongo are in sync (use >= rather than == just in
419
                # case some bizarro-ness has occurred)
UNCOV
420
                return WriteResult()
×
421
            if any(version > last_sync for version in all_options):
1✔
422
                # there's an options change ahead, this means we need to check all
423
                # records again, so filter out committed records only
UNCOV
424
                find_filter = {"version": {"$ne": None}}
×
425
            else:
426
                # find all the updated records that haven't had their updates synced yet
427
                find_filter = {"version": {"$gt": last_sync}}
1✔
428

429
        client = self._client.elasticsearch
1✔
430

431
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
1✔
432
        for index in self.indices.all:
1✔
433
            if not client.indices.exists(index=index):
1✔
434
                client.indices.create(index=index)
1✔
435

436
        # apply optimal indexing settings to all the indices we may update
437
        client.indices.put_settings(
1✔
438
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
439
            index=self.indices.all,
440
        )
441

442
        result = write_ops(
1✔
443
            client,
444
            generate_index_ops(
445
                self.indices,
446
                self.iter_records(filter=find_filter),
447
                all_options,
448
                last_sync,
449
            ),
450
            bulk_options,
451
        )
452

453
        # refresh all indices to make the changes visible all at once
454
        client.indices.refresh(index=self.indices.all)
1✔
455

456
        # reset the settings we changed (None forces them to revert to defaults)
457
        client.indices.put_settings(
1✔
458
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
459
            index=self.indices.all,
460
        )
461

462
        # do a bit of a tidy up by deleting any indexes without docs
463
        for index in self.indices.all:
1✔
464
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
1✔
465
                client.indices.delete(index=index)
1✔
466

467
        return result
1✔
468

469
    def search(
1✔
470
        self, version: Union[SearchVersion, int] = SearchVersion.latest
471
    ) -> Search:
472
        """
473
        Creates a Search DSL object to use on this database's indexed data. This Search
474
        object will be setup with the appropriate index and version filter depending on
475
        the given version parameter, and the Elasticsearch client object in use on this
476
        database.
477

478
        :param version: the version to search at, this should either be a SearchVersion
479
                        enum option or an int. SearchVersion.latest will result in a
480
                        search on the latest index with no version filter thus searching
481
                        the latest data. SearchVersion.all will result in a search on
482
                        all indices using a wildcard and no version filter. Passing an
483
                        int version will search at the given timestamp. The default is
484
                        SearchVersion.latest.
485
        :return: a Search DSL object
486
        """
487
        search = Search(using=self._client.elasticsearch)
1✔
488

489
        if isinstance(version, int):
1✔
490
            search = search.index(self.indices.wildcard)
1✔
491
            search = search.filter(create_version_query(version))
1✔
492
        else:
493
            if version == SearchVersion.latest:
1✔
494
                search = search.index(self.indices.latest)
1✔
495
            elif version == SearchVersion.all:
1✔
496
                search = search.index(self.indices.wildcard)
1✔
497

498
        return search
1✔
499

500
    def get_versions(self) -> List[int]:
1✔
501
        """
502
        Returns a list of the available versions that have been indexed into
503
        Elasticsearch for this database. The versions are in ascending order.
504

505
        :return: the available versions in ascending order
506
        """
UNCOV
507
        versions = set()
×
UNCOV
508
        after = None
×
509
        while True:
UNCOV
510
            search = self.search(version=SearchVersion.all)[:0]
×
UNCOV
511
            search.aggs.bucket(
×
512
                "versions",
513
                "composite",
514
                size=50,
515
                sources={
516
                    "version": A("terms", field=DocumentField.VERSION, order="asc")
517
                },
518
            )
519
            if after is not None:
×
520
                search.aggs["versions"].after = after
×
UNCOV
521
            result = search.execute().aggs.to_dict()
×
522
            buckets = get_in(("versions", "buckets"), result, [])
×
523
            after = get_in(("versions", "after_key"), result, None)
×
UNCOV
524
            if not buckets:
×
UNCOV
525
                break
×
UNCOV
526
            versions.update(bucket["key"]["version"] for bucket in buckets)
×
UNCOV
527
        return sorted(versions)
×
528

529
    def get_fields(
1✔
530
        self, version: Optional[int] = None, query: Optional[Query] = None
531
    ) -> FieldInfo:
532
        """
533
        Get information about the fields available in this database at the given version
534
        under the given query criteria. The FieldInfo object that is returned contains
535
        information about both fields in the source data (data fields) and fields in the
536
        searchable data (parsed fields).
537

538
        :param version: the version to search at, if None (the default), search at the
539
                        latest version
540
        :param query: a filter to apply to the result, if None (the default), all
541
                      records at the given version are searched
542
        :return: a FieldInfo object
543
        """
544
        base = self.search(version if version is not None else SearchVersion.latest)
1✔
545
        if query is not None:
1✔
546
            base = base.filter(query)
1✔
547

548
        field_info = FieldInfo()
1✔
549

550
        work = [
1✔
551
            (DocumentField.DATA_TYPES, field_info.add_data_type),
552
            (DocumentField.PARSED_TYPES, field_info.add_parsed_type),
553
        ]
554

555
        for document_field, add_method in work:
1✔
556
            after = None
1✔
557

558
            while True:
559
                # this has a dual purpose, it ensures we don't get any search results
560
                # when we don't need them, and it ensures we get a fresh copy of the
561
                # search to work with
562
                search = base[:0]
1✔
563
                search.aggs.bucket(
1✔
564
                    "paths",
565
                    "composite",
566
                    # let's try and get all the fields in one go if we can
567
                    size=100,
568
                    sources={"path": A("terms", field=document_field)},
569
                )
570
                if after is not None:
1✔
571
                    search.aggs["paths"].after = after
1✔
572

573
                result = search.execute().aggs.to_dict()
1✔
574

575
                buckets = get_in(("paths", "buckets"), result, [])
1✔
576
                after = get_in(("paths", "after_key"), result, None)
1✔
577
                if not buckets:
1✔
578
                    break
1✔
579
                else:
580
                    for bucket in buckets:
1✔
581
                        add_method(bucket["key"]["path"], bucket["doc_count"])
1✔
582

583
        return field_info
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc