• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

blue-marble / gridpath / 21263611097

22 Jan 2026 08:21PM UTC coverage: 89.055%. Remained the same
21263611097

push

github

anamileva
Lint with black==26.1.0

90 of 143 new or added lines in 74 files covered. (62.94%)

20 existing lines in 10 files now uncovered.

27550 of 30936 relevant lines covered (89.05%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.77
/gridpath/common_functions.py
1
# Copyright 2016-2023 Blue Marble Analytics LLC.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import os.path
1✔
16
import sys
1✔
17
import warnings
1✔
18

19
from argparse import ArgumentParser
1✔
20

21
import pandas as pd
1✔
22

23

24
def determine_scenario_directory(scenario_location, scenario_name):
1✔
25
    """
26
    :param scenario_location: string, the base directory
27
    :param scenario_name: string, the scenario name
28
    :return: the scenario directory (string)
29

30
    Determine the scenario directory given a base directory and the scenario
31
    name. If no base directory is specified, use a directory named
32
    'scenarios' in the root directory (one level down from the current
33
    working directory).
34
    """
35
    if scenario_location is None:
1✔
36
        main_directory = os.path.join(os.getcwd(), "..", "scenarios")
×
37
    else:
38
        main_directory = scenario_location
1✔
39

40
    scenario_directory = os.path.join(main_directory, str(scenario_name))
1✔
41

42
    return scenario_directory
1✔
43

44

45
def create_directory_if_not_exists(directory):
1✔
46
    """
47
    :param directory: string; the directory path
48

49
    Check if a directory exists and create it if not.
50
    """
51
    if not os.path.exists(directory):
1✔
52
        os.makedirs(directory)
×
53

54

55
def get_required_e2e_arguments_parser():
1✔
56
    """
57
    :return: the common parser for all e2e arguments
58

59
    Create ArgumentParser object which has the common set of arguments all
60
    end-to-end scripts. This includes the information for accessing local
61
    scenario data and whether to print run output.
62

63
    We can then simply add 'parents=[get_required_e2e_arguments_parser()]'
64
    when we create a parser for a script to inherit these common arguments.
65

66
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
67
    (one for parent and one for each child), which will throw an error.
68
    """
69

70
    parser = ArgumentParser(add_help=False)
1✔
71
    parser.add_argument(
1✔
72
        "--scenario_location",
73
        default="../scenarios",
74
        help="The path to the directory in which to create "
75
        "the scenario directory. Defaults to "
76
        "'../scenarios' if not specified.",
77
    )
78
    parser.add_argument(
1✔
79
        "--quiet", default=False, action="store_true", help="Don't print run output."
80
    )
81

82
    parser.add_argument(
1✔
83
        "--verbose",
84
        default=False,
85
        action="store_true",
86
        help="Print extra output, e.g. current module info.",
87
    )
88

89
    return parser
1✔
90

91

92
def get_scenario_name_parser():
1✔
93
    """
94
    Create ArgumentParser object which has the common set of arguments for
95
    getting the scenario name
96

97
    We can then simply add 'parents=[get_scenario_names_parser()]' when we
98
    create a parser for a script to inherit these common arguments.
99

100
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
101
    (one for parent and one for each child), which will throw an error.
102
    :return:
103
    """
104

105
    parser = ArgumentParser(add_help=False)
1✔
106
    required = parser.add_argument_group("required arguments")
1✔
107
    required.add_argument(
1✔
108
        "--scenario",
109
        required=True,
110
        type=str,
111
        help="Name of the scenario problem to solve.",
112
    )
113

114
    return parser
1✔
115

116

117
def get_db_parser():
1✔
118
    """
119
    Create ArgumentParser object which has the common set of arguments for
120
    accessing scenario data from the database.
121

122
    We can then simply add 'parents=[get_db_parser()]' when we create a
123
    parser for a script to inherit these common arguments.
124

125
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
126
    (one for parent and one for each child), which will throw an error.
127
    :return:
128
    """
129

130
    parser = ArgumentParser(add_help=False)
1✔
131
    parser.add_argument(
1✔
132
        "--database",
133
        default="../db/io.db",
134
        help="The database file path relative to the current "
135
        "working directory. Defaults to ../db/io.db ",
136
    )
137
    parser.add_argument(
1✔
138
        "--scenario_id",
139
        type=int,
140
        help="The scenario_id from the database. Not needed "
141
        "if scenario is specified.",
142
    )
143
    parser.add_argument(
1✔
144
        "--scenario",
145
        type=str,
146
        help="The scenario_name from the database. Not "
147
        "needed if scenario_id is specified.",
148
    )
149

150
    return parser
1✔
151

152

153
def get_get_inputs_parser():
1✔
154
    """ """
155

156
    parser = ArgumentParser(add_help=False)
1✔
157
    parser.add_argument(
1✔
158
        "--n_parallel_get_inputs",
159
        default=1,
160
        help="Get inputs for n subproblems in parallel.",
161
    )
162

163
    return parser
1✔
164

165

166
def get_run_scenario_parser():
1✔
167
    """
168
    Create ArgumentParser object which has the common set of arguments for
169
    solving a scenario (see run_scenario.py and run_end_to_end.py).
170

171
    We can then simply add 'parents=[get_solve_parser()]' when we create a
172
    parser for a script to inherit these common arguments.
173

174
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
175
    (one for parent and one for each child), which will throw an error.
176
    :return:
177
    """
178

179
    parser = ArgumentParser(add_help=False)
1✔
180

181
    # Output options
182
    parser.add_argument(
1✔
183
        "--log",
184
        default=False,
185
        action="store_true",
186
        help="Log output to a file in the scenario's 'logs' "
187
        "directory as well as the terminal.",
188
    )
189
    # Problem files and solutions
190
    parser.add_argument(
1✔
191
        "--create_lp_problem_file_only",
192
        default=False,
193
        action="store_true",
194
        help="Create and save the problem file, but don't solve yet.",
195
    )
196
    parser.add_argument(
1✔
197
        "--load_cplex_solution",
198
        default=False,
199
        action="store_true",
200
        help="Skip solve and load results from a CPLEX solution file instead.",
201
    )
202
    parser.add_argument(
1✔
203
        "--load_gurobi_solution",
204
        default=False,
205
        action="store_true",
206
        help="Skip solve and load results from a Gurobi solution file instead.",
207
    )
208
    # Solver options
209
    parser.add_argument(
1✔
210
        "--solver",
211
        help="Name of the solver to use. "
212
        "GridPath will use Cbc if solver is "
213
        "not specified here and a "
214
        "'solver_options.csv' file does not "
215
        "exist in the scenario directory.",
216
    )
217
    parser.add_argument(
1✔
218
        "--solver_executable",
219
        help="The path to the solver executable to use. This "
220
        "is optional; if you don't specify it, "
221
        "Pyomo will look for the solver executable in "
222
        "your PATH. The solver specified with the "
223
        "--solver option must be the same as the solver "
224
        "for which you are providing an executable.",
225
    )
226
    parser.add_argument(
1✔
227
        "--mute_solver_output",
228
        default=False,
229
        action="store_true",
230
        help="Don't print solver output.",
231
    )
232
    parser.add_argument(
1✔
233
        "--write_solver_files_to_logs_dir",
234
        default=False,
235
        action="store_true",
236
        help="Write the temporary " "solver files to the logs " "directory.",
237
    )
238
    parser.add_argument(
1✔
239
        "--keepfiles",
240
        default=False,
241
        action="store_true",
242
        help="Save temporary solver files.",
243
    )
244
    parser.add_argument(
1✔
245
        "--symbolic",
246
        default=False,
247
        action="store_true",
248
        help="Use symbolic labels in solver files.",
249
    )
250
    parser.add_argument(
1✔
251
        "--report_timing",
252
        default=False,
253
        action="store_true",
254
    )
255
    # Flag for test runs (various changes in behavior)
256
    parser.add_argument(
1✔
257
        "--testing",
258
        default=False,
259
        action="store_true",
260
        help="Flag for test suite runs.",
261
    )
262

263
    # Parallel solve
264
    parser.add_argument(
1✔
265
        "--n_parallel_solve",
266
        default=1,
267
        help="Solve n subproblems in parallel.",
268
    )
269

270
    # Solve only incomplete subproblems
271
    parser.add_argument(
1✔
272
        "--incomplete_only",
273
        default=False,
274
        action="store_true",
275
        help="Solve only incomplete subproblems, i.e. do no re-solve if "
276
        "results are found. The subproblem is assumed complete if the"
277
        "termination_condition.txt file is found.",
278
    )
279

280
    # Results export rule name
281
    parser.add_argument(
1✔
282
        "--results_export_rule",
283
        help="The name of the rule to use to decide whether to export results.",
284
    )
285

286
    parser.add_argument(
1✔
287
        "--results_export_summary_rule",
288
        help="The name of the rule to use to decide whether to export "
289
        "summary results.",
290
    )
291

292
    parser.add_argument(
1✔
293
        "--skip_quick_summary",
294
        default=False,
295
        action="store_true",
296
        help="Skip quick summary text file",
297
    )
298

299
    return parser
1✔
300

301

302
def get_import_results_parser():
1✔
303
    parser = ArgumentParser(add_help=False)
1✔
304
    parser.add_argument(
1✔
305
        "--results_import_rule",
306
        help="The name of the rule to use to decide whether to import results.",
307
    )
308

309
    return parser
1✔
310

311

312
def ensure_empty_string(string):
1✔
313
    empty_string_ensured = "" if string == "empty_string" else string
1✔
314

315
    return empty_string_ensured
1✔
316

317

318
def create_logs_directory_if_not_exists(
1✔
319
    scenario_directory,
320
    weather_iteration,
321
    hydro_iteration,
322
    availability_iteration,
323
    subproblem,
324
    stage,
325
):
326
    """
327
    Create a logs directory if it doesn't exist already
328
    :param scenario_directory:
329
    :param subproblem:
330
    :param stage:
331
    :return:
332
    """
333
    logs_directory = os.path.join(
×
334
        scenario_directory,
335
        weather_iteration,
336
        hydro_iteration,
337
        availability_iteration,
338
        subproblem,
339
        stage,
340
        "logs",
341
    )
342
    if not os.path.exists(logs_directory):
×
343
        os.makedirs(logs_directory)
×
344
    return logs_directory
×
345

346

347
class Logging(object):
1✔
348
    """
349
    Log output to both standard output and a log file. This will be
350
    accomplished by assigning this class to sys.stdout.
351
    """
352

353
    def __init__(self, logs_dir, start_time, e2e, process_id):
1✔
354
        """
355
        Assign sys.stdout and a log file as output destinations
356

357
        :param logs_dir:
358
        """
359
        self.terminal = sys.stdout
×
360

361
        # If logging only run_scenario, print to a file starting with opt_
362
        # and the datetime
363
        # If logging run_e2e, print to a file starting with e2e_, with the
364
        # datetime, and the process ID
365
        if not e2e:
×
366
            self.log_file_path = os.path.join(
×
367
                logs_dir, "opt_{}.log".format(string_from_time(start_time))
368
            )
369
        else:
370
            self.log_file_path = os.path.join(
×
371
                logs_dir,
372
                "e2e_{}_pid_{}.log".format(
373
                    string_from_time(start_time), str(process_id)
374
                ),
375
            )
376

377
        self.log_file = open(self.log_file_path, "a", buffering=1)
×
378

379
    def __getattr__(self, attr):
1✔
380
        """
381
        Default to sys.stdout when calling attributes for this class
382

383
        :param attr:
384
        :return:
385
        """
386
        return getattr(self.terminal, attr)
×
387

388
    def write(self, message):
1✔
389
        """
390
        Output to both terminal and a log file. The print statement will
391
        call the write() method of any object you assign to sys.stdout
392
        (in this case the Logging object)
393

394
        :param message:
395
        :return:
396
        """
397
        self.terminal.write(message)
×
398
        self.log_file.write(message)
×
399

400
        # Find a print statement
401
        # import collections
402
        # import inspect
403

404
        # if message.strip():
405
        #     Record = collections.namedtuple(
406
        #         'Record',
407
        #         'frame filename line_number function_name lines index')
408
        #
409
        #     record = Record(*inspect.getouterframes(inspect.currentframe())[1])
410
        #     self.terminal.write(
411
        #         '{f} {n}: '.format(f=record.filename, n=record.line_number))
412
        # self.terminal.write(message)
413

414
    def flush(self):
1✔
415
        """
416
        Flush both the terminal and the log file
417

418
        :return:
419
        """
420
        self.terminal.flush()
×
421
        self.log_file.flush()
×
422

423

424
def string_from_time(datetime_string):
1✔
425
    """
426
    :param datetime_string: datetime string
427
    :return: formatted time string
428
    """
429
    return datetime_string.strftime("%Y-%m-%d_%H-%M-%S")
×
430

431

432
def create_results_df(index_columns, results_columns, data):
1✔
433
    df = pd.DataFrame(
1✔
434
        columns=index_columns + results_columns,
435
        data=data,
436
    ).set_index(index_columns)
437

438
    return df
1✔
439

440

441
def duals_wrapper(m, component, verbose=False):
1✔
442
    try:
1✔
443
        return m.dual[component]
1✔
444
    except KeyError:
×
445
        if verbose:
×
NEW
446
            warnings.warn(f"""
×
447
                KeyError caught when saving duals for {component}. Duals were 
448
                not exported. This is expected if solving a MIP with CPLEX (and 
449
                possibly other solvers), not otherwise.
450
                """)
UNCOV
451
        return None
×
452

453

454
def none_dual_type_error_wrapper(component, coefficient):
1✔
455
    try:
1✔
456
        return component / coefficient
1✔
457
    except TypeError:
×
458
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc