• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

csingley / ofxtools / 27392199904

12 Jun 2026 03:14AM UTC coverage: 93.873% (+0.06%) from 93.817%
27392199904

push

github

csingley
Format test_utils.py with ruff

4566 of 4864 relevant lines covered (93.87%)

3.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.09
/ofxtools/scripts/ofxget.py
1
#!/usr/bin/env python
2
"""
3
Configurable CLI front end for ``ofxtools.Client``
4
"""
5

6
# stdlib imports
7
import argparse
4✔
8
import concurrent.futures
4✔
9
import configparser
4✔
10
import datetime
4✔
11
import getpass
4✔
12
import itertools
4✔
13
import json
4✔
14
import logging
4✔
15
import logging.config
4✔
16
import pydoc
4✔
17
import sys
4✔
18
import warnings
4✔
19
import xml.etree.ElementTree as ET
4✔
20
from collections import ChainMap, defaultdict
4✔
21
from collections.abc import (
4✔
22
    Callable,
23
    Iterable,
24
    Iterator,
25
    Mapping,
26
    MutableMapping,
27
    Sequence,
28
)
29
from io import BytesIO
4✔
30
from operator import attrgetter
4✔
31
from typing import Any
4✔
32
from urllib import parse as urllib_parse
4✔
33
from urllib.error import HTTPError, URLError
4✔
34

35
# 3rd party imports
36
try:
4✔
37
    # No library stub file for module 'keyring'
38
    import keyring  # type: ignore[import-not-found]
4✔
39

40
    HAS_KEYRING = True
×
41
except ImportError:
4✔
42
    HAS_KEYRING = False
4✔
43

44

45
# local imports
46
from ofxtools import config, models, utils
4✔
47
from ofxtools.Client import (
4✔
48
    CcStmtEndRq,
49
    CcStmtRq,
50
    InvStmtRq,
51
    OFXClient,
52
    StmtEndRq,
53
    StmtRq,
54
)
55
from ofxtools.header import OFXHeaderError
4✔
56
from ofxtools.Parser import OFXTree, ParseError
4✔
57
from ofxtools.Types import DateTime
4✔
58

59
CONFIGPATH = config.CONFIGDIR / "fi.cfg"
4✔
60
USERCONFIGPATH = config.USERCONFIGDIR / "ofxget.cfg"
4✔
61

62

63
logger = logging.getLogger(__name__)
4✔
64

65

66
###############################################################################
67
# TYPE ALIASES
68
###############################################################################
69
# Parsed ArgParser arg
70
ArgType = list[str] | bool | int | str
4✔
71

72
# Common data structure used for loading, combining, and converting between
73
# ArgParser and ConfigParser
74
ArgsType = ChainMap[str, Any]
4✔
75

76
# OFX connection params (OFX version, prettyprint, unclosedelements) tagged
77
# onto the OFXClient.request_profile() job submitted to the ThreadPoolExecutor
78
# during a profile scan
79
OFXVersion = int
4✔
80
MarkupFormat = Mapping[str, bool]  # keys are "pretty", "unclosedelements"
4✔
81
ScanMetadata = tuple[OFXVersion, MarkupFormat]
4✔
82

83
# All working FormatArgs for a given OFX version
84
FormatMap = Mapping[OFXVersion, list[MarkupFormat]]
4✔
85

86
# Scan result of a single OFX protocol version
87
ScanResult = Mapping[str, list]
4✔
88

89
# Auth information parsed out of SIGNONINFO during a profile scan -
90
# CLIENTUIDREQ et al.
91
SignoninfoReport = Mapping[str, bool]
4✔
92

93
# Full set of profile scan results
94
ScanResults = tuple[ScanResult, ScanResult, SignoninfoReport]
4✔
95

96
AcctInfo = models.BANKACCTINFO | models.CCACCTINFO | models.INVACCTINFO
4✔
97
ParsedAcctinfo = dict[str, str | list[Any]]
4✔
98

99

100
###############################################################################
101
# DEFINE CLI
102
###############################################################################
103
class UuidAction(argparse.Action):
4✔
104
    """
105
    Generates a random UUID4 each time called
106
    """
107

108
    def __call__(self, parser, namespace, values, option_string=None):
4✔
109
        uuid = values if values else OFXClient.uuid()
×
110
        setattr(namespace, self.dest, uuid)
×
111

112

113
def make_argparser() -> argparse.ArgumentParser:
4✔
114
    main_parser = argparse.ArgumentParser(
4✔
115
        description="Download OFX financial data", prog="ofxget"
116
    )
117
    subparsers_ = main_parser.add_subparsers(
4✔
118
        title="commands", description=None, help=None
119
    )
120
    subparsers = {}
4✔
121

122
    subparsers["list"] = add_subparser(
4✔
123
        subparsers_, "list", help="List known reachable OFX servers"
124
    )
125
    subparsers["scan"] = add_subparser(
4✔
126
        subparsers_,
127
        "scan",
128
        server=True,
129
        help=("Probe OFX server for working connection parameters"),
130
    )
131
    subparsers["prof"] = add_subparser(
4✔
132
        subparsers_,
133
        "prof",
134
        signon=True,
135
        help=("Download OFX service profile for server"),
136
    )
137
    subparsers["acctinfo"] = add_subparser(
4✔
138
        subparsers_,
139
        "acctinfo",
140
        signon=True,
141
        acctinforq=True,
142
        help=("Download account information for a user login"),
143
    )
144
    subparsers["stmt"] = add_subparser(
4✔
145
        subparsers_,
146
        "stmt",
147
        stmt=True,
148
        help=("Download statement(s) for bank/CC/investment acct(s)"),
149
    )
150
    subparsers["stmtend"] = add_subparser(
4✔
151
        subparsers_,
152
        "stmtend",
153
        stmtend=True,
154
        help=("Download closing statement(s) for bank/CC account(s)"),
155
    )
156
    subparsers["tax1099"] = add_subparser(
4✔
157
        subparsers_,
158
        "tax1099",
159
        tax=True,
160
        help=("(EXPERIMENTAL) Download US income tax data on f1099"),
161
    )
162
    main_parser.subparsers = subparsers  # type: ignore[attr-defined]
4✔
163
    return main_parser
4✔
164

165

166
def add_subparser(
4✔
167
    subparsers: argparse._SubParsersAction,
168
    cmd: str,
169
    server: bool = False,
170
    format: bool = False,
171
    signon: bool = False,
172
    stmtend: bool = False,
173
    stmt: bool = False,
174
    acctinforq: bool = False,
175
    tax: bool = False,
176
    help: str | None = None,
177
) -> argparse.ArgumentParser:
178
    parser = subparsers.add_parser(cmd, help=help, description=help)
4✔
179
    parser.set_defaults(request=cmd)
4✔
180
    parser.add_argument("server", nargs="?", help="OFX server nickname")
4✔
181
    parser.add_argument(
4✔
182
        "--verbose",
183
        "-v",
184
        action="count",
185
        default=0,
186
        help="Give more output (option can be repeated)",
187
    )
188
    # Higher-level configs (e.g. account #s)
189
    # imply lower-level configs (e.g. username/passwd)
190
    if stmt:
4✔
191
        stmtend = True
4✔
192
    if stmtend or tax:
4✔
193
        signon = True
4✔
194
        acctinforq = True  # Support internally generated ACCTINFORQ for --all accounts
4✔
195
    if signon:
4✔
196
        format = True
4✔
197
    if format:
4✔
198
        server = True
4✔
199

200
    if server:
4✔
201
        parser.add_argument("--url", help="OFX server URL")
4✔
202
        parser.add_argument(
4✔
203
            "-w",
204
            "--write",
205
            action="store_true",
206
            default=None,
207
            help="Write working parameters to config file",
208
        )
209
        parser.add_argument(
4✔
210
            "--useragent",
211
            dest="useragent",
212
            help="Value to use in HTTP 'User-Agent' header (defaults to 'InetClntApp/3.0')",
213
        )
214
        parser.add_argument(
4✔
215
            "--skipprofile",
216
            action="store_true",
217
            default=None,
218
            help="Skip sending PROFRQ to look up service URLs",
219
        )
220

221
    if format:
4✔
222
        parser.add_argument(
4✔
223
            "-n",
224
            "--dryrun",
225
            action="store_true",
226
            default=None,
227
            help="Display OFX request and exit without sending",
228
        )
229
        add_format_group(parser)
4✔
230

231
    if signon:
4✔
232
        parser.add_argument(
4✔
233
            "--savepass",
234
            action="store_true",
235
            default=None,
236
            help="Store password in system keyring (requires python-keyring)",
237
        )
238
        parser.add_argument(
4✔
239
            "--nokeyring",
240
            action="store_true",
241
            default=None,
242
            help="Don't use system keyring to store/retrieve passwords",
243
        )
244
        add_signon_group(parser)
4✔
245

246
    if stmtend:
4✔
247
        add_bank_acct_group(parser)
4✔
248
        stmt_group = add_stmt_group(parser)
4✔
249
        if stmt:
4✔
250
            add_stmt_args(stmt_group)
4✔
251
            add_inv_acct_group(parser)
4✔
252
            add_inv_stmt_group(parser)
4✔
253

254
    if acctinforq:
4✔
255
        add_acctinforq_group(parser)
4✔
256

257
    if tax:
4✔
258
        add_tax_group(parser)
4✔
259

260
    return parser
4✔
261

262

263
def add_format_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
264
    group = parser.add_argument_group(title="format options")
4✔
265
    group.add_argument("--version", help="OFX version", type=int)
4✔
266
    group.add_argument(
4✔
267
        "--unclosedelements",
268
        action="store_true",
269
        default=None,
270
        help="Omit end tags for elements (OFXv1 only)",
271
    )
272
    group.add_argument(
4✔
273
        "--pretty",
274
        action="store_true",
275
        default=None,
276
        help="Insert newlines and whitespace indentation",
277
    )
278
    group.add_argument(
4✔
279
        "--nonewfileuid",
280
        action="store_true",
281
        default=None,
282
        help="Use 'NONE' instead of generating a UID for NEWFILEUID in header",
283
    )
284

285
    return group
4✔
286

287

288
def add_signon_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
289
    group = parser.add_argument_group(title="signon options")
4✔
290
    group.add_argument("-u", "--user", help="FI login username")
4✔
291
    group.add_argument(
4✔
292
        "--password",
293
        help="Password. Used for scripting. Eg: --password $(/usr/bin/pass mybank/login). Use with "
294
        "caution to avoid exposing passwords to the shell and its history.",
295
    )
296
    group.add_argument(
4✔
297
        "--clientuid",
298
        nargs="?",
299
        action=UuidAction,
300
        metavar="UUID4",
301
        help="Override default CLIENTUID with the specified ID, or with a random ID if "
302
        "left unspecified",
303
    )
304
    group.add_argument("--org", help="FI.ORG")
4✔
305
    group.add_argument("--fid", help="FI.FID")
4✔
306
    group.add_argument("--appid", help="OFX client app identifier")
4✔
307
    group.add_argument("--appver", help="OFX client app version")
4✔
308
    group.add_argument("--language", help="OFX language")
4✔
309

310
    return group
4✔
311

312

313
def add_bank_acct_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
314
    group = parser.add_argument_group(title="bank/CC account options")
4✔
315
    group.add_argument("--bankid", help="ABA routing#")
4✔
316
    group.add_argument(
4✔
317
        "-C",
318
        "--checking",
319
        metavar="#",
320
        action="append",
321
        help="Account number (option can be repeated)",
322
    )
323
    group.add_argument(
4✔
324
        "-S",
325
        "--savings",
326
        metavar="#",
327
        action="append",
328
        help="Account number (option can be repeated)",
329
    )
330
    group.add_argument(
4✔
331
        "-M",
332
        "--moneymrkt",
333
        metavar="#",
334
        action="append",
335
        help="Account number (option can be repeated)",
336
    )
337
    group.add_argument(
4✔
338
        "-L",
339
        "--creditline",
340
        metavar="#",
341
        action="append",
342
        help="Account number (option can be repeated)",
343
    )
344
    group.add_argument(
4✔
345
        "-c",
346
        "--creditcard",
347
        "--cc",
348
        metavar="#",
349
        action="append",
350
        help="Account number (option can be repeated)",
351
    )
352
    group.add_argument(
4✔
353
        "--all",
354
        dest="all",
355
        action="store_true",
356
        default=None,
357
        help="Request ACCTINFO; download statements for all",
358
    )
359

360
    return group
4✔
361

362

363
def add_stmt_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
364
    group = parser.add_argument_group(
4✔
365
        title="general statement options (both bank and investment)"
366
    )
367
    group.add_argument(
4✔
368
        "-s",
369
        "--start",
370
        metavar="DATE",
371
        dest="dtstart",
372
        help="(YYYYmmdd) Transactions list start date",
373
    )
374
    group.add_argument(
4✔
375
        "-e",
376
        "--end",
377
        metavar="DATE",
378
        dest="dtend",
379
        help="(YYYYmmdd) Transactions list end date",
380
    )
381
    return group
4✔
382

383

384
def add_stmt_args(group: argparse._ArgumentGroup) -> argparse._ArgumentGroup:
4✔
385
    group.add_argument(
4✔
386
        "-a",
387
        "--asof",
388
        metavar="DATE",
389
        dest="dtasof",
390
        help="(YYYYmmdd) As-of date for balances and investment positions",
391
    )
392
    group.add_argument(
4✔
393
        "--no-transactions",
394
        dest="inctran",
395
        action="store_false",
396
        default=None,
397
        help="Omit transactions (config 'inctran: false')",
398
    )
399
    group.add_argument(
4✔
400
        "--no-balances",
401
        dest="incbal",
402
        action="store_false",
403
        default=None,
404
        help="Omit balances (config 'incbal: false')",
405
    )
406

407
    return group
4✔
408

409

410
def add_inv_acct_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
411
    group = parser.add_argument_group(title="investment account options")
4✔
412
    group.add_argument("--brokerid", help="Broker ID string")
4✔
413
    group.add_argument(
4✔
414
        "-i",
415
        "--investment",
416
        metavar="#",
417
        action="append",
418
        help="Account number (option can be repeated)",
419
    )
420
    return group
4✔
421

422

423
def add_inv_stmt_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
424
    group = parser.add_argument_group(title="investment statement options")
4✔
425
    group.add_argument(
4✔
426
        "--no-positions",
427
        dest="incpos",
428
        action="store_false",
429
        default=None,
430
        help="Omit investment positions (config 'incpos: false')",
431
    )
432
    group.add_argument(
4✔
433
        "--open-orders",
434
        dest="incoo",
435
        action="store_true",
436
        default=None,
437
        help="Include open orders (config 'incoo: true')",
438
    )
439
    return group
4✔
440

441

442
def add_acctinforq_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
443
    group = parser.add_argument_group(title="account info request options")
4✔
444
    group.add_argument(
4✔
445
        "--dtacctup",
446
        metavar="DATE",
447
        dest="dtacctup",
448
        help="(YYYYmmdd) date of last account update",
449
    )
450
    return group
4✔
451

452

453
def add_tax_group(parser: argparse.ArgumentParser) -> argparse._ArgumentGroup:
4✔
454
    group = parser.add_argument_group(title="tax form options")
4✔
455
    group.add_argument(
4✔
456
        "-y",
457
        "--year",
458
        metavar="YEAR",
459
        dest="years",
460
        type=int,
461
        action="append",
462
        help="(YYYY) Tax year (option can be repeated)",
463
    )
464
    group.add_argument(
4✔
465
        "--acctnum",
466
        dest="acctnum",
467
        help="Account # of recipient, if different than tax ID",
468
    )
469
    group.add_argument("--recid", dest="recid", help="ID of recipient")
4✔
470

471
    return group
4✔
472

473

474
###############################################################################
475
# CLI METHODS
476
###############################################################################
477
def scan_profile(args: ArgsType) -> None:
4✔
478
    """
479
    Report working connection parameters
480
    """
481
    if args["dryrun"]:
4✔
482
        raise SyntaxError("Can't reasonably show a dry run for a profile scan")
×
483

484
    url = args["url"]
4✔
485
    org = args["org"]
4✔
486
    fid = args["fid"]
4✔
487
    useragent = args["useragent"]
4✔
488
    gen_newfileuid = not args["nonewfileuid"]
4✔
489
    timeout = 2.0
4✔
490

491
    scan_results = _scan_profile(
4✔
492
        url=url,
493
        org=org,
494
        fid=fid,
495
        useragent=useragent,
496
        gen_newfileuid=gen_newfileuid,
497
        timeout=timeout,
498
    )
499

500
    v1, v2, signoninfo = scan_results
4✔
501
    if (not v2["versions"]) and (not v1["versions"]):
4✔
502
        msg = f"Scan found no working formats for {url}"
4✔
503
        print(msg)
4✔
504
    else:
505
        print(json.dumps(scan_results))
4✔
506

507
        if args["write"]:
4✔
508
            extra_args = _best_scan_format(scan_results)
4✔
509
            write_config(ChainMap(extra_args, dict(args)))
4✔
510

511

512
def _best_scan_format(scan_results: ScanResults) -> MutableMapping:
4✔
513
    """
514
    Given the results of _scan_profile(), choose the best parameters;
515
    return as dict (compatible with ArgParser/ ChainMap).
516

517
    "Best" here means highest working version with the minimal configuration
518
    delta, i.e. we prefer formats with "pretty"/"unclosedelements" given as
519
    False (the default) over True.
520
    """
521
    logger.info(f"Choosing best scan result from {scan_results}")
4✔
522
    v1, v2, signoninfo = scan_results
4✔
523
    if v2["versions"]:
4✔
524
        logger.debug("Found working OFX version 2")
4✔
525
        result = v2
4✔
526
    elif v1["versions"]:
×
527
        logger.debug("Found working OFX version 1")
×
528
        result = v1
×
529
    else:
530
        logger.info("Found no working OFX versions; returning")
×
531
        return {}
×
532

533
    formats = sorted(result["formats"], key=lambda f: sum(f.values()))
4✔
534
    logger.debug(f"Choose best format {formats[0]} from {formats}")
4✔
535
    args = {k: v for k, v in formats[0].items() if v}
4✔
536

537
    versions = result["versions"]
4✔
538
    args["version"] = versions[-1]
4✔
539
    logger.debug(f"Choose best version{versions[-1]} from {versions}")
4✔
540
    logger.info(f"Best scan result: {args}")
4✔
541
    return args
4✔
542

543

544
def init_client(args: ArgsType) -> OFXClient:
4✔
545
    """
546
    Initialize OFXClient with connection info from args
547
    """
548
    client = OFXClient(
4✔
549
        args["url"],
550
        userid=args["user"] or None,
551
        clientuid=args["clientuid"] or None,
552
        org=args["org"] or None,
553
        fid=args["fid"] or None,
554
        version=args["version"],
555
        appid=args["appid"] or None,
556
        appver=args["appver"] or None,
557
        language=args["language"] or None,
558
        prettyprint=args["pretty"],
559
        close_elements=not args["unclosedelements"],
560
        bankid=args["bankid"] or None,
561
        brokerid=args["brokerid"] or None,
562
        useragent=args["useragent"] or None,
563
    )
564
    logger.debug(f"Initialized {client}")
4✔
565
    return client
4✔
566

567

568
def request_profile(args: ArgsType) -> None:
4✔
569
    """
570
    Send PROFRQ
571
    """
572
    client = init_client(args)
4✔
573

574
    with client.request_profile(
4✔
575
        dryrun=args["dryrun"],
576
        gen_newfileuid=not args["nonewfileuid"],
577
    ) as f:
578
        response = f.read()
4✔
579

580
    print(response.decode())
4✔
581

582
    if args["write"]:
4✔
583
        write_config(args)
×
584

585

586
def request_acctinfo(args: ArgsType) -> None:
4✔
587
    """
588
    Send ACCTINFORQ
589
    """
590

591
    if not args["user"]:
4✔
592
        msg = "'user' not configured"
×
593
        logger.error(msg)
×
594
        raise ValueError(msg)
×
595

596
    password = get_passwd(args)
4✔
597
    acctinfo = _request_acctinfo(args, password)
4✔
598

599
    print(acctinfo.read().decode())
4✔
600
    acctinfo.seek(0)
4✔
601

602
    if args["write"]:
4✔
603
        _merge_acctinfo(args, acctinfo)
×
604
        write_config(args)
×
605

606
    if args["savepass"]:
4✔
607
        save_passwd(args, password)
×
608

609

610
def _request_acctinfo(args: ArgsType, password: str) -> BytesIO:
4✔
611
    client = init_client(args)
4✔
612
    dtacctup = args["dtacctup"] or datetime.datetime(1990, 1, 1, tzinfo=utils.UTC)
4✔
613

614
    with client.request_accounts(
4✔
615
        password,
616
        dtacctup,
617
        dryrun=args["dryrun"],
618
        gen_newfileuid=not args["nonewfileuid"],
619
        skip_profile=args["skipprofile"],
620
    ) as f:
621
        response = f.read()
4✔
622

623
    return BytesIO(response)
4✔
624

625

626
def _merge_acctinfo(args: ArgsType, markup: BytesIO) -> None:
4✔
627
    # *ACCTINFO classes don't have rich comparison methods;
628
    # can't sort by class
629
    sortKey = attrgetter("__class__.__name__")
4✔
630
    acctinfos: list[AcctInfo] = sorted(extract_acctinfos(markup), key=sortKey)
4✔
631

632
    def parse_acctinfos(clsName, acctinfos):
4✔
633
        dispatcher = {
4✔
634
            "BANKACCTINFO": parse_bankacctinfos,
635
            "CCACCTINFO": parse_ccacctinfos,
636
            "INVACCTINFO": parse_invacctinfos,
637
        }
638
        parser = dispatcher.get(clsName, lambda x: {})
4✔
639
        return parser(acctinfos)
4✔
640

641
    parsed_args: list[ParsedAcctinfo] = [
4✔
642
        parse_acctinfos(clsnm, infos)
643
        for clsnm, infos in itertools.groupby(acctinfos, key=sortKey)
644
    ]
645

646
    # Insert extracted ACCTINFO after CLI commands, but before config files
647
    args.maps.insert(1, ChainMap(*parsed_args))
4✔
648

649

650
def request_stmt(args: ArgsType) -> None:
4✔
651
    """
652
    Send *STMTRQ
653
    """
654
    dt = convert_datetime(args)
4✔
655
    password = get_passwd(args)
4✔
656

657
    if args["all"]:
4✔
658
        acctinfo = _request_acctinfo(args, password)
×
659
        _merge_acctinfo(args, acctinfo)
×
660

661
    stmtrqs: list[StmtRq | CcStmtRq | InvStmtRq] = []
4✔
662
    for accttype in ("checking", "savings", "moneymrkt", "creditline"):
4✔
663
        stmtrqs.extend(
4✔
664
            [
665
                StmtRq(
666
                    acctid=acctid,
667
                    accttype=accttype.upper(),
668
                    dtstart=dt["start"],
669
                    dtend=dt["end"],
670
                    inctran=args["inctran"],
671
                )
672
                for acctid in args[accttype]
673
            ]
674
        )
675

676
    for acctid in args["creditcard"]:
4✔
677
        stmtrqs.append(
4✔
678
            CcStmtRq(
679
                acctid=acctid,
680
                dtstart=dt["start"],
681
                dtend=dt["end"],
682
                inctran=args["inctran"],
683
            )
684
        )
685

686
    for acctid in args["investment"]:
4✔
687
        stmtrqs.append(
4✔
688
            InvStmtRq(
689
                acctid=acctid,
690
                dtstart=dt["start"],
691
                dtend=dt["end"],
692
                dtasof=dt["asof"],
693
                inctran=args["inctran"],
694
                incoo=args["incoo"],
695
                incpos=args["incpos"],
696
                incbal=args["incbal"],
697
            )
698
        )
699

700
    if not stmtrqs:
4✔
701
        accttypes = [
4✔
702
            "checking",
703
            "savings",
704
            "moneymrkt",
705
            "creditline",
706
            "creditcard",
707
            "investment",
708
        ]
709
        msg = f"No accounts specified; configure at least one of {accttypes}"
4✔
710
        warnings.warn(msg, category=SyntaxWarning)
4✔
711

712
    client = init_client(args)
4✔
713
    with client.request_statements(
4✔
714
        password,
715
        *stmtrqs,
716
        dryrun=args["dryrun"],
717
        gen_newfileuid=not args["nonewfileuid"],
718
        skip_profile=args["skipprofile"],
719
    ) as f:
720
        response = f.read()
4✔
721

722
    print(response.decode())
4✔
723

724
    if args["write"]:
4✔
725
        write_config(args)
×
726

727
    if args["savepass"]:
4✔
728
        save_passwd(args, password)
×
729

730

731
def request_stmtend(args: ArgsType) -> None:
4✔
732
    """
733
    Send *STMTENDRQ
734
    """
735
    dt = convert_datetime(args)
4✔
736
    password = get_passwd(args)
4✔
737

738
    if args["all"]:
4✔
739
        acctinfo = _request_acctinfo(args, password)
×
740
        _merge_acctinfo(args, acctinfo)
×
741

742
    stmtendrqs: list[StmtEndRq | CcStmtEndRq] = []
4✔
743
    for accttype in ("checking", "savings", "moneymrkt", "creditline"):
4✔
744
        acctids = args[accttype]
4✔
745
        stmtendrqs.extend(
4✔
746
            [
747
                StmtEndRq(
748
                    acctid=acctid,
749
                    accttype=accttype.upper(),
750
                    dtstart=dt["start"],
751
                    dtend=dt["end"],
752
                )
753
                for acctid in acctids
754
            ]
755
        )
756

757
    for acctid in args["creditcard"]:
4✔
758
        stmtendrqs.append(
4✔
759
            CcStmtEndRq(acctid=acctid, dtstart=dt["start"], dtend=dt["end"])
760
        )
761

762
    if not stmtendrqs:
4✔
763
        accttypes = ["checking", "savings", "moneymrkt", "creditline", "creditcard"]
×
764
        msg = f"No accounts specified; configure at least one of {accttypes}"
×
765
        warnings.warn(msg, category=SyntaxWarning)
×
766

767
    client = init_client(args)
4✔
768
    with client.request_statements(
4✔
769
        password,
770
        *stmtendrqs,
771
        dryrun=args["dryrun"],
772
        gen_newfileuid=not args["nonewfileuid"],
773
        skip_profile=args["skipprofile"],
774
    ) as f:
775
        response = f.read()
4✔
776

777
    print(response.decode())
4✔
778

779
    if args["write"]:
4✔
780
        write_config(args)
×
781

782
    if args["savepass"]:
4✔
783
        save_passwd(args, password)
×
784

785

786
def request_tax1099(args: ArgsType) -> None:
4✔
787
    """
788
    Send TAX1099RQ
789
    """
790
    client = init_client(args)
4✔
791

792
    password = get_passwd(args)
4✔
793

794
    with client.request_tax1099(
4✔
795
        password,
796
        *args["years"],
797
        acctnum=args["acctnum"],
798
        recid=args["recid"],
799
        dryrun=args["dryrun"],
800
        gen_newfileuid=not args["nonewfileuid"],
801
        skip_profile=args["skipprofile"],
802
    ) as f:
803
        response = f.read()
4✔
804

805
    print(response.decode())
4✔
806

807

808
###############################################################################
809
# ARGUMENT/CONFIG HANDLERS
810
###############################################################################
811
def convert_list(string: str) -> list[str]:
4✔
812
    """
813
    Deserialize INI representation to a Python list
814
    """
815
    return [sub.strip() for sub in string.split(",")]
×
816

817

818
class UserConfig(configparser.ConfigParser):
4✔
819
    def __init__(self, *args, **kwargs):
4✔
820
        kwargs["converters"] = {"list": convert_list}
4✔
821
        super().__init__(*args, **kwargs)
4✔
822

823

824
class LibraryConfig(configparser.ConfigParser):
4✔
825
    def __init__(self, *args, **kwargs):
4✔
826
        kwargs["converters"] = {"list": convert_list}
4✔
827
        super().__init__(*args, **kwargs)
4✔
828

829

830
USERCFG = UserConfig()
4✔
831
USERCFG.read([CONFIGPATH, USERCONFIGPATH])
4✔
832

833

834
LIBCFG = LibraryConfig()
4✔
835
LIBCFG.read(CONFIGPATH)
4✔
836

837

838
DEFAULTS: dict[str, ArgType] = {
4✔
839
    "verbose": 0,
840
    "server": "",
841
    "url": "",
842
    "version": 203,
843
    "org": "",
844
    "fid": "",
845
    "appid": "",
846
    "appver": "",
847
    "language": "",
848
    "bankid": "",
849
    "brokerid": "",
850
    "unclosedelements": False,
851
    "pretty": False,
852
    "user": "",
853
    "password": "",
854
    "clientuid": "",
855
    "checking": [],
856
    "savings": [],
857
    "moneymrkt": [],
858
    "creditline": [],
859
    "creditcard": [],
860
    "investment": [],
861
    "dtstart": "",
862
    "dtend": "",
863
    "dtasof": "",
864
    "dtacctup": "",
865
    "inctran": True,
866
    "incbal": True,
867
    "incpos": True,
868
    "incoo": False,
869
    "all": False,
870
    "years": [],
871
    "acctnum": "",
872
    "recid": "",
873
    "dryrun": False,
874
    "unsafe": False,
875
    "write": False,
876
    "savepass": False,
877
    "nokeyring": False,
878
    "nonewfileuid": False,
879
    "useragent": "",
880
    "skipprofile": False,
881
}
882

883

884
NULL_ARGS: Iterable = (None, "", [])
4✔
885

886

887
# "Configurable" means "will be read from / written to config file".
888
# Subdivided into -
889
#   "Configurable server", i.e. parameters used establish an OFX connection
890
#       (the kind of thing you'd pass to OFXClient.__init__(), which is
891
#       (the kind of thing you'd pass to OFXClient.__init__()); and
892
#   "Configurable user", i.e. auth/account parameters that are completely
893
#       user-specific and won't be shared by different users of the library.
894
configurable_srvr = (
4✔
895
    "url",
896
    "version",
897
    "pretty",
898
    "unclosedelements",
899
    "org",
900
    "fid",
901
    "brokerid",
902
    "bankid",
903
    "appid",
904
    "appver",
905
    "language",
906
    "nonewfileuid",
907
    "useragent",
908
    "skipprofile",
909
)
910
CONFIGURABLE_SRVR = {k: type(v) for k, v in DEFAULTS.items() if k in configurable_srvr}
4✔
911

912

913
configurable_user = (
4✔
914
    "user",
915
    "clientuid",
916
    "checking",
917
    "savings",
918
    "moneymrkt",
919
    "creditline",
920
    "creditcard",
921
    "investment",
922
)
923
CONFIGURABLE_USER = {k: type(v) for k, v in DEFAULTS.items() if k in configurable_user}
4✔
924

925

926
CONFIGURABLE = CONFIGURABLE_SRVR
4✔
927
CONFIGURABLE.update(CONFIGURABLE_USER)
4✔
928

929

930
def read_config(cfg: configparser.ConfigParser, section: str) -> Mapping[str, ArgType]:
4✔
931
    logger.info(f"Loading {cfg.__class__.__name__}")
4✔
932
    args: Mapping = {}
4✔
933
    if section not in cfg:
4✔
934
        return args
4✔
935

936
    proxy = cfg[section]
×
937
    handlers: dict[type | None, Callable[..., ArgType | None]] = {
×
938
        bool: proxy.getboolean,
939
        int: proxy.getint,
940
        list: proxy.getlist,
941
        str: proxy.get,
942
        None: lambda x: None,
943
    }
944

945
    args = {
×
946
        opt: handlers[CONFIGURABLE.get(opt, None)](opt)
947
        for opt in proxy
948
        if opt in CONFIGURABLE
949
    }
950
    logger.debug(f"Loaded {args}")
×
951

952
    return args
×
953

954

955
def write_config(args: ArgsType) -> None:
4✔
956
    if args["dryrun"]:
×
957
        msg = "Dry run; won't store password"
×
958
        warnings.warn(msg, category=SyntaxWarning)
×
959
        return
×
960

961
    mk_server_cfg(args)
×
962
    logger.info(f"Writing user configs to {USERCONFIGPATH}")
×
963

964
    config.USERCONFIGDIR.mkdir(parents=True, exist_ok=True)
×
965
    with open(USERCONFIGPATH, "w") as f:
×
966
        USERCFG.write(f)
×
967

968

969
def mk_server_cfg(args: ArgsType) -> configparser.SectionProxy:
4✔
970
    """
971
    Load user config from disk; apply key args to the section corresponding to
972
    the server nickname.
973
    """
974
    logger.info("Creating user config")
4✔
975
    logger.debug(f"Args to populate config: {args}")
4✔
976

977
    logger.debug(f"Reloading user config from {USERCONFIGPATH}")
4✔
978
    USERCFG.clear()
4✔
979
    USERCFG.read(USERCONFIGPATH)
4✔
980

981
    defaults = USERCFG[USERCFG.default_section]
4✔
982
    if "clientuid" not in defaults:
4✔
983
        clientuid = OFXClient.uuid()
4✔
984
        logger.debug(f"No global default CLIENTUID found; choosing {clientuid}")
4✔
985
        defaults["clientuid"] = clientuid
4✔
986

987
    server = args.get("server", None)
4✔
988
    # args.server might actually be a URL from CLI, not a nickname
989
    if (not server) or server == args["url"]:
4✔
990
        msg = "No server nickname provided; can't create config"
4✔
991
        logger.error(msg)
4✔
992
        raise ValueError(msg)
4✔
993
    logger.debug(f"Configuring {server}")
4✔
994

995
    if not USERCFG.has_section(server):
4✔
996
        USERCFG[server] = {}
4✔
997
    cfg = USERCFG[server]
4✔
998
    logger.debug(f"Existing user config section: {dict(cfg)}")
4✔
999

1000
    lib_cfg = read_config(LIBCFG, server)
4✔
1001

1002
    def test_cfg_val(opt: str, value: ArgType) -> bool:
4✔
1003
        """Select CLI args to write to config file"""
1004
        if value in NULL_ARGS:
4✔
1005
            return False
×
1006
        # Don't include CLIENTUID in the server section if it's sourced
1007
        # from USERCFG.default_section
1008
        if opt == "clientuid" and value == defaults["clientuid"]:
4✔
1009
            return False
×
1010
        # Don't include configs that are the same as defaults
1011
        elif value == lib_cfg.get(opt, DEFAULTS[opt]):
4✔
1012
            return False
4✔
1013

1014
        return True
4✔
1015

1016
    for opt, opt_type in CONFIGURABLE.items():
4✔
1017
        if opt in args:
4✔
1018
            value = args[opt]
4✔
1019
            if test_cfg_val(opt, value):
4✔
1020
                cfg[opt] = arg2config(opt, opt_type, value)
4✔
1021

1022
    return cfg
4✔
1023

1024

1025
def arg2config(key: str, cfg_type: type, value: ArgType) -> str:
4✔
1026
    """
1027
    Transform a config value from ArgParser format to ConfigParser format
1028
    """
1029

1030
    def write_string(value: str) -> str:
4✔
1031
        return value
4✔
1032

1033
    def write_int(value: int) -> str:
4✔
1034
        return str(value)
4✔
1035

1036
    def write_bool(value: bool) -> str:
4✔
1037
        return {True: "true", False: "false"}[value]
4✔
1038

1039
    def write_list(value: list) -> str:
4✔
1040
        # Serialized string representation of Python list type
1041
        return str(value).strip("[]").replace("'", "")
4✔
1042

1043
    handlers: dict[type, Callable[..., str]] = {
4✔
1044
        str: write_string,
1045
        bool: write_bool,
1046
        list: write_list,
1047
        int: write_int,
1048
    }
1049

1050
    if cfg_type not in handlers:
4✔
1051
        msg = f"Don't know how to write config for {key} type={cfg_type}"
×
1052
        logger.error(msg)
×
1053
        raise ValueError(msg)
×
1054

1055
    return handlers[cfg_type](value)
4✔
1056

1057

1058
def merge_config(
4✔
1059
    args: argparse.Namespace, config: configparser.ConfigParser
1060
) -> ArgsType:
1061
    """
1062
    Merge CLI args > user config > OFX Home > defaults
1063
    """
1064
    logger.info("Merging args")
4✔
1065
    # All ArgumentParser args that have a value set
1066
    _args = extractns(args)
4✔
1067
    logger.debug(f"CLI args: {_args}")
4✔
1068

1069
    if "server" in _args:
4✔
1070
        user_cfg = read_config(config, _args["server"])
4✔
1071
    else:
1072
        user_cfg = {}
×
1073
    logger.debug(f"Configs: {user_cfg}")
4✔
1074
    merged: ArgsType = ChainMap(_args, user_cfg, DEFAULTS)  # type: ignore[arg-type]
4✔
1075
    #  logger.debug(f"CLI args merged with user configs and defaults: {extrargs(merged)}")
1076

1077
    if not (
4✔
1078
        merged.get("url", None)
1079
        or merged.get("dryrun", False)
1080
        or merged.get("request", None) == "list"
1081
    ):
1082
        err = "Missing URL"
4✔
1083

1084
        if "server" not in _args:
4✔
1085
            logger.error(err)
×
1086
            msg = f"{err} - please provide a server nickname, or configure 'url'\n"
×
1087
            print(msg)
×
1088
            command = merged["request"]
×
1089
            make_argparser().subparsers[command].print_help()  # type: ignore[attr-defined]
×
1090
            sys.exit()
×
1091

1092
        server = _args["server"]
4✔
1093
        # Allow sloppy CLI args - passing URL as "server" positional arg
1094
        if urllib_parse.urlparse(server).scheme:
4✔
1095
            merged["url"] = server
×
1096
            merged["server"] = None
×
1097
        else:
1098
            logger.error(err)
4✔
1099
            msg = f"{err} - please configure 'url' for server '{server}'"
4✔
1100
            raise ValueError(msg)
4✔
1101

1102
    logger.info(f"Merged args: {extrargs(merged)}")
×
1103
    return merged
×
1104

1105

1106
def extrargs(args: ArgsType) -> dict:
4✔
1107
    """Extract non-null args"""
1108
    return {k: v for k, v in args.items() if v not in NULL_ARGS}
×
1109

1110

1111
def extractns(ns) -> dict:
4✔
1112
    """Extract non-null argparse.Namespace"""
1113
    return {k: v for k, v in vars(ns).items() if v is not None}
4✔
1114

1115

1116
###############################################################################
1117
# PROFILE SCAN
1118
###############################################################################
1119
def _scan_profile(
4✔
1120
    url: str,
1121
    org: str | None,
1122
    fid: str | None,
1123
    useragent: str | None,
1124
    gen_newfileuid: bool,
1125
    max_workers: int | None = None,
1126
    timeout: float | None = None,
1127
) -> ScanResults:
1128
    """
1129
    Report permutations of OFX version/prettyprint/unclosedelements that
1130
    successfully download OFX profile from server.
1131

1132
    Returns a 3-tuple of (OFXv1 results, OFXv2 results, signoninfo), each
1133
    type(dict).  OFX results provide ``ofxget`` configs that will work to
1134
    make a basic OFX connection. SIGNONINFO reports further information
1135
    that may be helpful to authenticate successfully.
1136
    """
1137
    logger.info(
4✔
1138
        f"Scanning url={url} org={org} fid={fid} "
1139
        f"max_workers={max_workers} timeout={timeout}"
1140
    )
1141
    client = OFXClient(url, org=org, fid=fid, useragent=useragent)
4✔
1142
    futures = _queue_scans(client, gen_newfileuid, max_workers, timeout)
4✔
1143

1144
    # The primary data we keep is actually the metadata (i.e. connection
1145
    # parameters - OFX version; prettyprint; unclosedelements) tagged on
1146
    # the Future by _queue_scans() that gave us a successful OFX connection.
1147
    success_params: FormatMap = defaultdict(list)
4✔
1148
    # If possible, we also parse out some data from SIGNONINFO included in
1149
    # the PROFRS.
1150
    signoninfo: SignoninfoReport = {}
4✔
1151

1152
    # Assume that SIGNONINFO is the same for each successful OFX PROFRS.
1153
    # Tell _read_scan_response() to stop parsing out SIGNONINFO once
1154
    # it's successfully extracted one.
1155
    for future in concurrent.futures.as_completed(futures):
4✔
1156
        version, format = futures[future]
4✔
1157
        valid, signoninfo_ = _read_scan_response(future, not signoninfo)
4✔
1158

1159
        if not valid:
4✔
1160
            continue
4✔
1161
        if not signoninfo and signoninfo_:
4✔
1162
            signoninfo = signoninfo_
4✔
1163

1164
        logger.debug(f"OFX connection success, version={version}, format={format}")
4✔
1165
        success_params[version].append(format)
4✔
1166

1167
    v1_result, v2_result = [
4✔
1168
        collate_scan_results(ver)
1169
        for ver in utils.partition(lambda it: it[0] >= 200, success_params.items())
1170
    ]
1171

1172
    # V2 always has closing tags for elements; just report prettyprint
1173
    for fmt in v2_result["formats"]:
4✔
1174
        if fmt["unclosedelements"]:
4✔
1175
            raise ValueError(f"OFX v2 format has unclosed elements: {fmt}")
×
1176
        del fmt["unclosedelements"]
4✔
1177

1178
    results = (v1_result, v2_result, signoninfo)
4✔
1179
    logger.info(f"Scan results: {results}")
4✔
1180
    return results
4✔
1181

1182

1183
def _queue_scans(
4✔
1184
    client: OFXClient,
1185
    gen_newfileuid: bool,
1186
    max_workers: int | None,
1187
    timeout: float | None,
1188
) -> Mapping[concurrent.futures.Future, ScanMetadata]:
1189
    ofxv1 = [102, 103, 151, 160]
4✔
1190
    ofxv2 = [200, 201, 202, 203, 210, 211, 220]
4✔
1191

1192
    BOOLS = (False, True)
4✔
1193

1194
    futures = {}
4✔
1195
    with concurrent.futures.ThreadPoolExecutor(max_workers) as executor:
4✔
1196
        for version, pretty, close in itertools.product(ofxv1, BOOLS, BOOLS):
4✔
1197
            future = executor.submit(
4✔
1198
                client.request_profile,
1199
                gen_newfileuid=gen_newfileuid,
1200
                version=version,
1201
                prettyprint=pretty,
1202
                close_elements=close,
1203
                timeout=timeout,
1204
            )
1205
            futures[future] = (
4✔
1206
                version,
1207
                {"pretty": pretty, "unclosedelements": not close},
1208
            )
1209

1210
        for version, pretty in itertools.product(ofxv2, BOOLS):
4✔
1211
            future = executor.submit(
4✔
1212
                client.request_profile,
1213
                version=version,
1214
                prettyprint=pretty,
1215
                close_elements=True,
1216
                timeout=timeout,
1217
            )
1218
            futures[future] = (
4✔
1219
                version,
1220
                {"pretty": pretty, "unclosedelements": not close},
1221
            )
1222

1223
    return futures
4✔
1224

1225

1226
def _read_scan_response(
4✔
1227
    future: concurrent.futures.Future, read_signoninfo: bool = False
1228
) -> tuple[bool, SignoninfoReport]:
1229
    valid: bool = False
4✔
1230
    signoninfo: SignoninfoReport = {}
4✔
1231

1232
    try:
4✔
1233
        # ``future.result()`` returns an http.client.HTTPResponse
1234
        response = future.result()
4✔
1235
    except (TimeoutError, URLError, HTTPError, ConnectionError, OSError) as exc:
4✔
1236
        logger.error(f"Didn't receive response: {exc}")
4✔
1237
        return valid, signoninfo
4✔
1238
    except (
×
1239
        ParseError,
1240
        ET.ParseError,
1241
        OFXHeaderError,
1242
    ):
1243
        logger.error("Response contains invalid OFX: {exc}")
×
1244
        return valid, signoninfo
×
1245

1246
    if read_signoninfo:
4✔
1247
        # ``response`` is an HTTPResponse; doesn't have seek() method used
1248
        # by ``header.parse_header()``.  Repackage as BytesIO for parsing.
1249
        with response as f:
4✔
1250
            response_ = f.read()
4✔
1251
        try:
4✔
1252
            if not response_:
4✔
1253
                return valid, signoninfo
×
1254

1255
            signoninfos: Iterator[models.SIGNONINFO] = extract_signoninfos(
4✔
1256
                BytesIO(response_)
1257
            )
1258

1259
            # Assume that all the SIGNONINFOs have the same content
1260
            valid = True
4✔
1261
            info = next(signoninfos)
4✔
1262
            bool_attrs = (
4✔
1263
                "chgpinfirst",
1264
                "clientuidreq",
1265
                "authtokenfirst",
1266
                "mfachallengefirst",
1267
            )
1268
            signoninfo = {
4✔
1269
                attr: getattr(info, attr, None) or False for attr in bool_attrs
1270
            }
1271

1272
            logger.debug(
4✔
1273
                f"Received HTTP response with valid OFX; signoninfo={signoninfo}"
1274
            )
1275
        except (ParseError, ET.ParseError, OFXHeaderError) as exc:
4✔
1276
            # We didn't receive valid OFX in the response
1277
            logger.error(f"Response contains invalid OFX {exc}")
4✔
1278
            valid = False
4✔
1279
        except (ValueError,):
4✔
1280
            # We received OFX; can't find SIGNONIFO (probably no PROFRS)
1281
            logger.warning("Response with valid OFX; can't parse SIGNONINFO")
4✔
1282
            valid = True
4✔
1283
    else:
1284
        # IF we're not parsing the PROFRS, then we interpret receiving a good
1285
        # HTTP response as valid.
1286
        logger.debug("Received HTTP response; not parsing SIGNONINFO")
4✔
1287
        valid = True
4✔
1288

1289
    logger.info(f"valid: {valid}, signoninfo: {signoninfo}")
4✔
1290
    return valid, signoninfo
4✔
1291

1292

1293
def collate_scan_results(
4✔
1294
    scan_results: Iterable[tuple[OFXVersion, Sequence[MarkupFormat]]],
1295
) -> ScanResult:
1296
    """
1297
    Input ``scan_results`` needs to be a complete set for either OFXv1 or v2,
1298
    with no results for the other version admixed.
1299
    """
1300
    results_ = list(scan_results)
4✔
1301
    if not results_:
4✔
1302
        return {"versions": [], "formats": []}
4✔
1303
    versions, formats = zip(*results_)
4✔
1304

1305
    # Assumption: the same markup formatting requirements apply to all
1306
    # versions (e.g. 1.0.2 and 1.0.3, or 2.0.3 and 2.2.0).
1307
    # If markup format succeeds on some versions but fails on others,
1308
    # we'll chalk it up to network transmission errors.
1309
    #
1310
    # Translation: just pick the longest sequence of successful
1311
    # formats and assume it applies for all versions.
1312
    def sortKey(d):
4✔
1313
        return len(d)
4✔
1314

1315
    formats_ = list(max(formats, key=sortKey))
4✔
1316
    formats_.sort(key=lambda f: (f["pretty"], f["unclosedelements"]))
4✔
1317
    return {"versions": sorted(versions), "formats": formats_}
4✔
1318

1319

1320
###############################################################################
1321
# OFX PARSING
1322
###############################################################################
1323
def verify_status(
4✔
1324
    trnrs: models.SONRS | models.PROFTRNRS | models.ACCTINFOTRNRS,
1325
) -> None:
1326
    """
1327
    Input a models.Aggregate instance representing a transaction wrapper.
1328
    """
1329
    status = trnrs.status
4✔
1330
    if status.code != 0:
4✔
1331
        cls = trnrs.__class__.__name__
4✔
1332
        msg = (
4✔
1333
            f"{cls}: Request failed, code={status.code}, "
1334
            f"severity={status.severity}, message={status.message!r}"
1335
        )
1336
        logger.error(msg)
4✔
1337
        raise ValueError(msg)
4✔
1338

1339

1340
def _acctIsActive(acctinfo: AcctInfo) -> bool:
4✔
1341
    return acctinfo.svcstatus == "ACTIVE"
4✔
1342

1343

1344
def extract_signoninfos(markup: BytesIO) -> Iterator[models.SIGNONINFO]:
4✔
1345
    """
1346
    Input seralized OFX containing PROFRS
1347
    Output list of ofxtools.models.SIGNONINFO instances
1348
    """
1349
    parser = OFXTree()
4✔
1350
    parser.parse(markup)
4✔
1351
    ofx = parser.convert()
4✔
1352

1353
    sonrs: models.SONRS | None = ofx.signonmsgsrsv1.sonrs
4✔
1354
    if not isinstance(sonrs, models.SONRS):
4✔
1355
        raise ValueError(f"Expected SONRS, got {type(sonrs).__name__}")
×
1356
    verify_status(sonrs)
4✔
1357

1358
    msgs: models.PROFMSGSRSV1 | None = ofx.profmsgsrsv1
4✔
1359
    if msgs is None:
4✔
1360
        raise ValueError("OFX response missing PROFMSGSRSV1")
×
1361

1362
    def extract_signoninfo(trnrs: models.PROFTRNRS) -> list[models.SIGNONINFO]:
4✔
1363
        verify_status(trnrs)
4✔
1364
        rs: models.PROFRS | None = trnrs.profrs
4✔
1365
        if rs is None:
4✔
1366
            raise ValueError("PROFTRNRS missing PROFRS")
×
1367

1368
        list_: models.SIGNONINFOLIST | None = rs.signoninfolist
4✔
1369
        if list_ is None:
4✔
1370
            raise ValueError("PROFRS missing SIGNONINFOLIST")
×
1371
        return list_
4✔
1372

1373
    return itertools.chain.from_iterable(extract_signoninfo(trnrs) for trnrs in msgs)
4✔
1374

1375

1376
def extract_acctinfos(markup: BytesIO) -> Iterator[AcctInfo]:
4✔
1377
    """
1378
    Input seralized OFX containing ACCTINFORS
1379
    Output dict-like object containing parsed *ACCTINFOs
1380
    """
1381
    parser = OFXTree()
4✔
1382
    parser.parse(markup)
4✔
1383
    ofx = parser.convert()
4✔
1384

1385
    sonrs = ofx.signonmsgsrsv1.sonrs
4✔
1386
    if not isinstance(sonrs, models.SONRS):
4✔
1387
        raise ValueError(f"Expected SONRS, got {type(sonrs).__name__}")
×
1388
    verify_status(sonrs)
4✔
1389

1390
    msgs = ofx.signupmsgsrsv1
4✔
1391
    if msgs is None or len(msgs) != 1:
4✔
1392
        raise ValueError(
×
1393
            f"Expected exactly 1 SIGNUPMSGSRSV1, got {len(msgs) if msgs is not None else 0}"
1394
        )
1395
    trnrs = msgs[0]
4✔
1396
    if not isinstance(trnrs, models.ACCTINFOTRNRS):
4✔
1397
        raise ValueError(f"Expected ACCTINFOTRNRS, got {type(trnrs).__name__}")
×
1398
    verify_status(trnrs)
4✔
1399

1400
    acctinfors = trnrs.acctinfors
4✔
1401
    if not isinstance(acctinfors, models.ACCTINFORS):
4✔
1402
        raise ValueError(f"Expected ACCTINFORS, got {type(acctinfors).__name__}")
×
1403

1404
    # ACCTINFOs are ListAggregates of ACCTINFORS
1405
    # *ACCTINFOs are ListAggregates of ACCTINFO
1406
    # The data we want is in a nested list
1407
    return itertools.chain.from_iterable(acctinfors)
4✔
1408

1409

1410
def parse_bankacctinfos(acctinfos: Sequence[models.BANKACCTINFO]) -> ParsedAcctinfo:
4✔
1411
    bankids = []
4✔
1412
    args_: MutableMapping = defaultdict(list)
4✔
1413
    for inf in acctinfos:
4✔
1414
        if _acctIsActive(inf):
4✔
1415
            bankids.append(inf.bankid)
4✔
1416
            args_[inf.accttype.lower()].append(inf.acctid)
4✔
1417

1418
    args_["bankid"] = utils.collapseToSingle(bankids, "BANKIDs")
4✔
1419
    return dict(args_)
4✔
1420

1421

1422
def parse_invacctinfos(acctinfos: Sequence[models.INVACCTINFO]) -> ParsedAcctinfo:
4✔
1423
    brokerids = []
4✔
1424
    args_: MutableMapping = defaultdict(list)
4✔
1425
    for inf in acctinfos:
4✔
1426
        if _acctIsActive(inf):
4✔
1427
            acctfrom = inf.invacctfrom
4✔
1428
            brokerids.append(acctfrom.brokerid)
4✔
1429
            args_["investment"].append(acctfrom.acctid)
4✔
1430

1431
    args_["brokerid"] = utils.collapseToSingle(brokerids, "BROKERIDs")
4✔
1432
    return dict(args_)
4✔
1433

1434

1435
def parse_ccacctinfos(acctinfos: Sequence[models.CCACCTINFO]) -> ParsedAcctinfo:
4✔
1436
    return {"creditcard": [i.acctid for i in acctinfos if _acctIsActive(i)]}
4✔
1437

1438

1439
###############################################################################
1440
# CLI UTILITIES
1441
###############################################################################
1442
def list_fis(args: ArgsType) -> None:
4✔
1443
    server = args["server"]
4✔
1444
    if server in NULL_ARGS:
4✔
1445
        entries = ["{:<40}{:<30}".format(*srv) for srv in fi_index()]
4✔
1446
        entries.insert(0, " ".join(("=" * 39, "=" * 29)))
4✔
1447
        entries.insert(0, "{:^40}{:^30}".format("Name", "Nickname"))
4✔
1448
        pydoc.pager("\n".join(entries))
4✔
1449
    elif server not in USERCFG:
4✔
1450
        msg = f"Unknown server '{server}'"
4✔
1451
        raise ValueError(msg)
4✔
1452
    else:
1453
        name = USERCFG["NAMES"].get(server, "")
4✔
1454
        config = [" = ".join(pair) for pair in USERCFG[server].items()]
4✔
1455
        print()
4✔
1456
        if name:
4✔
1457
            print(name)
4✔
1458
        print("\n".join(config))
4✔
1459
        print()
4✔
1460

1461

1462
def fi_index() -> Sequence[tuple[str, str]]:
4✔
1463
    """All FIs known to ofxget"""
1464
    names = {nick: name for nick, name in USERCFG["NAMES"].items()}
4✔
1465
    cfg_default_sect = USERCFG.default_section
4✔
1466
    servers = [
4✔
1467
        (names.get(nick, ""), nick)
1468
        for nick, sct in USERCFG.items()
1469
        if nick not in (cfg_default_sect, "NAMES") and "url" in sct
1470
    ]
1471

1472
    def sortkey(srv):
4✔
1473
        key = srv[0].lower()
4✔
1474
        if key.startswith("the "):
4✔
1475
            key = key[4:]
×
1476
        return key
4✔
1477

1478
    servers.sort(key=sortkey)
4✔
1479
    return servers
4✔
1480

1481

1482
def convert_datetime(args: ArgsType) -> Mapping[str, datetime.datetime | None]:
4✔
1483
    """Convert dtstart/dtend/dtasof to Python datetime type for request"""
1484
    D = DateTime().convert
4✔
1485
    return {d[2:]: D(args[d] or None) for d in ("dtstart", "dtend", "dtasof")}
4✔
1486

1487

1488
def get_passwd(args: ArgsType) -> str:
4✔
1489
    """
1490
    1.  For dry run, use dummy password from OFX spec
1491
    2.  If python-keyring is installed and neither --nokeyring nor --savepass is set,
1492
        try to use it!
1493
    3.  Prompt for password in terminal
1494
    """
1495
    if args["dryrun"]:
4✔
1496
        logger.debug("Dry run; using dummy password")
4✔
1497
        password = "{:0<32}".format("anonymous")
4✔
1498
    elif args["password"]:
4✔
1499
        return args["password"]
×
1500
    else:
1501
        password = ""
4✔
1502
        if all(
4✔
1503
            (
1504
                HAS_KEYRING,
1505
                "nokeyring" in args and not args["nokeyring"],
1506
                "savepass" in args and not args["savepass"],
1507
            )
1508
        ):
1509
            server = args["server"]
×
1510
            logger.debug("Found python-keyring; loading password for {server}")
×
1511
            try:
×
1512
                password = keyring.get_password("ofxtools", server) or ""
×
1513
            except keyring.errors.KeyringError as err:
×
1514
                msg = (
×
1515
                    f"keyring.get_password('ofxtools', {server}) failed: "
1516
                    f"'{err.args[0]}'"
1517
                )
1518
                logger.error(msg)
×
1519
        if not password:
4✔
1520
            password = getpass.getpass()
4✔
1521
    return password
4✔
1522

1523

1524
def save_passwd(args: Mapping, password: str) -> None:
4✔
1525
    if "dryrun" in args and args["dryrun"]:
4✔
1526
        msg = "Dry run; won't store password"
4✔
1527
        warnings.warn(msg, category=SyntaxWarning)
4✔
1528
        return
4✔
1529
    if "nokeyring" in args and args["nokeyring"]:
4✔
1530
        msg = "python-keyring disabled; won't store password"
×
1531
        warnings.warn(msg, category=SyntaxWarning)
×
1532
        return
×
1533
    if not password:
4✔
1534
        msg = "Empty password; won't store"
4✔
1535
        warnings.warn(msg, category=SyntaxWarning)
4✔
1536
        return
4✔
1537
    if not HAS_KEYRING:
4✔
1538
        msg = "Can't find python-keyring pacakge; can't save password"
4✔
1539
        logger.error(msg)
4✔
1540
        raise RuntimeError(msg)
4✔
1541

1542
    server = args["server"]
×
1543
    logger.debug("Found python-keyring; storing password for {server}")
×
1544
    keyring.set_password("ofxtools", server, password)
×
1545

1546

1547
LOG_LEVELS = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG}
4✔
1548

1549

1550
# Map "request" arg to handler function
1551
REQUEST_HANDLERS = {
4✔
1552
    "list": list_fis,
1553
    "scan": scan_profile,
1554
    "prof": request_profile,
1555
    "acctinfo": request_acctinfo,
1556
    "stmt": request_stmt,
1557
    "stmtend": request_stmtend,
1558
    "tax1099": request_tax1099,
1559
}
1560

1561

1562
def main() -> None:
4✔
1563
    argparser = make_argparser()
4✔
1564
    args_ = argparser.parse_args()
4✔
1565

1566
    verbosity = getattr(args_, "verbose", 0)
4✔
1567
    log_level = LOG_LEVELS.get(verbosity, logging.DEBUG)
4✔
1568
    config.configure_logging(log_level)
4✔
1569

1570
    logger.debug(f"Parsed CLI args: {extractns(args_)}")
4✔
1571

1572
    if not hasattr(args_, "request"):
4✔
1573
        argparser.print_help()
×
1574
        sys.exit()
×
1575

1576
    args = merge_config(args_, USERCFG)
4✔
1577
    REQUEST_HANDLERS[args["request"]](args)
4✔
1578

1579

1580
if __name__ == "__main__":
4✔
1581
    main()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc