• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 9061894859

13 May 2024 11:13AM UTC coverage: 84.442%. First build
9061894859

Pull #3210

github

web-flow
Merge 8ef11bc06 into 4d7c6a137
Pull Request #3210: fix(downsample api)

11 of 12 new or added lines in 5 files covered. (91.67%)

19680 of 23306 relevant lines covered (84.44%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/cg/apps/downsample/downsample.py
1
"""API that handles downsampling of samples."""
2

3
import logging
1✔
4
from pathlib import Path
1✔
5

6
from cg.apps.demultiplex.sample_sheet.validators import validate_sample_id
1✔
7
from cg.exc import DownsampleFailedError
1✔
8
from cg.meta.meta import MetaAPI
1✔
9
from cg.meta.workflow.downsample.downsample import DownsampleWorkflow
1✔
10
from cg.models.cg_config import CGConfig
1✔
11
from cg.models.downsample.downsample_data import DownsampleData
1✔
12
from cg.store.models import Case, CaseSample, Sample
1✔
13
from cg.utils.calculations import multiply_by_million
1✔
14

15
LOG = logging.getLogger(__name__)
1✔
16

17

18
class DownsampleAPI(MetaAPI):
1✔
19
    def __init__(
1✔
20
        self,
21
        config: CGConfig,
22
        dry_run: bool = False,
23
    ):
24
        """Initialize the API."""
25
        super().__init__(config)
1✔
26
        self.dry_run: bool = dry_run
1✔
27

28
    def get_downsample_data(
1✔
29
        self, sample_id: str, number_of_reads: float, case_id: str, case_name: str
30
    ) -> DownsampleData:
31
        """Return the DownSampleData.
32
        Raises:
33
            DownsampleFailedError
34
        """
35
        try:
1✔
36
            return DownsampleData(
1✔
37
                status_db=self.status_db,
38
                hk_api=self.housekeeper_api,
39
                sample_id=sample_id,
40
                number_of_reads=number_of_reads,
41
                case_id=case_id,
42
                case_name=case_name,
43
                out_dir=Path(self.config.downsample.downsample_dir),
44
            )
45
        except Exception as error:
×
46
            raise DownsampleFailedError(repr(error))
×
47

48
    def store_downsampled_sample(self, downsample_data: DownsampleData) -> None:
1✔
49
        """
50
        Add a downsampled sample entry to StatusDB.
51
        Raises:
52
            ValueError
53
        """
54
        downsampled_sample: Sample = downsample_data.downsampled_sample
1✔
55
        if self.status_db.sample_with_id_exists(sample_id=downsampled_sample.internal_id):
1✔
56
            raise ValueError(f"Sample {downsampled_sample.internal_id} already exists in StatusDB.")
1✔
57
        LOG.info(
1✔
58
            f"New downsampled sample created: {downsampled_sample.internal_id} from {downsampled_sample.from_sample}"
59
            f"Application tag set to: {downsampled_sample.application_version.application.tag}"
60
            f"Customer set to: {downsampled_sample.customer}"
61
        )
62
        if not self.dry_run:
1✔
63
            self.status_db.session.add(downsampled_sample)
1✔
64
            self.status_db.session.commit()
1✔
65
            LOG.info(f"Added {downsampled_sample.name} to StatusDB.")
1✔
66

67
    def store_downsampled_case(self, downsample_data: DownsampleData) -> None:
1✔
68
        """
69
        Add a down sampled case entry to StatusDB.
70
        """
71
        downsampled_case: Case = downsample_data.downsampled_case
1✔
72
        if self.status_db.case_with_name_exists(case_name=downsampled_case.name):
1✔
73
            LOG.info(f"Case with name {downsampled_case.name} already exists in StatusDB.")
1✔
74
            return
1✔
75
        if not self.dry_run:
1✔
76
            self.status_db.session.add(downsampled_case)
1✔
77
            self.status_db.session.commit()
1✔
78
            LOG.info(f"New down sampled case created: {downsampled_case.internal_id}")
1✔
79

80
    def _link_downsampled_sample_to_case(
1✔
81
        self, downsample_data: DownsampleData, sample: Sample, case: Case
82
    ) -> None:
83
        """Create a link between sample and case in statusDB."""
84
        sample_case_link: CaseSample = self.status_db.relate_sample(
1✔
85
            case=case,
86
            sample=sample,
87
            status=downsample_data.get_sample_status(),
88
        )
89
        if self.dry_run:
1✔
90
            return
1✔
91
        self.status_db.session.add(sample_case_link)
1✔
92
        self.status_db.session.commit()
1✔
93
        LOG.info(f"Related sample {sample.internal_id} to {case.internal_id}")
1✔
94

95
    def store_downsampled_sample_case(self, downsample_data: DownsampleData) -> None:
1✔
96
        """
97
        Add the downsampled sample and case to statusDB and generate the sample case link.
98
        """
99
        self.store_downsampled_sample(downsample_data)
1✔
100
        self.store_downsampled_case(downsample_data)
1✔
101
        self._link_downsampled_sample_to_case(
1✔
102
            downsample_data=downsample_data,
103
            sample=downsample_data.downsampled_sample,
104
            case=downsample_data.downsampled_case,
105
        )
106

107
    def start_downsample_job(
1✔
108
        self,
109
        downsample_data: DownsampleData,
110
        account: str,
111
    ) -> int:
112
        """
113
        Start a down sample job for a sample.
114
        -Retrieves input directory from the latest version of a sample bundle in Housekeeper
115
        -Starts a down sample job
116
        """
117
        downsample_work_flow = DownsampleWorkflow(
1✔
118
            number_of_reads=multiply_by_million(downsample_data.number_of_reads),
119
            config=self.config,
120
            output_fastq_dir=str(downsample_data.fastq_file_output_directory),
121
            input_fastq_dir=str(downsample_data.fastq_file_input_directory),
122
            original_sample=downsample_data.original_sample,
123
            downsampled_sample=downsample_data.downsampled_sample,
124
            account=account,
125
            dry_run=self.dry_run,
126
        )
127
        return downsample_work_flow.write_and_submit_sbatch_script()
1✔
128

129
    def downsample_sample(
1✔
130
        self,
131
        sample_id: str,
132
        case_name: str,
133
        case_id: str,
134
        number_of_reads: float,
135
        account: str | None = None,
136
    ) -> int | None:
137
        """Downsample a sample."""
138
        LOG.info(f"Starting Downsampling for sample {sample_id}.")
1✔
139
        validate_sample_id(sample_id)
1✔
140
        downsample_data: DownsampleData = self.get_downsample_data(
1✔
141
            sample_id=sample_id,
142
            case_id=case_id,
143
            number_of_reads=number_of_reads,
144
            case_name=case_name,
145
        )
146
        if self.prepare_fastq_api.is_sample_decompression_needed(
1✔
147
            downsample_data.original_sample.internal_id
148
        ):
149
            self.prepare_fastq_api.compress_api.decompress_spring(
×
150
                downsample_data.original_sample.internal_id
151
            )
152
            return
×
153
        LOG.debug("No Decompression needed.")
1✔
154
        self.store_downsampled_sample_case(downsample_data=downsample_data)
1✔
155
        if not self.dry_run:
1✔
NEW
156
            downsample_data.create_down_sampling_working_directory()
×
157
        submitted_job: int = self.start_downsample_job(
1✔
158
            downsample_data=downsample_data, account=account
159
        )
160
        return submitted_job
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc