• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 11290323218

11 Oct 2024 09:58AM UTC coverage: 84.504%. First build
11290323218

Pull #3839

github

web-flow
Merge c6917f55c into 560003d27
Pull Request #3839: fix(post processing when samples missing)

1 of 3 new or added lines in 2 files covered. (33.33%)

22861 of 27053 relevant lines covered (84.5%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.93
/cg/services/illumina/post_processing/post_processing_service.py
1
"""Module that holds the illumina post-processing service."""
2

3
import logging
1✔
4
from pathlib import Path
1✔
5

6
from cg.apps.housekeeper.hk import HousekeeperAPI
1✔
7
from cg.constants import SequencingFileTag
1✔
8
from cg.constants.devices import DeviceType
1✔
9
from cg.exc import FlowCellError, MissingFilesError
1✔
10
from cg.models.run_devices.illumina_run_directory_data import IlluminaRunDirectoryData
1✔
11
from cg.services.illumina.data_transfer.data_transfer_service import (
1✔
12
    IlluminaDataTransferService,
13
)
14
from cg.services.illumina.data_transfer.models import (
1✔
15
    IlluminaFlowCellDTO,
16
    IlluminaSampleSequencingMetricsDTO,
17
    IlluminaSequencingRunDTO,
18
)
19
from cg.services.illumina.post_processing.housekeeper_storage import (
1✔
20
    add_demux_logs_to_housekeeper,
21
    add_run_parameters_file_to_housekeeper,
22
    add_sample_fastq_files_to_housekeeper,
23
    delete_sequencing_data_from_housekeeper,
24
    store_undetermined_fastq_files,
25
)
26
from cg.services.illumina.post_processing.utils import (
1✔
27
    combine_sample_metrics_with_undetermined,
28
    create_delivery_file_in_flow_cell_directory,
29
)
30
from cg.services.illumina.post_processing.validation import (
1✔
31
    is_flow_cell_ready_for_postprocessing,
32
)
33
from cg.store.exc import EntryNotFoundError
1✔
34
from cg.store.models import IlluminaFlowCell, IlluminaSequencingRun
1✔
35
from cg.store.store import Store
1✔
36
from cg.utils.files import get_directories_in_path
1✔
37

38
LOG = logging.getLogger(__name__)
1✔
39

40

41
class IlluminaPostProcessingService:
1✔
42
    def __init__(
1✔
43
        self,
44
        status_db: Store,
45
        housekeeper_api: HousekeeperAPI,
46
        demultiplexed_runs_dir: Path,
47
        dry_run: bool,
48
    ) -> None:
49
        self.status_db: Store = status_db
1✔
50
        self.hk_api: HousekeeperAPI = housekeeper_api
1✔
51
        self.demultiplexed_runs_dir = demultiplexed_runs_dir
1✔
52
        self.dry_run: bool = dry_run
1✔
53

54
    def store_illumina_flow_cell(
1✔
55
        self,
56
        run_directory_data: IlluminaRunDirectoryData,
57
    ) -> IlluminaFlowCell:
58
        """
59
        Create Illumina flow cell from the parsed and validated run directory data
60
        and add the run samples to the model.
61
        """
62
        model: str | None = run_directory_data.run_parameters.get_flow_cell_model()
1✔
63
        flow_cell_dto = IlluminaFlowCellDTO(
1✔
64
            internal_id=run_directory_data.id, type=DeviceType.ILLUMINA, model=model
65
        )
66
        return self.status_db.add_illumina_flow_cell(flow_cell_dto)
1✔
67

68
    def store_illumina_sequencing_run(
1✔
69
        self,
70
        run_directory_data: IlluminaRunDirectoryData,
71
        flow_cell: IlluminaFlowCell,
72
    ) -> IlluminaSequencingRun:
73
        """Store Illumina run metrics in the status database."""
74
        metrics_service = IlluminaDataTransferService()
1✔
75
        sequencing_run_dto: IlluminaSequencingRunDTO = (
1✔
76
            metrics_service.create_illumina_sequencing_dto(run_directory_data)
77
        )
78
        return self.status_db.add_illumina_sequencing_run(
1✔
79
            sequencing_run_dto=sequencing_run_dto, flow_cell=flow_cell
80
        )
81

82
    def store_illumina_sample_sequencing_metrics(
1✔
83
        self,
84
        run_directory_data: IlluminaRunDirectoryData,
85
        sequencing_run: IlluminaSequencingRun,
86
    ) -> list[IlluminaSampleSequencingMetricsDTO]:
87
        """Store Illumina sample sequencing metrics in the status database."""
88
        metrics_service = IlluminaDataTransferService()
1✔
89
        sample_metrics: list[IlluminaSampleSequencingMetricsDTO] = (
1✔
90
            metrics_service.create_sample_sequencing_metrics_dto_for_flow_cell(
91
                flow_cell_directory=run_directory_data.get_demultiplexed_runs_dir(),
92
            )
93
        )
94
        undetermined_metrics: list[IlluminaSampleSequencingMetricsDTO] = (
1✔
95
            metrics_service.create_sample_run_dto_for_undetermined_reads(run_directory_data)
96
        )
97
        combined_metrics: list[IlluminaSampleSequencingMetricsDTO] = (
1✔
98
            combine_sample_metrics_with_undetermined(
99
                sample_metrics=sample_metrics,
100
                undetermined_metrics=undetermined_metrics,
101
            )
102
        )
103
        for sample_metric in combined_metrics:
1✔
104
            self.status_db.add_illumina_sample_metrics_entry(
1✔
105
                metrics_dto=sample_metric, sequencing_run=sequencing_run
106
            )
107
        return combined_metrics
1✔
108

109
    def store_sequencing_data_in_status_db(
1✔
110
        self, run_directory_data: IlluminaRunDirectoryData
111
    ) -> None:
112
        """Store all Illumina sequencing data in the status database."""
113
        LOG.info(f"Add sequencing and demux data to StatusDB for run {run_directory_data.id}")
1✔
114
        flow_cell: IlluminaFlowCell = self.store_illumina_flow_cell(
1✔
115
            run_directory_data=run_directory_data
116
        )
117
        sequencing_run: IlluminaSequencingRun = self.store_illumina_sequencing_run(
1✔
118
            run_directory_data=run_directory_data, flow_cell=flow_cell
119
        )
120
        sample_metrics: list[IlluminaSampleSequencingMetricsDTO] = (
1✔
121
            self.store_illumina_sample_sequencing_metrics(
122
                run_directory_data=run_directory_data, sequencing_run=sequencing_run
123
            )
124
        )
125
        self.update_samples_for_metrics(
1✔
126
            sample_metrics=sample_metrics, sequencing_run=sequencing_run
127
        )
128
        self.status_db.commit_to_store()
1✔
129

130
    def update_samples_for_metrics(
1✔
131
        self,
132
        sample_metrics: list[IlluminaSampleSequencingMetricsDTO],
133
        sequencing_run: IlluminaSequencingRun,
134
    ) -> None:
135
        unique_samples_on_run: list[str] = self.get_unique_samples_from_run(sample_metrics)
1✔
136
        for sample_id in unique_samples_on_run:
1✔
137
            self.status_db.update_sample_reads_illumina(
1✔
138
                internal_id=sample_id, sequencer_type=sequencing_run.sequencer_type
139
            )
140
            self.status_db.update_sample_sequenced_at(
1✔
141
                sample_id, date=sequencing_run.sequencing_completed_at
142
            )
143

144
    @staticmethod
1✔
145
    def get_unique_samples_from_run(
1✔
146
        sample_metrics: list[IlluminaSampleSequencingMetricsDTO],
147
    ) -> list[str]:
148
        """Get unique samples from the run."""
149
        return list({sample_metric.sample_id for sample_metric in sample_metrics})
1✔
150

151
    def store_sequencing_data_in_housekeeper(
1✔
152
        self,
153
        run_directory_data: IlluminaRunDirectoryData,
154
        store: Store,
155
    ) -> None:
156
        """Store fastq files, demux logs and run parameters for sequencing run in Housekeeper."""
157
        LOG.info(f"Add sequencing and demux data to Housekeeper for run {run_directory_data.id}")
×
158

159
        self.hk_api.add_bundle_and_version_if_non_existent(run_directory_data.id)
×
160
        tags: list[str] = [
×
161
            SequencingFileTag.FASTQ,
162
            SequencingFileTag.RUN_PARAMETERS,
163
            run_directory_data.id,
164
        ]
165
        self.hk_api.add_tags_if_non_existent(tags)
×
166
        add_sample_fastq_files_to_housekeeper(
×
167
            run_directory_data=run_directory_data, hk_api=self.hk_api, store=store
168
        )
169
        store_undetermined_fastq_files(
×
170
            run_directory_data=run_directory_data, hk_api=self.hk_api, store=store
171
        )
172
        add_demux_logs_to_housekeeper(
×
173
            run_directory_data=run_directory_data,
174
            hk_api=self.hk_api,
175
        )
176
        add_run_parameters_file_to_housekeeper(
×
177
            run_directory_data=run_directory_data,
178
            hk_api=self.hk_api,
179
        )
180

181
    def post_process_illumina_flow_cell(
1✔
182
        self,
183
        sequencing_run_name: str,
184
    ) -> None:
185
        """Store data for an Illumina demultiplexed run and mark it as ready for delivery.
186
        This function:
187
            - Stores the run data in the status database
188
            - Stores sequencing metrics in the status database
189
            - Updates sample read counts in the status database
190
            - Stores the run data in the Housekeeper database
191
            - Creates a delivery file in the sequencing run directory
192
        Raises:
193
            FlowCellError: If the flow cell directory or the data it contains is not valid.
194
        """
195

196
        LOG.info(f"Post-process Illumina run {sequencing_run_name}")
1✔
197
        demux_run_dir = Path(self.demultiplexed_runs_dir, sequencing_run_name)
1✔
198
        run_directory_data = IlluminaRunDirectoryData(demux_run_dir)
1✔
199
        sample_sheet_path: Path = self.hk_api.get_sample_sheet_path(run_directory_data.id)
1✔
200
        run_directory_data.set_sample_sheet_path_hk(hk_path=sample_sheet_path)
1✔
201
        sequencing_run: IlluminaSequencingRun | None = None
1✔
202
        has_backup: bool = False
1✔
203

204
        LOG.debug("Set path for Housekeeper sample sheet in run directory")
1✔
205
        try:
1✔
206
            is_flow_cell_ready_for_postprocessing(
1✔
207
                flow_cell_output_directory=demux_run_dir,
208
                flow_cell=run_directory_data,
209
            )
210
        except (FlowCellError, MissingFilesError) as e:
1✔
211
            LOG.warning(f"Run {sequencing_run_name} will be skipped: {e}")
1✔
212
            return
1✔
213
        if self.dry_run:
1✔
214
            LOG.info(f"Dry run: will not post-process Illumina run {sequencing_run_name}")
1✔
215
            return
1✔
216
        try:
×
217
            sequencing_run: IlluminaSequencingRun = (
×
218
                self.status_db.get_illumina_sequencing_run_by_device_internal_id(
219
                    run_directory_data.id
220
                )
221
            )
222
            has_backup: bool = sequencing_run.has_backup
×
223
        except EntryNotFoundError as error:
×
224
            LOG.info(f"Run {sequencing_run_name} not found in StatusDB: {str(error)}")
×
225
        self.delete_sequencing_run_data(flow_cell_id=run_directory_data.id)
×
226
        try:
×
227
            self.store_sequencing_data_in_status_db(run_directory_data)
×
228
            self.store_sequencing_data_in_housekeeper(
×
229
                run_directory_data=run_directory_data,
230
                store=self.status_db,
231
            )
232
        except Exception as e:
×
233
            LOG.error(f"Failed to store Illumina run: {str(e)}")
×
NEW
234
            raise
×
235
        if sequencing_run:
×
236
            self.status_db.update_illumina_sequencing_run_has_backup(
×
237
                sequencing_run=sequencing_run, has_backup=has_backup
238
            )
239

240
        create_delivery_file_in_flow_cell_directory(demux_run_dir)
×
241

242
    def get_all_demultiplexed_runs(self) -> list[Path]:
1✔
243
        """Get all demultiplexed Illumina runs."""
244
        return get_directories_in_path(self.demultiplexed_runs_dir)
1✔
245

246
    def post_process_all_runs(self) -> bool:
1✔
247
        """Post process all demultiplex illumina runs that need it."""
248
        demux_dirs = self.get_all_demultiplexed_runs()
1✔
249
        is_error_raised: bool = False
1✔
250
        for demux_dir in demux_dirs:
1✔
251
            try:
1✔
252
                self.post_process_illumina_flow_cell(demux_dir.name)
1✔
253
            except Exception as error:
×
254
                LOG.error(
×
255
                    f"Failed to post process demultiplexed Illumina run {demux_dir.name}: {str(error)}"
256
                )
257
                is_error_raised = True
×
258
                continue
×
259
        return is_error_raised
1✔
260

261
    def delete_sequencing_run_data(self, flow_cell_id: str):
1✔
262
        """Delete sequencing run entries from Housekeeper and StatusDB."""
263
        try:
×
264
            self.status_db.delete_illumina_flow_cell(flow_cell_id)
×
265
        except EntryNotFoundError:
×
266
            LOG.warning(f"Flow cell {flow_cell_id} not found in StatusDB.")
×
267
        delete_sequencing_data_from_housekeeper(flow_cell_id=flow_cell_id, hk_api=self.hk_api)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc