• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

NaturalHistoryMuseum / splitgill / #76

04 Aug 2024 07:46PM UTC coverage: 34.976% (-59.7%) from 94.693%
#76

push

coveralls-python

jrdh
feat: remove default config values

BREAKING CHANGE: remove default config values

3 of 19 new or added lines in 3 files covered. (15.79%)

613 existing lines in 13 files now uncovered.

369 of 1055 relevant lines covered (34.98%)

0.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

24.8
/splitgill/diffing.py
1
import abc
1✔
2
from collections import deque
1✔
3
from dataclasses import dataclass
1✔
4
from datetime import datetime
1✔
5
from itertools import zip_longest
1✔
6
from typing import (
1✔
7
    Iterable,
8
    Tuple,
9
    Any,
10
    Union,
11
    NamedTuple,
12
    Dict,
13
    Deque,
14
    List,
15
    TypeVar,
16
    Generic,
17
    Optional,
18
    Collection,
19
)
20

21
import regex as rx
1✔
22

23
# this regex matches invalid characters which we would like to remove from all string
24
# values as they are ingested into the system. It matches unicode control characters
25
# (i.e. category C*) but not \n, \r, or \t).
26
invalid_value_char_regex = rx.compile(r"[^\P{C}\n\r\t]")
1✔
27
# this regex matches invalid characters which we would like to remove from all field
28
# names as they are ingested into the system. It matches all unicode control characters,
29
# so it's a bit stricter than the value regex which allows new lines and tabs
30
invalid_key_char_regex = rx.compile(r"[^\P{C}]")
1✔
31

32

33
def prepare_data(value: Any) -> Union[str, dict, list, int, float, bool, None]:
1✔
34
    """
35
    Prepares the given value for storage in MongoDB. Conversions are completed like so:
36

37
        - None values are just returned as is
38
        - str values have invalid characters removed and are then returned. The
39
          characters are currently all unicode control characters except \n, \r, and \t.
40
        - int, float, bool, and None values are returned with no changes made
41
        - datetime values are converted to isoformat strings
42
        - dict values are returned as a new dict instance, with all the keys converted
43
          to strings and all the values recursively prepared using this function.
44
        - lists, sets, and tuples are converted to lists with each element of the value
45
          prepared by this function.
46

47
    :param value: the value to be stored in MongoDB
48
    :return: None, str, int, float, bool, tuple, or dict depending on the input value
49
    """
UNCOV
50
    if value is None:
×
UNCOV
51
        return None
×
UNCOV
52
    if isinstance(value, str):
×
53
        # replace any invalid characters in the string with the empty string
UNCOV
54
        return invalid_value_char_regex.sub("", value)
×
UNCOV
55
    if isinstance(value, (int, float, bool)):
×
UNCOV
56
        return value
×
UNCOV
57
    if isinstance(value, dict):
×
UNCOV
58
        return {
×
59
            prepare_field_name(key): prepare_data(value) for key, value in value.items()
60
        }
UNCOV
61
    if isinstance(value, (list, set, tuple)):
×
UNCOV
62
        return list(map(prepare_data, value))
×
UNCOV
63
    if isinstance(value, datetime):
×
64
        # stringifying this ensures the tz info is recorded and won't change going
65
        # in/out mongo
UNCOV
66
        return value.isoformat()
×
67
    # fallback
UNCOV
68
    return str(value)
×
69

70

71
def prepare_field_name(name: Any) -> str:
1✔
72
    """
73
    Cleans up a field name for ingestion into the system. There are a few steps to this:
74

75
        - convert the name to a str, MongoDB only accepts str keys in objects
76
        - remove any control characters from the str
77
        - replace . with _ as Elasticsearch doesn't like dots in keys
78

79
    :param name: the field name
80
    :return: a clean str field name
81
    """
UNCOV
82
    return invalid_key_char_regex.sub("", str(name)).replace(".", "_").strip()
×
83

84

85
class DiffOp(NamedTuple):
1✔
86
    """
87
    A namedtuple describing the differences found by the diff function.
88
    """
89

90
    # the path where the changes should be applied at in the root dict
91
    path: Tuple[Union[str, int], ...]
1✔
92
    # a dict of the changes made at that path
93
    ops: Dict[str, Any]
1✔
94

95

96
_T = TypeVar("_T")
1✔
97

98

99
@dataclass
1✔
100
class Comparison(abc.ABC, Generic[_T]):
1✔
101
    """
102
    A comparison between two objects of the same type.
103
    """
104

105
    path: Tuple[Union[str, int], ...]
1✔
106
    left: _T
1✔
107
    right: _T
1✔
108

109
    @abc.abstractmethod
1✔
110
    def compare(self) -> Tuple[Optional[DiffOp], List["Comparison"]]:
1✔
111
        """
112
        Compare the two objects and return a 2-tuple containing a DiffOp and a list of
113
        further comparisons which need to be handled. If no differences are found, then
114
        the first element of the returned 2-tuple will be None.
115

116
        :return: A 2-tuple containing a DiffOp and a list of further Comparison objects
117
        """
118
        pass
×
119

120

121
class DictComparison(Comparison[dict]):
1✔
122
    """
123
    A comparison between two dicts.
124
    """
125

126
    def compare(self) -> Tuple[Optional[DiffOp], List["Comparison"]]:
1✔
127
        """
128
        Compares the two dicts and return a 2-tuple containing a DiffOp and a list of
129
        further comparisons which need to be handled. If no differences are found, then
130
        the first element of the returned 2-tuple will be None.
131

132
        :return: A 2-tuple containing a DiffOp and a list of further Comparison objects
133
        """
UNCOV
134
        missing = object()
×
UNCOV
135
        ops = {}
×
UNCOV
136
        further_comparisons = []
×
137

UNCOV
138
        new_values = {
×
139
            key: value for key, value in self.right.items() if key not in self.left
140
        }
UNCOV
141
        if new_values:
×
UNCOV
142
            ops["dn"] = new_values
×
143

UNCOV
144
        deleted_keys = [key for key in self.left if key not in self.right]
×
UNCOV
145
        if deleted_keys:
×
UNCOV
146
            ops["dd"] = deleted_keys
×
147

UNCOV
148
        changes = {}
×
UNCOV
149
        for key, left_value in self.left.items():
×
UNCOV
150
            right_value = self.right.get(key, missing)
×
151

152
            # deletion or equality, nothing to do
UNCOV
153
            if right_value is missing or left_value == right_value:
×
UNCOV
154
                continue
×
155

156
            # check for nested container objects and add Comparison objects to the list
157
            # if any are found of the same types
UNCOV
158
            if isinstance(left_value, dict) and isinstance(right_value, dict):
×
UNCOV
159
                further_comparisons.append(
×
160
                    DictComparison((*self.path, key), left_value, right_value)
161
                )
UNCOV
162
            elif isinstance(left_value, list) and isinstance(right_value, list):
×
UNCOV
163
                further_comparisons.append(
×
164
                    ListComparison((*self.path, key), left_value, right_value)
165
                )
166
            else:
UNCOV
167
                changes[key] = right_value
×
UNCOV
168
        if changes:
×
UNCOV
169
            ops["dc"] = changes
×
170

UNCOV
171
        return DiffOp(self.path, ops) if ops else None, further_comparisons
×
172

173

174
@dataclass
1✔
175
class ListComparison(Comparison[list]):
1✔
176
    """
177
    A comparison between two lists.
178
    """
179

180
    def compare(self) -> Tuple[DiffOp, List["Comparison"]]:
1✔
181
        """
182
        Compares the two lists and return a 2-tuple containing a DiffOp and a list of
183
        further comparisons which need to be handled. If no differences are found, then
184
        the first element of the returned 2-tuple will be None.
185

186
        :return: A 2-tuple containing a DiffOp and a list of further Comparison objects
187
        """
UNCOV
188
        missing = object()
×
UNCOV
189
        ops = {}
×
UNCOV
190
        further_comparisons = []
×
191

UNCOV
192
        changes = []
×
UNCOV
193
        for index, (left_value, right_value) in enumerate(
×
194
            zip_longest(self.left, self.right, fillvalue=missing)
195
        ):
UNCOV
196
            if left_value == right_value:
×
UNCOV
197
                continue
×
198

UNCOV
199
            if left_value is missing:
×
200
                # the right list is longer, so store all the new values so that they can
201
                # just be added to the left list to patch it, and stop
UNCOV
202
                ops["ln"] = self.right[index:]
×
UNCOV
203
                break
×
UNCOV
204
            elif right_value is missing:
×
205
                # the left value is longer, so store the index from which elements in
206
                # the left list will be deleted to shorten it to the length of the right
207
                # list, and stop
UNCOV
208
                ops["ld"] = index
×
UNCOV
209
                break
×
210
            else:
211
                # a change in the values at this index in each list, check for nested
212
                # container objects and add Comparison objects to the list if any are
213
                # found of the same types
UNCOV
214
                if isinstance(left_value, dict) and isinstance(right_value, dict):
×
UNCOV
215
                    further_comparisons.append(
×
216
                        DictComparison((*self.path, index), left_value, right_value)
217
                    )
UNCOV
218
                elif isinstance(left_value, list) and isinstance(right_value, list):
×
UNCOV
219
                    further_comparisons.append(
×
220
                        ListComparison((*self.path, index), left_value, right_value)
221
                    )
222
                else:
UNCOV
223
                    changes.append((index, right_value))
×
UNCOV
224
        if changes:
×
UNCOV
225
            ops["lc"] = changes
×
226

UNCOV
227
        return DiffOp(self.path, ops) if ops else None, further_comparisons
×
228

229

230
class DiffingTypeComparisonException(Exception):
1✔
231
    """
232
    Exception raised if the base type and the new type passed to the diff function below
233
    are not both dicts.
234
    """
235

236
    pass
1✔
237

238

239
def diff(base: dict, new: dict) -> Iterable[DiffOp]:
1✔
240
    """
241
    Finds the differences between the two dicts, yielding DiffOps. Each DiffOp describes
242
    specific differences between the base dict and the new dict. By applying them all
243
    using the patch function below, the new dict can be recreated from the base dict.
244

245
    For efficiency, the DiffOps represent all the changes at a container level (e.g. a
246
    dict or list) not each specific change to every version at a specific key or index.
247
    This saves not only database space, but also allows for a faster patch function as
248
    changes can be applied en masse instead of individually.
249

250
    :param base: the base dict
251
    :param new: the new version of the base dict
252
    :return: yields DiffOps (if any changes are found)
253
    """
UNCOV
254
    if base == new:
×
UNCOV
255
        return
×
256

UNCOV
257
    if not isinstance(base, dict) or not isinstance(new, dict):
×
UNCOV
258
        raise DiffingTypeComparisonException("Both base and new must be dicts")
×
259

260
    # TODO: we could write a shortcut when one of the dicts is empty
261

UNCOV
262
    queue: Deque[Comparison] = deque([DictComparison(tuple(), base, new)])
×
UNCOV
263
    while queue:
×
UNCOV
264
        comparison: Comparison = queue.popleft()
×
UNCOV
265
        diff_op, further_comparisons = comparison.compare()
×
UNCOV
266
        if diff_op:
×
UNCOV
267
            yield diff_op
×
UNCOV
268
        if further_comparisons:
×
UNCOV
269
            queue.extend(further_comparisons)
×
270

271

272
# # dynamically figure out the typing of the DiffOp based on it's annotations
273
# _diff_op_typing = get_type_hints(DiffOp)
274
# _DiffOpType = Tuple[_diff_op_typing["path"], _diff_op_typing["ops"]]
275

276

277
def patch(base: dict, ops: Collection[DiffOp]) -> dict:
1✔
278
    """
279
    Applies the operations in the ops iterable to the base dict, returning a new dict.
280
    If there are no operations to apply, the base dict is returned unchanged.
281

282
    Note that although the returned dict is new, the nested container values in it may
283
    or may not be new and could reference the same exact object as in the passed base
284
    dict. A nested container will be copied to avoid modifying the same container
285
    referenced in the base dict if there are any modifications made to it directly, or
286
    to any nested containers below it at any depth. If the nested container contains no
287
    changes to itself or its nested containers, it is not copied and the original
288
    reference to it from the base dict is used.
289

290
    :param base: the starting dict
291
    :param ops: the DiffOps to apply to the base dict (can be pure tuples, doesn't have
292
                to be DiffOp namedtuples)
293
    :return: a new dict with the changes applied
294
    """
295
    # nothing to do
UNCOV
296
    if len(ops) == 0:
×
UNCOV
297
        return base
×
298

299
    # create a copy of the base dict so that we don't modify it and can return a new one
UNCOV
300
    new = base.copy()
×
301

UNCOV
302
    for path, op in ops:
×
303
        # loop through the path finding the target of the operations we're going to
304
        # perform. At every point in the path, replace the container with a copy to
305
        # ensure we don't modify the container object from the base.
UNCOV
306
        target = new
×
UNCOV
307
        for key_or_index in path:
×
UNCOV
308
            target_copy = target[key_or_index].copy()
×
UNCOV
309
            target[key_or_index] = target_copy
×
UNCOV
310
            target = target_copy
×
311

312
        # dict ops
UNCOV
313
        if "dc" in op:
×
UNCOV
314
            target.update(op["dc"])
×
UNCOV
315
        if "dn" in op:
×
UNCOV
316
            target.update(op["dn"])
×
UNCOV
317
        if "dd" in op:
×
UNCOV
318
            for key in op["dd"]:
×
UNCOV
319
                del target[key]
×
320

321
        # list ops
UNCOV
322
        if "lc" in op:
×
UNCOV
323
            for index, value in op["lc"]:
×
UNCOV
324
                target[index] = value
×
UNCOV
325
        if "ln" in op:
×
UNCOV
326
            target.extend(op["ln"])
×
UNCOV
327
        if "ld" in op:
×
UNCOV
328
            del target[op["ld"] :]
×
329

UNCOV
330
    return new
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc