• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Sage-Bionetworks-IT / lambda-mips-api / 15838468732

24 Jun 2025 12:51AM UTC coverage: 91.738% (-4.2%) from 95.968%
15838468732

Pull #50

github

web-flow
Merge 51397c2fd into 192557697
Pull Request #50: [IT-4373] Add endpoint to emit current balances as CSV

149 of 170 new or added lines in 1 file covered. (87.65%)

29 existing lines in 1 file now uncovered.

322 of 351 relevant lines covered (91.74%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.74
/mips_api/__init__.py
1
import csv
1✔
2
import io
1✔
3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
from datetime import date
1✔
8

9

10
import backoff
1✔
11
import boto3
1✔
12
import requests
1✔
13
from requests.exceptions import RequestException
1✔
14
from urllib3.exceptions import RequestError
1✔
15

16
LOG = logging.getLogger(__name__)
1✔
17
LOG.setLevel(logging.DEBUG)
1✔
18

19
_mip_url_login = "https://login.mip.com/api/v1/sso/mipadv/login"
1✔
20
_mip_url_coa_segments = "https://api.mip.com/api/coa/segments"
1✔
21
_mip_url_coa_accounts = "https://api.mip.com/api/coa/segments/accounts"
1✔
22
_mip_url_current_balance = (
1✔
23
    "https://api.mip.com/api/model/CBODispBal/methods/GetAccountBalances"
24
)
25
_mip_url_logout = "https://api.mip.com/api/security/logout"
1✔
26

27
# These are global so that they can be stubbed in test.
28
# Because they are global their value will be retained
29
# in the lambda environment and re-used on warm runs.
30
ssm_client = None
1✔
31
s3_client = None
1✔
32

33

34
def _get_os_var(varnam):
1✔
35
    try:
1✔
36
        return os.environ[varnam]
1✔
37
    except KeyError as exc:
1✔
38
        raise Exception(f"The environment variable '{varnam}' must be set")
1✔
39

40

41
def _parse_codes(codes):
1✔
42
    data = []
1✔
43
    if codes:
1✔
44
        data = codes.split(",")
1✔
45
    return data
1✔
46

47

48
def _param_bool(params, param):
1✔
49
    if params and param in params:
1✔
50
        if params[param].lower() not in ["false", "no", "off"]:
1✔
51
            return True
1✔
52
    return False
1✔
53

54

55
def _param_hide_inactive_bool(params):
1✔
56
    # default True
57
    return not _param_bool(params, "show_inactive_codes")
1✔
58

59

60
def _param_show_other_bool(params):
1✔
61
    # default False
62
    return _param_bool(params, "show_other_code")
1✔
63

64

65
def _param_show_no_program_bool(params):
1✔
66
    # default True
67
    return not _param_bool(params, "hide_no_program_code")
1✔
68

69

70
def _param_limit_int(params):
1✔
71
    if params and "limit" in params:
1✔
72
        try:
1✔
73
            return int(params["limit"])
1✔
74
        except ValueError as exc:
1✔
75
            err_str = "QueryStringParameter 'limit' must be an Integer"
1✔
76
            raise ValueError(err_str) from exc
1✔
77
    return 0
1✔
78

79

80
def _param_priority_list(params):
1✔
81
    if params and "priority_codes" in params:
1✔
82
        return _parse_codes(params["priority_codes"])
1✔
83

84
    return None
1✔
85

86

87
# helper functions to encapsulate the body, headers, and status code
88
def _build_return_json(code, body):
1✔
89
    return {
1✔
90
        "statusCode": code,
91
        "body": json.dumps(body, indent=2),
92
    }
93

94

95
def _build_return_text(code, body):
1✔
96
    return {
1✔
97
        "statusCode": code,
98
        "body": body,
99
    }
100

101

102
def collect_secrets(ssm_path):
1✔
103
    """Collect secure parameters from SSM"""
104

105
    # create boto client
106
    global ssm_client
107
    if ssm_client is None:
1✔
UNCOV
108
        ssm_client = boto3.client("ssm")
×
109

110
    # object to return
111
    ssm_secrets = {}
1✔
112

113
    # get secret parameters from ssm
114
    params = ssm_client.get_parameters_by_path(
1✔
115
        Path=ssm_path,
116
        Recursive=True,
117
        WithDecryption=True,
118
    )
119
    if "Parameters" in params:
1✔
120
        for p in params["Parameters"]:
1✔
121
            # strip leading path plus / char
122
            if len(p["Name"]) > len(ssm_path):
1✔
UNCOV
123
                name = p["Name"][len(ssm_path) + 1 :]
×
124
            else:
125
                name = p["Name"]
1✔
126
            ssm_secrets[name] = p["Value"]
1✔
127
            LOG.info(f"Loaded secret: {name}")
1✔
128
    else:
UNCOV
129
        raise Exception(f"Invalid response from SSM client")
×
130

131
    for reqkey in ["user", "pass"]:
1✔
132
        if reqkey not in ssm_secrets:
1✔
133
            raise Exception(f"Missing required secure parameter: {reqkey}")
1✔
134

135
    return ssm_secrets
1✔
136

137

138
@backoff.on_exception(backoff.expo, (RequestError, RequestException), max_time=11)
1✔
139
def _request_login(creds):
1✔
140
    """
141
    Wrap login request with backoff decorator, using exponential backoff
142
    and running for at most 11 seconds. With a connection timeout of 4
143
    seconds, this allows two attempts.
144
    """
145
    timeout = 4
1✔
146
    LOG.info("Logging in to upstream API")
1✔
147

148
    login_response = requests.post(
1✔
149
        _mip_url_login,
150
        json=creds,
151
        timeout=timeout,
152
    )
153
    login_response.raise_for_status()
1✔
154
    token = login_response.json()["AccessToken"]
1✔
155
    return token
1✔
156

157

158
@backoff.on_exception(backoff.expo, (RequestError, RequestException), max_time=11)
1✔
159
def _request_program_segment(access_token):
1✔
160
    """
161
    Wrap the request for chart segment IDs with backoff decorator, using
162
    exponential backoff and running for at most 11 seconds. With a
163
    connection timeout of 4 seconds, this allows two attempts.
164
    Only return the ID of the "Program" segment needed for filtering.
165
    """
166
    timeout = 4
1✔
167
    LOG.info("Getting chart segments")
1✔
168

169
    # get segments from api
170
    segment_response = requests.get(
1✔
171
        _mip_url_coa_segments,
172
        headers={"Authorization-Token": access_token},
173
        timeout=timeout,
174
    )
175
    segment_response.raise_for_status()
1✔
176
    json_response = segment_response.json()
1✔
177
    LOG.debug(f"Raw segment json: {json_response}")
1✔
178

179
    # get the segment ID for the "Program" segment
180
    seg_id = None
1✔
181
    for segment in json_response["COA_SEGID"]:
1✔
182
        if segment["TITLE"] == "Program":
1✔
183
            seg_id = segment["COA_SEGID"]
1✔
184
            break
1✔
185

186
    if seg_id is None:
1✔
UNCOV
187
        raise ValueError("Program segment not found")
×
188

189
    return seg_id
1✔
190

191

192
@backoff.on_exception(backoff.expo, (RequestError, RequestException), max_time=11)
1✔
193
def _request_accounts(access_token, program_id, hide_inactive):
1✔
194
    """
195
    Wrap the request for chart of accounts with backoff decorator, using
196
    exponential backoff and running for at most 11 seconds. With a
197
    connection timeout of 4 seconds, this allows two attempts.
198
    Only return results for active accounts in the program segment.
199
    """
200
    timeout = 4
1✔
201
    LOG.info("Getting chart of accounts")
1✔
202

203
    # get segments from api
204
    account_response = requests.get(
1✔
205
        _mip_url_coa_accounts,
206
        headers={"Authorization-Token": access_token},
207
        timeout=timeout,
208
    )
209
    account_response.raise_for_status()
1✔
210
    json_response = account_response.json()
1✔
211
    LOG.debug(f"Raw account json: {json_response}")
1✔
212

213
    accounts = {}
1✔
214
    for account in json_response["COA_SEGID"]:
1✔
215
        # require "Program" segment
216
        if account["COA_SEGID"] == program_id:
1✔
217
            if hide_inactive:
1✔
218
                # require (A)ctive status
NEW
UNCOV
219
                if account["COA_STATUS"] == "A":
×
NEW
UNCOV
220
                    accounts[account["COA_CODE"]] = account["COA_TITLE"]
×
221
            else:
222
                accounts[account["COA_CODE"]] = account["COA_TITLE"]
1✔
223

224
    LOG.info(f"Chart of accounts: {accounts}")
1✔
225
    return accounts
1✔
226

227

228
@backoff.on_exception(backoff.expo, (RequestError, RequestException), max_time=11)
1✔
229
def _request_balance(access_token, period_from, period_to):
1✔
230
    timeout = 4
1✔
231
    LOG.info("Getting balances")
1✔
232

233
    # copied from chrome dev tools while clicking through the web ui
234
    body = {
1✔
235
        "BOInformation": {
236
            "constructor": {},
237
            "ModelFields": {
238
                "fields": [
239
                    {"DISPBAL_DATEFROM": period_from},
240
                    {"DISPBAL_DATETO": period_to},
241
                ],
242
                "DISPBAL_SEGINFO": [
243
                    {
244
                        "fields": [
245
                            {"GRID_PHY_ID": 0},
246
                            {"FILTER_SELECT": True},
247
                            {"FILTER_ORDER": -1},
248
                            {"FILTER_FIX": False},
249
                            {"FILTER_DATATYPE": 10},
250
                            {"FILTER_FIELDTYPE": 40},
251
                            {"FILTER_ITEM": "Program"},
252
                            {"FILTER_OPERATOR": "<>"},
253
                            {"FILTER_CRITERIA1": "<Blank>"},
254
                            {"FILTER_CRITERIA2": ""},
255
                        ]
256
                    }
257
                ],
258
            },
259
        },
260
        "MethodParameters": {"strJson": '{"level":1}'},
261
    }
262

263
    api_response = requests.post(
1✔
264
        _mip_url_current_balance,
265
        headers={"Authorization-Token": access_token},
266
        timeout=timeout,
267
        json=body,
268
    )
269
    api_response.raise_for_status()
1✔
270
    json_response = api_response.json()
1✔
271
    LOG.debug(f"Raw balance documents json: {json_response}")
1✔
272

273
    balance = json_response
1✔
274
    return balance
1✔
275

276

277
@backoff.on_exception(backoff.fibo, (RequestError, RequestException), max_time=28)
1✔
278
def _request_logout(access_token):
1✔
279
    """
280
    Wrap logout request with backoff decorator, using fibonacci backoff
281
    and running for at most 28 seconds. With a connection timeout of 6
282
    seconds, this allows three attempts.
283

284
    Prioritize spending time logging out over the other requests because
285
    failing to log out after successfully logging in will lock us out of
286
    the API; but CloudFront will only wait a maximum of 60 seconds for a
287
    response from this lambda.
288
    """
289
    timeout = 6
1✔
290
    LOG.info("Logging out of upstream API")
1✔
291

292
    requests.post(
1✔
293
        _mip_url_logout,
294
        headers={"Authorization-Token": access_token},
295
        timeout=timeout,
296
    )
297

298

299
def _chart_requests(org_name, secrets, hide_inactive):
1✔
300
    """
301
    Log into MIPS, get the chart of accounts, and log out
302
    """
303

304
    coa_dict = {}
1✔
305
    access_token = None
1✔
306

307
    mip_creds = {
1✔
308
        "username": secrets["user"],
309
        "password": secrets["pass"],
310
        "org": org_name,
311
    }
312

313
    try:
1✔
314
        # get mip access token
315
        access_token = _request_login(mip_creds)
1✔
316

317
        # get the chart segments
318
        program_id = _request_program_segment(access_token)
1✔
319

320
        # get the chart of accounts
321
        coa_dict = _request_accounts(access_token, program_id, hide_inactive)
1✔
322

323
    except Exception as exc:
1✔
324
        LOG.exception("Error interacting with upstream API")
1✔
325

326
    finally:
327
        # It's important to logout. Logging in a second time without
328
        # logging out will lock us out of the upstream API
329
        try:
1✔
330
            _request_logout(access_token)
1✔
NEW
UNCOV
331
        except Exception as exc:
×
NEW
UNCOV
332
            LOG.exception("Error logging out")
×
333

334
    return coa_dict
1✔
335

336

337
def _balance_requests(org_name, secrets):
1✔
338
    bal_dict = {}
1✔
339
    access_token = None
1✔
340

341
    mip_creds = {
1✔
342
        "username": secrets["user"],
343
        "password": secrets["pass"],
344
        "org": org_name,
345
    }
346

347
    today = date.today()
1✔
348
    LOG.info(f"Today is {today}")
1✔
349

350
    end = today.replace(day=1)  # first of this month
1✔
351
    end_str = end.strftime("%m/%d/%Y")
1✔
352
    LOG.info(f"End day is {end_str}")
1✔
353

354
    start = end.replace(month=end.month - 1)
1✔
355
    start_str = start.strftime("%m/%d/%Y")  # first day of last month
1✔
356
    LOG.info(f"Start day is {start_str}")
1✔
357

358
    try:
1✔
359
        # get mip access token
360
        access_token = _request_login(mip_creds)
1✔
361
        bal_dict = _request_balance(access_token, start_str, end_str)
1✔
NEW
UNCOV
362
    except Exception as exc:
×
NEW
UNCOV
363
        LOG.exception(exc)
×
364
    finally:
365
        # It's important to logout. Logging in a second time without
366
        # logging out will lock us out of the upstream API
367
        try:
1✔
368
            _request_logout(access_token)
1✔
NEW
UNCOV
369
        except Exception as exc:
×
NEW
UNCOV
370
            LOG.exception("Error logging out")
×
371

372
    bal_dict["period_from"] = start_str
1✔
373
    bal_dict["period_to"] = end_str
1✔
374

375
    LOG.debug(f"Balance dict: {bal_dict}")
1✔
376

377
    return bal_dict
1✔
378

379

380
def _s3_cache_read(bucket, path):
1✔
381
    """
382
    Read MIP response from S3 cache object
383
    """
384
    global s3_client
385
    if s3_client is None:
1✔
UNCOV
386
        s3_client = boto3.client("s3")
×
387

388
    data = s3_client.get_object(Bucket=bucket, Key=path)
1✔
389
    return json.loads(data["Body"].read())
1✔
390

391

392
def _s3_cache_write(data, bucket, path):
1✔
393
    """
394
    Write MIP response to S3 cache object
395
    """
396
    global s3_client
397
    if s3_client is None:
1✔
UNCOV
398
        s3_client = boto3.client("s3")
×
399

400
    body = json.dumps(data)
1✔
401
    s3_client.put_object(Bucket=bucket, Key=path, Body=body)
1✔
402

403

404
def s3_cache(src_dict, bucket, path):
1✔
405
    """
406
    Access the Chart of Accounts from MIP Cloud, and implement a write-through
407
    cache of successful responses to tolerate long-term faults in the upstream
408
    API.
409

410
    A successful API response will be stored in S3 indefinitely, to be retrieved
411
    and used in the case of an API failure.
412

413
    The S3 bucket has versioning enabled for disaster recovery, but this means
414
    that every PUT request will create a new S3 object. In order to minimize
415
    the number of objects in the bucket, read the cache value on every run and
416
    only update the S3 object if it changes.
417
    """
418

419
    out_dict = None
1✔
420
    cache_dict = None
1✔
421

422
    # always read cached value
423
    LOG.info("Read cached json from S3")
1✔
424
    try:
1✔
425
        cache_dict = _s3_cache_read(bucket, path)
1✔
426
        LOG.debug(f"Cached API response: {cache_dict}")
1✔
427
    except Exception as exc:
1✔
428
        LOG.exception("S3 read failure")
1✔
429

430
    if src_dict:
1✔
431
        # if we received a non-empty response from the upstream API, compare it
432
        # to our cached response and update the S3 write-through cache if needed
433
        if src_dict == cache_dict:
1✔
434
            LOG.debug("No change in chart of accounts")
1✔
435
        else:
436
            # store write-through cache
437
            LOG.info("Write updated chart of accounts to S3")
1✔
438
            try:
1✔
439
                _s3_cache_write(src_dict, bucket, path)
1✔
UNCOV
440
            except Exception as exc:
×
UNCOV
441
                LOG.exception("S3 write failure")
×
442
        out_dict = src_dict
1✔
443
    else:
444
        # no response (or an empty response) from the upstream API,
445
        # rely on our response cached in S3.
446
        out_dict = cache_dict
1✔
447

448
    if not out_dict:
1✔
449
        # make sure we don't return an empty value
450
        raise ValueError("No valid chart of accounts found")
1✔
451

452
    return out_dict
1✔
453

454

455
def chart_cache(org_name, secrets, bucket, path, inactive):
1✔
456
    """
457
    Access the Chart of Accounts from MIP Cloud, and implement a
458
    write-through cache of successful responses to tolerate long-term
459
    faults in the upstream API.
460

461
    A successful API response will be stored in S3 indefinitely, to be
462
    retrieved and used in the case of an API failure.
463

464
    The S3 bucket has versioning enabled for disaster recovery, but this
465
    means that every PUT request will create a new S3 object. In order
466
    to minimize the number of objects in the bucket, read the cache
467
    value on every run and only update the S3 object if it changes.
468
    """
469

470
    # get the upstream API response
471
    LOG.info("Read chart of accounts from upstream API")
1✔
472
    upstream_dict = _chart_requests(org_name, secrets, inactive)
1✔
473
    LOG.debug(f"Upstream API response: {upstream_dict}")
1✔
474

475
    chart_dict = s3_cache(upstream_dict, bucket, path)
1✔
476
    return chart_dict
1✔
477

478

479
def balance_cache(org_name, secrets, bucket, path):
1✔
NEW
UNCOV
480
    LOG.info("Read trial balances from upstream API")
×
NEW
UNCOV
481
    upstream_dict = _balance_requests(org_name, secrets)
×
NEW
UNCOV
482
    LOG.debug(f"Upstream API response: {upstream_dict}")
×
483

NEW
UNCOV
484
    bal_dict = s3_cache(upstream_dict, bucket, path)
×
NEW
UNCOV
485
    return bal_dict
×
486

487

488
def process_chart(
1✔
489
    chart_dict,
490
    omit_list,
491
    priority_codes,
492
    hide_inactive,
493
    other_code,
494
    show_other,
495
    no_program_code,
496
    show_no_program,
497
):
498
    """
499
    Process chart of accounts to remove unneeded programs,
500
    and inject some extra (meta) programs.
501

502
    5-digit codes are inactive and should be ignored in most cases.
503
    8-digit codes are active, but only the first 6 digits are significant,
504
      i.e. 12345601 and 12345602 should be deduplicated as 123456.
505
    """
506

507
    # deduplicate on shortened numeric codes
508
    # pre-populate with codes to omit to short-circuit their processing
509
    found_codes = []
1✔
510
    found_codes.extend(omit_list)
1✔
511

512
    # output object
513
    out_chart = {}
1✔
514

515
    # whether to show inactive codes
516
    code_len = 5
1✔
517
    if hide_inactive:
1✔
518
        code_len = 6
1✔
519

520
    # add short codes
521
    for code, _name in chart_dict.items():
1✔
522
        if len(code) >= code_len:
1✔
523
            # truncate active codes to the first 6 significant digits
524
            short = code[:6]
1✔
525
            # enforce AWS tags limitations
526
            # https://docs.aws.amazon.com/tag-editor/latest/userguide/best-practices-and-strats.html
527
            # enforce removing special characters globally for consistency,
528
            # only enforce string limit when listing tag values because
529
            # the string size will change.
530
            regex = r"[^\d\w\s.:/=+\-@]+"
1✔
531
            name = re.sub(regex, "", _name)
1✔
532

533
            if short in found_codes:
1✔
534
                LOG.info(f"Code {short} has already been processed")
1✔
535
                continue
1✔
536

537
            if priority_codes is not None:
1✔
538
                if short in priority_codes:
1✔
539
                    # Since Python 3.7, python dictionaries preserve
540
                    # insertion order, so to prepend an item to the top
541
                    # of the dictionary, we create a new dictionary
542
                    # inserting the target code first, then add the
543
                    # previous output, and finally save the new
544
                    # dictionary as our output.
545
                    new_chart = {short: name}
1✔
546
                    new_chart.update(out_chart)
1✔
547
                    out_chart = new_chart
1✔
548
                    found_codes.append(short)
1✔
549
                else:
550
                    out_chart[short] = name
1✔
551
                    found_codes.append(short)
1✔
552
            else:
553
                out_chart[short] = name
1✔
554
                found_codes.append(short)
1✔
555

556
    # inject "other" code
557
    if show_other:
1✔
558
        new_chart = {other_code: "Other"}
1✔
559
        new_chart.update(out_chart)
1✔
560
        out_chart = new_chart
1✔
561

562
    # inject "no program" code
563
    if show_no_program:
1✔
564
        new_chart = {no_program_code: "No Program"}
1✔
565
        new_chart.update(out_chart)
1✔
566
        out_chart = new_chart
1✔
567

568
    return out_chart
1✔
569

570

571
def limit_chart(coa_dict, limit):
1✔
572
    """
573
    Optionally limit the size of the chart based on a query-string parameter.
574
    """
575

576
    # if a 'limit' query-string parameter is defined, "slice" the dictionary
577
    if limit > 0:
1✔
578
        # https://stackoverflow.com/a/66535220/1742875
579
        # broken into two steps
580
        _coa_dict_list = list(coa_dict.items())
1✔
581
        coa_dict = dict(_coa_dict_list[:limit])
1✔
582

583
    return coa_dict
1✔
584

585

586
def list_tags(chart_dict, limit):
1✔
587
    """
588
    Generate a list of valid AWS tags. Only active codes are listed.
589

590
    The string format is `{Program Name} / {Program Code}`.
591

592
    Returns
593
        A list of strings.
594
    """
595

596
    tags = []
1✔
597

598
    # build tags from chart of accounts
599
    for code, name in chart_dict.items():
1✔
600
        # enforce AWS tags limitations
601
        # https://docs.aws.amazon.com/tag-editor/latest/userguide/best-practices-and-strats.html
602
        # max tag value length is 256, truncate
603
        # only enforce when listing tag values
604
        tag = f"{name[:245]} / {code[:6]}"
1✔
605
        tags.append(tag)
1✔
606

607
    if limit > 0:
1✔
608
        LOG.info(f"limiting output to {limit} values")
1✔
609
        return tags[0:limit]
1✔
610
    else:
611
        return tags
1✔
612

613

614
def process_balance(bal_dict, coa_dict):
1✔
615

616
    # check for success
617
    if "executionResult" not in bal_dict:
1✔
NEW
UNCOV
618
        LOG.error(f"No execution result found: '{bal_dict}'")
×
NEW
UNCOV
619
        raise KeyError("No 'executionResult' found")
×
620

621
    result = bal_dict["executionResult"]
1✔
622
    if result != "SUCCESS":
1✔
NEW
UNCOV
623
        LOG.error(f"Execution result is not 'SUCCESS': '{result}'")
×
NEW
UNCOV
624
        raise ValueError("Execution result is not 'SUCCESS'")
×
625

626
    # collate api response into a dict
627
    _data = {}
1✔
628

629
    _detail = []
1✔
630
    for k, v in bal_dict["extraInformation"].items():
1✔
631
        if k != "Level1":
1✔
NEW
UNCOV
632
            LOG.info(f"Unexpected key (not 'Level1'): {k}")
×
633
        else:
634
            _detail = v
1✔
635

636
    for d in _detail:
1✔
637
        program_id = d["DBDETAIL_SUM_SEGMENT_N2"]
1✔
638
        if program_id not in _data:
1✔
639
            _data[program_id] = {}
1✔
640

641
        if d["DBDETAIL_SUM_TYPE"] == 1:
1✔
642
            _data[program_id]["balance_start"] = d["DBDETAIL_SUM_POSTEDAMT"]
1✔
643
        elif d["DBDETAIL_SUM_TYPE"] == 2:
1✔
644
            _data[program_id]["activity"] = d["DBDETAIL_SUM_POSTEDAMT"]
1✔
645
        elif d["DBDETAIL_SUM_TYPE"] == 3:
1✔
646
            _data[program_id]["balance_end"] = d["DBDETAIL_SUM_POSTEDAMT"]
1✔
647
        else:
NEW
UNCOV
648
            LOG.error(f"Unknown balance type: {d['DBDETAIL_SUM_DESC']}")
×
649

650
    LOG.debug(f"Raw internal balance dict: {_data}")
1✔
651

652
    # List of rows in CSV
653
    out_rows = []
1✔
654

655
    # Add header row
656
    headers = [
1✔
657
        "AccountNumber",
658
        "AccountName",
659
        "PeriodStart",
660
        "PeriodEnd",
661
        "StartBalance",
662
        "Activity",
663
        "EndBalance",
664
    ]
665
    out_rows.append(headers)
1✔
666

667
    # Generate rows from input dict
668
    for k, v in _data.items():
1✔
669
        name = None
1✔
670
        if k not in coa_dict:
1✔
NEW
UNCOV
671
            LOG.error(f"Key {k} not found in chart of accounts")
×
NEW
UNCOV
672
            name = k
×
673
        else:
674
            name = coa_dict[k]
1✔
675

676
        row = [
1✔
677
            k,
678
            name,
679
            bal_dict["period_from"],
680
            bal_dict["period_to"],
681
            v["balance_start"],
682
            v["activity"],
683
            v["balance_end"],
684
        ]
685
        out_rows.append(row)
1✔
686

687
    return out_rows
1✔
688

689

690
def format_balance(bal_dict, coa_dict):
1✔
691
    csv_out = io.StringIO()
1✔
692
    csv_writer = csv.writer(csv_out)
1✔
693

694
    csv_rows = process_balance(bal_dict, coa_dict)
1✔
695
    for row in csv_rows:
1✔
696
        csv_writer.writerow(row)
1✔
697

698
    return csv_out.getvalue()
1✔
699

700

701
def lambda_handler(event, context):
1✔
702
    """Sample pure Lambda function
703

704
    Parameters
705
    ----------
706
    event: dict, required
707
        API Gateway Lambda Proxy Input Format
708

709
        Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
710

711
    context: object, required
712
        Lambda Context runtime methods and attributes
713

714
        Context doc: https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
715

716
    Returns
717
    ------
718
    API Gateway Lambda Proxy Output Format: dict
719

720
        Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
721
    """
722

723
    try:
1✔
724
        # collect environment variables
725
        mip_org = _get_os_var("MipsOrg")
1✔
726
        ssm_path = _get_os_var("SsmPath")
1✔
727
        s3_bucket = _get_os_var("CacheBucket")
1✔
728
        s3_chart_path = _get_os_var("CacheBucketPathChart")
1✔
729
        s3_balance_path = _get_os_var("CacheBucketPathBalance")
1✔
730

731
        code_other = _get_os_var("OtherCode")
1✔
732
        code_no_program = _get_os_var("NoProgramCode")
1✔
733

734
        api_routes = {
1✔
735
            "ApiChartOfAccounts": _get_os_var("ApiChartOfAccounts"),
736
            "ApiValidTags": _get_os_var("ApiValidTags"),
737
            "ApiTrialBalances": _get_os_var("ApiTrialBalances"),
738
        }
739

740
        _to_omit = _get_os_var("CodesToOmit")
1✔
741
        omit_codes_list = _parse_codes(_to_omit)
1✔
742

743
        # collect secure parameters
744
        ssm_secrets = collect_secrets(ssm_path)
1✔
745

746
        # collect query-string parameters
747
        params = {}
1✔
748
        if "queryStringParameters" in event:
1✔
749
            params = event["queryStringParameters"]
1✔
750
            LOG.debug(f"Query-string parameters: {params}")
1✔
751

752
        limit_length = _param_limit_int(params)
1✔
753
        priority_codes = _param_priority_list(params)
1✔
754
        hide_inactive = _param_hide_inactive_bool(params)
1✔
755
        show_no_program = _param_show_no_program_bool(params)
1✔
756
        show_other = _param_show_other_bool(params)
1✔
757

758
        # Process the chart of accounts from MIP, it's used in all cases
759
        raw_chart = chart_cache(
1✔
760
            mip_org,
761
            ssm_secrets,
762
            s3_bucket,
763
            s3_chart_path,
764
            hide_inactive,
765
        )
766
        LOG.debug(f"Raw chart data: {raw_chart}")
1✔
767
        coa_chart = process_chart(
1✔
768
            raw_chart,
769
            omit_codes_list,
770
            priority_codes,
771
            hide_inactive,
772
            code_other,
773
            show_other,
774
            code_no_program,
775
            show_no_program,
776
        )
777

778
        # parse the path and return appropriate data
779
        if "path" in event:
1✔
780
            event_path = event["path"]
1✔
781

782
            if event_path == api_routes["ApiTrialBalances"]:
1✔
783
                # Process current balances
784
                raw_bal = balance_cache(
1✔
785
                    mip_org, ssm_secrets, s3_bucket, s3_balance_path
786
                )
787
                bal_csv = format_balance(raw_bal, coa_chart)
1✔
788

789
                return _build_return_text(200, bal_csv)
1✔
790
            else:
791

792
                if event_path == api_routes["ApiChartOfAccounts"]:
1✔
793
                    # conditionally filter the output
794
                    _coa_chart = limit_chart(coa_chart, limit_length)
1✔
795
                    return _build_return_json(200, _coa_chart)
1✔
796

797
                elif event_path == api_routes["ApiValidTags"]:
1✔
798
                    # build a list of strings from the processed dictionary
799
                    valid_tags = list_tags(coa_chart, limit_length)
1✔
800
                    return _build_return_json(200, valid_tags)
1✔
801
                else:
802
                    return _build_return_json(404, {"error": "Invalid request path"})
1✔
803

804
        else:
805
            return _build_return_json(
1✔
806
                400, {"error": f"Invalid event: No path found: {event}"}
807
            )
808

809
    except Exception as exc:
1✔
810
        LOG.exception(exc)
1✔
811
        return _build_return_json(500, {"error": str(exc)})
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc