• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #68

20 Mar 2024 09:46PM UTC coverage: 95.833% (+0.2%) from 95.643%
#68

push

coveralls-python

jrdh
feat: lock during database commit

9 of 9 new or added lines in 1 file covered. (100.0%)

9 existing lines in 1 file now uncovered.

966 of 1008 relevant lines covered (95.83%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.22
/splitgill/manager.py
1
from collections import deque
1✔
2
from dataclasses import asdict
1✔
3
from enum import Enum
1✔
4
from typing import Optional, Iterable, List, Dict, Union
1✔
5

6
from cytoolz.dicttoolz import get_in
1✔
7
from elasticsearch import Elasticsearch
1✔
8
from elasticsearch.helpers import parallel_bulk, streaming_bulk
1✔
9
from elasticsearch_dsl import Search, A
1✔
10
from pymongo import MongoClient, IndexModel, ASCENDING, DESCENDING
1✔
11
from pymongo.collection import Collection
1✔
12
from pymongo.database import Database
1✔
13

14
from splitgill.indexing import fields
1✔
15
from splitgill.indexing.index import generate_index_ops, IndexNames
1✔
16
from splitgill.indexing.options import ParsingOptionsBuilder
1✔
17
from splitgill.indexing.templates import DATA_TEMPLATE
1✔
18
from splitgill.ingest import generate_ops, generate_rollback_ops
1✔
19
from splitgill.locking import LockManager
1✔
20
from splitgill.model import Record, MongoRecord, ParsingOptions, IngestResult
1✔
21
from splitgill.profiles import Profile, build_profile
1✔
22
from splitgill.search import create_version_query
1✔
23
from splitgill.utils import partition, now
1✔
24

25
MONGO_DATABASE_NAME = "sg"
1✔
26
OPTIONS_COLLECTION_NAME = "options"
1✔
27
PROFILES_INDEX_NAME = "profiles"
1✔
28
LOCKS_COLLECTION_NAME = "locks"
1✔
29

30

31
class SplitgillClient:
1✔
32
    """
33
    Splitgill client class which holds a mongo connection, an elasticsearch connection
34
    and any other general information Splitgill needs to manage the databases.
35
    """
36

37
    def __init__(self, mongo: MongoClient, elasticsearch: Elasticsearch):
1✔
38
        self.mongo = mongo
1✔
39
        self.elasticsearch = elasticsearch
1✔
40
        self.profile_manager = ProfileManager(elasticsearch)
1✔
41
        self.lock_manager = LockManager(self.get_lock_collection())
1✔
42

43
    def get_database(self, name: str) -> "SplitgillDatabase":
1✔
44
        """
45
        Returns a SplitgillDatabase object.
46

47
        :param name: the name of the database
48
        :return: a new SplitgillDatabase object
49
        """
50
        return SplitgillDatabase(name, self)
1✔
51

52
    def get_mongo_database(self) -> Database:
1✔
53
        """
54
        Returns the MongoDB database in use.
55

56
        :return: a pymongo Database object
57
        """
58
        return self.mongo.get_database(MONGO_DATABASE_NAME)
1✔
59

60
    def get_options_collection(self) -> Collection:
1✔
61
        """
62
        Returns the options collection.
63

64
        :return: a pymongo Collection object
65
        """
66
        return self.get_mongo_database().get_collection(OPTIONS_COLLECTION_NAME)
1✔
67

68
    def get_data_collection(self, name: str) -> Collection:
1✔
69
        """
70
        Returns the data collection for the given Splitgill database.
71

72
        :param name: the name of the Splitgill database
73
        :return: a pymongo Collection object
74
        """
75
        return self.get_mongo_database().get_collection(f"data-{name}")
1✔
76

77
    def get_lock_collection(self) -> Collection:
1✔
78
        """
79
        Returns the locks collection.
80

81
        :return: a pymongo Collection object
82
        """
83
        return self.get_mongo_database().get_collection(LOCKS_COLLECTION_NAME)
1✔
84

85

86
class SearchVersion(Enum):
1✔
87
    """
88
    Indicator for the SplitgillDatabase.search method as to which version of the data
89
    you would like to search.
90
    """
91

92
    # searches the latest data
93
    latest = "latest"
1✔
94
    # searches all data
95
    all = "all"
1✔
96

97

98
class SplitgillDatabase:
1✔
99
    """
100
    Represents a single set of data to be managed by Splitgill.
101

102
    Under the hood, this data will exist in several MongoDB collections and
103
    Elasticsearch indices, but this object provides an abstraction layer to access all
104
    of that from one object.
105
    """
106

107
    def __init__(self, name: str, client: SplitgillClient):
1✔
108
        """
109
        :param name: the name of the database, needs to be a valid MongoDB collection
110
                     name and a valid Elasticsearch index name
111
        :param client: a SplitgillClient object
112
        """
113
        self.name = name
1✔
114
        self._client = client
1✔
115
        # Mongo collection objects
116
        self.data_collection = self._client.get_data_collection(self.name)
1✔
117
        self.options_collection = self._client.get_options_collection()
1✔
118
        # index names
119
        self.indices = IndexNames(self.name)
1✔
120
        self.locker = self._client.lock_manager
1✔
121

122
    def get_committed_version(self) -> Optional[int]:
1✔
123
        """
124
        Returns the latest committed version of the data or config (whichever is
125
        higher). If no records or options exist, or if neither has any committed values,
126
        None is returned.
127

128
        :return: the max version or None
129
        """
130
        sort = [("version", DESCENDING)]
1✔
131
        last_data = self.data_collection.find_one(sort=sort)
1✔
132
        last_options = self.options_collection.find_one({"name": self.name}, sort=sort)
1✔
133

134
        last_data_version = last_data.get("version") if last_data is not None else None
1✔
135
        last_options_version = (
1✔
136
            last_options.get("version") if last_options is not None else None
137
        )
138
        # there's no committed data or options
139
        if last_data_version is None and last_options_version is None:
1✔
140
            return None
1✔
141

142
        return max(
1✔
143
            last
144
            for last in (last_data_version, last_options_version)
145
            if last is not None
146
        )
147

148
    def get_elasticsearch_version(self) -> Optional[int]:
1✔
149
        """
150
        Returns the latest version found in the Elasticsearch indices for this database.
151
        If no records exist in any index, None is returned.
152

153
        :return: the max version or None
154
        """
155
        result = self._client.elasticsearch.search(
1✔
156
            aggs={"max_version": {"max": {"field": fields.VERSION}}},
157
            size=0,
158
            # search all data indices for this database (really the most recent version
159
            # will always be in the latest index, but using a wildcard on all indices
160
            # means we can avoid doing a check that the latest index exists first and
161
            # just go for it).
162
            index=self.indices.wildcard,
163
        )
164

165
        version = get_in(("aggregations", "max_version", "value"), result, None)
1✔
166
        if version is None:
1✔
167
            return None
1✔
168

169
        # elasticsearch does max aggs using the double type apparently, so we need to
170
        # convert it back to an int to avoid returning a float and causing confusion
171
        return int(version)
1✔
172

173
    def has_data(self) -> bool:
1✔
174
        """
175
        Returns True if there is at least one committed record in this database,
176
        otherwise returns False. Note that this ignored options.
177

178
        :return: True if there is data, False if not
179
        """
180
        return self.data_collection.find_one({"version": {"$ne": None}}) is not None
1✔
181

182
    def has_options(self) -> bool:
1✔
183
        """
184
        Returns True if there is at least one committed options in this database,
185
        otherwise returns False. Note that this ignored data.
186

187
        :return: True if there is options, False if not
188
        """
189
        return self.options_collection.find_one({"version": {"$ne": None}}) is not None
1✔
190

191
    def commit(self) -> Optional[int]:
1✔
192
        """
193
        Commits the currently uncommitted data and options changes for this database.
194
        All new data/options will be given the same version which is the current time.
195
        If no changes were made, None is returned, otherwise the new version is
196
        returned.
197

198
        If a commit is already ongoing this will raise an AlreadyLocked exception.
199

200
        :return: the new version or None if no uncommitted changes were found
201
        """
202
        # TODO: global now?
203
        # TODO: transaction/rollback? Can't do this without replicasets so who knows?
204
        with self.locker.lock(self.name, stage="commit"):
1✔
205
            if not self.has_uncommitted_data() and not self.has_uncommitted_options():
1✔
206
                # nothing to commit, so nothing to do
207
                return None
1✔
208

209
            if not self.has_options() and not self.has_uncommitted_options():
1✔
210
                # no existing options and no options to be committed, create a default
211
                self.update_options(ParsingOptionsBuilder().build(), commit=False)
1✔
212

213
            version = now()
1✔
214

215
            # update the uncommitted data and options in a transaction
216
            for collection in [self.data_collection, self.options_collection]:
1✔
217
                collection.update_many(
1✔
218
                    filter={"version": None}, update={"$set": {"version": version}}
219
                )
220
            return version
1✔
221

222
    def ingest(
1✔
223
        self,
224
        records: Iterable[Record],
225
        commit=True,
226
        modified_field: Optional[str] = None,
227
    ) -> IngestResult:
228
        """
229
        Ingests the given records to the database. This only adds the records to the
230
        MongoDB data collection, it doesn't trigger the indexing of this new data into
231
        the Elasticsearch cluster. All data will be added with a None version unless the
232
        commit parameter is True in which case a version will be assigned.
233

234
        Use the commit keyword argument to either close the "transaction" after writing
235
        these records or leave it open. By default, the "transaction" is committed
236
        before the method returns, and the version is set then.
237

238
        If an error occurs, the "transaction" will not be committed, but the changes
239
        will not be rolled back.
240

241
        :param records: the records to add. These will be added in batches, so it is
242
                        safe to pass a very large stream of records
243
        :param commit: whether to commit the data added with a new version after writing
244
                       the records. Default: True.
245
        :param modified_field: a field name which, if the only changes in the record
246
                               data are in this field means the changes will be ignored.
247
                               As you can probably guess from the name, the root reason
248
                               for this parameter existing is to avoid committing a new
249
                               version of a record when all that has happened is the
250
                               record has been touched and the modified field's date
251
                               value updated even though the rest of the record remains
252
                               the same. Default: None, meaning all fields are checked
253
                               for changes.
254
        :return: returns a IngestResult object
255
        """
256
        # this does nothing if the indexes already exist
257
        self.data_collection.create_indexes(
1✔
258
            [IndexModel([("id", ASCENDING)]), IndexModel([("version", DESCENDING)])]
259
        )
260

261
        result = IngestResult()
1✔
262
        # this is used for the find size and the bulk ops partition size which both need
263
        # to be the same to ensure we can handle duplicates in the record stream
264
        size = 200
1✔
265

266
        for ops in partition(
1✔
267
            generate_ops(self.data_collection, records, modified_field, size), size
268
        ):
269
            bulk_result = self.data_collection.bulk_write(ops)
1✔
270
            result.update(bulk_result)
1✔
271

272
        if commit:
1✔
273
            result.version = self.commit()
1✔
274

275
        return result
1✔
276

277
    def update_options(self, options: ParsingOptions, commit=True) -> Optional[int]:
1✔
278
        """
279
        Update the parsing options for this database.
280

281
        :param options: the new parsing options
282
        :param commit: whether to commit the new config added with a new version after
283
                       writing the config. Default: True.
284
        :return: returns the new version if a commit happened, otherwise None. If a
285
                 commit was requested but nothing was changed, None is returned.
286
        """
287
        # get the latest options that have been committed (get_options ignores
288
        # uncommitted options)
289
        all_options = self.get_options()
1✔
290
        if all_options:
1✔
291
            latest_options = all_options[max(all_options)]
1✔
292
        else:
293
            latest_options = None
1✔
294

295
        if self.has_uncommitted_options():
1✔
UNCOV
296
            self.rollback_options()
×
297

298
        # if the options are the same as the latest existing ones, don't update
299
        if latest_options == options:
1✔
300
            return None
1✔
301

302
        # either the options are completely new or they differ from the existing
303
        # options, write a fresh entry
304
        new_doc = {
1✔
305
            "name": self.name,
306
            # version = None to indicate this is an uncommitted change
307
            "version": None,
308
            "options": options.to_doc(),
309
        }
310
        self.options_collection.insert_one(new_doc)
1✔
311

312
        if commit:
1✔
313
            return self.commit()
1✔
314
        return None
1✔
315

316
    def rollback_options(self) -> int:
1✔
317
        """
318
        Remove any uncommitted option changes.
319

320
        There should only ever be one, but this deletes them all ensuring everything is
321
        clean and tidy.
322

323
        :return: the number of documents deleted
324
        """
325
        return self.options_collection.delete_many({"version": None}).deleted_count
×
326

327
    def rollback_records(self):
1✔
328
        """
329
        Remove any uncommitted data changes.
330

331
        This method has to interrogate every uncommitted record in the data collection
332
        to perform the rollback and therefore, depending on how much uncommitted data
333
        there is, may take a bit of time to run.
334
        """
UNCOV
335
        if self.has_uncommitted_data():
×
UNCOV
336
            for ops in partition(generate_rollback_ops(self.data_collection), 200):
×
UNCOV
337
                self.data_collection.bulk_write(ops)
×
338

339
    def has_uncommitted_data(self) -> bool:
1✔
340
        """
341
        Check if there are any uncommitted records stored against this database.
342

343
        :return: returns True if there are any uncommitted records, False if not
344
        """
345
        return self.data_collection.find_one({"version": None}) is not None
1✔
346

347
    def has_uncommitted_options(self) -> bool:
1✔
348
        """
349
        Check if there are any uncommitted options stored against this database.
350

351
        :return: returns True if there are any uncommitted options, False if not
352
        """
353
        return self.options_collection.find_one({"version": None}) is not None
1✔
354

355
    def get_options(self, include_uncommitted=False) -> Dict[int, ParsingOptions]:
1✔
356
        """
357
        Retrieve all the parsing options configured for this database in a dict mapping
358
        int versions to ParsingOptions objects. Use the include_uncommitted parameter to
359
        indicate whether to include the uncommitted options or not.
360

361
        :return: a dict of versions and options
362
        """
363
        return {
1✔
364
            doc["version"]: ParsingOptions.from_doc(doc["options"])
365
            for doc in self.options_collection.find({"name": self.name})
366
            if include_uncommitted or doc["version"] is not None
367
        }
368

369
    def iter_records(self, **find_kwargs) -> Iterable[MongoRecord]:
1✔
370
        """
371
        Yields MongoRecord objects matching the given find kwargs. As you can probably
372
        guess, the find_kwargs argument is just passed directly to PyMongo's find
373
        method.
374

375
        :param find_kwargs: args to pass to the data collection's find method
376
        :return: yields matching MongoRecord objects
377
        """
378
        yield from (
1✔
379
            MongoRecord(**doc) for doc in self.data_collection.find(**find_kwargs)
380
        )
381

382
    def sync(self, parallel: bool = True, chunk_size: int = 500, resync: bool = False):
1✔
383
        """
384
        Synchronise the data/options in MongoDB with the data in Elasticsearch by
385
        updating the latest and old data indices as required.
386

387
        To find the data that needs to be updated, the current version of the data in
388
        MongoDB is compared to the current version of the data in Elasticsearch, and the
389
        two are synced (assuming MongoDB's version is <= Elasticsearch).
390

391
        While the data is being indexed, refreshing is paused on this database's indexes
392
        and only resumed once all the data has been indexed. If an error occurs during
393
        indexing then the refresh interval is not reset meaning the updates that have
394
        made it to Elasticsearch will not impact searches until a refresh is triggered,
395
        or the refresh interval is reset. This kinda makes this function transactional.
396
        If no errors occur, the refresh interval is reset (along with the replica count)
397
        and a refresh is called. This means that if this function returns successfully,
398
        the data updated by it will be immediately available for searches.
399

400
        :param parallel: send the data to Elasticsearch using multiple threads if True,
401
                         otherwise use a single thread if False
402
        :param chunk_size: the number of docs to send to Elasticsearch in each bulk
403
                           request
404
        :param resync: whether to resync all records with Elasticsearch regardless of
405
                       the currently synced version. This won't delete any data first
406
                       and just replaces documents in Elasticsearch as needed.
407
        """
408
        if not self.has_data():
1✔
409
            return
1✔
410

411
        all_options = self.get_options(include_uncommitted=False)
1✔
412
        last_sync = self.get_elasticsearch_version() if not resync else None
1✔
413
        if last_sync is None:
1✔
414
            # elasticsearch has nothing so find all committed records
415
            find_filter = {"version": {"$ne": None}}
1✔
416
        else:
417
            committed_version = self.get_committed_version()
1✔
418
            if last_sync >= committed_version:
1✔
419
                # elasticsearch and mongo are in sync (use >= rather than == just in case
420
                # some bizarro-ness has occurred)
UNCOV
421
                return
×
422
            if any(version > last_sync for version in all_options):
1✔
423
                # there's an options change ahead, this means we need to check all
424
                # records again, so filter out committed records only
UNCOV
425
                find_filter = {"version": {"$ne": None}}
×
426
            else:
427
                # find all the updated records that haven't had their updates synced yet
428
                find_filter = {"version": {"$gt": last_sync}}
1✔
429

430
        client = self._client.elasticsearch
1✔
431
        bulk_function = parallel_bulk if parallel else streaming_bulk
1✔
432

433
        client.indices.put_index_template(name="data-template", body=DATA_TEMPLATE)
1✔
434
        for index in self.indices.all:
1✔
435
            if not client.indices.exists(index=index):
1✔
436
                client.indices.create(index=index)
1✔
437

438
        # apply optimal indexing settings to all the indices we may update
439
        client.indices.put_settings(
1✔
440
            body={"index": {"refresh_interval": -1, "number_of_replicas": 0}},
441
            index=self.indices.all,
442
        )
443

444
        # we don't care about the results so just throw them away into a 0-sized
445
        # deque (errors will be raised directly)
446
        deque(
1✔
447
            bulk_function(
448
                client,
449
                generate_index_ops(
450
                    self.indices,
451
                    self.iter_records(filter=find_filter),
452
                    all_options,
453
                    last_sync,
454
                ),
455
                raise_on_error=True,
456
                chunk_size=chunk_size,
457
            ),
458
            maxlen=0,
459
        )
460

461
        # refresh all indices to make the changes visible all at once
462
        client.indices.refresh(index=self.indices.all)
1✔
463

464
        # reset the settings we changed (None forces them to revert to defaults)
465
        client.indices.put_settings(
1✔
466
            body={"index": {"refresh_interval": None, "number_of_replicas": None}},
467
            index=self.indices.all,
468
        )
469

470
        # do a bit of a tidy up by deleting any indexes without docs
471
        for index in self.indices.all:
1✔
472
            if not any(client.search(index=index, size=1)["hits"]["hits"]):
1✔
473
                client.indices.delete(index=index)
1✔
474

475
        self._client.profile_manager.update_profiles(self)
1✔
476

477
    def search(
1✔
478
        self, version: Union[SearchVersion, int] = SearchVersion.latest
479
    ) -> Search:
480
        """
481
        Creates a Search DSL object to use on this database's indexed data. This Search
482
        object will be setup with the appropriate index and version filter depending on
483
        the given version parameter, and the Elasticsearch client object in use on this
484
        database.
485

486
        :param version: the version to search at, this should either be a SearchVersion
487
                        enum option or an int. SearchVersion.latest will result in a
488
                        search on the latest index with no version filter thus searching
489
                        the latest data. SearchVersion.all will result in a search on
490
                        all indices using a wildcard and no version filter. Passing an
491
                        int version will search at the given timestamp. The default is
492
                        SearchVersion.latest.
493
        :return: a Search DSL object
494
        """
495
        search = Search(using=self._client.elasticsearch)
1✔
496

497
        if isinstance(version, int):
1✔
498
            search = search.index(self.indices.wildcard)
1✔
499
            search = search.filter(create_version_query(version))
1✔
500
        else:
501
            if version == SearchVersion.latest:
1✔
502
                search = search.index(self.indices.latest)
1✔
503
            elif version == SearchVersion.all:
1✔
504
                search = search.index(self.indices.wildcard)
1✔
505

506
        return search
1✔
507

508
    def get_available_versions(self) -> List[int]:
1✔
509
        """
510
        Returns a list of the available versions that have been indexed into
511
        Elasticsearch for this database. The versions are in ascending order.
512

513
        :return: the available versions in ascending order
514
        """
515
        versions = set()
1✔
516
        after = None
1✔
517
        while True:
518
            search = self.search(version=SearchVersion.all)[:0]
1✔
519
            search.aggs.bucket(
1✔
520
                "versions",
521
                "composite",
522
                size=50,
523
                sources={"version": A("terms", field=fields.VERSION, order="asc")},
524
            )
525
            if after is not None:
1✔
526
                search.aggs["versions"].after = after
1✔
527
            result = search.execute().aggs.to_dict()
1✔
528
            buckets = get_in(("versions", "buckets"), result, [])
1✔
529
            after = get_in(("versions", "after_key"), result, None)
1✔
530
            if not buckets:
1✔
531
                break
1✔
532
            versions.update(bucket["key"]["version"] for bucket in buckets)
1✔
533
        return sorted(versions)
1✔
534

535
    def get_profile(self, version: int) -> Optional[Profile]:
1✔
536
        """
537
        Given a version, gets the data profile that applies, if there is one available.
538

539
        :param version: the data version to get the profile for
540
        :return: a Profile object or None
541
        """
542
        return self._client.profile_manager.get_profile(self.name, version)
1✔
543

544
    def update_profiles(self, rebuild: bool = False):
1✔
545
        """
546
        Force an update of the profiles for this database, optionally rebuilding them.
547

548
        :param rebuild: whether to rebuild the profiles completely
549
        """
UNCOV
550
        self._client.profile_manager.update_profiles(self, rebuild)
×
551

552

553
class ProfileManager:
1✔
554
    """
555
    Class that manages all database profiles in Elasticsearch.
556
    """
557

558
    def __init__(self, elasticsearch: Elasticsearch):
1✔
559
        """
560
        :param elasticsearch: an Elasticsearch client
561
        """
562
        self._elasticsearch = elasticsearch
1✔
563

564
    def _index_exists(self) -> bool:
1✔
565
        """
566
        Check if the profiles index exists.
567

568
        :return: True if the profiles index exists, False if not
569
        """
570
        return self._elasticsearch.indices.exists(index=PROFILES_INDEX_NAME)
1✔
571

572
    def _create_index(self):
1✔
573
        """
574
        If the profiles index doesn't exist, create it.
575
        """
576
        if self._index_exists():
1✔
577
            return
1✔
578
        self._elasticsearch.indices.create(
1✔
579
            index=PROFILES_INDEX_NAME,
580
            mappings={
581
                "properties": {
582
                    "name": {"type": "keyword"},
583
                    "version": {"type": "long"},
584
                    "total": {"type": "long"},
585
                    "changes": {"type": "long"},
586
                    "fields": {
587
                        "properties": {
588
                            "name": {"type": "keyword"},
589
                            "path": {"type": "keyword"},
590
                            "count": {"type": "long"},
591
                            "boolean_count": {"type": "long"},
592
                            "date_count": {"type": "long"},
593
                            "number_count": {"type": "long"},
594
                            "array_count": {"type": "long"},
595
                            "is_value": {"type": "boolean"},
596
                            "is_parent": {"type": "boolean"},
597
                        }
598
                    },
599
                },
600
            },
601
        )
602

603
    def get_profile_versions(self, name: str) -> List[int]:
1✔
604
        """
605
        Returns a list of the profile versions that are currently available for the
606
        given database name. The list of versions is sorted in ascending order.
607

608
        :param name: the database name
609
        :return: the versions in ascending order
610
        """
611
        return [profile.version for profile in self.get_profiles(name)]
1✔
612

613
    def get_profile(self, name: str, version: int) -> Optional[Profile]:
1✔
614
        """
615
        Returns the profile that applies to the given version of the database with the
616
        given name. If no profile applies then None is returned. Each profile has a
617
        version and is applicable from that version (inclusive) until the next version
618
        (exclusive) that is available supersedes it. If no version supersedes then the
619
        version is the latest.
620

621
        :param name: the database name
622
        :param version: the version to get a profile for
623
        :return: a profile if one can be found, otherwise None
624
        """
625
        candidate = None
1✔
626
        for profile in self.get_profiles(name):
1✔
627
            if version >= profile.version:
1✔
628
                candidate = profile
1✔
629
            else:
UNCOV
630
                break
×
631
        return candidate
1✔
632

633
    def get_profiles(self, name: str) -> List[Profile]:
1✔
634
        """
635
        Return a list of all the profiles available for this database, sorted in
636
        ascending version order.
637

638
        :param name: the name of the database
639
        :return: a list of Profile objects
640
        """
641
        if not self._index_exists():
1✔
642
            return []
1✔
643
        search = Search(using=self._elasticsearch, index=PROFILES_INDEX_NAME).filter(
1✔
644
            "term", name=name
645
        )
646
        profiles = [Profile.from_dict(hit.to_dict()) for hit in search.scan()]
1✔
647
        return sorted(profiles, key=lambda profile: profile.version)
1✔
648

649
    def update_profiles(self, database: SplitgillDatabase, rebuild: bool = False):
1✔
650
        """
651
        Updates the profiles for the given database. This will find all the available
652
        versions of the database, check if any versions don't have a profile, and then
653
        create those versions as needed. If all versions have profiles, nothing happens.
654

655
        If the rebuild option is True, all the profiles are deleted and recreated. This
656
        may take a bit of time depending on the number of versions and size of the
657
        database.
658

659
        :param database: the database to profile
660
        :param rebuild: whether to rebuild all the profiles for all versions
661
        """
662
        if rebuild and self._index_exists():
1✔
663
            # delete all the profiles
UNCOV
664
            self._elasticsearch.delete_by_query(
×
665
                index=PROFILES_INDEX_NAME,
666
                refresh=True,
667
                query=Search().filter("term", name=database.name).to_dict(),
668
            )
669

670
        profiled_versions = set(self.get_profile_versions(database.name))
1✔
671
        for version in database.get_available_versions():
1✔
672
            if version in profiled_versions:
1✔
673
                continue
1✔
674
            # no profile available for this version, build it
675
            profile = build_profile(self._elasticsearch, database.indices, version)
1✔
676

677
            # create a doc from the profile and then add some extras
678
            doc = {**asdict(profile), "name": database.name, "version": version}
1✔
679

680
            # make sure the index exists
681
            self._create_index()
1✔
682

683
            # add to the profiles index
684
            self._elasticsearch.create(
1✔
685
                index=PROFILES_INDEX_NAME,
686
                id=f"{database.name}-{version}",
687
                document=doc,
688
                refresh=True,
689
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc