• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

csingley / ofxtools / 27392199904

12 Jun 2026 03:14AM UTC coverage: 93.873% (+0.06%) from 93.817%
27392199904

push

github

csingley
Format test_utils.py with ruff

4566 of 4864 relevant lines covered (93.87%)

3.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.57
/ofxtools/utils.py
1
"""Utility functions and classes"""
2

3
# stdlib imports
4
import calendar
4✔
5
import datetime
4✔
6
import itertools
4✔
7
import math
4✔
8
import os
4✔
9
import xml.etree.ElementTree as ET
4✔
10
from collections.abc import Callable, Iterable, Sequence
4✔
11
from typing import Any
4✔
12

13
# local imports
14
from ofxtools.lib import NUMBERING_AGENCIES
4✔
15

16

17
class classproperty(property):
4✔
18
    """Descriptor that makes a classmethod behave like a property."""
19

20
    def __get__(self, cls, owner):
4✔
21
        return self.fget.__get__(None, owner)()
4✔
22

23

24
def fixpath(path: str) -> str:
4✔
25
    """Makes paths do the right thing."""
26
    path = os.path.expanduser(path)
4✔
27
    path = os.path.normpath(path)
4✔
28
    path = os.path.normcase(path)
4✔
29
    path = os.path.abspath(path)
4✔
30
    return path
4✔
31

32

33
def collapseToSingle(items: Sequence, label: str) -> Any:
4✔
34
    """
35
    Given a sequence of repeated items, return the item that's repeated.
36
    Throw an error if sequence is empty or contains >1 distinct item.
37

38
    ``label`` is the name used in error reporting.
39
    """
40
    items_ = set(items)
4✔
41
    if len(items_) == 0:
4✔
42
        raise ValueError(f"{label} is empty")
×
43
    if len(items_) > 1:
4✔
44
        raise ValueError(
×
45
            f"Multiple {label} {list(items)}; can't configure automatically"
46
        )
47
    return items_.pop()
4✔
48

49

50
###############################################################################
51
#  date/time utilities
52
###############################################################################
53
def gmt_offset(hours: int, minutes: int) -> datetime.timedelta:
4✔
54
    if hours not in range(-12, 15):
4✔
55
        raise ValueError(f"Invalid UTC offset hours: {hours}")
×
56
    if minutes < 0:
4✔
57
        raise ValueError(f"Invalid UTC offset minutes: {minutes}")
×
58
    offset_minutes = math.copysign(60 * abs(hours) + minutes, hours)
4✔
59
    return datetime.timedelta(minutes=offset_minutes)
4✔
60

61

62
TZS = {
4✔
63
    "EST": -5,
64
    "EDT": -4,
65
    "CST": -6,
66
    "CDT": -5,
67
    "MST": -7,
68
    "MDT": -6,
69
    "PST": -8,
70
    "PDT": -7,
71
}
72

73

74
###############################################################################
75
#  itertools recipes
76
#  https://docs.python.org/2/library/itertools.html#recipes
77
###############################################################################
78
def pairwise(iterable: Iterable) -> Iterable[tuple[Any, Any]]:
4✔
79
    """s -> (s0,s1), (s1,s2), (s2, s3), ..."""
80
    a, b = itertools.tee(iterable)
×
81
    next(b, None)
×
82
    return zip(a, b)
×
83

84

85
def all_equal(iterable: Iterable) -> bool:
4✔
86
    """Returns True if all the elements are equal to each other"""
87
    g = itertools.groupby(iterable)
4✔
88
    return next(g, True) and not next(g, False)
4✔
89

90

91
def partition(pred: Callable, iterable: Iterable) -> tuple[Iterable, Iterable]:
4✔
92
    """
93
    Use a predicate to partition entries into false entries and true entries
94
    """
95
    # partition(is_odd, range(10)) --> 0 2 4 6 8   and  1 3 5 7 9
96
    t1, t2 = itertools.tee(iterable)
4✔
97
    return itertools.filterfalse(pred, t1), filter(pred, t2)
4✔
98

99

100
###############################################################################
101
#  ElementTree utilities
102
###############################################################################
103
def indent(elem: ET.Element, level: int = 0) -> None:
4✔
104
    """
105
    Indent xml.etree.ElementTree.Element.text by nesting level.
106

107
    http://effbot.org/zone/element-lib.htm#prettyprint
108
    """
109
    i = "\n" + level * "  "
4✔
110
    if len(elem):
4✔
111
        if not elem.text or not elem.text.strip():
4✔
112
            elem.text = i + "  "
4✔
113
        if not elem.tail or not elem.tail.strip():
4✔
114
            elem.tail = i
4✔
115
        for elem in elem:
4✔
116
            indent(elem, level + 1)
4✔
117
        if not elem.tail or not elem.tail.strip():
4✔
118
            elem.tail = i
4✔
119
    else:
120
        if level and (not elem.tail or not elem.tail.strip()):
4✔
121
            elem.tail = i
4✔
122

123

124
def tostring_unclosed_elements(elem: ET.Element) -> bytes:
4✔
125
    """
126
    SGML-style string representation of xml.etree.ElementTree, without
127
    closing tags on leaf elements.
128

129
    In OFX v1 (SGML), aggregate elements retain closing tags but leaf
130
    (data-bearing) elements do not: <ACCTID>12345 rather than
131
    <ACCTID>12345</ACCTID>.
132

133
    Drop-in replacement for xml.etree.ElementTree.tostring().
134
    """
135
    if len(elem) == 0:
4✔
136
        # Leaf element: emit value, no closing tag
137
        text = f"<{elem.tag}>{elem.text or ''}{elem.tail or ''}"
4✔
138
        return bytes(text, "utf_8")
4✔
139
    else:
140
        # Container element: emit children between opening/closing tags.
141
        # elem.text is whitespace between the start tag and the first child
142
        # (set by utils.indent() when prettyprint=True).
143
        # elem.tail is whitespace after the closing tag (between siblings).
144
        output = bytes(f"<{elem.tag}>{elem.text or ''}", "utf_8")
4✔
145
        for child in elem:
4✔
146
            output += tostring_unclosed_elements(child)
4✔
147
        output += bytes(f"</{elem.tag}>{elem.tail or ''}", "utf_8")
4✔
148
        return output
4✔
149

150

151
###############################################################################
152
#  Securities identifier utilities (CUSIP, ISIN, etc.)
153
###############################################################################
154
def cusip_checksum(base: str) -> str:
4✔
155
    """
156
    Compute the check digit for a base Committee on Uniform Security
157
    Identification Procedures (CUSIP) securities identifier.
158
    Input an 8-digit alphanum str, output a single-char str.
159

160
    http://goo.gl/4TeWl
161
    """
162

163
    def encode(index, char):
4✔
164
        num = {"*": 36, "@": 37, "#": 38}.get(char, int(char, 36))
4✔
165
        return str(num * 2) if index % 2 else str(num)
4✔
166

167
    if len(base) != 8:
4✔
168
        raise ValueError(f"CUSIP base must be 8 characters, got {len(base)}")
×
169
    check = "".join([encode(index, char) for index, char in enumerate(base)])
4✔
170
    check_ = sum([int(digit) for digit in check])
4✔
171
    return str((10 - (check_ % 10)) % 10)
4✔
172

173

174
def validate_cusip(cusip: str) -> bool:
4✔
175
    """
176
    Validate a CUSIP
177
    """
178
    if len(cusip) == 9 and cusip_checksum(cusip[:8]) == cusip[8]:
4✔
179
        return True
4✔
180
    else:
181
        return False
4✔
182

183

184
def sedol_checksum(base: str) -> str:
4✔
185
    """
186
    Stock Exchange Daily Official List (SEDOL)
187
    http://goo.gl/HxFWL
188
    """
189
    weights = (1, 3, 1, 7, 3, 9)
4✔
190

191
    if len(base) != 6:
4✔
192
        raise ValueError(f"SEDOL base must be 6 characters, got {len(base)}")
×
193
    for badLetter in "AEIO":
4✔
194
        if badLetter in base:
4✔
195
            raise ValueError(f"SEDOL base must not contain vowel '{badLetter}'")
×
196
    check = sum([int(char, 36) * weights[n] for n, char in enumerate(base)])
4✔
197
    return str((10 - (check % 10)) % 10)
4✔
198

199

200
def isin_checksum(base: str) -> str:
4✔
201
    """
202
    Compute the check digit for a base International Securities Identification
203
    Number (ISIN).  Input an 11-char alphanum str, output a single-char str.
204

205
    http://goo.gl/8kPzD
206
    """
207
    if len(base) != 11:
4✔
208
        raise ValueError(f"ISIN base must be 11 characters, got {len(base)}")
×
209
    if base[:2] not in NUMBERING_AGENCIES.keys():
4✔
210
        raise ValueError(f"ISIN country code '{base[:2]}' not recognized")
×
211
    check = "".join([str(int(char, 36)) for char in base])
4✔
212
    check = check[::-1]  # string reversal
4✔
213
    check = "".join([d if n % 2 else str(int(d) * 2) for n, d in enumerate(check)])
4✔
214
    return str((10 - sum([int(d) for d in check]) % 10) % 10)
4✔
215

216

217
def validate_isin(isin: str) -> bool:
4✔
218
    """
219
    Validate an ISIN
220
    """
221
    if (
4✔
222
        len(isin) == 12
223
        and isin[:2] in NUMBERING_AGENCIES.keys()
224
        and isin_checksum(isin[:11]) == isin[11]
225
    ):
226
        return True
4✔
227
    else:
228
        return False
4✔
229

230

231
def cusip2isin(cusip: str, nation: str | None = None) -> str:
4✔
232
    # Validate inputs
233
    if not validate_cusip(cusip):
4✔
234
        raise ValueError(f"'{cusip}' is not a valid CUSIP")
4✔
235

236
    nation = nation or "US"
4✔
237
    if nation not in NUMBERING_AGENCIES.keys():
4✔
238
        raise ValueError(f"'{nation}' is not a valid country code")
4✔
239

240
    # Construct ISIN
241
    base = nation + cusip
4✔
242
    return base + isin_checksum(base)
4✔
243

244

245
def sedol2isin(sedol: str, nation: str | None = None) -> str:
4✔
246
    nation = nation or "GB"
4✔
247
    if len(sedol) != 7:
4✔
248
        raise ValueError(f"SEDOL must be 7 characters, got {len(sedol)}")
×
249
    if sedol_checksum(sedol[:6]) != sedol[6]:
4✔
250
        raise ValueError(f"Invalid SEDOL check digit in '{sedol}'")
×
251
    base = nation + sedol.zfill(9)
4✔
252
    return base + isin_checksum(base)
4✔
253

254

255
try:
4✔
256
    # If pytz is installed then use that.
257
    import pytz
4✔
258

259
    UTC = pytz.UTC
×
260
except ImportError:
4✔
261
    # Otherwise create our own UTC tzinfo.
262
    class _UTC(datetime.tzinfo):
4✔
263
        def tzname(self, dt: datetime.datetime | None) -> str | None:
4✔
264
            """datetime -> string name of time zone."""
265
            return "UTC"
4✔
266

267
        def utcoffset(self, dt: datetime.datetime | None) -> datetime.timedelta | None:
4✔
268
            """datetime -> minutes east of UTC (negative for west of UTC)"""
269
            return datetime.timedelta(0)
4✔
270

271
        def dst(self, dt: datetime.datetime | None) -> datetime.timedelta | None:
4✔
272
            """datetime -> DST offset in minutes east of UTC.
273

274
            Return 0 if DST not in effect.  utcoffset() must include the DST
275
            offset.
276
            """
277
            return datetime.timedelta(0)
4✔
278

279
        def __repr__(self) -> str:
4✔
280
            return "<UTC>"
4✔
281

282
    UTC = _UTC()  # type: ignore[assignment]
4✔
283

284

285
def findEaster(year: int) -> datetime.date:
4✔
286
    """
287
    Compute the date of Easter Sunday for the given Gregorian calendar year
288
    (valid 1583–4099).
289

290
    Copyright (c) 2003  Gustavo Niemeyer <niemeyer@conectiva.com>
291
    Licensed under the PSF license.
292
    Ported from GM Arts / Claus Tondering algorithm (Ouding 1940), as quoted
293
    in "Explanatory Supplement to the Astronomical Almanac", P. Kenneth
294
    Seidelmann, editor.
295
    """
296
    # g - Golden year - 1
297
    # c - Century
298
    # h - (23 - Epact) mod 30
299
    # i - Number of days from March 21 to Paschal Full Moon
300
    # j - Weekday for PFM (0=Sunday, etc)
301
    # p - Number of days from March 21 to Sunday on or before PFM (-6 to 28)
302
    y = year
4✔
303
    g = y % 19
4✔
304
    c = y // 100
4✔
305
    h = (c - c // 4 - (8 * c + 13) // 25 + 19 * g + 15) % 30
4✔
306
    i = h - (h // 28) * (1 - (h // 28) * (29 // (h + 1)) * ((21 - g) // 11))
4✔
307
    j = (y + y // 4 + i + 2 - c + c // 4) % 7
4✔
308
    p = i - j
4✔
309
    d = 1 + (p + 27 + (p + 6) // 40) % 31
4✔
310
    m = 3 + (p + 26) // 30
4✔
311
    return datetime.date(y, m, d)
4✔
312

313

314
class NYSEcalendar:
4✔
315
    """
316
    NYSE holiday calendar.
317

318
    The Exchange is closed on: New Year's Day, Martin Luther King Jr. Day,
319
    Washington's Birthday, Good Friday, Memorial Day, Independence Day,
320
    Labor Day, Thanksgiving Day, and Christmas Day.
321

322
    When a fixed-date holiday falls on Saturday, the preceding Friday is
323
    observed — except New Year's Day, where Dec 31 is the year-end accounting
324
    close and the holiday is simply skipped that year.  When a fixed-date
325
    holiday falls on Sunday, the following Monday is observed.
326
    """
327

328
    _cal = calendar.Calendar()
4✔
329

330
    @classmethod
4✔
331
    def _weekdays(cls, year: int, month: int, weekday: int) -> list[datetime.date]:
4✔
332
        """Return all dates in (year, month) falling on the given weekday (0=Mon)."""
333
        return [
4✔
334
            datetime.date(year, month, day)
335
            for day, wkday in cls._cal.itermonthdays2(year, month)
336
            if day > 0 and wkday == weekday
337
        ]
338

339
    @classmethod
4✔
340
    def mondays(cls, year: int, month: int) -> list[datetime.date]:
4✔
341
        return cls._weekdays(year, month, weekday=0)
4✔
342

343
    @classmethod
4✔
344
    def thursdays(cls, year: int, month: int) -> list[datetime.date]:
4✔
345
        return cls._weekdays(year, month, weekday=3)
4✔
346

347
    @classmethod
4✔
348
    def _observed(cls, date: datetime.date) -> datetime.date:
4✔
349
        """Return the NYSE-observed date for a fixed holiday falling on a weekend."""
350
        if date.weekday() == 5:  # Saturday → preceding Friday
4✔
351
            return date - datetime.timedelta(days=1)
4✔
352
        if date.weekday() == 6:  # Sunday → following Monday
4✔
353
            return date + datetime.timedelta(days=1)
4✔
354
        return date
4✔
355

356
    @classmethod
4✔
357
    def holidays(cls, year: int) -> list[datetime.date]:
4✔
358
        hols = [
4✔
359
            cls._observed(datetime.date(year, 7, 4)),  # Independence Day
360
            cls._observed(datetime.date(year, 12, 25)),  # Christmas
361
            cls.mondays(year, 1)[2],  # MLK Day (3rd Mon in Jan)
362
            findEaster(year) - datetime.timedelta(days=2),  # Good Friday
363
            cls.mondays(year, 2)[2],  # Washington's Birthday (3rd Mon in Feb)
364
            cls.mondays(year, 5)[-1],  # Memorial Day (last Mon in May)
365
            cls.mondays(year, 9)[0],  # Labor Day (1st Mon in Sep)
366
            cls.thursdays(year, 11)[3],  # Thanksgiving (4th Thu in Nov)
367
        ]
368
        # New Year's Day: Saturday → skipped (Dec 31 is year-end accounting close)
369
        #                 Sunday   → observed Monday Jan 2
370
        #                 weekday  → Jan 1 itself
371
        nyd = datetime.date(year, 1, 1)
4✔
372
        if nyd.weekday() == 6:
4✔
373
            hols.append(nyd + datetime.timedelta(days=1))
4✔
374
        elif nyd.weekday() != 5:
4✔
375
            hols.append(nyd)
4✔
376
        hols.sort()
4✔
377
        return hols
4✔
378

379

380
def nextBizDay(dt: datetime.date) -> datetime.date:
4✔
381
    """Return the next NYSE business day after dt."""
382
    dt += datetime.timedelta(days=1)
4✔
383
    while dt.weekday() in (5, 6) or dt in NYSEcalendar.holidays(dt.year):
4✔
384
        dt += datetime.timedelta(days=1)
4✔
385
    return dt
4✔
386

387

388
def settleDate(dt: datetime.date, n: int = 1) -> datetime.date:
4✔
389
    """
390
    Return the settlement date for a trade on dt.
391

392
    n is the number of business days to add (T+n).  Defaults to 1 (T+1),
393
    the US equity standard since May 2024.  Pass n=2 for instruments still
394
    settling T+2 (e.g. most bonds, some international markets).
395
    """
396
    for _ in range(n):
4✔
397
        dt = nextBizDay(dt)
4✔
398
    return dt
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc