• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Clinical-Genomics / cg / 9301536289

30 May 2024 11:09AM UTC coverage: 84.337%. First build
9301536289

Pull #3293

github

web-flow
Merge d928420c2 into bfb8146c5
Pull Request #3293: add(illumina device store flow)

168 of 169 new or added lines in 10 files covered. (99.41%)

18808 of 22301 relevant lines covered (84.34%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.65
/cg/store/crud/create.py
1
import logging
1✔
2
from datetime import datetime
1✔
3

4
import petname
1✔
5
from sqlalchemy import Insert
1✔
6
from sqlalchemy.orm import Session
1✔
7

8
from cg.constants import DataDelivery, FlowCellStatus, Priority, Workflow
1✔
9
from cg.constants.archiving import PDC_ARCHIVE_LOCATION
1✔
10
from cg.models.orders.order import OrderIn
1✔
11
from cg.services.illumina_services.illumina_metrics_service.models import (
1✔
12
    IlluminaFlowCellDTO,
13
    IlluminaSequencingRunDTO,
14
    IlluminaSampleSequencingMetricsDTO,
15
)
16
from cg.store.base import BaseHandler
1✔
17
from cg.store.database import get_session
1✔
18
from cg.store.models import (
1✔
19
    Analysis,
20
    Application,
21
    ApplicationLimitations,
22
    ApplicationVersion,
23
    Bed,
24
    BedVersion,
25
    Case,
26
    CaseSample,
27
    Collaboration,
28
    Customer,
29
    Flowcell,
30
    Invoice,
31
    Order,
32
    Organism,
33
    Panel,
34
    Pool,
35
    Sample,
36
    SampleLaneSequencingMetrics,
37
    User,
38
    order_case,
39
    IlluminaFlowCell,
40
    IlluminaSequencingRun,
41
    IlluminaSampleSequencingMetrics,
42
)
43

44
LOG = logging.getLogger(__name__)
1✔
45

46

47
class CreateHandler(BaseHandler):
1✔
48
    """Methods related to adding new data to the store."""
49

50
    def generate_readable_sample_id(self) -> str:
1✔
51
        """Generates a petname as sample internal id for development purposes. Not used in normal production flow."""
52
        while True:
1✔
53
            random_id: str = petname.Generate(3, separator="")
1✔
54
            if not self.get_sample_by_internal_id(random_id):
1✔
55
                return random_id
1✔
56

57
    def generate_readable_case_id(self) -> str:
1✔
58
        while True:
1✔
59
            random_id: str = petname.Generate(2, separator="", letters=10)
1✔
60
            if not self.get_case_by_internal_id(random_id):
1✔
61
                return random_id
1✔
62

63
    def add_customer(
1✔
64
        self,
65
        internal_id: str,
66
        name: str,
67
        invoice_address: str,
68
        invoice_reference: str,
69
        data_archive_location: str = PDC_ARCHIVE_LOCATION,
70
        scout_access: bool = False,
71
        is_clinical: bool = False,
72
        *args,
73
        **kwargs,
74
    ) -> Customer:
75
        """Build a new customer record."""
76

77
        return Customer(
1✔
78
            internal_id=internal_id,
79
            name=name,
80
            scout_access=scout_access,
81
            invoice_address=invoice_address,
82
            invoice_reference=invoice_reference,
83
            data_archive_location=data_archive_location,
84
            is_clinical=is_clinical,
85
            **kwargs,
86
        )
87

88
    def add_collaboration(self, internal_id: str, name: str, **kwargs) -> Collaboration:
1✔
89
        """Build a new customer group record."""
90

91
        return Collaboration(internal_id=internal_id, name=name, **kwargs)
1✔
92

93
    def add_user(self, customer: Customer, email: str, name: str, is_admin: bool = False) -> User:
1✔
94
        """Build a new user record."""
95

96
        new_user = User(name=name, email=email, is_admin=is_admin)
1✔
97
        new_user.customers.append(customer)
1✔
98
        return new_user
1✔
99

100
    def add_application(
1✔
101
        self,
102
        tag: str,
103
        prep_category: str,
104
        description: str,
105
        percent_kth: int,
106
        percent_reads_guaranteed: int,
107
        is_accredited: bool = False,
108
        min_sequencing_depth: int = 0,
109
        **kwargs,
110
    ) -> Application:
111
        """Build a new application record."""
112

113
        return Application(
1✔
114
            tag=tag,
115
            prep_category=prep_category,
116
            description=description,
117
            is_accredited=is_accredited,
118
            min_sequencing_depth=min_sequencing_depth,
119
            percent_kth=percent_kth,
120
            percent_reads_guaranteed=percent_reads_guaranteed,
121
            **kwargs,
122
        )
123

124
    def add_application_version(
1✔
125
        self,
126
        application: Application,
127
        version: int,
128
        valid_from: datetime,
129
        prices: dict,
130
        **kwargs,
131
    ) -> ApplicationVersion:
132
        """Build a new application version record."""
133

134
        new_record = ApplicationVersion(version=version, valid_from=valid_from, **kwargs)
1✔
135
        for price_key in [
1✔
136
            Priority.standard.name,
137
            Priority.priority.name,
138
            Priority.express.name,
139
            Priority.research.name,
140
        ]:
141
            setattr(new_record, f"price_{price_key}", prices[price_key])
1✔
142
        new_record.application = application
1✔
143
        return new_record
1✔
144

145
    @staticmethod
1✔
146
    def add_application_limitation(
1✔
147
        application: Application,
148
        workflow: str,
149
        limitations: str,
150
        comment: str = "Dummy comment",
151
        created_at: datetime = datetime.now(),
152
        updated_at: datetime = datetime.now(),
153
        **kwargs,
154
    ) -> ApplicationLimitations:
155
        """Build a new application limitations record."""
156
        return ApplicationLimitations(
1✔
157
            application=application,
158
            workflow=workflow,
159
            limitations=limitations,
160
            comment=comment,
161
            created_at=created_at,
162
            updated_at=updated_at,
163
            **kwargs,
164
        )
165

166
    def add_bed(self, name: str) -> Bed:
1✔
167
        """Build a new bed record."""
168
        return Bed(name=name)
1✔
169

170
    def add_bed_version(self, bed: Bed, version: int, filename: str, shortname: str) -> BedVersion:
1✔
171
        """Build a new bed version record."""
172
        bed_version: BedVersion = BedVersion(
1✔
173
            version=version, filename=filename, shortname=shortname
174
        )
175
        bed_version.bed = bed
1✔
176
        return bed_version
1✔
177

178
    def add_sample(
1✔
179
        self,
180
        name: str,
181
        sex: str,
182
        comment: str = None,
183
        control: str = None,
184
        downsampled_to: int = None,
185
        internal_id: str = None,
186
        last_sequenced_at: datetime = None,
187
        order: str = None,
188
        ordered: datetime = None,
189
        prepared_at: datetime = None,
190
        priority: Priority = None,
191
        received: datetime = None,
192
        original_ticket: str = None,
193
        tumour: bool = False,
194
        **kwargs,
195
    ) -> Sample:
196
        """Build a new Sample record."""
197

198
        internal_id = internal_id or self.generate_readable_sample_id()
1✔
199
        priority = priority or (Priority.research if downsampled_to else Priority.standard)
1✔
200
        return Sample(
1✔
201
            comment=comment,
202
            control=control,
203
            downsampled_to=downsampled_to,
204
            internal_id=internal_id,
205
            is_tumour=tumour,
206
            last_sequenced_at=last_sequenced_at,
207
            name=name,
208
            order=order,
209
            ordered_at=ordered or datetime.now(),
210
            original_ticket=original_ticket,
211
            prepared_at=prepared_at,
212
            priority=priority,
213
            received_at=received,
214
            sex=sex,
215
            **kwargs,
216
        )
217

218
    def add_case(
1✔
219
        self,
220
        data_analysis: Workflow,
221
        data_delivery: DataDelivery,
222
        name: str,
223
        ticket: str,
224
        panels: list[str] | None = None,
225
        cohorts: list[str] | None = None,
226
        priority: Priority | None = Priority.standard,
227
        synopsis: str | None = None,
228
        customer_id: int | None = None,
229
    ) -> Case:
230
        """Build a new Case record."""
231

232
        internal_id: str = self.generate_readable_case_id()
1✔
233
        return Case(
1✔
234
            cohorts=cohorts,
235
            data_analysis=str(data_analysis),
236
            data_delivery=str(data_delivery),
237
            internal_id=internal_id,
238
            name=name,
239
            panels=panels,
240
            priority=priority,
241
            synopsis=synopsis,
242
            tickets=ticket,
243
            customer_id=customer_id,
244
        )
245

246
    def relate_sample(
1✔
247
        self,
248
        case: Case,
249
        sample: Sample,
250
        status: str,
251
        mother: Sample = None,
252
        father: Sample = None,
253
    ) -> CaseSample:
254
        """Relate a sample record to a family record."""
255

256
        new_record: CaseSample = CaseSample(status=status)
1✔
257
        new_record.case = case
1✔
258
        new_record.sample = sample
1✔
259
        new_record.mother = mother
1✔
260
        new_record.father = father
1✔
261
        return new_record
1✔
262

263
    def add_flow_cell(
1✔
264
        self,
265
        flow_cell_name: str,
266
        sequencer_name: str,
267
        sequencer_type: str,
268
        date: datetime,
269
        flow_cell_status: str | None = FlowCellStatus.ON_DISK,
270
        has_backup: bool | None = False,
271
    ) -> Flowcell:
272
        """Build a new Flowcell record."""
273
        return Flowcell(
1✔
274
            name=flow_cell_name,
275
            sequencer_name=sequencer_name,
276
            sequencer_type=sequencer_type,
277
            sequenced_at=date,
278
            status=flow_cell_status,
279
            has_backup=has_backup,
280
        )
281

282
    def add_analysis(
1✔
283
        self,
284
        workflow: Workflow,
285
        version: str = None,
286
        completed_at: datetime = None,
287
        primary: bool = False,
288
        uploaded: datetime = None,
289
        started_at: datetime = None,
290
        **kwargs,
291
    ) -> Analysis:
292
        """Build a new Analysis record."""
293
        return Analysis(
1✔
294
            workflow=workflow,
295
            workflow_version=version,
296
            completed_at=completed_at,
297
            is_primary=primary,
298
            uploaded_at=uploaded,
299
            started_at=started_at,
300
            **kwargs,
301
        )
302

303
    def add_panel(
1✔
304
        self,
305
        customer: Customer,
306
        name: str,
307
        abbrev: str,
308
        version: float,
309
        date: datetime = None,
310
        genes: int = None,
311
    ) -> Panel:
312
        """Build a new panel record."""
313

314
        new_record = Panel(
1✔
315
            name=name, abbrev=abbrev, current_version=version, date=date, gene_count=genes
316
        )
317
        new_record.customer = customer
1✔
318
        return new_record
1✔
319

320
    def add_pool(
1✔
321
        self,
322
        customer: Customer,
323
        name: str,
324
        order: str,
325
        ordered: datetime,
326
        application_version: ApplicationVersion,
327
        ticket: str = None,
328
        comment: str = None,
329
        received_at: datetime = None,
330
        invoice_id: int = None,
331
        no_invoice: bool = None,
332
        delivered_at: datetime = None,
333
    ) -> Pool:
334
        """Build a new Pool record."""
335

336
        new_record: Pool = Pool(
1✔
337
            name=name,
338
            ordered_at=ordered or datetime.now(),
339
            order=order,
340
            ticket=ticket,
341
            received_at=received_at,
342
            comment=comment,
343
            delivered_at=delivered_at,
344
            invoice_id=invoice_id,
345
            no_invoice=no_invoice,
346
        )
347
        new_record.customer = customer
1✔
348
        new_record.application_version = application_version
1✔
349
        return new_record
1✔
350

351
    def add_invoice(
1✔
352
        self,
353
        customer: Customer,
354
        samples: list[Sample] = None,
355
        microbial_samples: list[Sample] = None,
356
        pools: list[Pool] = None,
357
        comment: str = None,
358
        discount: int = 0,
359
        record_type: str = None,
360
        invoiced_at: datetime | None = None,
361
    ):
362
        """Build a new Invoice record."""
363

364
        new_id = self.new_invoice_id()
1✔
365
        new_invoice: Invoice = Invoice(
1✔
366
            comment=comment,
367
            discount=discount,
368
            id=new_id,
369
            record_type=record_type,
370
            invoiced_at=invoiced_at,
371
        )
372
        new_invoice.customer = customer
1✔
373
        for sample in samples or []:
1✔
374
            new_invoice.samples.append(sample)
1✔
375
        for microbial_sample in microbial_samples or []:
1✔
376
            new_invoice.samples.append(microbial_sample)
×
377
        for pool in pools or []:
1✔
378
            new_invoice.pools.append(pool)
1✔
379
        return new_invoice
1✔
380

381
    def add_organism(
1✔
382
        self,
383
        internal_id: str,
384
        name: str,
385
        reference_genome: str = None,
386
        verified: bool = False,
387
        **kwargs,
388
    ) -> Organism:
389
        """Build a new Organism record."""
390
        return Organism(
1✔
391
            internal_id=internal_id,
392
            name=name,
393
            reference_genome=reference_genome,
394
            verified=verified,
395
            **kwargs,
396
        )
397

398
    def add_sample_lane_sequencing_metrics(
1✔
399
        self, flow_cell_name: str, sample_internal_id: str, **kwargs
400
    ) -> SampleLaneSequencingMetrics:
401
        """Add a new SampleLaneSequencingMetrics record."""
402
        return SampleLaneSequencingMetrics(
1✔
403
            flow_cell_name=flow_cell_name,
404
            sample_internal_id=sample_internal_id,
405
            **kwargs,
406
        )
407

408
    def add_order(self, order_data: OrderIn):
1✔
409
        customer: Customer = self.get_customer_by_internal_id(order_data.customer)
×
410
        workflow: str = order_data.samples[0].data_analysis
×
411
        order = Order(
×
412
            customer_id=customer.id,
413
            ticket_id=order_data.ticket,
414
            workflow=workflow,
415
        )
NEW
416
        session: Session = get_session()
×
417
        session.add(order)
×
418
        session.commit()
×
419
        return order
×
420

421
    @staticmethod
1✔
422
    def link_case_to_order(order_id: int, case_id: int):
1✔
423
        insert_statement: Insert = order_case.insert().values(order_id=order_id, case_id=case_id)
1✔
424
        session: Session = get_session()
1✔
425
        session.execute(insert_statement)
1✔
426
        session.commit()
1✔
427

428
    def add_illumina_flow_cell(self, flow_cell_dto: IlluminaFlowCellDTO) -> IlluminaFlowCell:
1✔
429
        """Add a new Illumina flow cell to the status database as a pending transaction."""
430
        if self.get_illumina_flow_cell_by_internal_id(flow_cell_dto.internal_id):
1✔
431
            raise ValueError(f"Flow cell with {flow_cell_dto.internal_id} already exists.")
1✔
432
        new_flow_cell = IlluminaFlowCell(
1✔
433
            internal_id=flow_cell_dto.internal_id,
434
            type=flow_cell_dto.type,
435
            model=flow_cell_dto.model,
436
        )
437
        self.session.add(new_flow_cell)
1✔
438
        LOG.debug(f"Flow cell added to status db: {new_flow_cell.id}.")
1✔
439
        return new_flow_cell
1✔
440

441
    def add_illumina_sequencing_run(
1✔
442
        self, sequencing_run_dto: IlluminaSequencingRunDTO, flow_cell: IlluminaFlowCell
443
    ) -> IlluminaSequencingRun:
444
        """Add a new Illumina flow cell to the status database as a pending transaction."""
445
        new_sequencing_run = IlluminaSequencingRun(
1✔
446
            type=sequencing_run_dto.type,
447
            device=flow_cell,
448
            sequencer_type=sequencing_run_dto.sequencer_type,
449
            sequencer_name=sequencing_run_dto.sequencer_name,
450
            data_availability=sequencing_run_dto.data_availability,
451
            archived_at=sequencing_run_dto.archived_at,
452
            has_backup=sequencing_run_dto.has_backup,
453
            total_reads=sequencing_run_dto.total_reads,
454
            total_undetermined_reads=sequencing_run_dto.total_undetermined_reads,
455
            percent_undetermined_reads=sequencing_run_dto.percent_undetermined_reads,
456
            percent_q30=sequencing_run_dto.percent_q30,
457
            mean_quality_score=sequencing_run_dto.mean_quality_score,
458
            total_yield=sequencing_run_dto.total_yield,
459
            yield_q30=sequencing_run_dto.yield_q30,
460
            cycles=sequencing_run_dto.cycles,
461
            demultiplexing_software=sequencing_run_dto.demultiplexing_software,
462
            demultiplexing_software_version=sequencing_run_dto.demultiplexing_software_version,
463
            sequencing_started_at=sequencing_run_dto.sequencing_started_at,
464
            sequencing_completed_at=sequencing_run_dto.sequencing_completed_at,
465
            demultiplexing_started_at=sequencing_run_dto.demultiplexing_started_at,
466
            demultiplexing_completed_at=sequencing_run_dto.demultiplexing_completed_at,
467
        )
468
        self.session.add(new_sequencing_run)
1✔
469
        LOG.debug(f"Sequencing run added to status db: {new_sequencing_run.id}.")
1✔
470
        return new_sequencing_run
1✔
471

472
    def add_illumina_sample_metrics(
1✔
473
        self,
474
        sample_metrics_dto: list[IlluminaSampleSequencingMetricsDTO],
475
        sequencing_run: IlluminaSequencingRun,
476
    ) -> list[IlluminaSampleSequencingMetrics]:
477
        """Add new IlluminaSampleSequencingMetrics as a pending transaction."""
478
        new_sample_metrics: list[IlluminaSampleSequencingMetrics] = []
1✔
479
        for sample_metric in sample_metrics_dto:
1✔
480
            sample: Sample = self.get_sample_by_internal_id(sample_metric.sample_id)
1✔
481
            new_sample_metric = IlluminaSampleSequencingMetrics(
1✔
482
                sample=sample,
483
                instrument_run_id=sequencing_run.id,
484
                instrument_run=sequencing_run,
485
                type=sample_metric.type,
486
                flow_cell_lane=sample_metric.flow_cell_lane,
487
                total_reads_in_lane=sample_metric.total_reads_in_lane,
488
                base_passing_q30_percent=sample_metric.base_passing_q30_percent,
489
                base_mean_quality_score=sample_metric.base_mean_quality_score,
490
                yield_=sample_metric.yield_,
491
                yield_q30=sample_metric.yield_q30,
492
                created_at=sample_metric.created_at,
493
            )
494
            new_sample_metrics.append(new_sample_metric)
1✔
495
        self.session.add_all(new_sample_metrics)
1✔
496
        LOG.debug("Sample metrics added to status db.")
1✔
497
        return new_sample_metrics
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc