• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

blue-marble / gridpath / 16035649147

02 Jul 2025 08:57PM UTC coverage: 89.027% (-0.01%) from 89.037%
16035649147

push

github

anamileva
Loose ends from previous two PRs

26798 of 30101 relevant lines covered (89.03%)

2.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.53
/gridpath/common_functions.py
1
# Copyright 2016-2023 Blue Marble Analytics LLC.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import os.path
3✔
16
import sys
3✔
17
import warnings
3✔
18

19
from argparse import ArgumentParser
3✔
20

21
import pandas as pd
3✔
22

23

24
def determine_scenario_directory(scenario_location, scenario_name):
3✔
25
    """
26
    :param scenario_location: string, the base directory
27
    :param scenario_name: string, the scenario name
28
    :return: the scenario directory (string)
29

30
    Determine the scenario directory given a base directory and the scenario
31
    name. If no base directory is specified, use a directory named
32
    'scenarios' in the root directory (one level down from the current
33
    working directory).
34
    """
35
    if scenario_location is None:
3✔
36
        main_directory = os.path.join(os.getcwd(), "..", "scenarios")
×
37
    else:
38
        main_directory = scenario_location
3✔
39

40
    scenario_directory = os.path.join(main_directory, str(scenario_name))
3✔
41

42
    return scenario_directory
3✔
43

44

45
def create_directory_if_not_exists(directory):
3✔
46
    """
47
    :param directory: string; the directory path
48

49
    Check if a directory exists and create it if not.
50
    """
51
    if not os.path.exists(directory):
3✔
52
        os.makedirs(directory)
×
53

54

55
def get_required_e2e_arguments_parser():
3✔
56
    """
57
    :return: the common parser for all e2e arguments
58

59
    Create ArgumentParser object which has the common set of arguments all
60
    end-to-end scripts. This includes the information for accessing local
61
    scenario data and whether to print run output.
62

63
    We can then simply add 'parents=[get_required_e2e_arguments_parser()]'
64
    when we create a parser for a script to inherit these common arguments.
65

66
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
67
    (one for parent and one for each child), which will throw an error.
68
    """
69

70
    parser = ArgumentParser(add_help=False)
3✔
71
    parser.add_argument(
3✔
72
        "--scenario_location",
73
        default="../scenarios",
74
        help="The path to the directory in which to create "
75
        "the scenario directory. Defaults to "
76
        "'../scenarios' if not specified.",
77
    )
78
    parser.add_argument(
3✔
79
        "--quiet", default=False, action="store_true", help="Don't print run output."
80
    )
81

82
    parser.add_argument(
3✔
83
        "--verbose",
84
        default=False,
85
        action="store_true",
86
        help="Print extra output, e.g. current module info.",
87
    )
88

89
    return parser
3✔
90

91

92
def get_scenario_name_parser():
3✔
93
    """
94
    Create ArgumentParser object which has the common set of arguments for
95
    getting the scenario name
96

97
    We can then simply add 'parents=[get_scenario_names_parser()]' when we
98
    create a parser for a script to inherit these common arguments.
99

100
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
101
    (one for parent and one for each child), which will throw an error.
102
    :return:
103
    """
104

105
    parser = ArgumentParser(add_help=False)
3✔
106
    required = parser.add_argument_group("required arguments")
3✔
107
    required.add_argument(
3✔
108
        "--scenario",
109
        required=True,
110
        type=str,
111
        help="Name of the scenario problem to solve.",
112
    )
113

114
    return parser
3✔
115

116

117
def get_db_parser():
3✔
118
    """
119
    Create ArgumentParser object which has the common set of arguments for
120
    accessing scenario data from the database.
121

122
    We can then simply add 'parents=[get_db_parser()]' when we create a
123
    parser for a script to inherit these common arguments.
124

125
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
126
    (one for parent and one for each child), which will throw an error.
127
    :return:
128
    """
129

130
    parser = ArgumentParser(add_help=False)
3✔
131
    parser.add_argument(
3✔
132
        "--database",
133
        default="../db/io.db",
134
        help="The database file path relative to the current "
135
        "working directory. Defaults to ../db/io.db ",
136
    )
137
    parser.add_argument(
3✔
138
        "--scenario_id",
139
        type=int,
140
        help="The scenario_id from the database. Not needed "
141
        "if scenario is specified.",
142
    )
143
    parser.add_argument(
3✔
144
        "--scenario",
145
        type=str,
146
        help="The scenario_name from the database. Not "
147
        "needed if scenario_id is specified.",
148
    )
149

150
    return parser
3✔
151

152

153
def get_get_inputs_parser():
3✔
154
    """ """
155

156
    parser = ArgumentParser(add_help=False)
3✔
157
    parser.add_argument(
3✔
158
        "--n_parallel_get_inputs",
159
        default=1,
160
        help="Get inputs for n subproblems in parallel.",
161
    )
162

163
    return parser
3✔
164

165

166
def get_run_scenario_parser():
3✔
167
    """
168
    Create ArgumentParser object which has the common set of arguments for
169
    solving a scenario (see run_scenario.py and run_end_to_end.py).
170

171
    We can then simply add 'parents=[get_solve_parser()]' when we create a
172
    parser for a script to inherit these common arguments.
173

174
    Note that 'add_help' is set to 'False' to avoid multiple `-h/--help` options
175
    (one for parent and one for each child), which will throw an error.
176
    :return:
177
    """
178

179
    parser = ArgumentParser(add_help=False)
3✔
180

181
    # Output options
182
    parser.add_argument(
3✔
183
        "--log",
184
        default=False,
185
        action="store_true",
186
        help="Log output to a file in the scenario's 'logs' "
187
        "directory as well as the terminal.",
188
    )
189
    # Problem files and solutions
190
    parser.add_argument(
3✔
191
        "--create_lp_problem_file_only",
192
        default=False,
193
        action="store_true",
194
        help="Create and save the problem file, but don't solve yet.",
195
    )
196
    parser.add_argument(
3✔
197
        "--load_cplex_solution",
198
        default=False,
199
        action="store_true",
200
        help="Skip solve and load results from a CPLEX solution file instead.",
201
    )
202
    parser.add_argument(
3✔
203
        "--load_gurobi_solution",
204
        default=False,
205
        action="store_true",
206
        help="Skip solve and load results from a Gurobi solution file instead.",
207
    )
208
    # Solver options
209
    parser.add_argument(
3✔
210
        "--solver",
211
        help="Name of the solver to use. "
212
        "GridPath will use Cbc if solver is "
213
        "not specified here and a "
214
        "'solver_options.csv' file does not "
215
        "exist in the scenario directory.",
216
    )
217
    parser.add_argument(
3✔
218
        "--solver_executable",
219
        help="The path to the solver executable to use. This "
220
        "is optional; if you don't specify it, "
221
        "Pyomo will look for the solver executable in "
222
        "your PATH. The solver specified with the "
223
        "--solver option must be the same as the solver "
224
        "for which you are providing an executable.",
225
    )
226
    parser.add_argument(
3✔
227
        "--mute_solver_output",
228
        default=False,
229
        action="store_true",
230
        help="Don't print solver output.",
231
    )
232
    parser.add_argument(
3✔
233
        "--write_solver_files_to_logs_dir",
234
        default=False,
235
        action="store_true",
236
        help="Write the temporary " "solver files to the logs " "directory.",
237
    )
238
    parser.add_argument(
3✔
239
        "--keepfiles",
240
        default=False,
241
        action="store_true",
242
        help="Save temporary solver files.",
243
    )
244
    parser.add_argument(
3✔
245
        "--symbolic",
246
        default=False,
247
        action="store_true",
248
        help="Use symbolic labels in solver files.",
249
    )
250
    parser.add_argument(
3✔
251
        "--report_timing",
252
        default=False,
253
        action="store_true",
254
    )
255
    # Flag for test runs (various changes in behavior)
256
    parser.add_argument(
3✔
257
        "--testing",
258
        default=False,
259
        action="store_true",
260
        help="Flag for test suite runs.",
261
    )
262

263
    # Parallel solve
264
    parser.add_argument(
3✔
265
        "--n_parallel_solve",
266
        default=1,
267
        help="Solve n subproblems in parallel.",
268
    )
269

270
    # Solve only incomplete subproblems
271
    parser.add_argument(
3✔
272
        "--incomplete_only",
273
        default=False,
274
        action="store_true",
275
        help="Solve only incomplete subproblems, i.e. do no re-solve if "
276
        "results are found. The subproblem is assumed complete if the"
277
        "termination_condition.txt file is found.",
278
    )
279

280
    # Results export rule name
281
    parser.add_argument(
3✔
282
        "--results_export_rule",
283
        help="The name of the rule to use to decide whether to export results.",
284
    )
285

286
    parser.add_argument(
3✔
287
        "--results_export_summary_rule",
288
        help="The name of the rule to use to decide whether to export "
289
        "summary results.",
290
    )
291

292
    return parser
3✔
293

294

295
def get_import_results_parser():
3✔
296
    parser = ArgumentParser(add_help=False)
3✔
297
    parser.add_argument(
3✔
298
        "--results_import_rule",
299
        help="The name of the rule to use to decide whether to import results.",
300
    )
301

302
    return parser
3✔
303

304

305
def ensure_empty_string(string):
3✔
306
    empty_string_ensured = "" if string == "empty_string" else string
3✔
307

308
    return empty_string_ensured
3✔
309

310

311
def create_logs_directory_if_not_exists(
3✔
312
    scenario_directory,
313
    weather_iteration,
314
    hydro_iteration,
315
    availability_iteration,
316
    subproblem,
317
    stage,
318
):
319
    """
320
    Create a logs directory if it doesn't exist already
321
    :param scenario_directory:
322
    :param subproblem:
323
    :param stage:
324
    :return:
325
    """
326
    logs_directory = os.path.join(
×
327
        scenario_directory,
328
        weather_iteration,
329
        hydro_iteration,
330
        availability_iteration,
331
        subproblem,
332
        stage,
333
        "logs",
334
    )
335
    if not os.path.exists(logs_directory):
×
336
        os.makedirs(logs_directory)
×
337
    return logs_directory
×
338

339

340
class Logging(object):
3✔
341
    """
342
    Log output to both standard output and a log file. This will be
343
    accomplished by assigning this class to sys.stdout.
344
    """
345

346
    def __init__(self, logs_dir, start_time, e2e, process_id):
3✔
347
        """
348
        Assign sys.stdout and a log file as output destinations
349

350
        :param logs_dir:
351
        """
352
        self.terminal = sys.stdout
×
353

354
        # If logging only run_scenario, print to a file starting with opt_
355
        # and the datetime
356
        # If logging run_e2e, print to a file starting with e2e_, with the
357
        # datetime, and the process ID
358
        if not e2e:
×
359
            self.log_file_path = os.path.join(
×
360
                logs_dir, "opt_{}.log".format(string_from_time(start_time))
361
            )
362
        else:
363
            self.log_file_path = os.path.join(
×
364
                logs_dir,
365
                "e2e_{}_pid_{}.log".format(
366
                    string_from_time(start_time), str(process_id)
367
                ),
368
            )
369

370
        self.log_file = open(self.log_file_path, "a", buffering=1)
×
371

372
    def __getattr__(self, attr):
3✔
373
        """
374
        Default to sys.stdout when calling attributes for this class
375

376
        :param attr:
377
        :return:
378
        """
379
        return getattr(self.terminal, attr)
×
380

381
    def write(self, message):
3✔
382
        """
383
        Output to both terminal and a log file. The print statement will
384
        call the write() method of any object you assign to sys.stdout
385
        (in this case the Logging object)
386

387
        :param message:
388
        :return:
389
        """
390
        self.terminal.write(message)
×
391
        self.log_file.write(message)
×
392

393
        # Find a print statement
394
        # import collections
395
        # import inspect
396

397
        # if message.strip():
398
        #     Record = collections.namedtuple(
399
        #         'Record',
400
        #         'frame filename line_number function_name lines index')
401
        #
402
        #     record = Record(*inspect.getouterframes(inspect.currentframe())[1])
403
        #     self.terminal.write(
404
        #         '{f} {n}: '.format(f=record.filename, n=record.line_number))
405
        # self.terminal.write(message)
406

407
    def flush(self):
3✔
408
        """
409
        Flush both the terminal and the log file
410

411
        :return:
412
        """
413
        self.terminal.flush()
×
414
        self.log_file.flush()
×
415

416

417
def string_from_time(datetime_string):
3✔
418
    """
419
    :param datetime_string: datetime string
420
    :return: formatted time string
421
    """
422
    return datetime_string.strftime("%Y-%m-%d_%H-%M-%S")
×
423

424

425
def create_results_df(index_columns, results_columns, data):
3✔
426
    df = pd.DataFrame(
3✔
427
        columns=index_columns + results_columns,
428
        data=data,
429
    ).set_index(index_columns)
430

431
    return df
3✔
432

433

434
def duals_wrapper(m, component, verbose=False):
3✔
435
    try:
3✔
436
        return m.dual[component]
3✔
437
    except KeyError:
×
438
        if verbose:
×
439
            warnings.warn(
×
440
                f"""
441
                KeyError caught when saving duals for {component}. Duals were 
442
                not exported. This is expected if solving a MIP with CPLEX (and 
443
                possibly other solvers), not otherwise.
444
                """
445
            )
446
        return None
×
447

448

449
def none_dual_type_error_wrapper(component, coefficient):
3✔
450
    try:
3✔
451
        return component / coefficient
3✔
452
    except TypeError:
×
453
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc