• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 18218493111

03 Oct 2025 09:27AM UTC coverage: 86.132%. First build
18218493111

Pull #4544

github

web-flow
Merge 80fb4d965 into 789863e18
Pull Request #4544: (Pipeline integration) dev Refactor start MIP-DNA (#4538)

353 of 360 new or added lines in 32 files covered. (98.06%)

27838 of 32320 relevant lines covered (86.13%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

32.53
/cg/apps/lims/api.py
1
"""API to communicate with LIMS."""
2

3
import logging
1✔
4
from datetime import date, datetime
1✔
5
from typing import Any
1✔
6

7
from dateutil.parser import parse as parse_date
1✔
8
from genologics.entities import Artifact, Process, Researcher, Sample
1✔
9
from genologics.lims import Lims
1✔
10
from requests.exceptions import HTTPError
1✔
11

12
from cg.constants.constants import ControlOptions, CustomerId
1✔
13
from cg.constants.lims import (
1✔
14
    MASTER_STEPS_UDFS,
15
    PROP2UDF,
16
    DocumentationMethod,
17
    LimsArtifactTypes,
18
    LimsProcess,
19
)
20
from cg.constants.priority import Priority
1✔
21
from cg.exc import LimsDataError
1✔
22

23
from .order import OrderHandler
1✔
24

25
SEX_MAP = {"F": "female", "M": "male", "Unknown": "unknown", "unknown": "unknown"}
1✔
26
REV_SEX_MAP = {value: key for key, value in SEX_MAP.items()}
1✔
27
AM_METHODS = {
1✔
28
    "1464": "Automated TruSeq DNA PCR-free library preparation method",
29
    "1317": "HiSeq X Sequencing method at Clinical Genomics",
30
    "1383": "MIP analysis for Whole genome and Exome",
31
    "1717": "NxSeq® AmpFREE Low DNA Library Kit (Lucigen)",
32
    "1060": "Raw data delivery",
33
    "1036": "HiSeq 2500 Rapid Run sequencing",
34
    "1314": "Automated SureSelect XT Target Enrichment for Illumina sequencing",
35
    "1518": "200 ng input Manual SureSelect XT Target Enrichment",
36
    "1079": "Manuel SureSelect XT Target Enrichment for Illumina sequencing",
37
    "1879": "Method - Manual Twist Target Enrichment",
38
    "2182": "Manual SARS-CoV-2 library preparation using Illumina COVIDseq Test",
39
    "2175": "Manual SARS-CoV-2 library preparation using Illumina DNA Prep",
40
    "1830": "NovaSeq 6000 Sequencing method",
41
    "2234": "Method - Illumina Stranded mRNA Library Preparation",
42
}
43
METHOD_INDEX, METHOD_DOCUMENT_INDEX, METHOD_VERSION_INDEX, METHOD_TYPE_INDEX = 0, 1, 2, 3
1✔
44

45
LOG = logging.getLogger(__name__)
1✔
46

47

48
class LimsAPI(Lims, OrderHandler):
1✔
49
    """API to communicate with LIMS"""
50

51
    def __init__(self, config):
1✔
52
        lconf = config["lims"]
1✔
53
        super(LimsAPI, self).__init__(lconf["host"], lconf["username"], lconf["password"])
1✔
54

55
    @property
1✔
56
    def user(self) -> Researcher:
1✔
57
        return self.get_researchers(username=self.username)[0]
×
58

59
    def sample(self, lims_id: str) -> dict[str, Any]:
1✔
60
        """Return sample by ID from the LIMS database."""
61
        lims_sample = {}
×
62
        try:
×
63
            sample = Sample(self, id=lims_id)
×
64
            lims_sample: dict[str, Any] = self._export_sample(sample)
×
65
        except HTTPError as error:
×
66
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
×
67
        return lims_sample
×
68

69
    def samples_in_pools(self, pool_name, projectname):
1✔
70
        """Fetch all samples from a pool"""
71
        return self.get_samples(udf={"pool name": str(pool_name)}, projectname=projectname)
×
72

73
    def get_source(self, lims_id: str) -> str | None:
1✔
74
        """Return the source from LIMS for a given sample ID.
75
        Return 'None' if no source information is set or
76
        if sample is not found or cannot be fetched from LIMS."""
77
        lims_sample: dict[str, Any] = self.sample(lims_id=lims_id)
×
78
        return lims_sample.get("source")
×
79

80
    @staticmethod
1✔
81
    def _export_project(lims_project) -> dict:
1✔
82
        """Fetch relevant information from a lims project object"""
83
        return {
×
84
            "id": lims_project.id,
85
            "name": lims_project.name,
86
            "date": parse_date(lims_project.open_date) if lims_project.open_date else None,
87
        }
88

89
    def _export_sample(self, lims_sample):
1✔
90
        """Get data from a LIMS sample."""
91
        udfs = lims_sample.udf
×
92
        return {
×
93
            "id": lims_sample.id,
94
            "name": lims_sample.name,
95
            "project": self._export_project(lims_sample.project),
96
            "family": udfs.get("familyID"),
97
            "customer": udfs.get("customer"),
98
            "sex": SEX_MAP.get(udfs.get("Gender"), None),
99
            "father": udfs.get("fatherID"),
100
            "mother": udfs.get("motherID"),
101
            "source": udfs.get("Source"),
102
            "status": udfs.get("Status"),
103
            "panels": udfs.get("Gene List").split(";") if udfs.get("Gene List") else None,
104
            "priority": udfs.get("priority"),
105
            "received": self.get_received_date(lims_sample.id),
106
            "application": udfs.get("Sequencing Analysis"),
107
            "application_version": (
108
                int(udfs["Application Tag Version"])
109
                if udfs.get("Application Tag Version")
110
                else None
111
            ),
112
            "comment": udfs.get("comment"),
113
            "concentration_ng_ul": udfs.get("Concentration (ng/ul)"),
114
            "passed_initial_qc": udfs.get("Passed Initial QC"),
115
        }
116

117
    def get_received_date(self, lims_id: str) -> date:
1✔
118
        """Get the date when a sample was received."""
119
        sample = Sample(self, id=lims_id)
1✔
120
        date = None
1✔
121
        try:
1✔
122
            date = sample.udf.get("Received at")
1✔
123
        except HTTPError as error:
×
124
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
×
125
        return date
1✔
126

127
    def get_prepared_date(self, lims_id: str) -> date:
1✔
128
        """Get the date when a sample was prepared in the lab."""
129
        sample = Sample(self, id=lims_id)
1✔
130
        date = None
1✔
131
        try:
1✔
132
            date = sample.udf.get("Library Prep Finished")
1✔
133
        except HTTPError as error:
1✔
134
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
1✔
135
        return date
1✔
136

137
    def get_delivery_date(self, lims_id: str) -> date:
1✔
138
        """Get delivery date for a sample."""
139
        sample = Sample(self, id=lims_id)
1✔
140
        try:
1✔
141
            date = sample.udf.get("Delivered at")
1✔
142
        except HTTPError as error:
1✔
143
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
1✔
144
            date = None
1✔
145
        return date
1✔
146

147
    def capture_kit(self, lims_id: str) -> str | None:
1✔
148
        """Return the capture kit of a LIMS sample."""
149
        step_names_udfs = MASTER_STEPS_UDFS["capture_kit_step"]
×
150
        capture_kits = set()
×
151
        try:
×
152
            lims_sample = Sample(self, id=lims_id)
×
153
            capture_kit: str | None = lims_sample.udf.get("Bait Set")
×
154
            if capture_kit and capture_kit != "NA":
×
155
                return capture_kit
×
156
            for process_type in step_names_udfs:
×
157
                artifacts: list[Artifact] = self.get_artifacts(
×
158
                    samplelimsid=lims_id, process_type=process_type, type="Analyte"
159
                )
160
                udf_key = step_names_udfs[process_type]
×
161
                capture_kits: set[str] = capture_kits.union(
×
162
                    self._find_capture_kits(artifacts=artifacts, udf_key=udf_key)
163
                    or self._find_twist_capture_kits(artifacts=artifacts, udf_key=udf_key)
164
                )
165
            if len(capture_kits) > 1:
×
166
                message = f"Capture kit error: {lims_sample.id} | {capture_kits}"
×
167
                raise LimsDataError(message)
×
168
            if len(capture_kits) == 1:
×
169
                return capture_kits.pop()
×
170
        except HTTPError as error:
×
171
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
×
172
        return None
×
173

174
    def get_capture_kit_strict(self, lims_id: str) -> str:
1✔
NEW
175
        if capture_kit := self.capture_kit(lims_id):
×
NEW
176
            return capture_kit
×
NEW
177
        raise LimsDataError(f"No capture kit found for sample {lims_id}")
×
178

179
    def get_samples(self, *args, map_ids=False, **kwargs) -> dict[str, str] | list[Sample]:
1✔
180
        """Bypass to original method."""
181
        lims_samples = super(LimsAPI, self).get_samples(*args, **kwargs)
×
182
        if map_ids:
×
183
            return {lims_sample.name: lims_sample.id for lims_sample in lims_samples}
×
184
        return lims_samples
×
185

186
    def family(self, customer: str, family: str):
1✔
187
        """Fetch information about a family of samples."""
188
        filters = {"customer": customer, "familyID": family}
×
189
        lims_samples = self.get_samples(udf=filters)
×
190
        samples_data = [self._export_sample(lims_sample) for lims_sample in lims_samples]
×
191
        # get family level data
192
        family_data = {"family": family, "customer": customer, "samples": []}
×
193
        priorities = set()
×
194
        panels = set()
×
195

196
        for sample_data in samples_data:
×
197
            priorities.add(sample_data["priority"])
×
198
            if sample_data["panels"]:
×
199
                panels.update(sample_data["panels"])
×
200
            family_data["samples"].append(sample_data)
×
201

202
        if len(priorities) == 1:
×
203
            family_data["priority"] = priorities.pop()
×
204
        elif len(priorities) < 1:
×
205
            raise LimsDataError(f"unable to determine family priority: {priorities}")
×
206
        else:
207
            for prio in [
×
208
                Priority.express,
209
                Priority.priority,
210
                Priority.standard,
211
                Priority.clinical_trials,
212
            ]:
213
                if prio in priorities:
×
214
                    family_data["priority"] = prio.name
×
215
                    break
×
216

217
        family_data["panels"] = list(panels)
×
218
        return family_data
×
219

220
    def process(self, process_id: str) -> Process:
1✔
221
        """Get LIMS process."""
222
        return Process(self, id=process_id)
×
223

224
    def update_sample(
1✔
225
        self, lims_id: str, sex=None, target_reads: int = None, name: str = None, **kwargs
226
    ):
227
        """Update information about a sample."""
228
        lims_sample = Sample(self, id=lims_id)
×
229

230
        if sex:
×
231
            lims_gender = REV_SEX_MAP.get(sex)
×
232
            if lims_gender:
×
233
                lims_sample.udf[PROP2UDF["sex"]] = lims_gender
×
234
        if name:
×
235
            lims_sample.name = name
×
236
        if isinstance(target_reads, int):
×
237
            lims_sample.udf[PROP2UDF["target_reads"]] = target_reads
×
238

239
        for key, value in kwargs.items():
×
240
            if not PROP2UDF.get(key):
×
241
                raise LimsDataError(
×
242
                    f"Unknown how to set {key} in LIMS since it is not defined in {PROP2UDF}"
243
                )
244
            lims_sample.udf[PROP2UDF[key]] = value
×
245

246
        lims_sample.put()
×
247

248
    def get_sample_attribute(self, lims_id: str, key: str) -> str:
1✔
249
        """Get data from a sample."""
250

251
        sample = Sample(self, id=lims_id)
×
252
        if not PROP2UDF.get(key):
×
253
            raise LimsDataError(
×
254
                f"Unknown how to get {key} from LIMS since it is not defined in " f"{PROP2UDF}"
255
            )
256
        return sample.udf[PROP2UDF[key]]
×
257

258
    def get_prep_method(self, lims_id: str) -> str | None:
1✔
259
        """Return the library preparation method of a LIMS sample."""
260
        step_names_udfs: dict[str, dict] = MASTER_STEPS_UDFS["prep_method_step"]
×
261
        prep_method: str | None = self._get_methods(
×
262
            step_names_udfs=step_names_udfs, lims_id=lims_id
263
        )
264
        return prep_method
×
265

266
    def get_sequencing_method(self, lims_id: str) -> str | None:
1✔
267
        """Return the sequencing method of a LIMS sample."""
268
        step_names_udfs: dict[str, dict] = MASTER_STEPS_UDFS["sequencing_method_step"]
×
269
        sequencing_method: str | None = self._get_methods(
×
270
            step_names_udfs=step_names_udfs, lims_id=lims_id
271
        )
272
        return sequencing_method
×
273

274
    @staticmethod
1✔
275
    def _sort_by_date_run(sort_list: list):
1✔
276
        """
277
        Sort list of tuples by parent process attribute date_run in descending order.
278

279
        Parameters:
280
            sort_list (list): a list of tuples in the format (date_run, elem1, elem2, ...)
281

282
        Returns:
283
            sorted list of tuples
284
        """
285
        return sorted(sort_list, key=lambda sort_tuple: sort_tuple[0], reverse=True)
×
286

287
    def _get_methods(self, step_names_udfs: dict[str, dict], lims_id: str) -> str | None:
1✔
288
        """
289
        Return the method name, number, and version for a given list of step names for AM documents.
290
        Only the name and Atlas version are returned if Atlas documentation has been used instead.
291
        """
292
        methods = []
×
293
        try:
×
294
            for process_name in step_names_udfs:
×
295
                artifacts: list[Artifact] = self.get_artifacts(
×
296
                    process_type=process_name, samplelimsid=lims_id
297
                )
298
                if not artifacts:
×
299
                    continue
×
300
                processes: list[Process] = self.get_processes_from_artifacts(artifacts)
×
301
                for process in processes:
×
302
                    method_type: str = self.get_method_type(
×
303
                        process=process, method_udfs=step_names_udfs[process_name]
304
                    )
305
                    udf_key_method_doc, udf_key_version = self.get_method_udf_values(
×
306
                        method_udfs=step_names_udfs[process_name], method_type=method_type
307
                    )
308
                    methods.append(
×
309
                        (
310
                            process.date_run,
311
                            self.get_method_document(process, udf_key_method_doc),
312
                            self.get_method_version(process, udf_key_version),
313
                            method_type,
314
                        )
315
                    )
316

317
            sorted_methods = self._sort_by_date_run(methods)
×
318

319
            if sorted_methods:
×
320
                method = sorted_methods[METHOD_INDEX]
×
321

322
                if (
×
323
                    method[METHOD_TYPE_INDEX] == DocumentationMethod.AM
324
                    and method[METHOD_DOCUMENT_INDEX] is not None
325
                ):
326
                    method_name = AM_METHODS.get(method[METHOD_DOCUMENT_INDEX])
×
327
                    return (
×
328
                        f"{method[METHOD_DOCUMENT_INDEX]}:{method[METHOD_VERSION_INDEX]} - "
329
                        f"{method_name}"
330
                    )
331
                elif (
×
332
                    method[METHOD_TYPE_INDEX] == DocumentationMethod.ATLAS
333
                    and method[METHOD_DOCUMENT_INDEX] is not None
334
                ):
335
                    return f"{method[METHOD_DOCUMENT_INDEX]} ({method[METHOD_VERSION_INDEX]})"
×
336
        except HTTPError as error:
×
337
            LOG.warning(f"Sample {lims_id} not found in LIMS: {error}")
×
338
        return None
×
339

340
    @staticmethod
1✔
341
    def get_processes_from_artifacts(artifacts: list[Artifact]) -> list[Process]:
1✔
342
        """
343
        Get a list of parent processes from a set of given artifacts.
344
        """
345
        processes = []
×
346
        for artifact in artifacts:
×
347
            parent_process: Process = artifact.parent_process
×
348
            if parent_process not in processes:
×
349
                processes.append(parent_process)
×
350
        return processes
×
351

352
    @staticmethod
1✔
353
    def get_method_type(process: Process, method_udfs: dict) -> str:
1✔
354
        """
355
        Return which type of method documentation has been used, AM or Atlas.
356
        """
357
        if "atlas_version" in method_udfs and process.udf.get(method_udfs["atlas_version"]):
×
358
            return DocumentationMethod.ATLAS
×
359
        return DocumentationMethod.AM
×
360

361
    @staticmethod
1✔
362
    def get_method_udf_values(method_udfs: dict, method_type: str) -> tuple[str, str]:
1✔
363
        """
364
        Return UDF values for Method and Method version depending on which method type.
365
        """
366
        if method_type == DocumentationMethod.ATLAS:
×
367
            udf_key_method_doc = method_udfs["atlas_document"]
×
368
            udf_key_version = method_udfs["atlas_version"]
×
369
        else:
370
            udf_key_method_doc = method_udfs["method_number"]
×
371
            udf_key_version = method_udfs["method_version"]
×
372
        return udf_key_method_doc, udf_key_version
×
373

374
    @staticmethod
1✔
375
    def get_method_document(process: Process, udf_key_method_doc: str) -> str:
1✔
376
        """
377
        Return method number for artifact.
378
        """
379
        return process.udf.get(udf_key_method_doc)
×
380

381
    @staticmethod
1✔
382
    def get_method_version(process: Process, udf_key_version: str) -> str:
1✔
383
        """
384
        Return method version for artifact.
385
        """
386
        return process.udf.get(udf_key_version)
×
387

388
    @staticmethod
1✔
389
    def _find_capture_kits(artifacts, udf_key):
1✔
390
        """
391
        get capture kit from parent process for non-TWIST samples
392
        """
393
        return {
×
394
            artifact.parent_process.udf.get(udf_key)
395
            for artifact in artifacts
396
            if artifact.parent_process.udf.get(udf_key) is not None
397
        }
398

399
    @staticmethod
1✔
400
    def _find_twist_capture_kits(artifacts, udf_key):
1✔
401
        """
402
        get capture kit from parent process for TWIST samples
403
        """
404
        return {
×
405
            artifact.udf.get(udf_key)
406
            for artifact in artifacts
407
            if artifact.udf.get(udf_key) is not None
408
        }
409

410
    def get_sample_project(self, sample_id: str) -> str | None:
1✔
411
        """Return the LIMS ID of the sample associated project if sample exists in LIMS."""
412
        lims_sample: dict[str, Any] = self.sample(sample_id)
×
413
        project_id = None
×
414
        if lims_sample:
×
415
            project_id: str = lims_sample.get("project").get("id")
×
416
        return project_id
×
417

418
    def get_sample_rin(self, sample_id: str) -> float | None:
1✔
419
        """Return the sample RIN value."""
420
        rin: float | None = None
×
421
        try:
×
422
            sample_artifact: Artifact = Artifact(self, id=f"{sample_id}PA1")
×
423
            rin: float = sample_artifact.udf.get(PROP2UDF["rin"])
×
424
        except HTTPError as error:
×
425
            LOG.warning(f"Sample {sample_id} not found in LIMS: {error}")
×
426
        return rin
×
427

428
    def get_sample_dv200(self, sample_id: str) -> float | None:
1✔
429
        """Return the sample's percentage of RNA fragments greater than 200 nucleotides."""
430
        dv200: float | None = None
×
431
        try:
×
432
            sample_artifact: Artifact = Artifact(self, id=f"{sample_id}PA1")
×
433
            dv200: float = sample_artifact.udf.get(PROP2UDF["dv200"])
×
434
        except HTTPError as error:
×
435
            LOG.warning(f"Sample {sample_id} not found in LIMS: {error}")
×
436
        return dv200
×
437

438
    def has_sample_passed_initial_qc(self, sample_id: str) -> bool | None:
1✔
439
        """Return the outcome of the initial QC protocol of the given sample."""
440
        lims_sample: dict[str, Any] = self.sample(sample_id)
1✔
441
        initial_qc_udf: str | None = lims_sample.get("passed_initial_qc")
1✔
442
        initial_qc: bool | None = eval(initial_qc_udf) if initial_qc_udf else None
1✔
443
        return initial_qc
1✔
444

445
    def _get_rna_input_amounts(self, sample_id: str) -> list[tuple[datetime, float]]:
1✔
446
        """Return all prep input amounts used for an RNA sample in lims."""
447
        step_names_udfs: dict[str] = MASTER_STEPS_UDFS["rna_prep_step"]
×
448
        input_amounts: list[tuple[datetime, float]] = []
×
449
        try:
×
450
            for process_type in step_names_udfs:
×
451
                artifacts: list[Artifact] = self.get_artifacts(
×
452
                    samplelimsid=sample_id,
453
                    process_type=process_type,
454
                    type=LimsArtifactTypes.ANALYTE,
455
                )
456

457
                udf_key: str = step_names_udfs[process_type]
×
458
                for artifact in artifacts:
×
459
                    input_amounts.append(
×
460
                        (
461
                            artifact.parent_process.date_run,
462
                            artifact.udf.get(udf_key),
463
                        )
464
                    )
465
        except HTTPError as error:
×
466
            LOG.warning(f"Sample {sample_id} not found in LIMS: {error}")
×
467
        return input_amounts
×
468

469
    def _get_last_used_input_amount(
1✔
470
        self, input_amounts: list[tuple[datetime, float]]
471
    ) -> float | None:
472
        """Return the latest used input amount."""
473
        sorted_input_amounts: list[tuple[datetime, float]] = self._sort_by_date_run(input_amounts)
×
474
        if not sorted_input_amounts:
×
475
            return None
×
476
        return sorted_input_amounts[0][1]
×
477

478
    def get_latest_rna_input_amount(self, sample_id: str) -> float | None:
1✔
479
        """Return the input amount used in the latest preparation of an RNA sample."""
480
        input_amounts: list[tuple[datetime, float]] = self._get_rna_input_amounts(
×
481
            sample_id=sample_id
482
        )
483
        input_amount: float | None = self._get_last_used_input_amount(input_amounts=input_amounts)
×
484
        return input_amount
×
485

486
    def get_latest_artifact_for_sample(
1✔
487
        self,
488
        process_type: LimsProcess,
489
        sample_internal_id: str,
490
        artifact_type: LimsArtifactTypes | None = LimsArtifactTypes.ANALYTE,
491
    ) -> Artifact:
492
        """Return latest artifact for a given sample, process and artifact type."""
493

494
        artifacts: list[Artifact] = self.get_artifacts(
×
495
            process_type=process_type,
496
            type=artifact_type,
497
            samplelimsid=sample_internal_id,
498
        )
499

500
        if not artifacts:
×
501
            raise LimsDataError(
×
502
                f"No artifacts were found for process {process_type}, type {artifact_type} and sample {sample_internal_id}."
503
            )
504

505
        latest_artifact: Artifact = self._get_latest_artifact_from_list(artifact_list=artifacts)
×
506
        return latest_artifact
×
507

508
    def _get_latest_artifact_from_list(self, artifact_list: list[Artifact]) -> Artifact:
1✔
509
        """Returning the latest artifact in a list of artifacts."""
510
        artifacts = []
×
511
        for artifact in artifact_list:
×
512
            date = artifact.parent_process.date_run or datetime.today().strftime("%Y-%m-%d")
×
513
            artifacts.append((date, artifact.id, artifact))
×
514

515
        artifacts.sort()
×
516
        date, id, latest_artifact = artifacts[-1]
×
517
        return latest_artifact
×
518

519
    def get_internal_negative_control_id_from_sample_in_pool(
1✔
520
        self, sample_internal_id: str, pooling_step: LimsProcess
521
    ) -> str:
522
        """Retrieve from LIMS the sample ID for the internal negative control sample present in the same pool as the given sample."""
523
        artifact: Artifact = self.get_latest_artifact_for_sample(
×
524
            process_type=pooling_step,
525
            sample_internal_id=sample_internal_id,
526
        )
527
        negative_controls: list[Sample] = self._get_negative_controls_from_list(
×
528
            samples=artifact.samples
529
        )
530

531
        if not negative_controls:
×
532
            raise LimsDataError(
×
533
                f"No internal negative controls found in the pool of sample {sample_internal_id}."
534
            )
535

536
        if len(negative_controls) > 1:
×
537
            sample_ids = [sample.id for sample in negative_controls]
×
538
            raise LimsDataError(
×
539
                f"Multiple internal negative control samples found: {' '.join(sample_ids)}"
540
            )
541

542
        return negative_controls[0].id
×
543

544
    @staticmethod
1✔
545
    def _get_negative_controls_from_list(samples: list[Sample]) -> list[Sample]:
1✔
546
        """Filter and return a list of internal negative controls from a given sample list."""
547
        negative_controls = []
×
548
        for sample in samples:
×
549
            if (
×
550
                sample.udf.get("Control") == ControlOptions.NEGATIVE
551
                and sample.udf.get("customer") == CustomerId.CG_INTERNAL_CUSTOMER
552
            ):
553
                negative_controls.append(sample)
×
554
        return negative_controls
×
555

556
    def get_sample_region_and_lab_code(self, sample_id: str) -> str:
1✔
557
        """Return the region code and lab code for a sample formatted as a prefix string."""
558
        region_code: str = self.get_sample_attribute(lims_id=sample_id, key="region_code").split(
×
559
            " "
560
        )[0]
561
        lab_code: str = self.get_sample_attribute(lims_id=sample_id, key="lab_code").split(" ")[0]
×
562
        return f"{region_code}_{lab_code}_"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc