• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

blue-marble / gridpath / 24209898320

09 Apr 2026 07:44PM UTC coverage: 88.907% (-0.05%) from 88.956%
24209898320

Pull #1351

github

web-flow
Merge de4a2efef into ad380fcdb
Pull Request #1351: RA Toolkit updates

143 of 182 new or added lines in 14 files covered. (78.57%)

1 existing line in 1 file now uncovered.

27507 of 30939 relevant lines covered (88.91%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.94
/data_toolkit/project/create_sync_gen_input_csvs_common.py
1
# Copyright 2016-2023 Blue Marble Analytics LLC.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14

15
import os.path
1✔
16
from argparse import Namespace
1✔
17
from multiprocessing import get_context
1✔
18
from sqlite3 import Connection
1✔
19

20
import pandas as pd
1✔
21

22
from data_toolkit.project.common_methods import create_iterations_csv
1✔
23
from db.common_functions import connect_to_database
1✔
24

25

26
def get_sync_project_pool_and_make_profile_csvs(
1✔
27
    db_path,
28
    param_name,
29
    raw_data_table_name,
30
    raw_data_units_table_name,
31
    profile_scenario_id,
32
    profile_scenario_name,
33
    stage_id,
34
    output_directory,
35
    overwrite,
36
    varies_by_weather,
37
    varies_by_hydro,
38
    include_hydro_iteration_column,
39
    n_parallel_projects,
40
):
41

42
    conn = connect_to_database(db_path=db_path)
1✔
43
    c = conn.cursor()
1✔
44

45
    projects = [
1✔
46
        prj[0]
47
        for prj in c.execute(
48
            f"SELECT DISTINCT project FROM {raw_data_units_table_name};"
49
        ).fetchall()
50
    ]
51

52
    pool_data = tuple(
1✔
53
        [
54
            [
55
                db_path,
56
                prj,
57
                param_name,
58
                raw_data_table_name,
59
                raw_data_units_table_name,
60
                profile_scenario_id,
61
                profile_scenario_name,
62
                stage_id,
63
                output_directory,
64
                overwrite,
65
                varies_by_weather,
66
                varies_by_hydro,
67
                include_hydro_iteration_column,
68
            ]
69
            for prj in projects
70
        ]
71
    )
72

73
    # Pool must use spawn to work properly on Linux
74
    pool = get_context("spawn").Pool(int(n_parallel_projects))
1✔
75

76
    pool.map(create_project_profile_csv_pool, pool_data)
1✔
77
    pool.close()
1✔
78

79
    conn.close()
1✔
80

81

82
def create_project_profile_csv(
1✔
83
    db_path,
84
    project,
85
    profile_scenario_id,
86
    profile_scenario_name,
87
    stage_id,
88
    output_directory,
89
    overwrite,
90
    param_name,
91
    raw_data_table_name,
92
    raw_data_units_table_name,
93
    varies_by_weather,
94
    varies_by_hydro,
95
    include_hydro_iteration_column,
96
):
97
    conn = connect_to_database(db_path=db_path)
×
98

99
    # Get the weighted value for each of the project's constituent units,
100
    # get the UNION of these tables, and then find the project value
101
    # with SUM and GROUP BY
NEW
102
    hydro_iter_sql = "0 AS hydro_iteration," if include_hydro_iteration_column else ""
×
103
    query = f"""
×
104
        SELECT year AS weather_iteration, 
105
        {hydro_iter_sql}
106
        {stage_id} AS stage_id, 
107
        hour_of_year as timepoint, sum(weighted_{param_name}) as {param_name}
108
            FROM (
109
            SELECT year, month, day_of_month, hour_of_day, unit, 
110
            project, unit_weight, value, unit_weight * value as 
111
            weighted_{param_name},
112
                (CAST(
113
                    strftime('%j',
114
                        year || '-' || 
115
                        CASE
116
                        WHEN month > 9 THEN month
117
                        ELSE '0' || month END
118
                        || '-' || 
119
                        CASE
120
                        WHEN day_of_month > 9 THEN day_of_month
121
                        ELSE '0' || day_of_month END
122
                        ) AS DECIMAL
123
                    ) - 1) * 24 + hour_of_day AS hour_of_year
124
            FROM {raw_data_table_name}
125
            JOIN {raw_data_units_table_name}
126
            USING (unit)
127
            WHERE project = '{project}'
128
            )
129
        GROUP BY year, hour_of_year
130
        ORDER BY year, hour_of_year
131
    """
132

133
    # Put into a dataframe and add to file
134
    df = pd.read_sql(query, con=conn)
×
135

136
    filename = os.path.join(
×
137
        output_directory,
138
        f"{project}-{profile_scenario_id}-" f"{profile_scenario_name}.csv",
139
    )
140

141
    if overwrite:
×
142
        mode = "w"
×
143
    else:
144
        mode = "a"
×
145

146
    write_header = not os.path.exists(filename)
×
147

148
    df.to_csv(
×
149
        filename,
150
        mode=mode,
151
        header=True if mode == "w" or write_header else False,
152
        index=False,
153
    )
154

155
    # Create iterations CSV
NEW
156
    iterations_directory = os.path.join(output_directory, "iterations")
×
NEW
157
    os.makedirs(iterations_directory, exist_ok=True)
×
NEW
158
    create_iterations_csv(
×
159
        iterations_directory=iterations_directory,
160
        project=project,
161
        profile_id=profile_scenario_id,
162
        profile_name=profile_scenario_name,
163
        varies_by_weather=varies_by_weather,
164
        varies_by_hydro=varies_by_hydro,
165
        overwrite=overwrite,
166
    )
167

UNCOV
168
    conn.close()
×
169

170

171
def create_project_profile_csv_pool(pool_datum):
1✔
NEW
172
    [
×
173
        db_path,
174
        project,
175
        param_name,
176
        raw_data_table_name,
177
        raw_data_units_table_name,
178
        profile_scenario_id,
179
        profile_scenario_name,
180
        stage_id,
181
        output_directory,
182
        overwrite,
183
        varies_by_weather,
184
        varies_by_hydro,
185
        include_hydro_iteration_column,
186
    ] = pool_datum
187

NEW
188
    create_project_profile_csv(
×
189
        db_path=db_path,
190
        project=project,
191
        profile_scenario_id=profile_scenario_id,
192
        profile_scenario_name=profile_scenario_name,
193
        stage_id=stage_id,
194
        output_directory=output_directory,
195
        overwrite=overwrite,
196
        param_name=param_name,
197
        raw_data_table_name=raw_data_table_name,
198
        raw_data_units_table_name=raw_data_units_table_name,
199
        varies_by_weather=varies_by_weather,
200
        varies_by_hydro=varies_by_hydro,
201
        include_hydro_iteration_column=include_hydro_iteration_column,
202
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc