• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

blue-marble / gridpath / 26596344027

28 May 2026 07:09PM UTC coverage: 88.671% (-0.1%) from 88.813%
26596344027

Pull #1360

github

web-flow
Merge daf8e5e2a into 70957e555
Pull Request #1360: Optional BA-level project aggregation via --aggregate_projects

22 of 28 new or added lines in 8 files covered. (78.57%)

278 existing lines in 6 files now uncovered.

28318 of 31936 relevant lines covered (88.67%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/gridpath/common_functions.py
1
# Copyright 2016-2025 Blue Marble Analytics LLC.
2
# Copyright 2026 Sylvan Energy Analytics LLC.
3
#
4
# Licensed under the Apache License, Version 2.0 (the "License");
5
# you may not use this file except in compliance with the License.
6
# You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

16
import os.path
1✔
17
import sys
1✔
18
import warnings
1✔
19

20
from argparse import ArgumentParser
1✔
21

22
import pandas as pd
1✔
23

24

25
def determine_scenario_directory(scenario_location, scenario_name):
1✔
26
    """
27
    :param scenario_location: string, the base directory
28
    :param scenario_name: string, the scenario name
29
    :return: the scenario directory (string)
30

31
    Determine the scenario directory given a base directory and the scenario
32
    name. If no base directory is specified, use a directory named
33
    'scenarios' in the root directory (one level down from the current
34
    working directory).
35
    """
36
    if scenario_location is None:
1✔
37
        main_directory = os.path.join(os.getcwd(), "..", "scenarios")
×
38
    else:
39
        main_directory = scenario_location
1✔
40

41
    scenario_directory = os.path.join(main_directory, str(scenario_name))
1✔
42

43
    return scenario_directory
1✔
44

45

46
def create_directory_if_not_exists(directory):
1✔
47
    """
48
    :param directory: string; the directory path
49

50
    Check if a directory exists and create it if not.
51
    """
52
    if not os.path.exists(directory):
1✔
53
        os.makedirs(directory)
×
54

55

56
def get_required_e2e_arguments_parser():
1✔
57
    """
58
    :return: the common parser for all e2e arguments
59

60
    Create ArgumentParser object which has the common set of arguments all
61
    end-to-end scripts. This includes the information for accessing local
62
    scenario data and whether to print run output.
63

64
    We can then simply add 'parents=[get_required_e2e_arguments_parser()]'
65
    when we create a parser for a script to inherit these common arguments.
66

67
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
68
    (one for parent and one for each child), which will throw an error.
69
    """
70

71
    parser = ArgumentParser(add_help=False)
1✔
72
    parser.add_argument(
1✔
73
        "--scenario_location",
74
        default="../scenarios",
75
        help="The path to the directory in which to create "
76
        "the scenario directory. Defaults to "
77
        "'../scenarios' if not specified.",
78
    )
79
    parser.add_argument(
1✔
80
        "--quiet", default=False, action="store_true", help="Don't print run output."
81
    )
82

83
    parser.add_argument(
1✔
84
        "--verbose",
85
        default=False,
86
        action="store_true",
87
        help="Print extra output, e.g. current module info.",
88
    )
89

90
    return parser
1✔
91

92

93
def get_scenario_name_parser():
1✔
94
    """
95
    Create ArgumentParser object which has the common set of arguments for
96
    getting the scenario name
97

98
    We can then simply add 'parents=[get_scenario_names_parser()]' when we
99
    create a parser for a script to inherit these common arguments.
100

101
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
102
    (one for parent and one for each child), which will throw an error.
103
    :return:
104
    """
105

106
    parser = ArgumentParser(add_help=False)
1✔
107
    required = parser.add_argument_group("required arguments")
1✔
108
    required.add_argument(
1✔
109
        "--scenario",
110
        required=True,
111
        type=str,
112
        help="Name of the scenario problem to solve.",
113
    )
114

115
    return parser
1✔
116

117

118
def get_db_parser():
1✔
119
    """
120
    Create ArgumentParser object which has the common set of arguments for
121
    accessing scenario data from the database.
122

123
    We can then simply add 'parents=[get_db_parser()]' when we create a
124
    parser for a script to inherit these common arguments.
125

126
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
127
    (one for parent and one for each child), which will throw an error.
128
    :return:
129
    """
130

131
    parser = ArgumentParser(add_help=False)
1✔
132
    parser.add_argument(
1✔
133
        "--database",
134
        default="../db/io.db",
135
        help="The database file path relative to the current "
136
        "working directory. Defaults to ../db/io.db ",
137
    )
138
    parser.add_argument(
1✔
139
        "--scenario_id",
140
        type=int,
141
        help="The scenario_id from the database. Not needed "
142
        "if scenario is specified.",
143
    )
144
    parser.add_argument(
1✔
145
        "--scenario",
146
        type=str,
147
        help="The scenario_name from the database. Not "
148
        "needed if scenario_id is specified.",
149
    )
150

151
    return parser
1✔
152

153

154
def get_temporal_structure_csv_overwrite_parser():
1✔
155
    """ """
156
    parser = ArgumentParser(add_help=False)
1✔
157
    parser.add_argument(
1✔
158
        "--temporal_structure_csv_overwrite",
159
        default=False,
160
        action="store_true",
161
        help="Overwrite the temporal structure from the database with the "
162
        "provided CSV file.",
163
    )
164
    parser.add_argument(
1✔
165
        "--temporal_structure_csv_path",
166
        help="Path to the CSV where the temporal structure is defined.",
167
    )
168

169
    return parser
1✔
170

171

172
def get_get_inputs_parser():
1✔
173
    """ """
174

175
    parser = ArgumentParser(add_help=False)
1✔
176
    parser.add_argument(
1✔
177
        "--n_parallel_get_inputs",
178
        default=1,
179
        help="Get inputs for n subproblems in parallel.",
180
    )
181

182
    return parser
1✔
183

184

185
def get_run_scenario_parser():
1✔
186
    """
187
    Create ArgumentParser object which has the common set of arguments for
188
    solving a scenario (see run_scenario.py and run_end_to_end.py).
189

190
    We can then simply add 'parents=[get_solve_parser()]' when we create a
191
    parser for a script to inherit these common arguments.
192

193
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
194
    (one for parent and one for each child), which will throw an error.
195
    :return:
196
    """
197

198
    parser = ArgumentParser(add_help=False)
1✔
199

200
    # Output options
201
    parser.add_argument(
1✔
202
        "--log",
203
        default=False,
204
        action="store_true",
205
        help="Log output to a file in the scenario's 'logs' "
206
        "directory as well as the terminal.",
207
    )
208
    # Problem files and solutions
209
    parser.add_argument(
1✔
210
        "--create_lp_problem_file_only",
211
        default=False,
212
        action="store_true",
213
        help="Create and save the problem file, but don't solve yet.",
214
    )
215
    parser.add_argument(
1✔
216
        "--load_cplex_solution",
217
        default=False,
218
        action="store_true",
219
        help="Skip solve and load results from a CPLEX solution file instead.",
220
    )
221
    parser.add_argument(
1✔
222
        "--load_gurobi_solution",
223
        default=False,
224
        action="store_true",
225
        help="Skip solve and load results from a Gurobi solution file instead.",
226
    )
227
    parser.add_argument(
1✔
228
        "--load_highs_solution",
229
        default=False,
230
        action="store_true",
231
        help="Skip solve and load results from a HiGHS solution file instead.",
232
    )
233
    # Solver options
234
    parser.add_argument(
1✔
235
        "--solver",
236
        help="Name of the solver to use. "
237
        "GridPath will use Cbc if solver is "
238
        "not specified here and a "
239
        "'solver_options.csv' file does not "
240
        "exist in the scenario directory.",
241
    )
242
    parser.add_argument(
1✔
243
        "--solver_executable",
244
        help="The path to the solver executable to use. This "
245
        "is optional; if you don't specify it, "
246
        "Pyomo will look for the solver executable in "
247
        "your PATH. The solver specified with the "
248
        "--solver option must be the same as the solver "
249
        "for which you are providing an executable.",
250
    )
251
    parser.add_argument(
1✔
252
        "--mute_solver_output",
253
        default=False,
254
        action="store_true",
255
        help="Don't print solver output.",
256
    )
257
    parser.add_argument(
1✔
258
        "--write_solver_files_to_logs_dir",
259
        default=False,
260
        action="store_true",
261
        help="Write the temporary " "solver files to the logs " "directory.",
262
    )
263
    parser.add_argument(
1✔
264
        "--keepfiles",
265
        default=False,
266
        action="store_true",
267
        help="Save temporary solver files.",
268
    )
269
    parser.add_argument(
1✔
270
        "--symbolic",
271
        default=False,
272
        action="store_true",
273
        help="Use symbolic labels in solver files.",
274
    )
275
    parser.add_argument(
1✔
276
        "--report_timing",
277
        default=False,
278
        action="store_true",
279
    )
280
    # Flag for test runs (various changes in behavior)
281
    parser.add_argument(
1✔
282
        "--testing",
283
        default=False,
284
        action="store_true",
285
        help="Flag for test suite runs.",
286
    )
287

288
    # Parallel solve
289
    parser.add_argument(
1✔
290
        "--n_parallel_solve",
291
        default=1,
292
        help="Solve n subproblems in parallel.",
293
    )
294

295
    # Solve only incomplete subproblems
296
    parser.add_argument(
1✔
297
        "--incomplete_only",
298
        default=False,
299
        action="store_true",
300
        help="Solve only incomplete subproblems, i.e. do no re-solve if "
301
        "results are found. The subproblem is assumed complete if the"
302
        "termination_condition.txt file is found.",
303
    )
304

305
    # Results export rule name
306
    parser.add_argument(
1✔
307
        "--results_export_rule",
308
        help="The name of the rule to use to decide whether to export results.",
309
    )
310

311
    parser.add_argument(
1✔
312
        "--results_export_summary_rule",
313
        help="The name of the rule to use to decide whether to export "
314
        "summary results.",
315
    )
316

317
    parser.add_argument(
1✔
318
        "--skip_quick_summary",
319
        default=False,
320
        action="store_true",
321
        help="Skip quick summary text file",
322
    )
323

324
    return parser
1✔
325

326

327
def get_import_results_parser():
1✔
328
    parser = ArgumentParser(add_help=False)
1✔
329
    parser.add_argument(
1✔
330
        "--results_import_rule",
331
        help="The name of the rule to use to decide whether to import results.",
332
    )
333
    parser.add_argument(
1✔
334
        "--ignore_incomplete",
335
        default=False,
336
        action="store_true",
337
        help="Ignore problems with no results. Can be used to import results "
338
        "for subproblems before all other subproblems have been solved. "
339
        "Proceed with caution.",
340
    )
341

342
    return parser
1✔
343

344

345
def ensure_empty_string(string):
1✔
346
    empty_string_ensured = "" if string == "empty_string" else string
1✔
347

348
    return empty_string_ensured
1✔
349

350

351
def create_logs_directory_if_not_exists(
1✔
352
    scenario_directory,
353
    weather_iteration,
354
    hydro_iteration,
355
    availability_iteration,
356
    subproblem,
357
    stage,
358
):
359
    """
360
    Create a logs directory if it doesn't exist already
361
    :param scenario_directory:
362
    :param subproblem:
363
    :param stage:
364
    :return:
365
    """
366
    logs_directory = os.path.join(
×
367
        scenario_directory,
368
        weather_iteration,
369
        hydro_iteration,
370
        availability_iteration,
371
        subproblem,
372
        stage,
373
        "logs",
374
    )
375
    if not os.path.exists(logs_directory):
×
376
        os.makedirs(logs_directory)
×
377
    return logs_directory
×
378

379

380
class Logging(object):
1✔
381
    """
382
    Log output to both standard output and a log file. This will be
383
    accomplished by assigning this class to sys.stdout.
384
    """
385

386
    def __init__(self, logs_dir, start_time, e2e, process_id):
1✔
387
        """
388
        Assign sys.stdout and a log file as output destinations
389

390
        :param logs_dir:
391
        """
392
        self.terminal = sys.stdout
×
393

394
        # If logging only run_scenario, print to a file starting with opt_
395
        # and the datetime
396
        # If logging run_e2e, print to a file starting with e2e_, with the
397
        # datetime, and the process ID
398
        if not e2e:
×
399
            self.log_file_path = os.path.join(
×
400
                logs_dir, "opt_{}.log".format(string_from_time(start_time))
401
            )
402
        else:
403
            self.log_file_path = os.path.join(
×
404
                logs_dir,
405
                "e2e_{}_pid_{}.log".format(
406
                    string_from_time(start_time), str(process_id)
407
                ),
408
            )
409

410
        self.log_file = open(self.log_file_path, "a", buffering=1)
×
411

412
    def __getattr__(self, attr):
1✔
413
        """
414
        Default to sys.stdout when calling attributes for this class
415

416
        :param attr:
417
        :return:
418
        """
419
        return getattr(self.terminal, attr)
×
420

421
    def write(self, message):
1✔
422
        """
423
        Output to both terminal and a log file. The print statement will
424
        call the write() method of any object you assign to sys.stdout
425
        (in this case the Logging object)
426

427
        :param message:
428
        :return:
429
        """
430
        self.terminal.write(message)
×
431
        self.log_file.write(message)
×
432

433
        # Find a print statement
434
        # import collections
435
        # import inspect
436

437
        # if message.strip():
438
        #     Record = collections.namedtuple(
439
        #         'Record',
440
        #         'frame filename line_number function_name lines index')
441
        #
442
        #     record = Record(*inspect.getouterframes(inspect.currentframe())[1])
443
        #     self.terminal.write(
444
        #         '{f} {n}: '.format(f=record.filename, n=record.line_number))
445
        # self.terminal.write(message)
446

447
    def flush(self):
1✔
448
        """
449
        Flush both the terminal and the log file
450

451
        :return:
452
        """
453
        self.terminal.flush()
×
454
        self.log_file.flush()
×
455

456
    def close(self):
1✔
457
        """
458
        Close the log file to release the file descriptor.
459
        Critical for preventing "too many open files" errors.
460
        """
UNCOV
461
        if hasattr(self, "log_file") and self.log_file and not self.log_file.closed:
×
462
            self.log_file.close()
×
463

464
    def __del__(self):
1✔
465
        """
466
        Ensure log file is closed when object is garbage collected
467
        """
UNCOV
468
        self.close()
×
469

470
    def __enter__(self):
1✔
471
        """
472
        Support context manager protocol
473
        """
UNCOV
474
        return self
×
475

476
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
477
        """
478
        Close log file when exiting context manager
479
        """
UNCOV
480
        self.close()
×
UNCOV
481
        return False
×
482

483

484
def string_from_time(datetime_string):
1✔
485
    """
486
    :param datetime_string: datetime string
487
    :return: formatted time string
488
    """
UNCOV
489
    return datetime_string.strftime("%Y-%m-%d_%H-%M-%S")
×
490

491

492
def create_results_df(index_columns, results_columns, data):
1✔
493
    df = pd.DataFrame(
1✔
494
        columns=index_columns + results_columns,
495
        data=data,
496
    ).set_index(index_columns)
497

498
    return df
1✔
499

500

501
def duals_wrapper(m, component, verbose=False):
1✔
502
    try:
1✔
503
        return m.dual[component]
1✔
UNCOV
504
    except KeyError:
×
UNCOV
505
        if verbose:
×
UNCOV
506
            warnings.warn(f"""
×
507
                KeyError caught when saving duals for {component}. Duals were 
508
                not exported. This is expected if solving a MIP with CPLEX (and 
509
                possibly other solvers), not otherwise.
510
                """)
UNCOV
511
        return None
×
512

513

514
def none_dual_type_error_wrapper(component, coefficient):
1✔
515
    try:
1✔
516
        return component / coefficient
1✔
UNCOV
517
    except TypeError:
×
UNCOV
518
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc