• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #72

02 Aug 2024 09:12PM UTC coverage: 35.289% (-60.8%) from 96.047%
#72

push

coveralls-python

jrdh
build: swap docker-compose for docker compose

379 of 1074 relevant lines covered (35.29%)

0.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.57
/splitgill/manager.py
1
from enum import Enum
1✔
2
from typing import Optional, Iterable, List, Dict, Union
1✔
3

4
from cytoolz.dicttoolz import get_in
1✔
5
from elasticsearch import Elasticsearch
1✔
6
from elasticsearch_dsl import Search, A
1✔
7
from elasticsearch_dsl.query import Query
1✔
8
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
9
from pymongo.collection import Collection
1✔
10
from pymongo.database import Database
1✔
11

12
from splitgill.indexing.fields import DocumentField, FieldInfo
1✔
13
from splitgill.indexing.index import IndexNames, generate_index_ops
1✔
14
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
15
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
16
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
17
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
18
from splitgill.locking import LockManager
1✔
19
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
20
from splitgill.search import create_version_query
1✔
21
from splitgill.utils import partition, now
1✔
22

23
MONGO_DATABASE_NAME = "sg"
1✔
24
OPTIONS_COLLECTION_NAME = "options"
1✔
25
LOCKS_COLLECTION_NAME = "locks"
1✔
26

27

28
class SplitgillClient:
1✔
29
    """
30
    Splitgill client class which holds a mongo connection, an elasticsearch connection
31
    and any other general information Splitgill needs to manage the databases.
32
    """
33

34
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
35
        self.mongo = mongo
×
36
        self.elasticsearch = elasticsearch
×
37
        self.lock_manager = LockManager(self.get_lock_collection())
×
38

39
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
40
        """
41
        Returns a SplitgillDatabase object.
42

43
        :param name: the name of the database
44
        :return: a new SplitgillDatabase object
45
        """
46
        return SplitgillDatabase(name, self)
×
47

48
    def get_mongo_database(self) -> Database:
1✔
49
        """
50
        Returns the MongoDB database in use.
51

52
        :return: a pymongo Database object
53
        """
54
        return self.mongo.get_database(MONGO_DATABASE_NAME)
×
55

56
    def get_options_collection(self) -> Collection:
1✔
57
        """
58
        Returns the options collection.
59

60
        :return: a pymongo Collection object
61
        """
62
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
×
63

64
    def get_data_collection(self, name: str) -> Collection:
1✔
65
        """
66
        Returns the data collection for the given Splitgill database.
67

68
        :param name: the name of the Splitgill database
69
        :return: a pymongo Collection object
70
        """
71
        return self.get_mongo_database().get_collection(f"data-{name}")
×
72

73
    def get_lock_collection(self) -> Collection:
1✔
74
        """
75
        Returns the locks collection.
76

77
        :return: a pymongo Collection object
78
        """
79
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
×
80

81

82
class SearchVersion(Enum):
1✔
83
    """
84
    Indicator for the SplitgillDatabase.search method as to which version of the data
85
    you would like to search.
86
    """
87

88
    # searches the latest data
89
    latest = "latest"
1✔
90
    # searches all data
91
    all = "all"
1✔
92

93

94
class SplitgillDatabase:
1✔
95
    """
96
    Represents a single set of data to be managed by Splitgill.
97

98
    Under the hood, this data will exist in several MongoDB collections and
99
    Elasticsearch indices, but this object provides an abstraction layer to access all
100
    of that from one object.
101
    """
102

103
    def __init__(self, name: str, client: SplitgillClient):
1✔
104
        """
105
        :param name: the name of the database, needs to be a valid MongoDB collection
106
                     name and a valid Elasticsearch index name
107
        :param client: a SplitgillClient object
108
        """
109
        self.name = name
×
110
        self._client = client
×
111
        # Mongo collection objects
112
        self.data_collection = self._client.get_data_collection(self.name)
×
113
        self.options_collection = self._client.get_options_collection()
×
114
        # index names
115
        self.indices = IndexNames(self.name)
×
116
        self.locker = self._client.lock_manager
×
117

118
    def get_committed_version(self) -> Optional[int]:
1✔
119
        """
120
        Returns the latest committed version of the data or config (whichever is
121
        higher). If no records or options exist, or if neither has any committed values,
122
        None is returned.
123

124
        :return: the max version or None
125
        """
126
        sort = [("version", DESCENDING)]
×
127
        last_data = self.data_collection.find_one(sort=sort)
×
128
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
×
129

130
        last_data_version = last_data.get("version") if last_data is not None else None
×
131
        last_options_version = (
×
132
            last_options.get("version") if last_options is not None else None
133
        )
134
        # there's no committed data or options
135
        if last_data_version is None and last_options_version is None:
×
136
            return None
×
137

138
        return max(
×
139
            last
140
            for last in (last_data_version, last_options_version)
141
            if last is not None
142
        )
143

144
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
145
        """
146
        Returns the latest version found in the Elasticsearch indices for this database.
147
        If no records exist in any index, None is returned.
148

149
        :return: the max version or None
150
        """
151
        result = self._client.elasticsearch.search(
×
152
            aggs={"max_version": {"max": {"field": DocumentField.VERSION}}},
153
            size=0,
154
            # search all data indices for this database (really the most recent version
155
            # will always be in the latest index, but using a wildcard on all indices
156
            # means we can avoid doing a check that the latest index exists first and
157
            # just go for it).
158
            index=self.indices.wildcard,
159
        )
160

161
        version = get_in(("aggregations", "max_version", "value"), result, None)
×
162
        if version is None:
×
163
            return None
×
164

165
        # elasticsearch does max aggs using the double type apparently, so we need to
166
        # convert it back to an int to avoid returning a float and causing confusion
167
        return int(version)
×
168

169
    def has_data(self) -> bool:
1✔
170
        """
171
        Returns True if there is at least one committed record in this database,
172
        otherwise returns False. Note that this ignored options.
173

174
        :return: True if there is data, False if not
175
        """
176
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
×
177

178
    def has_options(self) -> bool:
1✔
179
        """
180
        Returns True if there is at least one committed options in this database,
181
        otherwise returns False. Note that this ignored data.
182

183
        :return: True if there is options, False if not
184
        """
185
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
×
186

187
    def commit(self) -> Optional[int]:
1✔
188
        """
189
        Commits the currently uncommitted data and options changes for this database.
190
        All new data/options will be given the same version which is the current time.
191
        If no changes were made, None is returned, otherwise the new version is
192
        returned.
193

194
        If a commit is already ongoing this will raise an AlreadyLocked exception.
195

196
        :return: the new version or None if no uncommitted changes were found
197
        """
198
        # TODO: global now?
199
        # TODO: transaction/rollback? Can't do this without replicasets so who knows?
200
        with self.locker.lock(self.name, stage="commit"):
×
201
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
×
202
                # nothing to commit, so nothing to do
203
                return None
×
204

205
            if not self.has_options() and not self.has_uncommitted_options():
×
206
                # no existing options and no options to be committed, create a default
207
                self.update_options(ParsingOptionsBuilder().build(), commit=False)
×
208

209
            version = now()
×
210

211
            # update the uncommitted data and options in a transaction
212
            for collection in [self.data_collection, self.options_collection]:
×
213
                collection.update_many(
×
214
                    filter={"version": None}, update={"$set": {"version": version}}
215
                )
216
            return version
×
217

218
    def ingest(
1✔
219
        self,
220
        records: Iterable[Record],
221
        commit=True,
222
        modified_field: Optional[str] = None,
223
    ) -> IngestResult:
224
        """
225
        Ingests the given records to the database. This only adds the records to the
226
        MongoDB data collection, it doesn't trigger the indexing of this new data into
227
        the Elasticsearch cluster. All data will be added with a None version unless the
228
        commit parameter is True in which case a version will be assigned.
229

230
        Use the commit keyword argument to either close the "transaction" after writing
231
        these records or leave it open. By default, the "transaction" is committed
232
        before the method returns, and the version is set then.
233

234
        If an error occurs, the "transaction" will not be committed, but the changes
235
        will not be rolled back.
236

237
        :param records: the records to add. These will be added in batches, so it is
238
                        safe to pass a very large stream of records
239
        :param commit: whether to commit the data added with a new version after writing
240
                       the records. Default: True.
241
        :param modified_field: a field name which, if the only changes in the record
242
                               data are in this field means the changes will be ignored.
243
                               As you can probably guess from the name, the root reason
244
                               for this parameter existing is to avoid committing a new
245
                               version of a record when all that has happened is the
246
                               record has been touched and the modified field's date
247
                               value updated even though the rest of the record remains
248
                               the same. Default: None, meaning all fields are checked
249
                               for changes.
250
        :return: returns a IngestResult object
251
        """
252
        # this does nothing if the indexes already exist
253
        self.data_collection.create_indexes(
×
254
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
255
        )
256

257
        result = IngestResult()
×
258
        # this is used for the find size and the bulk ops partition size which both need
259
        # to be the same to ensure we can handle duplicates in the record stream
260
        size = 200
×
261

262
        for ops in partition(
×
263
            generate_ops(self.data_collection, records, modified_field, size), size
264
        ):
265
            bulk_result = self.data_collection.bulk_write(ops)
×
266
            result.update(bulk_result)
×
267

268
        if commit:
×
269
            result.version = self.commit()
×
270

271
        return result
×
272

273
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
274
        """
275
        Update the parsing options for this database.
276

277
        :param options: the new parsing options
278
        :param commit: whether to commit the new config added with a new version after
279
                       writing the config. Default: True.
280
        :return: returns the new version if a commit happened, otherwise None. If a
281
                 commit was requested but nothing was changed, None is returned.
282
        """
283
        # get the latest options that have been committed (get_options ignores
284
        # uncommitted options)
285
        all_options = self.get_options()
×
286
        if all_options:
×
287
            latest_options = all_options[max(all_options)]
×
288
        else:
289
            latest_options = None
×
290

291
        if self.has_uncommitted_options():
×
292
            self.rollback_options()
×
293

294
        # if the options are the same as the latest existing ones, don't update
295
        if latest_options == options:
×
296
            return None
×
297

298
        # either the options are completely new or they differ from the existing
299
        # options, write a fresh entry
300
        new_doc = {
×
301
            "name": self.name,
302
            # version = None to indicate this is an uncommitted change
303
            "version": None,
304
            "options": options.to_doc(),
305
        }
306
        self.options_collection.insert_one(new_doc)
×
307

308
        if commit:
×
309
            return self.commit()
×
310
        return None
×
311

312
    def rollback_options(self) -> int:
1✔
313
        """
314
        Remove any uncommitted option changes.
315

316
        There should only ever be one, but this deletes them all ensuring everything is
317
        clean and tidy.
318

319
        :return: the number of documents deleted
320
        """
321
        return self.options_collection.delete_many({"version": None}).deleted_count
×
322

323
    def rollback_records(self):
1✔
324
        """
325
        Remove any uncommitted data changes.
326

327
        This method has to interrogate every uncommitted record in the data collection
328
        to perform the rollback and therefore, depending on how much uncommitted data
329
        there is, may take a bit of time to run.
330
        """
331
        if self.has_uncommitted_data():
×
332
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
333
                self.data_collection.bulk_write(ops)
×
334

335
    def has_uncommitted_data(self) -> bool:
1✔
336
        """
337
        Check if there are any uncommitted records stored against this database.
338

339
        :return: returns True if there are any uncommitted records, False if not
340
        """
341
        return self.data_collection.find_one({"version": None}) is not None
×
342

343
    def has_uncommitted_options(self) -> bool:
1✔
344
        """
345
        Check if there are any uncommitted options stored against this database.
346

347
        :return: returns True if there are any uncommitted options, False if not
348
        """
349
        return self.options_collection.find_one({"version": None}) is not None
×
350

351
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
352
        """
353
        Retrieve all the parsing options configured for this database in a dict mapping
354
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
355
        indicate whether to include the uncommitted options or not.
356

357
        :return: a dict of versions and options
358
        """
359
        return {
×
360
            doc["version"]: ParsingOptions.from_doc(doc["options"])
361
            for doc in self.options_collection.find({"name": self.name})
362
            if include_uncommitted or doc["version"] is not None
363
        }
364

365
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
366
        """
367
        Yields MongoRecord objects matching the given find kwargs. As you can probably
368
        guess, the find_kwargs argument is just passed directly to PyMongo's find
369
        method.
370

371
        :param find_kwargs: args to pass to the data collection's find method
372
        :return: yields matching MongoRecord objects
373
        """
374
        yield from (
×
375
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
376
        )
377

378
    def sync(
1✔
379
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
380
    ) -> WriteResult:
381
        """
382
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
383
        updating the latest and old data indices as required.
384

385
        To find the data that needs to be updated, the current version of the data in
386
        MongoDB is compared to the current version of the data in Elasticsearch, and the
387
        two are synced (assuming MongoDB's version is <= Elasticsearch).
388

389
        While the data is being indexed, refreshing is paused on this database's indexes
390
        and only resumed once all the data has been indexed. If an error occurs during
391
        indexing then the refresh interval is not reset meaning the updates that have
392
        made it to Elasticsearch will not impact searches until a refresh is triggered,
393
        or the refresh interval is reset. This kinda makes this function transactional.
394
        If no errors occur, the refresh interval is reset (along with the replica count)
395
        and a refresh is called. This means that if this function returns successfully,
396
        the data updated by it will be immediately available for searches.
397

398
        :param bulk_options: options determining how the bulk operations are sent to
399
                             Elasticsearch
400
        :param resync: whether to resync all records with Elasticsearch regardless of
401
                       the currently synced version. This won't delete any data first
402
                       and just replaces documents in Elasticsearch as needed.
403
        :return: a WriteResult object
404
        """
405
        if not self.has_data():
×
406
            return WriteResult()
×
407

408
        all_options = self.get_options(include_uncommitted=False)
×
409
        last_sync = self.get_elasticsearch_version() if not resync else None
×
410
        if last_sync is None:
×
411
            # elasticsearch has nothing so find all committed records
412
            find_filter = {"version": {"$ne": None}}
×
413
        else:
414
            committed_version = self.get_committed_version()
×
415
            if last_sync >= committed_version:
×
416
                # elasticsearch and mongo are in sync (use >= rather than == just in
417
                # case some bizarro-ness has occurred)
418
                return WriteResult()
×
419
            if any(version > last_sync for version in all_options):
×
420
                # there's an options change ahead, this means we need to check all
421
                # records again, so filter out committed records only
422
                find_filter = {"version": {"$ne": None}}
×
423
            else:
424
                # find all the updated records that haven't had their updates synced yet
425
                find_filter = {"version": {"$gt": last_sync}}
×
426

427
        client = self._client.elasticsearch
×
428

429
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
×
430
        for index in self.indices.all:
×
431
            if not client.indices.exists(index=index):
×
432
                client.indices.create(index=index)
×
433

434
        # apply optimal indexing settings to all the indices we may update
435
        client.indices.put_settings(
×
436
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
437
            index=self.indices.all,
438
        )
439

440
        result = write_ops(
×
441
            client,
442
            generate_index_ops(
443
                self.indices,
444
                self.iter_records(filter=find_filter),
445
                all_options,
446
                last_sync,
447
            ),
448
            bulk_options,
449
        )
450

451
        # refresh all indices to make the changes visible all at once
452
        client.indices.refresh(index=self.indices.all)
×
453

454
        # reset the settings we changed (None forces them to revert to defaults)
455
        client.indices.put_settings(
×
456
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
457
            index=self.indices.all,
458
        )
459

460
        # do a bit of a tidy up by deleting any indexes without docs
461
        for index in self.indices.all:
×
462
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
×
463
                client.indices.delete(index=index)
×
464

465
        return result
×
466

467
    def search(
1✔
468
        self, version: Union[SearchVersion, int] = SearchVersion.latest
469
    ) -> Search:
470
        """
471
        Creates a Search DSL object to use on this database's indexed data. This Search
472
        object will be setup with the appropriate index and version filter depending on
473
        the given version parameter, and the Elasticsearch client object in use on this
474
        database.
475

476
        :param version: the version to search at, this should either be a SearchVersion
477
                        enum option or an int. SearchVersion.latest will result in a
478
                        search on the latest index with no version filter thus searching
479
                        the latest data. SearchVersion.all will result in a search on
480
                        all indices using a wildcard and no version filter. Passing an
481
                        int version will search at the given timestamp. The default is
482
                        SearchVersion.latest.
483
        :return: a Search DSL object
484
        """
485
        search = Search(using=self._client.elasticsearch)
×
486

487
        if isinstance(version, int):
×
488
            search = search.index(self.indices.wildcard)
×
489
            search = search.filter(create_version_query(version))
×
490
        else:
491
            if version == SearchVersion.latest:
×
492
                search = search.index(self.indices.latest)
×
493
            elif version == SearchVersion.all:
×
494
                search = search.index(self.indices.wildcard)
×
495

496
        return search
×
497

498
    def get_versions(self) -> List[int]:
1✔
499
        """
500
        Returns a list of the available versions that have been indexed into
501
        Elasticsearch for this database. The versions are in ascending order.
502

503
        :return: the available versions in ascending order
504
        """
505
        versions = set()
×
506
        after = None
×
507
        while True:
508
            search = self.search(version=SearchVersion.all)[:0]
×
509
            search.aggs.bucket(
×
510
                "versions",
511
                "composite",
512
                size=50,
513
                sources={
514
                    "version": A("terms", field=DocumentField.VERSION, order="asc")
515
                },
516
            )
517
            if after is not None:
×
518
                search.aggs["versions"].after = after
×
519
            result = search.execute().aggs.to_dict()
×
520
            buckets = get_in(("versions", "buckets"), result, [])
×
521
            after = get_in(("versions", "after_key"), result, None)
×
522
            if not buckets:
×
523
                break
×
524
            versions.update(bucket["key"]["version"] for bucket in buckets)
×
525
        return sorted(versions)
×
526

527
    def get_fields(
1✔
528
        self, version: Optional[int] = None, query: Optional[Query] = None
529
    ) -> FieldInfo:
530
        """
531
        Get information about the fields available in this database at the given version
532
        under the given query criteria. The FieldInfo object that is returned contains
533
        information about both fields in the source data (data fields) and fields in the
534
        searchable data (parsed fields).
535

536
        :param version: the version to search at, if None (the default), search at the
537
                        latest version
538
        :param query: a filter to apply to the result, if None (the default), all
539
                      records at the given version are searched
540
        :return: a FieldInfo object
541
        """
542
        base = self.search(version if version is not None else SearchVersion.latest)
×
543
        if query is not None:
×
544
            base = base.filter(query)
×
545

546
        field_info = FieldInfo()
×
547

548
        work = [
×
549
            (DocumentField.DATA_TYPES, field_info.add_data_type),
550
            (DocumentField.PARSED_TYPES, field_info.add_parsed_type),
551
        ]
552

553
        for document_field, add_method in work:
×
554
            after = None
×
555

556
            while True:
557
                # this has a dual purpose, it ensures we don't get any search results
558
                # when we don't need them, and it ensures we get a fresh copy of the
559
                # search to work with
560
                search = base[:0]
×
561
                search.aggs.bucket(
×
562
                    "paths",
563
                    "composite",
564
                    # let's try and get all the fields in one go if we can
565
                    size=100,
566
                    sources={"path": A("terms", field=document_field)},
567
                )
568
                if after is not None:
×
569
                    search.aggs["paths"].after = after
×
570

571
                result = search.execute().aggs.to_dict()
×
572

573
                buckets = get_in(("paths", "buckets"), result, [])
×
574
                after = get_in(("paths", "after_key"), result, None)
×
575
                if not buckets:
×
576
                    break
×
577
                else:
578
                    for bucket in buckets:
×
579
                        add_method(bucket["key"]["path"], bucket["doc_count"])
×
580

581
        return field_info
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc