• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 14170299167

31 Mar 2025 11:17AM UTC coverage: 85.56%. First build
14170299167

Pull #4319

github

web-flow
Merge 3d188a84b into e6ca0d0d6
Pull Request #4319: Add validation for capture kit

13 of 14 new or added lines in 3 files covered. (92.86%)

26183 of 30602 relevant lines covered (85.56%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.57
/cg/services/orders/validation/rules/case_sample/utils.py
1
import re
1✔
2
from collections import Counter
1✔
3

4
from cg.constants.constants import StatusOptions
1✔
5
from cg.constants.sequencing import SeqLibraryPrepCategory
1✔
6
from cg.constants.subject import Sex
1✔
7
from cg.models.orders.sample_base import ContainerEnum, SexEnum
1✔
8
from cg.services.orders.validation.errors.case_errors import RepeatedCaseNameError
1✔
9
from cg.services.orders.validation.errors.case_sample_errors import (
1✔
10
    FatherNotInCaseError,
11
    InvalidConcentrationIfSkipRCError,
12
    InvalidFatherSexError,
13
    InvalidMotherSexError,
14
    MotherNotInCaseError,
15
    OccupiedWellError,
16
    SubjectIdSameAsCaseNameError,
17
)
18
from cg.services.orders.validation.models.case import Case
1✔
19
from cg.services.orders.validation.models.case_aliases import (
1✔
20
    CaseContainingRelatives,
21
    CaseWithSkipRC,
22
)
23
from cg.services.orders.validation.models.existing_sample import ExistingSample
1✔
24
from cg.services.orders.validation.models.order_with_cases import OrderWithCases
1✔
25
from cg.services.orders.validation.models.sample import Sample
1✔
26
from cg.services.orders.validation.models.sample_aliases import (
1✔
27
    HumanSample,
28
    SampleInCase,
29
    SampleWithRelatives,
30
)
31
from cg.services.orders.validation.order_types.balsamic.models.sample import BalsamicSample
1✔
32
from cg.services.orders.validation.order_types.balsamic_umi.models.sample import BalsamicUmiSample
1✔
33
from cg.services.orders.validation.rules.case.utils import is_sample_in_case
1✔
34
from cg.services.orders.validation.rules.utils import (
1✔
35
    get_concentration_interval,
36
    has_sample_invalid_concentration,
37
    is_in_container,
38
    is_sample_on_plate,
39
    is_volume_within_allowed_interval,
40
)
41
from cg.store.models import Application, Customer
1✔
42
from cg.store.models import Sample as DbSample
1✔
43
from cg.store.store import Store
1✔
44

45

46
def is_concentration_missing(sample: SampleWithRelatives) -> bool:
1✔
47
    return not sample.concentration_ng_ul
1✔
48

49

50
def is_well_position_missing(sample: SampleWithRelatives) -> bool:
1✔
51
    return sample.container == ContainerEnum.plate and not sample.well_position
1✔
52

53

54
def is_container_name_missing(sample: SampleWithRelatives) -> bool:
1✔
55
    return sample.container == ContainerEnum.plate and not sample.container_name
1✔
56

57

58
def get_invalid_panels(panels: list[str], store: Store) -> list[str]:
1✔
59
    invalid_panels: list[str] = [
1✔
60
        panel for panel in panels if not store.does_gene_panel_exist(panel)
61
    ]
62
    return invalid_panels
1✔
63

64

65
def is_volume_invalid(sample: Sample) -> bool:
1✔
66
    in_container: bool = is_in_container(sample.container)
×
67
    allowed_volume: bool = is_volume_within_allowed_interval(sample.volume)
×
68
    return in_container and not allowed_volume
×
69

70

71
def get_well_sample_map(
1✔
72
    order: OrderWithCases, **kwargs
73
) -> dict[tuple[str, str], list[tuple[int, int]]]:
74
    """
75
    Constructs a dict with keys being a (container_name, well_position) pair. For each such pair, the value will be
76
    a list of (case index, sample index) pairs corresponding to all samples with matching container_name and
77
    well_position, provided the sample is on a plate.
78
    """
79
    well_position_to_sample_map = {}
1✔
80
    for case_index, case in order.enumerated_new_cases:
1✔
81
        for sample_index, sample in case.enumerated_new_samples:
1✔
82
            if is_sample_on_plate(sample):
1✔
83
                key: tuple[str, str] = (sample.container_name, sample.well_position)
1✔
84
                value: tuple[int, int] = (case_index, sample_index)
1✔
85
                if not well_position_to_sample_map.get(key):
1✔
86
                    well_position_to_sample_map[key] = []
1✔
87
                well_position_to_sample_map[key].append(value)
1✔
88
    return well_position_to_sample_map
1✔
89

90

91
def get_occupied_well_errors(colliding_samples: list[tuple[int, int]]) -> list[OccupiedWellError]:
1✔
92
    errors: list[OccupiedWellError] = []
1✔
93
    for case_index, sample_index in colliding_samples:
1✔
94
        error = OccupiedWellError(case_index=case_index, sample_index=sample_index)
1✔
95
        errors.append(error)
1✔
96
    return errors
1✔
97

98

99
def get_indices_for_repeated_case_names(order: OrderWithCases) -> list[int]:
1✔
100
    counter = Counter([case.name for _, case in order.enumerated_new_cases])
1✔
101
    indices: list[int] = []
1✔
102

103
    for index, case in order.enumerated_new_cases:
1✔
104
        if counter.get(case.name) > 1:
1✔
105
            indices.append(index)
1✔
106

107
    return indices
1✔
108

109

110
def get_repeated_case_name_errors(order: OrderWithCases) -> list[RepeatedCaseNameError]:
1✔
111
    case_indices: list[int] = get_indices_for_repeated_case_names(order)
1✔
112
    return [RepeatedCaseNameError(case_index=case_index) for case_index in case_indices]
1✔
113

114

115
def get_father_sex_errors(
1✔
116
    case: CaseContainingRelatives, case_index: int, store: Store
117
) -> list[InvalidFatherSexError]:
118
    errors: list[InvalidFatherSexError] = []
1✔
119
    children: list[tuple[SampleWithRelatives, int]] = case.get_samples_with_father()
1✔
120
    for child, child_index in children:
1✔
121
        if is_father_sex_invalid(child=child, case=case, store=store):
1✔
122
            error: InvalidFatherSexError = create_father_sex_error(
1✔
123
                case_index=case_index, sample_index=child_index
124
            )
125
            errors.append(error)
1✔
126
    return errors
1✔
127

128

129
def is_father_sex_invalid(
1✔
130
    child: SampleWithRelatives, case: CaseContainingRelatives, store: Store
131
) -> bool:
132
    father: SampleWithRelatives | None = case.get_new_sample(child.father)
1✔
133
    if not father:
1✔
134
        father: DbSample | None = case.get_existing_sample_from_db(
1✔
135
            sample_name=child.father, store=store
136
        )
137
    return father and father.sex != Sex.MALE
1✔
138

139

140
def create_father_sex_error(case_index: int, sample_index: int) -> InvalidFatherSexError:
1✔
141
    return InvalidFatherSexError(case_index=case_index, sample_index=sample_index)
1✔
142

143

144
def get_father_case_errors(
1✔
145
    case: CaseContainingRelatives, case_index: int, store: Store
146
) -> list[FatherNotInCaseError]:
147
    errors: list[FatherNotInCaseError] = []
1✔
148
    children: list[tuple[SampleWithRelatives | ExistingSample, int]] = (
1✔
149
        case.get_samples_with_father()
150
    )
151
    for child, child_index in children:
1✔
152
        if not is_sample_in_case(case=case, sample_name=child.father, store=store):
1✔
153
            error: FatherNotInCaseError = create_father_case_error(
1✔
154
                case_index=case_index,
155
                sample_index=child_index,
156
            )
157
            errors.append(error)
1✔
158
    return errors
1✔
159

160

161
def get_mother_sex_errors(
1✔
162
    case: CaseContainingRelatives, case_index: int, store: Store
163
) -> list[InvalidMotherSexError]:
164
    errors: list[InvalidMotherSexError] = []
1✔
165
    children: list[tuple[SampleWithRelatives, int]] = case.get_samples_with_mother()
1✔
166
    for child, child_index in children:
1✔
167
        if is_mother_sex_invalid(child=child, case=case, store=store):
1✔
168
            error: InvalidMotherSexError = create_mother_sex_error(
1✔
169
                case_index=case_index,
170
                sample_index=child_index,
171
            )
172
            errors.append(error)
1✔
173
    return errors
1✔
174

175

176
def get_mother_case_errors(
1✔
177
    case: CaseContainingRelatives, case_index: int, store: Store
178
) -> list[MotherNotInCaseError]:
179
    errors: list[MotherNotInCaseError] = []
1✔
180
    children: list[tuple[SampleWithRelatives, int]] = case.get_samples_with_mother()
1✔
181
    for child, child_index in children:
1✔
182
        if not is_sample_in_case(case=case, sample_name=child.mother, store=store):
1✔
183
            error: MotherNotInCaseError = create_mother_case_error(
×
184
                case_index=case_index, sample_index=child_index
185
            )
186
            errors.append(error)
×
187
    return errors
1✔
188

189

190
def create_father_case_error(case_index: int, sample_index: int) -> FatherNotInCaseError:
1✔
191
    return FatherNotInCaseError(case_index=case_index, sample_index=sample_index)
1✔
192

193

194
def create_mother_case_error(case_index: int, sample_index: int) -> MotherNotInCaseError:
1✔
195
    return MotherNotInCaseError(case_index=case_index, sample_index=sample_index)
×
196

197

198
def is_mother_sex_invalid(
1✔
199
    child: SampleWithRelatives, case: CaseContainingRelatives, store: Store
200
) -> bool:
201
    mother: SampleWithRelatives | None = case.get_new_sample(child.mother)
1✔
202
    if not mother:
1✔
203
        mother: DbSample | None = case.get_existing_sample_from_db(
1✔
204
            sample_name=child.mother, store=store
205
        )
206
    return mother and mother.sex != Sex.FEMALE
1✔
207

208

209
def create_mother_sex_error(case_index: int, sample_index: int) -> InvalidMotherSexError:
1✔
210
    return InvalidMotherSexError(case_index=case_index, sample_index=sample_index)
1✔
211

212

213
def has_sex_and_subject(sample: HumanSample) -> bool:
1✔
214
    return bool(sample.subject_id and sample.sex != SexEnum.unknown)
1✔
215

216

217
def validate_subject_ids_in_case(
1✔
218
    case: CaseContainingRelatives, case_index: int
219
) -> list[SubjectIdSameAsCaseNameError]:
220
    errors: list[SubjectIdSameAsCaseNameError] = []
1✔
221
    for sample_index, sample in case.enumerated_new_samples:
1✔
222
        if sample.subject_id == case.name:
1✔
223
            error = SubjectIdSameAsCaseNameError(case_index=case_index, sample_index=sample_index)
1✔
224
            errors.append(error)
1✔
225
    return errors
1✔
226

227

228
def validate_concentration_in_case(
1✔
229
    case: CaseWithSkipRC, case_index: int, store: Store
230
) -> list[InvalidConcentrationIfSkipRCError]:
231
    errors: list[InvalidConcentrationIfSkipRCError] = []
1✔
232
    for sample_index, sample in case.enumerated_new_samples:
1✔
233
        if application := store.get_application_by_tag(sample.application):
1✔
234
            allowed_interval = get_concentration_interval(sample=sample, application=application)
1✔
235
            if has_sample_invalid_concentration(sample=sample, allowed_interval=allowed_interval):
1✔
236
                error: InvalidConcentrationIfSkipRCError = create_invalid_concentration_error(
1✔
237
                    case_index=case_index,
238
                    sample_index=sample_index,
239
                    allowed_interval=allowed_interval,
240
                )
241
                errors.append(error)
1✔
242
    return errors
1✔
243

244

245
def create_invalid_concentration_error(
1✔
246
    case_index: int, sample_index: int, allowed_interval: tuple[float, float]
247
) -> InvalidConcentrationIfSkipRCError:
248
    return InvalidConcentrationIfSkipRCError(
1✔
249
        case_index=case_index,
250
        sample_index=sample_index,
251
        allowed_interval=allowed_interval,
252
    )
253

254

255
def is_invalid_plate_well_format(sample: Sample) -> bool:
1✔
256
    """Check if a sample has an invalid well format."""
257
    correct_well_position_pattern: str = r"^[A-H]:([1-9]|1[0-2])$"
1✔
258
    if sample.is_on_plate:
1✔
259
        return not bool(re.match(correct_well_position_pattern, sample.well_position))
1✔
260
    return False
×
261

262

263
def is_sample_tube_name_reused(sample: Sample, counter: Counter) -> bool:
1✔
264
    """Check if a tube container name is reused across samples."""
265
    return sample.container == ContainerEnum.tube and counter.get(sample.container_name) > 1
1✔
266

267

268
def get_counter_container_names(order: OrderWithCases) -> Counter:
1✔
269
    counter = Counter(
1✔
270
        sample.container_name
271
        for case_index, case in order.enumerated_new_cases
272
        for sample_index, sample in case.enumerated_new_samples
273
    )
274
    return counter
1✔
275

276

277
def get_existing_sample_names(order: OrderWithCases, status_db: Store) -> set[str]:
1✔
278
    existing_sample_names: set[str] = set()
1✔
279
    for case in order.cases:
1✔
280
        if case.is_new:
1✔
281
            for sample_index, sample in case.enumerated_existing_samples:
1✔
282
                db_sample = status_db.get_sample_by_internal_id(sample.internal_id)
×
283
                existing_sample_names.add(db_sample.name)
×
284
        else:
285
            db_case = status_db.get_case_by_internal_id(case.internal_id)
×
286
            for sample in db_case.samples:
×
287
                existing_sample_names.add(sample.name)
×
288
    return existing_sample_names
1✔
289

290

291
def are_all_samples_unknown(case: Case) -> bool:
1✔
292
    """Check if all samples in a case are unknown."""
293
    return all(sample.status == StatusOptions.UNKNOWN for sample in case.samples)
1✔
294

295

296
def is_buffer_missing(sample: SampleInCase) -> bool:
1✔
297
    applications_requiring_buffer: tuple = ("PAN", "EX", "WGSWPF", "METWPF")
1✔
298
    return bool(
1✔
299
        sample.application.startswith(tuple(applications_requiring_buffer))
300
        and not sample.elution_buffer
301
    )
302

303

304
def is_sample_missing_capture_kit(sample: BalsamicSample | BalsamicUmiSample, store: Store) -> bool:
1✔
305
    """Returns whether a TGS sample has an application and is missing a capture kit."""
306
    application: Application | None = store.get_application_by_tag(sample.application)
1✔
307
    return (
1✔
308
        application
309
        and application.prep_category == SeqLibraryPrepCategory.TARGETED_GENOME_SEQUENCING
310
        and not sample.capture_kit
311
    )
312

313

314
def is_invalid_capture_kit(sample: BalsamicSample | BalsamicUmiSample, store: Store) -> bool:
1✔
315
    if not sample.capture_kit:
1✔
NEW
316
        return False
×
317

318
    valid_beds = [bed.name for bed in store.get_active_beds()]
1✔
319
    return not sample.capture_kit in valid_beds
1✔
320

321

322
def is_sample_not_from_collaboration(
1✔
323
    customer_id: str, sample: ExistingSample, store: Store
324
) -> bool:
325
    db_sample: DbSample | None = store.get_sample_by_internal_id(sample.internal_id)
1✔
326
    customer: Customer | None = store.get_customer_by_internal_id(customer_id)
1✔
327
    return db_sample and customer and db_sample.customer not in customer.collaborators
1✔
328

329

330
def get_existing_case_names(order: OrderWithCases, status_db: Store) -> set[str]:
1✔
331
    existing_case_names: set[str] = set()
1✔
332
    for _, case in order.enumerated_existing_cases:
1✔
333
        if db_case := status_db.get_case_by_internal_id(case.internal_id):
1✔
334
            existing_case_names.add(db_case.name)
1✔
335
    return existing_case_names
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc