• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #69

26 Mar 2024 10:25AM UTC coverage: 96.13% (+0.3%) from 95.833%
#69

push

coveralls-python

jrdh
build: switch pytest-asyncio into auto mode and remove now unnecessary decorators

1068 of 1111 relevant lines covered (96.13%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.19
/splitgill/manager.py
1
from dataclasses import asdict
1✔
2
from enum import Enum
1✔
3
from typing import Optional, Iterable, List, Dict, Union
1✔
4

5
from cytoolz.dicttoolz import get_in
1✔
6
from elasticsearch import Elasticsearch
1✔
7
from elasticsearch_dsl import Search, A
1✔
8
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
9
from pymongo.collection import Collection
1✔
10
from pymongo.database import Database
1✔
11

12
from splitgill.indexing import fields
1✔
13
from splitgill.indexing.index import IndexNames, generate_index_ops
1✔
14
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
15
from splitgill.indexing.syncing import write_ops, WriteResult, BulkOptions
1✔
16
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
17
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
18
from splitgill.locking import LockManager
1✔
19
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
20
from splitgill.profiles import Profile, build_profile
1✔
21
from splitgill.search import create_version_query
1✔
22
from splitgill.utils import partition, now
1✔
23

24
MONGO_DATABASE_NAME = "sg"
1✔
25
OPTIONS_COLLECTION_NAME = "options"
1✔
26
PROFILES_INDEX_NAME = "profiles"
1✔
27
LOCKS_COLLECTION_NAME = "locks"
1✔
28

29

30
class SplitgillClient:
1✔
31
    """
32
    Splitgill client class which holds a mongo connection, an elasticsearch connection
33
    and any other general information Splitgill needs to manage the databases.
34
    """
35

36
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
37
        self.mongo = mongo
1✔
38
        self.elasticsearch = elasticsearch
1✔
39
        self.profile_manager = ProfileManager(elasticsearch)
1✔
40
        self.lock_manager = LockManager(self.get_lock_collection())
1✔
41

42
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
43
        """
44
        Returns a SplitgillDatabase object.
45

46
        :param name: the name of the database
47
        :return: a new SplitgillDatabase object
48
        """
49
        return SplitgillDatabase(name, self)
1✔
50

51
    def get_mongo_database(self) -> Database:
1✔
52
        """
53
        Returns the MongoDB database in use.
54

55
        :return: a pymongo Database object
56
        """
57
        return self.mongo.get_database(MONGO_DATABASE_NAME)
1✔
58

59
    def get_options_collection(self) -> Collection:
1✔
60
        """
61
        Returns the options collection.
62

63
        :return: a pymongo Collection object
64
        """
65
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
1✔
66

67
    def get_data_collection(self, name: str) -> Collection:
1✔
68
        """
69
        Returns the data collection for the given Splitgill database.
70

71
        :param name: the name of the Splitgill database
72
        :return: a pymongo Collection object
73
        """
74
        return self.get_mongo_database().get_collection(f"data-{name}")
1✔
75

76
    def get_lock_collection(self) -> Collection:
1✔
77
        """
78
        Returns the locks collection.
79

80
        :return: a pymongo Collection object
81
        """
82
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
1✔
83

84

85
class SearchVersion(Enum):
1✔
86
    """
87
    Indicator for the SplitgillDatabase.search method as to which version of the data
88
    you would like to search.
89
    """
90

91
    # searches the latest data
92
    latest = "latest"
1✔
93
    # searches all data
94
    all = "all"
1✔
95

96

97
class SplitgillDatabase:
1✔
98
    """
99
    Represents a single set of data to be managed by Splitgill.
100

101
    Under the hood, this data will exist in several MongoDB collections and
102
    Elasticsearch indices, but this object provides an abstraction layer to access all
103
    of that from one object.
104
    """
105

106
    def __init__(self, name: str, client: SplitgillClient):
1✔
107
        """
108
        :param name: the name of the database, needs to be a valid MongoDB collection
109
                     name and a valid Elasticsearch index name
110
        :param client: a SplitgillClient object
111
        """
112
        self.name = name
1✔
113
        self._client = client
1✔
114
        # Mongo collection objects
115
        self.data_collection = self._client.get_data_collection(self.name)
1✔
116
        self.options_collection = self._client.get_options_collection()
1✔
117
        # index names
118
        self.indices = IndexNames(self.name)
1✔
119
        self.locker = self._client.lock_manager
1✔
120

121
    def get_committed_version(self) -> Optional[int]:
1✔
122
        """
123
        Returns the latest committed version of the data or config (whichever is
124
        higher). If no records or options exist, or if neither has any committed values,
125
        None is returned.
126

127
        :return: the max version or None
128
        """
129
        sort = [("version", DESCENDING)]
1✔
130
        last_data = self.data_collection.find_one(sort=sort)
1✔
131
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
1✔
132

133
        last_data_version = last_data.get("version") if last_data is not None else None
1✔
134
        last_options_version = (
1✔
135
            last_options.get("version") if last_options is not None else None
136
        )
137
        # there's no committed data or options
138
        if last_data_version is None and last_options_version is None:
1✔
139
            return None
1✔
140

141
        return max(
1✔
142
            last
143
            for last in (last_data_version, last_options_version)
144
            if last is not None
145
        )
146

147
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
148
        """
149
        Returns the latest version found in the Elasticsearch indices for this database.
150
        If no records exist in any index, None is returned.
151

152
        :return: the max version or None
153
        """
154
        result = self._client.elasticsearch.search(
1✔
155
            aggs={"max_version": {"max": {"field": fields.VERSION}}},
156
            size=0,
157
            # search all data indices for this database (really the most recent version
158
            # will always be in the latest index, but using a wildcard on all indices
159
            # means we can avoid doing a check that the latest index exists first and
160
            # just go for it).
161
            index=self.indices.wildcard,
162
        )
163

164
        version = get_in(("aggregations", "max_version", "value"), result, None)
1✔
165
        if version is None:
1✔
166
            return None
1✔
167

168
        # elasticsearch does max aggs using the double type apparently, so we need to
169
        # convert it back to an int to avoid returning a float and causing confusion
170
        return int(version)
1✔
171

172
    def has_data(self) -> bool:
1✔
173
        """
174
        Returns True if there is at least one committed record in this database,
175
        otherwise returns False. Note that this ignored options.
176

177
        :return: True if there is data, False if not
178
        """
179
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
1✔
180

181
    def has_options(self) -> bool:
1✔
182
        """
183
        Returns True if there is at least one committed options in this database,
184
        otherwise returns False. Note that this ignored data.
185

186
        :return: True if there is options, False if not
187
        """
188
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
1✔
189

190
    def commit(self) -> Optional[int]:
1✔
191
        """
192
        Commits the currently uncommitted data and options changes for this database.
193
        All new data/options will be given the same version which is the current time.
194
        If no changes were made, None is returned, otherwise the new version is
195
        returned.
196

197
        If a commit is already ongoing this will raise an AlreadyLocked exception.
198

199
        :return: the new version or None if no uncommitted changes were found
200
        """
201
        # TODO: global now?
202
        # TODO: transaction/rollback? Can't do this without replicasets so who knows?
203
        with self.locker.lock(self.name, stage="commit"):
1✔
204
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
1✔
205
                # nothing to commit, so nothing to do
206
                return None
1✔
207

208
            if not self.has_options() and not self.has_uncommitted_options():
1✔
209
                # no existing options and no options to be committed, create a default
210
                self.update_options(ParsingOptionsBuilder().build(), commit=False)
1✔
211

212
            version = now()
1✔
213

214
            # update the uncommitted data and options in a transaction
215
            for collection in [self.data_collection, self.options_collection]:
1✔
216
                collection.update_many(
1✔
217
                    filter={"version": None}, update={"$set": {"version": version}}
218
                )
219
            return version
1✔
220

221
    def ingest(
1✔
222
        self,
223
        records: Iterable[Record],
224
        commit=True,
225
        modified_field: Optional[str] = None,
226
    ) -> IngestResult:
227
        """
228
        Ingests the given records to the database. This only adds the records to the
229
        MongoDB data collection, it doesn't trigger the indexing of this new data into
230
        the Elasticsearch cluster. All data will be added with a None version unless the
231
        commit parameter is True in which case a version will be assigned.
232

233
        Use the commit keyword argument to either close the "transaction" after writing
234
        these records or leave it open. By default, the "transaction" is committed
235
        before the method returns, and the version is set then.
236

237
        If an error occurs, the "transaction" will not be committed, but the changes
238
        will not be rolled back.
239

240
        :param records: the records to add. These will be added in batches, so it is
241
                        safe to pass a very large stream of records
242
        :param commit: whether to commit the data added with a new version after writing
243
                       the records. Default: True.
244
        :param modified_field: a field name which, if the only changes in the record
245
                               data are in this field means the changes will be ignored.
246
                               As you can probably guess from the name, the root reason
247
                               for this parameter existing is to avoid committing a new
248
                               version of a record when all that has happened is the
249
                               record has been touched and the modified field's date
250
                               value updated even though the rest of the record remains
251
                               the same. Default: None, meaning all fields are checked
252
                               for changes.
253
        :return: returns a IngestResult object
254
        """
255
        # this does nothing if the indexes already exist
256
        self.data_collection.create_indexes(
1✔
257
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
258
        )
259

260
        result = IngestResult()
1✔
261
        # this is used for the find size and the bulk ops partition size which both need
262
        # to be the same to ensure we can handle duplicates in the record stream
263
        size = 200
1✔
264

265
        for ops in partition(
1✔
266
            generate_ops(self.data_collection, records, modified_field, size), size
267
        ):
268
            bulk_result = self.data_collection.bulk_write(ops)
1✔
269
            result.update(bulk_result)
1✔
270

271
        if commit:
1✔
272
            result.version = self.commit()
1✔
273

274
        return result
1✔
275

276
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
277
        """
278
        Update the parsing options for this database.
279

280
        :param options: the new parsing options
281
        :param commit: whether to commit the new config added with a new version after
282
                       writing the config. Default: True.
283
        :return: returns the new version if a commit happened, otherwise None. If a
284
                 commit was requested but nothing was changed, None is returned.
285
        """
286
        # get the latest options that have been committed (get_options ignores
287
        # uncommitted options)
288
        all_options = self.get_options()
1✔
289
        if all_options:
1✔
290
            latest_options = all_options[max(all_options)]
1✔
291
        else:
292
            latest_options = None
1✔
293

294
        if self.has_uncommitted_options():
1✔
295
            self.rollback_options()
×
296

297
        # if the options are the same as the latest existing ones, don't update
298
        if latest_options == options:
1✔
299
            return None
1✔
300

301
        # either the options are completely new or they differ from the existing
302
        # options, write a fresh entry
303
        new_doc = {
1✔
304
            "name": self.name,
305
            # version = None to indicate this is an uncommitted change
306
            "version": None,
307
            "options": options.to_doc(),
308
        }
309
        self.options_collection.insert_one(new_doc)
1✔
310

311
        if commit:
1✔
312
            return self.commit()
1✔
313
        return None
1✔
314

315
    def rollback_options(self) -> int:
1✔
316
        """
317
        Remove any uncommitted option changes.
318

319
        There should only ever be one, but this deletes them all ensuring everything is
320
        clean and tidy.
321

322
        :return: the number of documents deleted
323
        """
324
        return self.options_collection.delete_many({"version": None}).deleted_count
×
325

326
    def rollback_records(self):
1✔
327
        """
328
        Remove any uncommitted data changes.
329

330
        This method has to interrogate every uncommitted record in the data collection
331
        to perform the rollback and therefore, depending on how much uncommitted data
332
        there is, may take a bit of time to run.
333
        """
334
        if self.has_uncommitted_data():
×
335
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
336
                self.data_collection.bulk_write(ops)
×
337

338
    def has_uncommitted_data(self) -> bool:
1✔
339
        """
340
        Check if there are any uncommitted records stored against this database.
341

342
        :return: returns True if there are any uncommitted records, False if not
343
        """
344
        return self.data_collection.find_one({"version": None}) is not None
1✔
345

346
    def has_uncommitted_options(self) -> bool:
1✔
347
        """
348
        Check if there are any uncommitted options stored against this database.
349

350
        :return: returns True if there are any uncommitted options, False if not
351
        """
352
        return self.options_collection.find_one({"version": None}) is not None
1✔
353

354
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
355
        """
356
        Retrieve all the parsing options configured for this database in a dict mapping
357
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
358
        indicate whether to include the uncommitted options or not.
359

360
        :return: a dict of versions and options
361
        """
362
        return {
1✔
363
            doc["version"]: ParsingOptions.from_doc(doc["options"])
364
            for doc in self.options_collection.find({"name": self.name})
365
            if include_uncommitted or doc["version"] is not None
366
        }
367

368
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
369
        """
370
        Yields MongoRecord objects matching the given find kwargs. As you can probably
371
        guess, the find_kwargs argument is just passed directly to PyMongo's find
372
        method.
373

374
        :param find_kwargs: args to pass to the data collection's find method
375
        :return: yields matching MongoRecord objects
376
        """
377
        yield from (
1✔
378
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
379
        )
380

381
    def sync(
1✔
382
        self, bulk_options: Optional[BulkOptions] = None, resync: bool = False
383
    ) -> WriteResult:
384
        """
385
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
386
        updating the latest and old data indices as required.
387

388
        To find the data that needs to be updated, the current version of the data in
389
        MongoDB is compared to the current version of the data in Elasticsearch, and the
390
        two are synced (assuming MongoDB's version is <= Elasticsearch).
391

392
        While the data is being indexed, refreshing is paused on this database's indexes
393
        and only resumed once all the data has been indexed. If an error occurs during
394
        indexing then the refresh interval is not reset meaning the updates that have
395
        made it to Elasticsearch will not impact searches until a refresh is triggered,
396
        or the refresh interval is reset. This kinda makes this function transactional.
397
        If no errors occur, the refresh interval is reset (along with the replica count)
398
        and a refresh is called. This means that if this function returns successfully,
399
        the data updated by it will be immediately available for searches.
400

401
        :param bulk_options: options determining how the bulk operations are sent to
402
                             Elasticsearch
403
        :param resync: whether to resync all records with Elasticsearch regardless of
404
                       the currently synced version. This won't delete any data first
405
                       and just replaces documents in Elasticsearch as needed.
406
        :return: a WriteResult object
407
        """
408
        if not self.has_data():
1✔
409
            return WriteResult()
1✔
410

411
        all_options = self.get_options(include_uncommitted=False)
1✔
412
        last_sync = self.get_elasticsearch_version() if not resync else None
1✔
413
        if last_sync is None:
1✔
414
            # elasticsearch has nothing so find all committed records
415
            find_filter = {"version": {"$ne": None}}
1✔
416
        else:
417
            committed_version = self.get_committed_version()
1✔
418
            if last_sync >= committed_version:
1✔
419
                # elasticsearch and mongo are in sync (use >= rather than == just in case
420
                # some bizarro-ness has occurred)
421
                return WriteResult()
×
422
            if any(version > last_sync for version in all_options):
1✔
423
                # there's an options change ahead, this means we need to check all
424
                # records again, so filter out committed records only
425
                find_filter = {"version": {"$ne": None}}
×
426
            else:
427
                # find all the updated records that haven't had their updates synced yet
428
                find_filter = {"version": {"$gt": last_sync}}
1✔
429

430
        client = self._client.elasticsearch
1✔
431

432
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
1✔
433
        for index in self.indices.all:
1✔
434
            if not client.indices.exists(index=index):
1✔
435
                client.indices.create(index=index)
1✔
436

437
        # apply optimal indexing settings to all the indices we may update
438
        client.indices.put_settings(
1✔
439
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
440
            index=self.indices.all,
441
        )
442

443
        result = write_ops(
1✔
444
            client,
445
            generate_index_ops(
446
                self.indices,
447
                self.iter_records(filter=find_filter),
448
                all_options,
449
                last_sync,
450
            ),
451
            bulk_options,
452
        )
453

454
        # refresh all indices to make the changes visible all at once
455
        client.indices.refresh(index=self.indices.all)
1✔
456

457
        # reset the settings we changed (None forces them to revert to defaults)
458
        client.indices.put_settings(
1✔
459
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
460
            index=self.indices.all,
461
        )
462

463
        # do a bit of a tidy up by deleting any indexes without docs
464
        for index in self.indices.all:
1✔
465
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
1✔
466
                client.indices.delete(index=index)
1✔
467

468
        self._client.profile_manager.update_profiles(self)
1✔
469

470
        return result
1✔
471

472
    def search(
1✔
473
        self, version: Union[SearchVersion, int] = SearchVersion.latest
474
    ) -> Search:
475
        """
476
        Creates a Search DSL object to use on this database's indexed data. This Search
477
        object will be setup with the appropriate index and version filter depending on
478
        the given version parameter, and the Elasticsearch client object in use on this
479
        database.
480

481
        :param version: the version to search at, this should either be a SearchVersion
482
                        enum option or an int. SearchVersion.latest will result in a
483
                        search on the latest index with no version filter thus searching
484
                        the latest data. SearchVersion.all will result in a search on
485
                        all indices using a wildcard and no version filter. Passing an
486
                        int version will search at the given timestamp. The default is
487
                        SearchVersion.latest.
488
        :return: a Search DSL object
489
        """
490
        search = Search(using=self._client.elasticsearch)
1✔
491

492
        if isinstance(version, int):
1✔
493
            search = search.index(self.indices.wildcard)
1✔
494
            search = search.filter(create_version_query(version))
1✔
495
        else:
496
            if version == SearchVersion.latest:
1✔
497
                search = search.index(self.indices.latest)
1✔
498
            elif version == SearchVersion.all:
1✔
499
                search = search.index(self.indices.wildcard)
1✔
500

501
        return search
1✔
502

503
    def get_available_versions(self) -> List[int]:
1✔
504
        """
505
        Returns a list of the available versions that have been indexed into
506
        Elasticsearch for this database. The versions are in ascending order.
507

508
        :return: the available versions in ascending order
509
        """
510
        versions = set()
1✔
511
        after = None
1✔
512
        while True:
513
            search = self.search(version=SearchVersion.all)[:0]
1✔
514
            search.aggs.bucket(
1✔
515
                "versions",
516
                "composite",
517
                size=50,
518
                sources={"version": A("terms", field=fields.VERSION, order="asc")},
519
            )
520
            if after is not None:
1✔
521
                search.aggs["versions"].after = after
1✔
522
            result = search.execute().aggs.to_dict()
1✔
523
            buckets = get_in(("versions", "buckets"), result, [])
1✔
524
            after = get_in(("versions", "after_key"), result, None)
1✔
525
            if not buckets:
1✔
526
                break
1✔
527
            versions.update(bucket["key"]["version"] for bucket in buckets)
1✔
528
        return sorted(versions)
1✔
529

530
    def get_profile(self, version: int) -> Optional[Profile]:
1✔
531
        """
532
        Given a version, gets the data profile that applies, if there is one available.
533

534
        :param version: the data version to get the profile for
535
        :return: a Profile object or None
536
        """
537
        return self._client.profile_manager.get_profile(self.name, version)
1✔
538

539
    def update_profiles(self, rebuild: bool = False):
1✔
540
        """
541
        Force an update of the profiles for this database, optionally rebuilding them.
542

543
        :param rebuild: whether to rebuild the profiles completely
544
        """
545
        self._client.profile_manager.update_profiles(self, rebuild)
×
546

547

548
class ProfileManager:
1✔
549
    """
550
    Class that manages all database profiles in Elasticsearch.
551
    """
552

553
    def __init__(self, elasticsearch: Elasticsearch):
1✔
554
        """
555
        :param elasticsearch: an Elasticsearch client
556
        """
557
        self._elasticsearch = elasticsearch
1✔
558

559
    def _index_exists(self) -> bool:
1✔
560
        """
561
        Check if the profiles index exists.
562

563
        :return: True if the profiles index exists, False if not
564
        """
565
        return self._elasticsearch.indices.exists(index=PROFILES_INDEX_NAME)
1✔
566

567
    def _create_index(self):
1✔
568
        """
569
        If the profiles index doesn't exist, create it.
570
        """
571
        if self._index_exists():
1✔
572
            return
1✔
573
        self._elasticsearch.indices.create(
1✔
574
            index=PROFILES_INDEX_NAME,
575
            mappings={
576
                "properties": {
577
                    "name": {"type": "keyword"},
578
                    "version": {"type": "long"},
579
                    "total": {"type": "long"},
580
                    "changes": {"type": "long"},
581
                    "fields": {
582
                        "properties": {
583
                            "name": {"type": "keyword"},
584
                            "path": {"type": "keyword"},
585
                            "count": {"type": "long"},
586
                            "boolean_count": {"type": "long"},
587
                            "date_count": {"type": "long"},
588
                            "number_count": {"type": "long"},
589
                            "array_count": {"type": "long"},
590
                            "is_value": {"type": "boolean"},
591
                            "is_parent": {"type": "boolean"},
592
                        }
593
                    },
594
                },
595
            },
596
        )
597

598
    def get_profile_versions(self, name: str) -> List[int]:
1✔
599
        """
600
        Returns a list of the profile versions that are currently available for the
601
        given database name. The list of versions is sorted in ascending order.
602

603
        :param name: the database name
604
        :return: the versions in ascending order
605
        """
606
        return [profile.version for profile in self.get_profiles(name)]
1✔
607

608
    def get_profile(self, name: str, version: int) -> Optional[Profile]:
1✔
609
        """
610
        Returns the profile that applies to the given version of the database with the
611
        given name. If no profile applies then None is returned. Each profile has a
612
        version and is applicable from that version (inclusive) until the next version
613
        (exclusive) that is available supersedes it. If no version supersedes then the
614
        version is the latest.
615

616
        :param name: the database name
617
        :param version: the version to get a profile for
618
        :return: a profile if one can be found, otherwise None
619
        """
620
        candidate = None
1✔
621
        for profile in self.get_profiles(name):
1✔
622
            if version >= profile.version:
1✔
623
                candidate = profile
1✔
624
            else:
625
                break
×
626
        return candidate
1✔
627

628
    def get_profiles(self, name: str) -> List[Profile]:
1✔
629
        """
630
        Return a list of all the profiles available for this database, sorted in
631
        ascending version order.
632

633
        :param name: the name of the database
634
        :return: a list of Profile objects
635
        """
636
        if not self._index_exists():
1✔
637
            return []
1✔
638
        search = Search(using=self._elasticsearch, index=PROFILES_INDEX_NAME).filter(
1✔
639
            "term", name=name
640
        )
641
        profiles = [Profile.from_dict(hit.to_dict()) for hit in search.scan()]
1✔
642
        return sorted(profiles, key=lambda profile: profile.version)
1✔
643

644
    def update_profiles(self, database: SplitgillDatabase, rebuild: bool = False):
1✔
645
        """
646
        Updates the profiles for the given database. This will find all the available
647
        versions of the database, check if any versions don't have a profile, and then
648
        create those versions as needed. If all versions have profiles, nothing happens.
649

650
        If the rebuild option is True, all the profiles are deleted and recreated. This
651
        may take a bit of time depending on the number of versions and size of the
652
        database.
653

654
        :param database: the database to profile
655
        :param rebuild: whether to rebuild all the profiles for all versions
656
        """
657
        if rebuild and self._index_exists():
1✔
658
            # delete all the profiles
659
            self._elasticsearch.delete_by_query(
×
660
                index=PROFILES_INDEX_NAME,
661
                refresh=True,
662
                query=Search().filter("term", name=database.name).to_dict(),
663
            )
664

665
        profiled_versions = set(self.get_profile_versions(database.name))
1✔
666
        for version in database.get_available_versions():
1✔
667
            if version in profiled_versions:
1✔
668
                continue
1✔
669
            # no profile available for this version, build it
670
            profile = build_profile(self._elasticsearch, database.indices, version)
1✔
671

672
            # create a doc from the profile and then add some extras
673
            doc = {**asdict(profile), "name": database.name, "version": version}
1✔
674

675
            # make sure the index exists
676
            self._create_index()
1✔
677

678
            # add to the profiles index
679
            self._elasticsearch.create(
1✔
680
                index=PROFILES_INDEX_NAME,
681
                id=f"{database.name}-{version}",
682
                document=doc,
683
                refresh=True,
684
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc