• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #160

30 Dec 2025 03:17PM UTC coverage: 94.675% (-0.4%) from 95.118%
#160

push

coveralls-python

1280 of 1352 relevant lines covered (94.67%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.5
/splitgill/utils.py
1
from dataclasses import dataclass
1✔
2
from datetime import date, datetime, timezone
1✔
3
from itertools import islice
1✔
4
from time import time
1✔
5
from typing import Iterable, Optional, Union
1✔
6

7
from cytoolz import get_in
1✔
8
from elasticsearch_dsl import A, Search
1✔
9

10

11
def to_timestamp(moment: Union[datetime, date]) -> int:
1✔
12
    """
13
    Converts a datetime or date object into a timestamp value. The timestamp returned is
14
    an int. The timestamp value is the number of milliseconds that have elapsed between
15
    the UNIX epoch and the given moment. If the moment is a date, 00:00:00 on the day
16
    will be used.
17

18
    Any precision greater than milliseconds held within the datetime is simply ignored
19
    and no rounding occurs.
20

21
    :param moment: a datetime or date object
22
    :return: the timestamp (number of milliseconds between the UNIX epoch and the
23
        moment) as an int
24
    """
25
    if isinstance(moment, datetime):
1✔
26
        return int(moment.timestamp() * 1000)
1✔
27
    else:
28
        return int(datetime(moment.year, moment.month, moment.day).timestamp() * 1000)
1✔
29

30

31
def parse_to_timestamp(
1✔
32
    datetime_string: str, datetime_format: str, tz: timezone = timezone.utc
33
) -> int:
34
    """
35
    Parses the given string using the given format and returns a timestamp.
36

37
    If the datetime object built from parsing the string with the given format doesn't
38
    contain a tzinfo component, then the tz parameter is added as a replacement value.
39
    This defaults to UTC.
40

41
    :param datetime_string: the datetime as a string
42
    :param datetime_format: the format as a string
43
    :param tz: the timezone to use (default: UTC)
44
    :return: the parsed datetime as the number of milliseconds since the UNIX epoch as
45
        an int
46
    """
47
    dt = datetime.strptime(datetime_string, datetime_format)
1✔
48
    if dt.tzinfo is None:
1✔
49
        dt = dt.replace(tzinfo=tz)
1✔
50
    return to_timestamp(dt)
1✔
51

52

53
def now() -> int:
1✔
54
    """
55
    Get the current datetime as a timestamp.
56
    """
57
    return int(time() * 1000)
1✔
58

59

60
def partition(iterable: Iterable, size: int) -> Iterable[list]:
1✔
61
    """
62
    Partitions the given iterable into chunks. Each chunk yielded will be a list which
63
    is at most `size` in length. The final list yielded may be smaller if the length of
64
    the iterable isn't wholly divisible by the size.
65

66
    :param iterable: the iterable to partition
67
    :param size: the maximum size of list chunk to yield
68
    :return: yields lists
69
    """
70
    it = iter(iterable)
1✔
71
    while chunk := list(islice(it, size)):
1✔
72
        yield chunk
1✔
73

74

75
@dataclass
1✔
76
class Term:
1✔
77
    """
78
    Represents a bucket in a terms aggregation result.
79
    """
80

81
    # the field value
82
    value: Union[str, int, float, bool]
1✔
83
    # thue number of documents this value appeared in
84
    count: int
1✔
85

86

87
def iter_terms(
1✔
88
    search: Search,
89
    field: str,
90
    chunk_size: int = 50,
91
    sample_probability: float = 1.0,
92
    seed: Optional[int] = None,
93
) -> Iterable[Term]:
94
    """
95
    Yields Term objects, each representing a value and the number of documents which
96
    contain that value in the given field. The Terms are yielded in descending order of
97
    value frequency.
98

99
    :param search: a Search instance to use to run the aggregation
100
    :param field: the name of the field to get the terms for
101
    :param chunk_size: the number of buckets to retrieve per request
102
    :param sample_probability: the probability that a given record will be included in a
103
        random sample; set to 1 to use all records (default 1)
104
    :param seed: sets the seed manually (if None or not set, defaults to current date
105
        timestamp / 3600)
106
    :return: yields Term objects
107
    """
108
    after = None
1✔
109
    while True:
110
        # this has a dual purpose, it ensures we don't get any search results
111
        # when we don't need them, and it ensures we get a fresh copy of the
112
        # search to work with
113
        agg_search = search[:0]
1✔
114

115
        # this is the core aggregation
116
        composite_agg = A(
1✔
117
            'composite', size=chunk_size, sources={'value': A('terms', field=field)}
118
        )
119
        result_keys = ['values', 'buckets']
1✔
120
        after_keys = ['values', 'after_key']
1✔
121

122
        if sample_probability < 1:
1✔
123
            # this should stay relatively constant for caching purposes, but we can
124
            # change it once a day.
125
            # divide it by 3600 just to make it fit under the ES seed max (2147483647)
126
            # for longer - otherwise this stops working in 2038. The actual number isn't
127
            # important.
128
            seed = (
×
129
                seed
130
                if seed is not None
131
                else int(
132
                    datetime.now()
133
                    .replace(hour=0, minute=0, second=0, microsecond=0)
134
                    .timestamp()
135
                    / 3600
136
                )
137
            )
138
            # if we're sampling, the core agg gets nested underneath the sampler
139
            agg_search.aggs.bucket(
×
140
                'sampling', 'random_sampler', probability=sample_probability, seed=seed
141
            ).bucket('values', composite_agg)
142
            if after is not None:
×
143
                agg_search.aggs['sampling'].aggs['values'].after = after
×
144
            result_keys = ['sampling'] + result_keys
×
145
            after_keys = ['sampling'] + after_keys
×
146
        else:
147
            agg_search.aggs.bucket('values', composite_agg)
1✔
148
            if after is not None:
1✔
149
                agg_search.aggs['values'].after = after
1✔
150

151
        result = agg_search.execute().aggs.to_dict()
1✔
152

153
        buckets = get_in(result_keys, result, [])
1✔
154
        after = get_in(after_keys, result, None)
1✔
155
        if not buckets:
1✔
156
            break
1✔
157
        else:
158
            yield from (
1✔
159
                Term(bucket['key']['value'], bucket['doc_count']) for bucket in buckets
160
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc