• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 9579469021

19 Jun 2024 09:20AM UTC coverage: 84.558%. First build
9579469021

Pull #3128

github

web-flow
Merge 387cfc7b6 into c943ec944
Pull Request #3128: RAREDISEASE: add store, store available and store housekeeper commands

16 of 17 new or added lines in 6 files covered. (94.12%)

20562 of 24317 relevant lines covered (84.56%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.44
/cg/meta/workflow/nf_analysis.py
1
import logging
1✔
2
from datetime import datetime
1✔
3
from pathlib import Path
1✔
4
from typing import Any, Iterator
1✔
5

6
from pydantic.v1 import ValidationError
1✔
7

8
from cg.constants import Workflow
1✔
9
from cg.constants.constants import (
1✔
10
    CaseActions,
11
    FileExtensions,
12
    FileFormat,
13
    GenomeVersion,
14
    MultiQC,
15
    WorkflowManager,
16
)
17
from cg.constants.gene_panel import GenePanelGenomeBuild
1✔
18
from cg.constants.nextflow import NFX_WORK_DIR
1✔
19
from cg.constants.nf_analysis import NfTowerStatus
1✔
20
from cg.constants.tb import AnalysisStatus
1✔
21
from cg.exc import CgError, HousekeeperStoreError, MetricsQCError
1✔
22
from cg.io.config import write_config_nextflow_style
1✔
23
from cg.io.controller import ReadFile, WriteFile
1✔
24
from cg.io.json import read_json
1✔
25
from cg.io.txt import concat_txt, write_txt
1✔
26
from cg.io.yaml import write_yaml_nextflow_style
1✔
27
from cg.meta.workflow.analysis import AnalysisAPI
1✔
28
from cg.meta.workflow.nf_handlers import NextflowHandler, NfTowerHandler
1✔
29
from cg.models.analysis import NextflowAnalysis
1✔
30
from cg.models.cg_config import CGConfig
1✔
31
from cg.models.deliverables.metric_deliverables import (
1✔
32
    MetricsBase,
33
    MetricsDeliverablesCondition,
34
    MultiqcDataJson,
35
)
36
from cg.models.fastq import FastqFileMeta
1✔
37
from cg.models.nf_analysis import (
1✔
38
    FileDeliverable,
39
    NfCommandArgs,
40
    WorkflowDeliverables,
41
    WorkflowParameters,
42
)
43
from cg.store.models import Case, CaseSample, Sample
1✔
44
from cg.utils import Process
1✔
45

46
LOG = logging.getLogger(__name__)
1✔
47

48

49
class NfAnalysisAPI(AnalysisAPI):
1✔
50
    """Parent class for handling NF-core analyses."""
51

52
    def __init__(self, config: CGConfig, workflow: Workflow):
1✔
53
        super().__init__(workflow=workflow, config=config)
1✔
54
        self.workflow: Workflow = workflow
1✔
55
        self.root_dir: str | None = None
1✔
56
        self.nfcore_workflow_path: str | None = None
1✔
57
        self.references: str | None = None
1✔
58
        self.profile: str | None = None
1✔
59
        self.conda_env: str | None = None
1✔
60
        self.conda_binary: str | None = None
1✔
61
        self.config_platform: str | None = None
1✔
62
        self.config_params: str | None = None
1✔
63
        self.config_resources: str | None = None
1✔
64
        self.tower_binary_path: str | None = None
1✔
65
        self.tower_workflow: str | None = None
1✔
66
        self.account: str | None = None
1✔
67
        self.email: str | None = None
1✔
68
        self.compute_env_base: str | None = None
1✔
69
        self.revision: str | None = None
1✔
70
        self.nextflow_binary_path: str | None = None
1✔
71

72
    @property
1✔
73
    def root(self) -> str:
1✔
74
        return self.root_dir
1✔
75

76
    @property
1✔
77
    def process(self):
1✔
78
        if not self._process:
1✔
79
            self._process = Process(
1✔
80
                binary=self.tower_binary_path,
81
            )
82
        return self._process
1✔
83

84
    @process.setter
1✔
85
    def process(self, process: Process):
1✔
86
        self._process = process
1✔
87

88
    @property
1✔
89
    def use_read_count_threshold(self) -> bool:
1✔
90
        """Defines whether the threshold for adequate read count should be passed for all samples
91
        when determining if the analysis for a case should be automatically started."""
92
        return True
×
93

94
    @property
1✔
95
    def sample_sheet_headers(self) -> list[str]:
1✔
96
        """Headers for sample sheet."""
97
        raise NotImplementedError
×
98

99
    @property
1✔
100
    def is_params_appended_to_nextflow_config(self) -> bool:
1✔
101
        """Return True if parameters should be added into the nextflow config file instead of the params file."""
102
        return False
1✔
103

104
    @property
1✔
105
    def is_multiqc_pattern_search_exact(self) -> bool:
1✔
106
        """Return True if only exact pattern search is allowed to collect metrics information from MultiQC file.
107
        If false, pattern must be present but does not need to be exact."""
108
        return False
1✔
109

110
    @property
1✔
111
    def is_gene_panel_required(self) -> bool:
1✔
112
        """Return True if a gene panel is needs to be created using the information in StatusDB and exporting it from Scout."""
113
        return False
1✔
114

115
    def get_profile(self, profile: str | None = None) -> str:
1✔
116
        """Get NF profiles."""
117
        return profile or self.profile
1✔
118

119
    def get_workflow_manager(self) -> str:
1✔
120
        """Get workflow manager from Tower."""
121
        return WorkflowManager.Tower.value
×
122

123
    def get_workflow_version(self, case_id: str) -> str:
1✔
124
        """Get workflow version from config."""
125
        return self.revision
1✔
126

127
    def get_workflow_parameters(self, case_id: str) -> WorkflowParameters:
1✔
128
        """Return workflow parameters."""
129
        raise NotImplementedError
×
130

131
    def get_nextflow_config_content(self, case_id: str) -> str:
1✔
132
        """Return nextflow config content."""
133
        config_files_list: list[str] = [
1✔
134
            self.config_platform,
135
            self.config_params,
136
            self.config_resources,
137
        ]
138
        extra_parameters_str: list[str] = [
1✔
139
            self.set_cluster_options(case_id=case_id),
140
        ]
141
        if self.is_params_appended_to_nextflow_config:
1✔
142
            extra_parameters_str.append(
×
143
                write_config_nextflow_style(self.get_workflow_parameters(case_id=case_id).dict())
144
            )
145
        return concat_txt(
1✔
146
            file_paths=config_files_list,
147
            str_content=extra_parameters_str,
148
        )
149

150
    def get_case_path(self, case_id: str) -> Path:
1✔
151
        """Path to case working directory."""
152
        return Path(self.root_dir, case_id)
1✔
153

154
    def get_sample_sheet_path(self, case_id: str) -> Path:
1✔
155
        """Path to sample sheet."""
156
        return Path(self.get_case_path(case_id), f"{case_id}_samplesheet").with_suffix(
1✔
157
            FileExtensions.CSV
158
        )
159

160
    def get_compute_env(self, case_id: str) -> str:
1✔
161
        """Get the compute environment for the head job based on the case priority."""
162
        return f"{self.compute_env_base}-{self.get_slurm_qos_for_case(case_id=case_id)}"
1✔
163

164
    def get_nextflow_config_path(
1✔
165
        self, case_id: str, nextflow_config: Path | str | None = None
166
    ) -> Path:
167
        """Path to nextflow config file."""
168
        if nextflow_config:
1✔
169
            return Path(nextflow_config).absolute()
×
170
        return Path((self.get_case_path(case_id)), f"{case_id}_nextflow_config").with_suffix(
1✔
171
            FileExtensions.JSON
172
        )
173

174
    def get_job_ids_path(self, case_id: str) -> Path:
1✔
175
        """Return the path to a Trailblazer config file containing Tower IDs."""
176
        return Path(self.root_dir, case_id, "tower_ids").with_suffix(FileExtensions.YAML)
1✔
177

178
    def get_deliverables_file_path(self, case_id: str) -> Path:
1✔
179
        """Path to deliverables file for a case."""
180
        return Path(self.get_case_path(case_id), f"{case_id}_deliverables").with_suffix(
1✔
181
            FileExtensions.YAML
182
        )
183

184
    def get_metrics_deliverables_path(self, case_id: str) -> Path:
1✔
185
        """Return a path where the <case>_metrics_deliverables.yaml file should be located."""
186
        return Path(self.root_dir, case_id, f"{case_id}_metrics_deliverables").with_suffix(
1✔
187
            FileExtensions.YAML
188
        )
189

190
    def get_params_file_path(self, case_id: str, params_file: Path | None = None) -> Path:
1✔
191
        """Return parameters file or a path where the default parameters file for a case id should be located."""
192
        if params_file:
1✔
193
            return Path(params_file).absolute()
×
194
        return Path((self.get_case_path(case_id)), f"{case_id}_params_file").with_suffix(
1✔
195
            FileExtensions.YAML
196
        )
197

198
    def create_case_directory(self, case_id: str, dry_run: bool = False) -> None:
1✔
199
        """Create case directory."""
200
        if not dry_run:
1✔
201
            Path(self.get_case_path(case_id=case_id)).mkdir(parents=True, exist_ok=True)
1✔
202

203
    def get_log_path(self, case_id: str, workflow: str, log: str = None) -> Path:
1✔
204
        """Path to NF log."""
205
        if log:
1✔
206
            return log
×
207
        launch_time: str = datetime.now().strftime("%Y-%m-%d_%H.%M.%S")
1✔
208
        return Path(
1✔
209
            self.get_case_path(case_id),
210
            f"{case_id}_{workflow}_nextflow_{launch_time}",
211
        ).with_suffix(FileExtensions.LOG)
212

213
    def get_workdir_path(self, case_id: str, work_dir: Path | None = None) -> Path:
1✔
214
        """Path to NF work directory."""
215
        if work_dir:
1✔
216
            return work_dir.absolute()
×
217
        return Path(self.get_case_path(case_id), NFX_WORK_DIR)
1✔
218

219
    def get_gene_panels_path(self, case_id: str) -> Path:
1✔
220
        """Path to gene panels bed file exported from Scout."""
221
        return Path(self.get_case_path(case_id=case_id), "gene_panels").with_suffix(
1✔
222
            FileExtensions.BED
223
        )
224

225
    def set_cluster_options(self, case_id: str) -> str:
1✔
226
        return f'process.clusterOptions = "-A {self.account} --qos={self.get_slurm_qos_for_case(case_id=case_id)}"\n'
1✔
227

228
    @staticmethod
1✔
229
    def extract_read_files(
1✔
230
        metadata: list[FastqFileMeta], forward_read: bool = False, reverse_read: bool = False
231
    ) -> list[str]:
232
        """Extract a list of fastq file paths for either forward or reverse reads."""
233
        if forward_read and not reverse_read:
1✔
234
            read_direction = 1
1✔
235
        elif reverse_read and not forward_read:
1✔
236
            read_direction = 2
1✔
237
        else:
238
            raise ValueError("Either forward or reverse needs to be specified")
×
239
        sorted_metadata: list = sorted(metadata, key=lambda k: k.path)
1✔
240
        return [
1✔
241
            fastq_file.path
242
            for fastq_file in sorted_metadata
243
            if fastq_file.read_direction == read_direction
244
        ]
245

246
    def get_paired_read_paths(self, sample=Sample) -> tuple[list[str], list[str]]:
1✔
247
        """Returns a tuple of paired fastq file paths for the forward and reverse read."""
248
        sample_metadata: list[FastqFileMeta] = self.gather_file_metadata_for_sample(sample=sample)
1✔
249
        fastq_forward_read_paths: list[str] = self.extract_read_files(
1✔
250
            metadata=sample_metadata, forward_read=True
251
        )
252
        fastq_reverse_read_paths: list[str] = self.extract_read_files(
1✔
253
            metadata=sample_metadata, reverse_read=True
254
        )
255
        return fastq_forward_read_paths, fastq_reverse_read_paths
1✔
256

257
    def get_sample_sheet_content_per_sample(self, case_sample: CaseSample) -> list[list[str]]:
1✔
258
        """Collect and format information required to build a sample sheet for a single sample."""
259
        raise NotImplementedError
×
260

261
    def get_sample_sheet_content(self, case_id: str) -> list[list[Any]]:
1✔
262
        """Return formatted information required to build a sample sheet for a case.
263
        This contains information for all samples linked to the case."""
264
        sample_sheet_content: list = []
1✔
265
        case: Case = self.get_validated_case(case_id)
1✔
266
        LOG.info(f"Samples linked to case {case_id}: {len(case.links)}")
1✔
267
        LOG.debug("Getting sample sheet information")
1✔
268
        for link in case.links:
1✔
269
            sample_sheet_content.extend(self.get_sample_sheet_content_per_sample(case_sample=link))
1✔
270
        return sample_sheet_content
1✔
271

272
    def verify_sample_sheet_exists(self, case_id: str, dry_run: bool = False) -> None:
1✔
273
        """Raise an error if sample sheet file is not found."""
274
        if not dry_run and not Path(self.get_sample_sheet_path(case_id=case_id)).exists():
1✔
275
            raise ValueError(f"No config file found for case {case_id}")
1✔
276

277
    def verify_deliverables_file_exists(self, case_id: str) -> None:
1✔
278
        """Raise an error if a deliverable file is not found."""
279
        if not Path(self.get_deliverables_file_path(case_id=case_id)).exists():
1✔
280
            raise CgError(f"No deliverables file found for case {case_id}")
1✔
281

282
    def write_params_file(self, case_id: str, workflow_parameters: dict = None) -> None:
1✔
283
        """Write params-file for analysis."""
284
        LOG.debug("Writing parameters file")
1✔
285
        if workflow_parameters:
1✔
286
            write_yaml_nextflow_style(
1✔
287
                content=workflow_parameters,
288
                file_path=self.get_params_file_path(case_id=case_id),
289
            )
290
        else:
291
            self.get_params_file_path(case_id=case_id).touch()
1✔
292

293
    @staticmethod
1✔
294
    def write_sample_sheet(
1✔
295
        content: list[list[Any]],
296
        file_path: Path,
297
        header: list[str],
298
    ) -> None:
299
        """Write sample sheet CSV file."""
300
        LOG.debug("Writing sample sheet")
1✔
301
        if header:
1✔
302
            content.insert(0, header)
1✔
303
        WriteFile.write_file_from_content(
1✔
304
            content=content,
305
            file_format=FileFormat.CSV,
306
            file_path=file_path,
307
        )
308

309
    @staticmethod
1✔
310
    def write_deliverables_file(
1✔
311
        deliverables_content: dict, file_path: Path, file_format=FileFormat.YAML
312
    ) -> None:
313
        """Write deliverables file."""
314
        WriteFile.write_file_from_content(
1✔
315
            content=deliverables_content, file_format=file_format, file_path=file_path
316
        )
317

318
    def write_trailblazer_config(self, case_id: str, tower_id: str) -> None:
1✔
319
        """Write Tower IDs to a file used as the Trailblazer config."""
320
        config_path: Path = self.get_job_ids_path(case_id=case_id)
×
321
        LOG.info(f"Writing Tower ID to {config_path.as_posix()}")
×
322
        WriteFile.write_file_from_content(
×
323
            content={case_id: [tower_id]},
324
            file_format=FileFormat.YAML,
325
            file_path=config_path,
326
        )
327

328
    def create_sample_sheet(self, case_id: str, dry_run: bool):
1✔
329
        """Create sample sheet for a case."""
330
        sample_sheet_content: list[list[Any]] = self.get_sample_sheet_content(case_id=case_id)
1✔
331
        if not dry_run:
1✔
332
            self.write_sample_sheet(
1✔
333
                content=sample_sheet_content,
334
                file_path=self.get_sample_sheet_path(case_id=case_id),
335
                header=self.sample_sheet_headers,
336
            )
337

338
    def create_params_file(self, case_id: str, dry_run: bool):
1✔
339
        """Create parameters file for a case."""
340
        LOG.debug("Getting parameters information")
1✔
341
        workflow_parameters = None
1✔
342
        if not self.is_params_appended_to_nextflow_config:
1✔
343
            workflow_parameters: dict | None = self.get_workflow_parameters(case_id=case_id).dict()
1✔
344
        if not dry_run:
1✔
345
            self.write_params_file(case_id=case_id, workflow_parameters=workflow_parameters)
1✔
346

347
    def create_nextflow_config(self, case_id: str, dry_run: bool = False) -> None:
1✔
348
        """Create nextflow config file."""
349
        if content := self.get_nextflow_config_content(case_id=case_id):
1✔
350
            LOG.debug("Writing nextflow config file")
1✔
351
            if dry_run:
1✔
352
                return
1✔
353
            write_txt(
1✔
354
                content=content,
355
                file_path=self.get_nextflow_config_path(case_id=case_id),
356
            )
357

358
    def create_gene_panel(self, case_id: str, dry_run: bool) -> None:
1✔
359
        """Create and write an aggregated gene panel file exported from Scout."""
360
        LOG.info("Creating gene panel file")
1✔
361
        bed_lines: list[str] = self.get_gene_panel(case_id=case_id, dry_run=dry_run)
1✔
362
        if dry_run:
1✔
363
            bed_lines: str = "\n".join(bed_lines)
1✔
364
            LOG.debug(f"{bed_lines}")
1✔
365
            return
1✔
366
        self.write_panel(case_id=case_id, content=bed_lines)
1✔
367

368
    def config_case(self, case_id: str, dry_run: bool):
1✔
369
        """Create directory and config files required by a workflow for a case."""
370
        if dry_run:
1✔
371
            LOG.info("Dry run: Config files will not be written")
1✔
372
        self.status_db.verify_case_exists(case_internal_id=case_id)
1✔
373
        self.create_case_directory(case_id=case_id, dry_run=dry_run)
1✔
374
        self.create_sample_sheet(case_id=case_id, dry_run=dry_run)
1✔
375
        self.create_params_file(case_id=case_id, dry_run=dry_run)
1✔
376
        self.create_nextflow_config(case_id=case_id, dry_run=dry_run)
1✔
377
        if self.is_gene_panel_required:
1✔
378
            self.create_gene_panel(case_id=case_id, dry_run=dry_run)
1✔
379

380
    def _run_analysis_with_nextflow(
1✔
381
        self, case_id: str, command_args: NfCommandArgs, dry_run: bool
382
    ) -> None:
383
        """Run analysis with given options using Nextflow."""
384
        self.process = Process(
1✔
385
            binary=self.nextflow_binary_path,
386
            environment=self.conda_env,
387
            conda_binary=self.conda_binary,
388
            launch_directory=self.get_case_path(case_id=case_id),
389
        )
390
        LOG.info("Workflow will be executed using Nextflow")
1✔
391
        parameters: list[str] = NextflowHandler.get_nextflow_run_parameters(
1✔
392
            case_id=case_id,
393
            workflow_path=self.nfcore_workflow_path,
394
            root_dir=self.root_dir,
395
            command_args=command_args.dict(),
396
        )
397
        self.process.export_variables(
1✔
398
            export=NextflowHandler.get_variables_to_export(),
399
        )
400
        command: str = self.process.get_command(parameters=parameters)
1✔
401
        LOG.info(f"{command}")
1✔
402
        sbatch_number: int = NextflowHandler.execute_head_job(
1✔
403
            case_id=case_id,
404
            case_directory=self.get_case_path(case_id=case_id),
405
            slurm_account=self.account,
406
            email=self.email,
407
            qos=self.get_slurm_qos_for_case(case_id=case_id),
408
            commands=command,
409
            dry_run=dry_run,
410
        )
411
        LOG.info(f"Nextflow head job running as job: {sbatch_number}")
1✔
412

413
    def _run_analysis_with_tower(
1✔
414
        self, case_id: str, command_args: NfCommandArgs, dry_run: bool
415
    ) -> None:
416
        """Run analysis with given options using NF-Tower."""
417
        LOG.info("Workflow will be executed using Tower")
1✔
418
        if command_args.resume:
1✔
419
            from_tower_id: int = command_args.id or NfTowerHandler.get_last_tower_id(
1✔
420
                case_id=case_id,
421
                trailblazer_config=self.get_job_ids_path(case_id=case_id),
422
            )
423
            LOG.info(f"Workflow will be resumed from run with Tower id: {from_tower_id}.")
1✔
424
            parameters: list[str] = NfTowerHandler.get_tower_relaunch_parameters(
1✔
425
                from_tower_id=from_tower_id, command_args=command_args.dict()
426
            )
427
        else:
428
            parameters: list[str] = NfTowerHandler.get_tower_launch_parameters(
1✔
429
                tower_workflow=self.tower_workflow, command_args=command_args.dict()
430
            )
431
        self.process.run_command(parameters=parameters, dry_run=dry_run)
1✔
432
        if self.process.stderr:
1✔
433
            LOG.error(self.process.stderr)
×
434
        if not dry_run:
1✔
435
            tower_id = NfTowerHandler.get_tower_id(stdout_lines=self.process.stdout_lines())
×
436
            self.write_trailblazer_config(case_id=case_id, tower_id=tower_id)
×
437
        LOG.info(self.process.stdout)
1✔
438

439
    def get_command_args(
1✔
440
        self,
441
        case_id: str,
442
        log: str,
443
        work_dir: str,
444
        from_start: bool,
445
        profile: str,
446
        config: str,
447
        params_file: str | None,
448
        revision: str,
449
        compute_env: str,
450
        nf_tower_id: str | None,
451
    ) -> NfCommandArgs:
452
        command_args: NfCommandArgs = NfCommandArgs(
1✔
453
            **{
454
                "log": self.get_log_path(case_id=case_id, workflow=self.workflow, log=log),
455
                "work_dir": self.get_workdir_path(case_id=case_id, work_dir=work_dir),
456
                "resume": not from_start,
457
                "profile": self.get_profile(profile=profile),
458
                "config": self.get_nextflow_config_path(case_id=case_id, nextflow_config=config),
459
                "params_file": self.get_params_file_path(case_id=case_id, params_file=params_file),
460
                "name": case_id,
461
                "compute_env": compute_env or self.get_compute_env(case_id=case_id),
462
                "revision": revision or self.revision,
463
                "wait": NfTowerStatus.SUBMITTED,
464
                "id": nf_tower_id,
465
            }
466
        )
467
        return command_args
1✔
468

469
    def run_nextflow_analysis(
1✔
470
        self,
471
        case_id: str,
472
        use_nextflow: bool,
473
        log: str,
474
        work_dir: str,
475
        from_start: bool,
476
        profile: str,
477
        config: str,
478
        params_file: str | None,
479
        revision: str,
480
        compute_env: str,
481
        nf_tower_id: str | None = None,
482
        dry_run: bool = False,
483
    ) -> None:
484
        """Prepare and start run analysis: check existence of all input files generated by config-case and sync with trailblazer."""
485
        self.status_db.verify_case_exists(case_internal_id=case_id)
1✔
486

487
        command_args = self.get_command_args(
1✔
488
            case_id=case_id,
489
            log=log,
490
            work_dir=work_dir,
491
            from_start=from_start,
492
            profile=profile,
493
            config=config,
494
            params_file=params_file,
495
            revision=revision,
496
            compute_env=compute_env,
497
            nf_tower_id=nf_tower_id,
498
        )
499

500
        try:
1✔
501
            self.verify_sample_sheet_exists(case_id=case_id, dry_run=dry_run)
1✔
502
            self.check_analysis_ongoing(case_id=case_id)
1✔
503
            LOG.info(f"Running analysis for {case_id}")
1✔
504
            self.run_analysis(
1✔
505
                case_id=case_id,
506
                command_args=command_args,
507
                use_nextflow=use_nextflow,
508
                dry_run=dry_run,
509
            )
510
            self.set_statusdb_action(case_id=case_id, action=CaseActions.RUNNING, dry_run=dry_run)
1✔
511
        except FileNotFoundError as error:
1✔
512
            LOG.error(f"Could not resume analysis: {error}")
1✔
513
            raise FileNotFoundError
1✔
514
        except ValueError as error:
1✔
515
            LOG.error(f"Could not run analysis: {error}")
1✔
516
            raise ValueError
1✔
517
        except CgError as error:
×
518
            LOG.error(f"Could not run analysis: {error}")
×
519
            raise CgError
×
520

521
        if not dry_run:
1✔
522
            self.add_pending_trailblazer_analysis(case_id=case_id)
×
523

524
    def run_analysis(
1✔
525
        self,
526
        case_id: str,
527
        command_args: NfCommandArgs,
528
        use_nextflow: bool,
529
        dry_run: bool = False,
530
    ) -> None:
531
        """Execute run analysis with given options."""
532
        if use_nextflow:
1✔
533
            self._run_analysis_with_nextflow(
1✔
534
                case_id=case_id,
535
                command_args=command_args,
536
                dry_run=dry_run,
537
            )
538
        else:
539
            self._run_analysis_with_tower(
1✔
540
                case_id=case_id,
541
                command_args=command_args,
542
                dry_run=dry_run,
543
            )
544

545
    def get_deliverables_template_content(self) -> list[dict[str, str]]:
1✔
546
        """Return deliverables file template content."""
547
        LOG.debug("Getting deliverables file template content")
1✔
548
        return ReadFile.get_content_from_file(
1✔
549
            file_format=FileFormat.YAML,
550
            file_path=self.get_bundle_filenames_path(),
551
        )
552

553
    @staticmethod
1✔
554
    def get_bundle_filenames_path() -> Path:
1✔
555
        """Return bundle filenames path."""
NEW
556
        raise NotImplementedError
×
557

558
    @staticmethod
1✔
559
    def get_formatted_file_deliverable(
1✔
560
        file_template: dict[str | None, str | None],
561
        case_id: str,
562
        sample_id: str,
563
        sample_name: str,
564
        case_path: str,
565
    ) -> FileDeliverable:
566
        """Return the formatted file deliverable with the case and sample attributes."""
567
        deliverables = file_template.copy()
1✔
568
        for deliverable_field, deliverable_value in file_template.items():
1✔
569
            if deliverable_value is None:
1✔
570
                continue
1✔
571
            deliverables[deliverable_field] = (
1✔
572
                deliverables[deliverable_field]
573
                .replace("CASEID", case_id)
574
                .replace("SAMPLEID", sample_id)
575
                .replace("SAMPLENAME", sample_name)
576
                .replace("PATHTOCASE", case_path)
577
            )
578
        return FileDeliverable(**deliverables)
1✔
579

580
    def get_deliverables_for_sample(
1✔
581
        self, sample: Sample, case_id: str, template: list[dict[str, str]]
582
    ) -> list[FileDeliverable]:
583
        """Return a list of FileDeliverables for each sample."""
584
        sample_id: str = sample.internal_id
1✔
585
        sample_name: str = sample.name
1✔
586
        case_path = str(self.get_case_path(case_id=case_id))
1✔
587
        files: list[FileDeliverable] = []
1✔
588
        for file in template:
1✔
589
            files.append(
1✔
590
                self.get_formatted_file_deliverable(
591
                    file_template=file,
592
                    case_id=case_id,
593
                    sample_id=sample_id,
594
                    sample_name=sample_name,
595
                    case_path=case_path,
596
                )
597
            )
598
        return files
1✔
599

600
    def get_deliverables_for_case(self, case_id: str) -> WorkflowDeliverables:
1✔
601
        """Return workflow deliverables for a given case."""
602
        deliverable_template: list[dict] = self.get_deliverables_template_content()
1✔
603
        samples: list[Sample] = self.status_db.get_samples_by_case_id(case_id=case_id)
1✔
604
        files: list[FileDeliverable] = []
1✔
605

606
        for sample in samples:
1✔
607
            bundles_per_sample = self.get_deliverables_for_sample(
1✔
608
                sample=sample, case_id=case_id, template=deliverable_template
609
            )
610
            files.extend(bundle for bundle in bundles_per_sample if bundle not in files)
1✔
611
        return WorkflowDeliverables(files=files)
1✔
612

613
    def get_multiqc_json_path(self, case_id: str) -> Path:
1✔
614
        """Return the path of the multiqc_data.json file."""
615
        return Path(
1✔
616
            self.root_dir,
617
            case_id,
618
            MultiQC.MULTIQC,
619
            MultiQC.MULTIQC_DATA,
620
            MultiQC.MULTIQC_DATA + FileExtensions.JSON,
621
        )
622

623
    def get_workflow_metrics(self, metric_id: str) -> dict:
1✔
624
        """Get nf-core workflow metrics constants."""
625
        return {}
1✔
626

627
    def get_multiqc_search_patterns(self, case_id: str) -> dict:
1✔
628
        """Return search patterns for MultiQC. Each key is a search pattern and each value
629
        corresponds to the metric ID to set in the metrics deliverables file.
630
        Multiple search patterns can be added. Ideally patterns used should be sample ids, e.g.
631
        {sample_id_1: sample_id_1, sample_id_2: sample_id_2}."""
632
        sample_ids: Iterator[str] = self.status_db.get_sample_ids_by_case_id(case_id=case_id)
1✔
633
        search_patterns: dict[str, str] = {sample_id: sample_id for sample_id in sample_ids}
1✔
634
        return search_patterns
1✔
635

636
    @staticmethod
1✔
637
    def get_deduplicated_metrics(metrics: list[MetricsBase]) -> list[MetricsBase]:
1✔
638
        """Return deduplicated metrics based on metric ID and name. If duplicated entries are found
639
        only the first one will be kept."""
640
        deduplicated_metric_id_name = set([])
1✔
641
        deduplicated_metrics: list = []
1✔
642
        for metric in metrics:
1✔
643
            if (metric.id, metric.name) not in deduplicated_metric_id_name:
1✔
644
                deduplicated_metric_id_name.add((metric.id, metric.name))
1✔
645
                deduplicated_metrics.append(metric)
1✔
646
        return deduplicated_metrics
1✔
647

648
    def get_multiqc_json_metrics(self, case_id: str) -> list[MetricsBase]:
1✔
649
        """Return a list of the metrics specified in a MultiQC json file."""
650
        multiqc_json = MultiqcDataJson(
1✔
651
            **read_json(file_path=self.get_multiqc_json_path(case_id=case_id))
652
        )
653
        metrics = []
1✔
654
        for search_pattern, metric_id in self.get_multiqc_search_patterns(case_id=case_id).items():
1✔
655
            metrics_for_pattern: list[MetricsBase] = (
1✔
656
                self.get_metrics_from_multiqc_json_with_pattern(
657
                    search_pattern=search_pattern,
658
                    multiqc_json=multiqc_json,
659
                    metric_id=metric_id,
660
                    exact_match=self.is_multiqc_pattern_search_exact,
661
                )
662
            )
663
            metrics.extend(metrics_for_pattern)
1✔
664
        metrics = self.get_deduplicated_metrics(metrics=metrics)
1✔
665
        return metrics
1✔
666

667
    def get_metrics_from_multiqc_json_with_pattern(
1✔
668
        self,
669
        search_pattern: str,
670
        multiqc_json: MultiqcDataJson,
671
        metric_id: str,
672
        exact_match: bool = False,
673
    ) -> list[MetricsBase]:
674
        """Parse a MultiqcDataJson and returns a list of metrics."""
675
        metrics: list[MetricsBase] = []
1✔
676
        for section in multiqc_json.report_general_stats_data:
1✔
677
            for section_name, section_values in section.items():
1✔
678
                if exact_match:
1✔
679
                    is_pattern_found: bool = search_pattern == section_name
1✔
680
                else:
681
                    is_pattern_found: bool = search_pattern in section_name
1✔
682
                if is_pattern_found:
1✔
683
                    for metric_name, metric_value in section_values.items():
1✔
684
                        metric: MetricsBase = self.get_multiqc_metric(
1✔
685
                            metric_name=metric_name, metric_value=metric_value, metric_id=metric_id
686
                        )
687
                        metrics.append(metric)
1✔
688
        return metrics
1✔
689

690
    def get_multiqc_metric(
1✔
691
        self, metric_name: str, metric_value: str | int | float, metric_id: str
692
    ) -> MetricsBase:
693
        """Return a MetricsBase object for a given metric."""
694
        return MetricsBase(
1✔
695
            header=None,
696
            id=metric_id,
697
            input=MultiQC.MULTIQC_DATA + FileExtensions.JSON,
698
            name=metric_name,
699
            step=MultiQC.MULTIQC,
700
            value=metric_value,
701
            condition=self.get_workflow_metrics(metric_id).get(metric_name, None),
702
        )
703

704
    @staticmethod
1✔
705
    def ensure_mandatory_metrics_present(metrics: list[MetricsBase]) -> None:
1✔
706
        return None
1✔
707

708
    def create_metrics_deliverables_content(self, case_id: str) -> dict[str, list[dict[str, Any]]]:
1✔
709
        """Create the content of metrics deliverables file."""
710
        metrics: list[MetricsBase] = self.get_multiqc_json_metrics(case_id=case_id)
1✔
711
        self.ensure_mandatory_metrics_present(metrics=metrics)
1✔
712
        return {"metrics": [metric.dict() for metric in metrics]}
1✔
713

714
    def write_metrics_deliverables(self, case_id: str, dry_run: bool = False) -> None:
1✔
715
        """Write <case>_metrics_deliverables.yaml file."""
716
        metrics_deliverables_path: Path = self.get_metrics_deliverables_path(case_id=case_id)
1✔
717
        content: dict = self.create_metrics_deliverables_content(case_id=case_id)
1✔
718
        if dry_run:
1✔
719
            LOG.info(
×
720
                f"Dry-run: metrics deliverables file would be written to {metrics_deliverables_path.as_posix()}"
721
            )
722
            return
×
723

724
        LOG.info(f"Writing metrics deliverables file to {metrics_deliverables_path.as_posix()}")
1✔
725
        WriteFile.write_file_from_content(
1✔
726
            content=content,
727
            file_format=FileFormat.YAML,
728
            file_path=metrics_deliverables_path,
729
        )
730

731
    def validate_qc_metrics(self, case_id: str, dry_run: bool = False) -> None:
1✔
732
        """Validate the information from a QC metrics deliverable file."""
733

734
        if dry_run:
1✔
735
            LOG.info("Dry-run: QC metrics validation would be performed")
×
736
            return
×
737

738
        LOG.info("Validating QC metrics")
1✔
739
        try:
1✔
740
            metrics_deliverables_path: Path = self.get_metrics_deliverables_path(case_id=case_id)
1✔
741
            qc_metrics_raw: dict = ReadFile.get_content_from_file(
1✔
742
                file_format=FileFormat.YAML, file_path=metrics_deliverables_path
743
            )
744
            MetricsDeliverablesCondition(**qc_metrics_raw)
1✔
745
        except MetricsQCError as error:
×
746
            LOG.error(f"QC metrics failed for {case_id}, with: {error}")
×
747
            self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.FAILED)
×
748
            self.trailblazer_api.add_comment(case_id=case_id, comment=str(error))
×
749
            raise MetricsQCError from error
×
750
        except CgError as error:
×
751
            LOG.error(f"Could not create metrics deliverables file: {error}")
×
752
            self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.ERROR)
×
753
            raise CgError from error
×
754
        self.trailblazer_api.set_analysis_status(case_id=case_id, status=AnalysisStatus.COMPLETED)
1✔
755

756
    def metrics_deliver(self, case_id: str, dry_run: bool):
1✔
757
        """Create and validate a metrics deliverables file for given case id."""
758
        self.status_db.verify_case_exists(case_internal_id=case_id)
1✔
759
        self.write_metrics_deliverables(case_id=case_id, dry_run=dry_run)
1✔
760
        self.validate_qc_metrics(case_id=case_id, dry_run=dry_run)
1✔
761

762
    def report_deliver(self, case_id: str, dry_run: bool = False, force: bool = False) -> None:
1✔
763
        """Write deliverables file."""
764
        self.status_db.verify_case_exists(case_id)
1✔
765
        self.trailblazer_api.verify_latest_analysis_is_completed(case_id=case_id, force=force)
1✔
766
        if dry_run:
1✔
767
            LOG.info(f"Dry-run: Would have created delivery files for case {case_id}")
×
768
            return
×
769
        workflow_content: WorkflowDeliverables = self.get_deliverables_for_case(case_id=case_id)
1✔
770
        self.write_deliverables_file(
1✔
771
            deliverables_content=workflow_content.dict(),
772
            file_path=self.get_deliverables_file_path(case_id=case_id),
773
        )
774
        LOG.info(
1✔
775
            f"Writing deliverables file in {self.get_deliverables_file_path(case_id=case_id).as_posix()}"
776
        )
777

778
    def store_analysis_housekeeper(
1✔
779
        self, case_id: str, dry_run: bool = False, force: bool = False
780
    ) -> None:
781
        """
782
        Store a finished Nextflow analysis in Housekeeper and StatusDB.
783

784
        Raises:
785
            HousekeeperStoreError: If the deliverables file is malformed or if the bundle could not be stored.
786
        """
787
        try:
1✔
788
            self.status_db.verify_case_exists(case_id)
1✔
789
            self.trailblazer_api.verify_latest_analysis_is_completed(case_id=case_id, force=force)
1✔
790
            self.verify_deliverables_file_exists(case_id)
1✔
791
            self.upload_bundle_housekeeper(case_id=case_id, dry_run=dry_run, force=force)
1✔
792
            self.upload_bundle_statusdb(case_id=case_id, dry_run=dry_run)
1✔
793
            self.set_statusdb_action(case_id=case_id, action=None, dry_run=dry_run)
1✔
794
        except ValidationError as error:
1✔
795
            raise HousekeeperStoreError(f"Deliverables file is malformed: {error}")
×
796
        except Exception as error:
1✔
797
            self.housekeeper_api.rollback()
1✔
798
            self.status_db.session.rollback()
1✔
799
            raise HousekeeperStoreError(
1✔
800
                f"Could not store bundle in Housekeeper and StatusDB: {error}"
801
            )
802

803
    def store(self, case_id: str, dry_run: bool = False, force: bool = False):
1✔
804
        """Generate deliverable files for a case and store in Housekeeper if they
805
        pass QC metrics checks."""
806
        is_latest_analysis_qc: bool = self.trailblazer_api.is_latest_analysis_qc(case_id)
1✔
807
        is_latest_analysis_completed: bool = self.trailblazer_api.is_latest_analysis_completed(
1✔
808
            case_id
809
        )
810
        if not is_latest_analysis_qc and not is_latest_analysis_completed and not force:
1✔
811
            LOG.error(
×
812
                "Case not stored. Trailblazer status must be either QC or COMPLETE to be able to store"
813
            )
814
            raise ValueError
×
815

816
        if (
1✔
817
            is_latest_analysis_qc
818
            or not self.get_metrics_deliverables_path(case_id=case_id).exists()
819
            and not force
820
        ):
821
            LOG.info(f"Generating metrics file and performing QC checks for {case_id}")
1✔
822
            self.metrics_deliver(case_id=case_id, dry_run=dry_run)
1✔
823
        LOG.info(f"Storing analysis for {case_id}")
1✔
824
        self.report_deliver(case_id=case_id, dry_run=dry_run, force=force)
1✔
825
        self.store_analysis_housekeeper(case_id=case_id, dry_run=dry_run, force=force)
1✔
826

827
    def get_cases_to_store(self) -> list[Case]:
1✔
828
        """Return cases where analysis finished successfully,
829
        and is ready to be stored in Housekeeper."""
830
        return [
1✔
831
            case
832
            for case in self.status_db.get_running_cases_in_workflow(workflow=self.workflow)
833
            if self.trailblazer_api.is_latest_analysis_completed(case_id=case.internal_id)
834
            or self.trailblazer_api.is_latest_analysis_qc(case_id=case.internal_id)
835
        ]
836

837
    def get_genome_build(self, case_id: str) -> GenomeVersion:
1✔
838
        """Return reference genome version for a case.
839
        Raises CgError if this information is missing or inconsistent for the samples linked to a case.
840
        """
841
        reference_genome: set[str] = {
1✔
842
            sample.reference_genome
843
            for sample in self.status_db.get_samples_by_case_id(case_id=case_id)
844
        }
845
        if len(reference_genome) == 1:
1✔
846
            return reference_genome.pop()
1✔
847
        if len(reference_genome) > 1:
×
848
            raise CgError(
×
849
                f"Samples linked to case {case_id} have different reference genome versions set"
850
            )
851
        raise CgError(f"No reference genome specified for case {case_id}")
×
852

853
    def get_gene_panel_genome_build(self, case_id: str) -> GenePanelGenomeBuild:
1✔
854
        """Return build version of the gene panel for a case."""
855
        reference_genome: GenomeVersion = self.get_genome_build(case_id=case_id)
1✔
856
        try:
1✔
857
            return getattr(GenePanelGenomeBuild, reference_genome)
1✔
858
        except AttributeError as error:
×
859
            raise CgError(
×
860
                f"Reference {reference_genome} has no associated genome build for panels: {error}"
861
            ) from error
862

863
    def get_gene_panel(self, case_id: str, dry_run: bool = False) -> list[str]:
1✔
864
        """Create and return the aggregated gene panel file."""
865
        return self._get_gene_panel(
1✔
866
            case_id=case_id,
867
            genome_build=self.get_gene_panel_genome_build(case_id=case_id),
868
            dry_run=dry_run,
869
        )
870

871
    def parse_analysis(self, qc_metrics_raw: list[MetricsBase], **kwargs) -> NextflowAnalysis:
1✔
872
        """Parse Nextflow output analysis files and return an analysis model."""
873
        sample_metrics: dict[str, dict] = {}
1✔
874
        for metric in qc_metrics_raw:
1✔
875
            try:
1✔
876
                sample_metrics[metric.id].update({metric.name.lower(): metric.value})
1✔
877
            except KeyError:
1✔
878
                sample_metrics[metric.id] = {metric.name.lower(): metric.value}
1✔
879
        return NextflowAnalysis(sample_metrics=sample_metrics)
1✔
880

881
    def get_latest_metadata(self, case_id: str) -> NextflowAnalysis:
1✔
882
        """Return analysis output of a Nextflow case."""
883
        qc_metrics: list[MetricsBase] = self.get_multiqc_json_metrics(case_id)
1✔
884
        return self.parse_analysis(qc_metrics_raw=qc_metrics)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc