• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / trailblazer / 7018400035

28 Nov 2023 11:59AM UTC coverage: 78.018%. First build
7018400035

Pull #322

github

seallard
Add test
Pull Request #322: Retrieve latest failed job via query

9 of 10 new or added lines in 3 files covered. (90.0%)

1118 of 1433 relevant lines covered (78.02%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/trailblazer/server/api.py
1
import datetime
×
2
import multiprocessing
×
3
import os
×
4
from http import HTTPStatus
×
5
from typing import Mapping
×
6

7
from flask import Blueprint, Response, abort, g, jsonify, make_response, request
×
8
from google.auth import jwt
×
9
from http import HTTPStatus
×
10
from pydantic import ValidationError
×
11
from sqlalchemy.orm import Query
×
12

13
from trailblazer.constants import (
×
14
    ONE_MONTH_IN_DAYS,
15
    TRAILBLAZER_TIME_STAMP,
16
    TrailblazerStatus,
17
)
18
from trailblazer.server.ext import store
×
19
from trailblazer.server.schemas import AnalysisUpdate
×
20
from trailblazer.store.models import Analysis, Info, Job, User
×
21
from trailblazer.utils.datetime import get_date_number_of_days_ago
×
22

23
ANALYSIS_HOST: str = os.environ.get("ANALYSIS_HOST")
×
24

25
blueprint = Blueprint("api", __name__, url_prefix="/api/v1")
×
26

27

28
def stringify_timestamps(data: dict) -> dict[str, str]:
×
29
    """Convert datetime into string before dumping in order to avoid information loss"""
30
    for key, val in data.items():
×
31
        if isinstance(val, datetime.datetime):
×
32
            data[key] = str(val)
×
33
    return data
×
34

35

36
@blueprint.before_request
×
37
def before_request():
×
38
    """Authentication that is run before processing requests to the application"""
39
    if request.method == "OPTIONS":
×
40
        return make_response(jsonify(ok=True), 204)
×
41
    if os.environ.get("SCOPE") == "DEVELOPMENT":
×
42
        return
×
43
    auth_header = request.headers.get("Authorization")
×
44
    if auth_header:
×
45
        jwt_token = auth_header.split("Bearer ")[-1]
×
46
    else:
47
        return abort(403, "no JWT token found on request")
×
48

49
    user_data: Mapping = jwt.decode(jwt_token, verify=False)
×
50
    user: User = store.get_user(email=user_data["email"], exclude_archived=True)
×
51
    if not user:
×
52
        return abort(403, f"{user_data['email']} doesn't have access")
×
53
    g.current_user = user
×
54

55

56
@blueprint.route("/analyses")
×
57
def analyses():
×
58
    """Display analyses."""
59
    per_page = int(request.args.get("per_page", 100))
×
60
    page = int(request.args.get("page", 1))
×
61
    analyses: Query = store.get_analyses_query_by_search_term_and_is_visible(
×
62
        search_term=request.args.get("query"),
63
        is_visible=bool(request.args.get("is_visible")),
64
    )
65

66
    query_page: Query = store.paginate_query(query=analyses, page=page, per_page=per_page)
×
67
    response_data = []
×
68
    for analysis in query_page.all():
×
69
        analysis_data = analysis.to_dict()
×
70
        analysis_data["user"] = analysis.user.to_dict() if analysis.user else None
×
NEW
71
        failed_job: Job | None = store.get_latest_failed_job_for_analysis(analysis.id)
×
72
        analysis_data["failed_job"] = failed_job.to_dict() if failed_job else None
×
73
        response_data.append(analysis_data)
×
74
    return jsonify(analyses=response_data)
×
75

76

77
@blueprint.route("/analyses/<int:analysis_id>", methods=["GET", "PUT"])
×
78
def analysis(analysis_id):
×
79
    """Retrieve or update an analysis."""
80
    analysis: Analysis = store.get_analysis_with_id(analysis_id)
×
81
    if analysis is None:
×
82
        return abort(404)
×
83

84
    if request.method == "PUT":
×
85
        try:
×
86
            analysis_update = AnalysisUpdate.model_validate(request.json)
×
87
            store.update_analysis(
×
88
                analysis_id=analysis_id,
89
                comment=analysis_update.comment,
90
                status=analysis_update.status,
91
                is_visible=analysis_update.is_visible,
92
            )
93
        except ValidationError as error:
×
94
            return jsonify(error=str(error)), HTTPStatus.BAD_REQUEST
×
95

96
    data = analysis.to_dict()
×
97
    data["jobs"] = [job.to_dict() for job in analysis.jobs]
×
98
    data["user"] = analysis.user.to_dict() if analysis.user else None
×
99
    return jsonify(**data)
×
100

101

102
@blueprint.route("/info")
×
103
def info():
×
104
    """Display metadata about database."""
105
    info: Info = store.get_query(table=Info).first()
×
106
    return jsonify(**info.to_dict())
×
107

108

109
@blueprint.route("/me")
×
110
def me():
×
111
    """Return information about a logged in user."""
112
    return jsonify(**g.current_user.to_dict())
×
113

114

115
@blueprint.route("/aggregate/jobs")
×
116
def aggregate_jobs():
×
117
    """Return stats about failed jobs."""
118
    time_window: datetime = get_date_number_of_days_ago(
×
119
        number_of_days_ago=int(request.args.get("days_back", ONE_MONTH_IN_DAYS))
120
    )
121
    failed_jobs: list[dict[str, str | int]] = store.get_nr_jobs_with_status_per_category(
×
122
        status=TrailblazerStatus.FAILED, since_when=time_window
123
    )
124
    return jsonify(jobs=failed_jobs)
×
125

126

127
@blueprint.route("/update-all")
×
128
def update_analyses():
×
129
    """Update all ongoing analysis by querying SLURM."""
130
    process = multiprocessing.Process(
×
131
        target=store.update_ongoing_analyses,
132
        kwargs={"analysis_host": ANALYSIS_HOST},
133
    )
134
    process.start()
×
135
    return jsonify(f"Success! Trailblazer updated {datetime.datetime.now()}"), HTTPStatus.CREATED
×
136

137

138
@blueprint.route("/update/<int:analysis_id>", methods=["PUT"])
×
139
def update_analysis(analysis_id):
×
140
    """Update a specific analysis."""
141
    try:
×
142
        process = multiprocessing.Process(
×
143
            target=store.update_run_status,
144
            kwargs={"analysis_id": analysis_id, "analysis_host": ANALYSIS_HOST},
145
        )
146
        process.start()
×
147
        return jsonify("Success! Update request sent"), HTTPStatus.CREATED
×
148
    except Exception as error:
×
149
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
150

151

152
@blueprint.route("/cancel/<int:analysis_id>", methods=["PUT"])
×
153
def cancel(analysis_id):
×
154
    """Cancel an analysis and all slurm jobs associated with it."""
155
    auth_header = request.headers.get("Authorization")
×
156
    jwt_token = auth_header.split("Bearer ")[-1]
×
157
    user_data = jwt.decode(jwt_token, verify=False)
×
158
    try:
×
159
        process = multiprocessing.Process(
×
160
            target=store.cancel_ongoing_analysis,
161
            kwargs={
162
                "analysis_id": analysis_id,
163
                "analysis_host": ANALYSIS_HOST,
164
                "email": user_data["email"],
165
            },
166
        )
167
        process.start()
×
168
        return jsonify("Success! Cancel request sent"), HTTPStatus.CREATED
×
169
    except Exception as error:
×
170
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
171

172

173
@blueprint.route("/delete/<int:analysis_id>", methods=["PUT"])
×
174
def delete(analysis_id):
×
175
    """Delete an analysis and all slurm jobs associated with it."""
176
    try:
×
177
        process = multiprocessing.Process(
×
178
            target=store.delete_analysis,
179
            kwargs={"analysis_id": analysis_id, "force": True},
180
        )
181
        process.start()
×
182
        return jsonify("Success! Delete request sent!"), HTTPStatus.CREATED
×
183
    except Exception as error:
×
184
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
185

186

187
# CG REST INTERFACE ###
188
# ONLY POST routes which accept messages in specific format
189
# NOT for use with GUI (for now)
190

191

192
@blueprint.route("/get-latest-analysis", methods=["POST"])
×
193
def post_get_latest_analysis():
×
194
    """Return latest analysis entry for specified case id."""
195
    post_request: Response.json = request.json
×
196
    latest_case_analysis: Analysis | None = store.get_latest_analysis_for_case(
×
197
        case_id=post_request.get("case_id")
198
    )
199
    if latest_case_analysis:
×
200
        raw_analysis: dict[str, str] = stringify_timestamps(latest_case_analysis.to_dict())
×
201
        return jsonify(**raw_analysis), HTTPStatus.OK
×
202
    return jsonify(None), HTTPStatus.OK
×
203

204

205
@blueprint.route("/find-analysis", methods=["POST"])
×
206
def post_find_analysis():
×
207
    """Find analysis using case id, date, and status."""
208
    post_request: Response.json = request.json
×
209
    analysis: Analysis = store.get_analysis(
×
210
        case_id=post_request.get("case_id"),
211
        started_at=datetime.strptime(post_request.get("started_at"), TRAILBLAZER_TIME_STAMP).date(),
212
        status=post_request.get("status"),
213
    )
214
    if analysis:
×
215
        raw_analysis: dict[str, str] = stringify_timestamps(analysis.to_dict())
×
216
        return jsonify(**raw_analysis), HTTPStatus.OK
×
217
    return jsonify(None), HTTPStatus.OK
×
218

219

220
@blueprint.route("/delete-analysis", methods=["POST"])
×
221
def post_delete_analysis():
×
222
    """Delete analysis using analysis_id. If analysis is ongoing, an error will be raised.
223
    To delete ongoing analysis, --force flag should also be passed.
224
    If an ongoing analysis is deleted in ths manner, all ongoing jobs will be cancelled"""
225
    post_request: Response.json = request.json
×
226
    try:
×
227
        store.delete_analysis(
×
228
            analysis_id=post_request.get("analysis_id"), force=post_request.get("force")
229
        )
230
        return jsonify(None), HTTPStatus.CREATED
×
231
    except Exception as error:
×
232
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
233

234

235
@blueprint.route("/mark-analyses-deleted", methods=["POST"])
×
236
def post_mark_analyses_deleted():
×
237
    """Mark all analysis belonging to a case as deleted."""
238
    post_request: Response.json = request.json
×
239
    case_analyses: list[Analysis] | None = store.update_case_analyses_as_deleted(
×
240
        case_id=post_request.get("case_id")
241
    )
242
    raw_analysis = [
×
243
        stringify_timestamps(case_analysis.to_dict()) for case_analysis in case_analyses
244
    ]
245
    if raw_analysis:
×
246
        return jsonify(*raw_analysis), HTTPStatus.CREATED
×
247
    return jsonify(None), HTTPStatus.CREATED
×
248

249

250
@blueprint.route("/add-pending-analysis", methods=["POST"])
×
251
def post_add_pending_analysis():
×
252
    """Add new analysis with status: pending."""
253
    post_request: Response.json = request.json
×
254
    try:
×
255
        analysis: Analysis = store.add_pending_analysis(
×
256
            case_id=post_request.get("case_id"),
257
            email=post_request.get("email"),
258
            type=post_request.get("type"),
259
            config_path=post_request.get("config_path"),
260
            out_dir=post_request.get("out_dir"),
261
            priority=post_request.get("priority"),
262
            data_analysis=post_request.get("data_analysis"),
263
            ticket_id=post_request.get("ticket"),
264
            workflow_manager=post_request.get("workflow_manager"),
265
        )
266
        raw_analysis: dict = stringify_timestamps(analysis.to_dict())
×
267
        return jsonify(**raw_analysis), 201
×
268
    except Exception as exception:
×
269
        return jsonify(f"Exception: {exception}"), 409
×
270

271

272
@blueprint.route("/set-analysis-uploaded", methods=["PUT"])
×
273
def set_analysis_uploaded():
×
274
    """Set the analysis uploaded at attribute."""
275
    put_request: Response.json = request.json
×
276
    try:
×
277
        store.update_analysis_uploaded_at(
×
278
            case_id=put_request.get("case_id"), uploaded_at=put_request.get("uploaded_at")
279
        )
280
        return jsonify("Success! Uploaded at request sent"), HTTPStatus.CREATED
×
281
    except Exception as error:
×
282
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
283

284

285
@blueprint.route("/set-analysis-status", methods=["PUT"])
×
286
def set_analysis_status():
×
287
    """Update analysis status of a case with supplied status."""
288
    put_request: Response.json = request.json
×
289
    try:
×
290
        store.update_analysis_status(
×
291
            case_id=put_request.get("case_id"), status=put_request.get("status")
292
        )
293
        return (
×
294
            jsonify(f"Success! Analysis set to {put_request.get('status')} request sent"),
295
            HTTPStatus.CREATED,
296
        )
297
    except Exception as error:
×
298
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
299

300

301
@blueprint.route("/add-comment", methods=["PUT"])
×
302
def add_comment():
×
303
    """Updating comment on analysis."""
304
    put_request: Response.json = request.json
×
305
    try:
×
306
        store.update_analysis_comment(
×
307
            case_id=put_request.get("case_id"), comment=put_request.get("comment")
308
        )
309
        return jsonify("Success! Adding comment request sent"), HTTPStatus.CREATED
×
310
    except Exception as error:
×
311
        return jsonify(f"Exception: {error}"), HTTPStatus.CONFLICT
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc