• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openhealthcare / elcid-rfh / 3073

pending completion
3073

Pull #731

travis-ci

web-flow
pending values are a lower priority to display in the tb tests summary than other tests
Pull Request #731: V0.12 lab test release

61 of 209 branches covered (29.19%)

57 of 57 new or added lines in 4 files covered. (100.0%)

2690 of 3483 relevant lines covered (77.23%)

1.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.58
/elcid/api.py
1
import datetime
1✔
2
import re
1✔
3
from django.conf import settings
1✔
4
from operator import itemgetter
1✔
5
from collections import defaultdict, OrderedDict
1✔
6
from django.conf import settings
1✔
7
from django.utils.text import slugify
1✔
8
from django.http import HttpResponseBadRequest
1✔
9
from intrahospital_api import loader
1✔
10
from rest_framework import viewsets, status
1✔
11
from opal.core.api import OPALRouter
1✔
12
from opal.core.api import (
1✔
13
    patient_from_pk, LoginRequiredViewset, SubrecordViewSet
14
)
15
from opal.core.views import json_response
1✔
16
from opal.core import serialization
1✔
17
from elcid import models as emodels
1✔
18
from elcid.utils import timing
1✔
19
from opal import models as omodels
1✔
20
from plugins.labtests import models as lab_test_models
1✔
21
import os
1✔
22
import json
1✔
23

24

25
_LAB_TEST_TAGS = {
1✔
26
    "BIOCHEMISTRY": [
27
        "BONE PROFILE",
28
        "UREA AND ELECTROLYTES",
29
        "LIVER PROFILE",
30
        "IMMUNOGLOBULINS",
31
        "C REACTIVE PROTEIN",
32
        "RENAL"
33

34
    ],
35
    "HAEMATOLOGY": [
36
        "FULL BLOOD COUNT", "HAEMATINICS", "HBA1C"
37
    ],
38
    "ENDOCRINOLOGY": [
39
        "CORTISOL AT 9AM", "THYROID FUNCTION TESTS"
40
    ],
41
    "URINE": [
42
        "OSMALITY", "PROTEIN ELECTROPHORESIS"
43
    ],
44
}
45

46
_ALWAYS_SHOW_AS_TABULAR = [
1✔
47
    "UREA AND ELECTROLYTES",
48
    "LIVER PROFILE",
49
    "IMMUNOGLOBULINS",
50
    "C REACTIVE PROTEIN",
51
    "RENAL",
52
    "RENAL PROFILE",
53
    "BONE PROFILE",
54
    "FULL BLOOD COUNT",
55
    "HAEMATINICS",
56
    "HBA1C",
57
    "THYROID FUNCTION TESTS",
58
    "ARTERIAL BLOOD GASES",
59
    "B12 AND FOLATE SCREEN",
60
    "CLOTTING SCREEN",
61
    "BICARBONATE",
62
    "CARDIAC PROFILE",
63
    "CHLORIDE",
64
    "CHOLESTEROL/TRIGLYCERIDES",
65
    "AFP",
66
    "25-OH VITAMIN D",
67
    "AMMONIA",
68
    "FLUID CA-125",
69
    "CARDIAC TROPONIN T",
70
    "BODYFLUID CALCIUM",
71
    "BODYFLUID GLUCOSE",
72
    "BODYFLUID POTASSIUM",
73
    "PDF PROTEIN",
74
    "TACROLIMUS",
75
    "FULL BLOOD COUNT"
76
]
77

78
LAB_TEST_TAGS = defaultdict(list)
1✔
79

80
from plugins.labtests.models import LabTest, Observation
1✔
81
from collections import defaultdict
1✔
82

83

84
def get_observations():
1✔
85
    result = defaultdict(lambda: defaultdict(int))
×
86
    result_count = defaultdict(int)
×
87
    observations = Observation.objects.filter(test__test_name="BLOOD CULTURE")
×
88
    for observation in observations:
×
89
        result[observation.observation_name][observation.observation_value] += 1
×
90
        result_count[observation.observation_name] += 1
×
91
    return result, result_count
×
92

93

94
def print_obs(i):
1✔
95
    print("{} {} {} {} {}".format(i.observation_number, i.observation_name, i.last_updated, i.observation_datetime, i.observation_value))
×
96

97
for tag, test_names in _LAB_TEST_TAGS.items():
1✔
98
    for test_name in test_names:
1✔
99
        LAB_TEST_TAGS[test_name].append(tag)
1✔
100

101

102
def generate_time_series(observations):
1✔
103
    """
104
        take in a bunch of observations and return a list
105
        of numeric observation values
106
        If we can't pass the observation to a float, we skip it.
107
    """
108
    timeseries = []
1✔
109
    for observation in observations:
1✔
110
        obs_result = get_observation_value(observation)
1✔
111

112
        if obs_result:
1✔
113
            timeseries.append(obs_result)
1✔
114

115
    return timeseries
1✔
116

117

118
def extract_observation_value(observation_value):
1✔
119
    """
120
        if an observation is numeric, return it as a float
121
        if its >12 return >12
122
        else return None
123
    """
124
    regex = r'^[-0-9][0-9.]*$'
1✔
125
    obs_result = observation_value.strip()
1✔
126
    obs_result = obs_result.split("~")[0].strip("<").strip(">").strip()
1✔
127
    if re.match(regex, obs_result):
1✔
128
        return round(float(obs_result), 3)
1✔
129

130

131
def get_observation_value(observation):
1✔
132
    return extract_observation_value(observation["observation_value"])
1✔
133

134

135
def clean_ref_range(ref_range):
1✔
136
    return ref_range.replace("]", "").replace("[", "").strip()
1✔
137

138

139
def to_date_str(some_date):
1✔
140
    if some_date:
1✔
141
        return some_date[:10]
1✔
142

143

144
def datetime_to_str(dt):
1✔
145
    return dt.strftime(
1✔
146
        settings.DATETIME_INPUT_FORMATS[0]
147
    )
148

149

150
def observations_by_date(observations):
1✔
151
    """ takes in a bunch of observations, assumes that
152
        they are like they are in the model, that
153
    """
154
    by_date_str = {}
1✔
155

156
    date_keys = [o['observation_datetime'] for o in observations]
1✔
157
    date_keys = sorted(date_keys)
1✔
158
    date_keys.reverse()
1✔
159
    date_keys = [to_date_str(d) for d in date_keys]
1✔
160

161
    for observation in observations:
1✔
162
        by_date_str[
1✔
163
            to_date_str(observation['observation_datetime'])
164
        ] = observation
165

166
    return [by_date_str[date_key] for date_key in date_keys]
1✔
167

168

169
def get_reference_range(observation_reference_range):
1✔
170
    ref_range = clean_ref_range(observation_reference_range)
1✔
171
    if not len(ref_range.replace("-", "").strip()):
1✔
172
        return None
1✔
173
    range_min_max = ref_range.split("-")
1✔
174
    if len(range_min_max) > 2:
1✔
175
        return None
1✔
176
    return {"min": range_min_max[0].strip(), "max": range_min_max[1].strip()}
1✔
177

178

179
AEROBIC = "aerobic"
1✔
180
ANAEROBIC = "anaerobic"
1✔
181

182

183
class LabTestResultsView(LoginRequiredViewset):
1✔
184
    """ The Api view of the the results view in the patient detail
185
        We want to show everything grouped by test, then observation, then
186
        date.
187
    """
188
    base_name = 'lab_test_results_view'
1✔
189

190
    def aggregate_observations_by_lab_test(self, lab_tests):
1✔
191
        by_test = defaultdict(list)
×
192
        # lab tests are sorted by lab test type
193
        # this is either the lab test type if its a lab test we've
194
        # defined or its what is in the profile description
195
        # if its an HL7 jobby
196
        for lab_test in lab_tests:
×
197
            as_dict = lab_test.dict_for_view(None)
×
198
            observations = as_dict.get("observations", [])
×
199
            lab_test_type = as_dict["extras"].get("test_name")
×
200
            if lab_test_type == "FULL BLOOD COUNT" and observations:
×
201
                print("id {} name {} result {}".format(
×
202
                    lab_test.id,
203
                    observations[0]["observation_name"],
204
                    observations[0]["observation_value"]
205
                ))
206

207
            for observation in observations:
×
208
                obs_result = extract_observation_value(observation["observation_value"])
×
209
                if obs_result:
×
210
                    observation["observation_value"] = obs_result
×
211

212
                observation["reference_range"] = observation["reference_range"].replace("]", "").replace("[", "")
×
213

214
                if not len(observation["reference_range"].replace("-", "").strip()):
×
215
                    observation["reference_range"] = None
×
216
                else:
217
                    range_min_max = observation["reference_range"].split("-")
×
218
                    if not range_min_max[0].strip():
×
219
                        observation["reference_range"] = None
×
220
                    else:
221
                        if not len(range_min_max) == 2:
×
222
                            observation["reference_range"] = None
×
223
                            # raise ValueError("unable to properly judge the range")
224
                        else:
225
                            observation["reference_range"] = dict(
×
226
                                min=float(range_min_max[0].strip()),
227
                                max=float(range_min_max[1].strip())
228
                            )
229

230
                observation["datetime_ordered"] = lab_test.datetime_ordered
×
231
                by_test[lab_test_type].append(observation)
×
232

233
        return by_test
×
234

235
    def is_empty_value(self, observation_value):
1✔
236
        """ we don't care about empty strings or
237
            ' - ' or ' # '
238
        """
239
        if isinstance(observation_value, str):
×
240
            return not observation_value.strip().strip("-").strip("#").strip()
×
241
        else:
242
            return observation_value is None
×
243

244
    @patient_from_pk
1✔
245
    def retrieve(self, request, patient):
246

247
        # so what I want to return is all observations to lab test desplay
248
        # name with the lab test properties on the observation
249

250
        a_year_ago = datetime.date.today() - datetime.timedelta(365)
×
251
        lab_tests = patient.lab_tests.all()
×
252
        lab_tests = lab_tests.filter(datetime_ordered__gte=a_year_ago)
×
253
        lab_tests = [l for l in lab_tests if l.extras]
×
254
        by_test = self.aggregate_observations_by_lab_test(lab_tests)
×
255
        serialised_tests = []
×
256

257
        # within the lab test observations should be sorted by test name
258
        # and within the if we have a date range we want to be exposing
259
        # them as part of a time series, ie adding in blanks if they
260
        # aren't populated
261
        for lab_test_type, observations in by_test.items():
×
262
            if lab_test_type.strip().upper() in set([
×
263
                'UNPROCESSED SAMPLE COMT', 'COMMENT', 'SAMPLE COMMENT'
264
            ]):
265
                continue
×
266

267
            observations = sorted(observations, key=lambda x: x["datetime_ordered"])
×
268
            observations = sorted(observations, key=lambda x: x["observation_name"])
×
269

270

271
            # observation_time_series = defaultdict(list)
272
            by_observations = defaultdict(list)
×
273
            timeseries = {}
×
274

275
            observation_metadata = {}
×
276
            observation_date_range = {
×
277
                to_date_str(observation["observation_datetime"]) for observation in observations
278
            }
279
            observation_date_range = sorted(
×
280
                list(observation_date_range),
281
                key=lambda x: serialization.deserialize_date(x)
282
            )
283
            observation_date_range.reverse()
×
284
            long_form = False
×
285

286
            for observation in observations:
×
287
                test_name = observation["observation_name"]
×
288

289
                if test_name.strip() == "Haem Lab Comment":
×
290
                    # skip these for the time being, we may not even
291
                    # have to bring them in
292
                    continue
×
293

294
                if test_name.strip() == "Sample Comment":
×
295
                    continue
×
296

297
                if lab_test_type in _ALWAYS_SHOW_AS_TABULAR:
×
298
                    pass
×
299
                else:
300
                    if isinstance(observation["observation_value"], str):
×
301
                        if extract_observation_value(observation["observation_value"].strip(">").strip("<")) is None:
×
302
                            long_form = True
×
303

304
                if test_name not in by_observations:
×
305
                    obs_for_test_name = {
×
306
                        to_date_str(i["observation_datetime"]): i for i in observations if i["observation_name"] == test_name
307
                    }
308
                    # if its all None's for a certain observation name lets skip it
309
                    # ie if WBC is all None, lets not waste the users' screen space
310
                    # sometimes they just have '-' so lets skip these too
311
                    if not all(i for i in obs_for_test_name.values() if self.is_empty_value(i)):
×
312
                        continue
×
313
                    by_observations[test_name] = obs_for_test_name
×
314

315
                if test_name not in observation_metadata:
×
316
                    observation_metadata[test_name] = dict(
×
317
                        units=observation["units"],
318
                        reference_range=observation["reference_range"],
319
                        api_name=slugify(observation["observation_name"])
320
                    )
321

322
            serialised_lab_test = dict(
×
323
                long_form=long_form,
324
                timeseries=timeseries,
325
                api_name=slugify(lab_test_type),
326
                observation_metadata=observation_metadata,
327
                lab_test_type=lab_test_type,
328
                observations=observations,
329
                observation_date_range=observation_date_range,
330
                # observation_time_series=observation_time_series,
331
                by_observations=by_observations,
332
                observation_names=sorted(by_observations.keys()),
333
                tags=LAB_TEST_TAGS.get(lab_test_type, [])
334
            )
335
            serialised_tests.append(serialised_lab_test)
×
336

337
            serialised_tests = sorted(
×
338
                serialised_tests, key=itemgetter("lab_test_type")
339
            )
340

341
            # ordered by most recent observations first please
342
            serialised_tests = sorted(
×
343
                serialised_tests, key=lambda t: -serialization.deserialize_date(
344
                    t["observation_date_range"][0]
345
                ).toordinal()
346
            )
347

348
        all_tags = list(_LAB_TEST_TAGS.keys())
×
349

350
        return json_response(
×
351
            dict(
352
                tags=all_tags,
353
                tests=serialised_tests
354
            )
355
        )
356

357

358
class InfectionServiceTestSummaryApi(LoginRequiredViewset):
1✔
359
    base_name = 'infection_service_summary_api'
1✔
360
    RELEVANT_TESTS = OrderedDict((
1✔
361
        ("FULL BLOOD COUNT", ["WBC", "Lymphocytes", "Neutrophils"],),
362
        ("CLOTTING SCREEN", ["INR"],),
363
        ("C REACTIVE PROTEIN", ["C Reactive Protein"]),
364
        ("LIVER PROFILE", ["ALT", "AST", "Alkaline Phosphatase"]),
365
    ),)
366

367
    NUM_RESULTS = 5
1✔
368

369
    def get_recent_dates_to_observations(self, qs):
1✔
370
        """
371
        We are looking for the last 5 dates
372
        where we have any observation that
373
        can be cast to a number.
374

375
        If a patient has multiple observations
376
        on the same date take the most recent
377
        that can be cast into a number
378
        """
379
        date_to_observation = {}
1✔
380
        # this should never be the case, but
381
        # we don't create the data so cater for it
382
        qs = qs.exclude(observation_datetime=None)
1✔
383
        for i in qs.order_by("-observation_datetime"):
1✔
384
            if i.value_numeric is not None:
1✔
385
                dt = i.observation_datetime
1✔
386
                obs_date = dt.date()
1✔
387
                if obs_date in date_to_observation:
1✔
388
                    if date_to_observation[obs_date].observation_datetime < dt:
1✔
389
                        date_to_observation[obs_date] = i
×
390
                else:
391
                    date_to_observation[obs_date] = i
1✔
392
            if len(date_to_observation.keys()) == self.NUM_RESULTS:
1✔
393
                break
1✔
394

395
        return date_to_observation
1✔
396

397
    def get_obs_queryset(self, patient, lab_test_name, observation_name):
1✔
398
        return lab_test_models.Observation.objects.filter(
1✔
399
            test__patient=patient
400
        ).filter(
401
            test__test_name=lab_test_name
402
        ).filter(
403
            observation_name=observation_name
404
        )
405

406
    def serialize_observations(self, observations):
1✔
407
        latest_results = {
1✔
408
            serialization.serialize_date(i.observation_datetime.date()): i.value_numeric
409
            for i in observations
410
        }
411
        return dict(
1✔
412
            name=observations[0].observation_name,
413
            units=observations[0].units,
414
            reference_range=get_reference_range(
415
                observations[0].reference_range
416
            ),
417
            latest_results=latest_results
418
        )
419

420
    def serialise_lab_tests(self, patient):
1✔
421
        obs = []
1✔
422
        date_set = set()
1✔
423

424
        for test_name, observation_names in self.RELEVANT_TESTS.items():
1✔
425
            for obs_name in observation_names:
1✔
426
                qs = self.get_obs_queryset(patient, test_name, obs_name)
1✔
427
                if qs:
1✔
428
                    date_to_obs = self.get_recent_dates_to_observations(qs)
1✔
429
                    if date_to_obs:
1✔
430
                        date_set.update(date_to_obs.keys())
1✔
431
                        obs.append(list(date_to_obs.values()))
1✔
432

433
        all_dates = list(date_set)
1✔
434
        all_dates.sort()
1✔
435
        recent_dates = all_dates[-self.NUM_RESULTS:]
1✔
436

437
        # flush out the recent dates with nulls if
438
        # the patient does not have a lot of results
439
        while len(recent_dates) < self.NUM_RESULTS:
1✔
440
            recent_dates.append(None)
1✔
441
        obs_values = []
1✔
442

443
        for obs_set in obs:
1✔
444
            obs_values.append(self.serialize_observations(obs_set))
1✔
445
        return dict(
1✔
446
            obs_values=obs_values,
447
            recent_dates=recent_dates
448
        )
449

450
    @patient_from_pk
1✔
451
    def retrieve(self, request, patient):
452
        result = self.serialise_lab_tests(patient)
1✔
453
        return json_response(result)
1✔
454

455

456
class UpstreamBloodCultureApi(viewsets.ViewSet):
1✔
457
    """
458
        for every upstream blood culture, return them grouped by
459

460
        datetimes_ordered as a date,
461
        lab_number
462
        observation name
463
    """
464
    base_name = "upstream_blood_culture_results"
1✔
465

466
    def no_growth_observations(self, observations):
1✔
467
        """
468
            We are looking for observations that looks like the below.
469
            Its not necessarily 5 days, sometimes its e.g. 48 hours.
470
            Otherwise they always look like the below.
471

472
            Aerobic bottle culture: No growth after 5 days of incubation
473
            Anaerobic bottle culture: No growth after 5 days of incubation
474

475
            The are always of the type [Anaerobic/Aerobic] bottle culture
476
        """
477
        obs_names = ["Aerobic bottle culture", "Anaerobic bottle culture"]
1✔
478

479
        bottles = [
1✔
480
            o for o in observations if o["observation_name"] in obs_names
481
        ]
482

483
        return len(bottles) == 2
1✔
484

485
    @patient_from_pk
1✔
486
    def retrieve(self, request, patient):
487
        """
488
            returns any observations with Aerobic or Anaerobic in their name
489
        """
490
        lab_tests = patient.labtest_set.filter(
×
491
            lab_test_type=emodels.UpstreamBloodCulture.get_display_name()
492
        ).order_by("external_identifier").order_by("-datetime_ordered")
493
        lab_tests = [i.dict_for_view(request.user) for i in lab_tests]
×
494
        for lab_test in lab_tests:
×
495
            observations = []
×
496
            lab_test["no_growth"] = self.no_growth_observations(
×
497
                lab_test["observations"]
498
            )
499

500
            for observation in lab_test["observations"]:
×
501
                ob_name = observation["observation_name"].lower()
×
502

503
                if "aerobic" in ob_name:
×
504
                    observations.append(observation)
×
505

506
            lab_test["observations"] = sorted(
×
507
                observations, key=lambda x: x["observation_name"]
508
            )
509
            if lab_test["extras"]["clinical_info"]:
×
510
                lab_test["extras"]["clinical_info"] = "{}{}".format(
×
511
                    lab_test["extras"]["clinical_info"][0].upper(),
512
                    lab_test["extras"]["clinical_info"][1:]
513
                )
514

515
        return json_response(dict(
×
516
            lab_tests=lab_tests
517
        ))
518

519

520
class BloodCultureResultApi(viewsets.ViewSet):
1✔
521
    base_name = 'blood_culture_results'
1✔
522

523
    BLOOD_CULTURES = [
1✔
524
        emodels.GramStain.get_display_name(),
525
        emodels.QuickFISH.get_display_name(),
526
        emodels.GPCStaph.get_display_name(),
527
        emodels.GPCStrep.get_display_name(),
528
        emodels.GNR.get_display_name(),
529
        emodels.BloodCultureOrganism.get_display_name()
530
    ]
531

532
    def sort_by_date_ordered_and_lab_number(self, some_keys):
1✔
533
        """ takes in a tuple of (date, lab number)
534
            both date or lab number will be "" if empty
535

536
            it sorts them by most recent and lowest lab number
537
        """
538
        def comparator(some_key):
1✔
539
            dt = some_key[0]
1✔
540
            if not dt:
1✔
541
                dt = datetime.date.max
1✔
542
            return (dt, some_key[1],)
1✔
543
        return sorted(some_keys, key=comparator, reverse=True)
1✔
544

545
    def sort_by_lab_test_order(self, some_results):
1✔
546
        """
547
            results should be sorted by the order of the blood culture
548
            list
549
        """
550
        return sorted(
1✔
551
            some_results, key=lambda x: self.BLOOD_CULTURES.index(
552
                x["lab_test_type"]
553
            )
554
        )
555

556
    def translate_date_to_string(self, some_date):
1✔
557
        if not some_date:
1✔
558
            return ""
1✔
559

560
        dt = datetime.datetime(
1✔
561
            some_date.year, some_date.month, some_date.day
562
        )
563
        return dt.strftime(
1✔
564
            settings.DATE_INPUT_FORMATS[0]
565
        )
566

567
    @patient_from_pk
1✔
568
    def retrieve(self, request, patient):
569
        lab_tests = patient.labtest_set.filter(
1✔
570
            lab_test_type__in=self.BLOOD_CULTURES
571
        )
572
        lab_tests = lab_tests.order_by("datetime_ordered")
1✔
573
        cultures = defaultdict(lambda: defaultdict(dict))
1✔
574
        culture_order = set()
1✔
575

576
        for lab_test in lab_tests:
1✔
577
            lab_number = lab_test.extras.get("lab_number", "")
1✔
578
            # if lab number is None or "", group it together
579
            if not lab_number:
1✔
580
                lab_number = ""
1✔
581
            if lab_test.datetime_ordered:
1✔
582
                date_ordered = self.translate_date_to_string(
1✔
583
                    lab_test.datetime_ordered.date()
584
                )
585

586
                culture_order.add((
1✔
587
                    lab_test.datetime_ordered.date(), lab_number,
588
                ))
589
            else:
590
                date_ordered = ""
×
591
                culture_order.add(("", lab_number,))
×
592

593
            if lab_number not in cultures[date_ordered]:
1✔
594
                cultures[date_ordered][lab_number][AEROBIC] = defaultdict(list)
1✔
595
                cultures[date_ordered][lab_number][ANAEROBIC] = defaultdict(
1✔
596
                    list
597
                )
598

599
            isolate = lab_test.extras["isolate"]
1✔
600

601
            if lab_test.extras[AEROBIC]:
1✔
602
                cultures[date_ordered][lab_number][AEROBIC][isolate].append(
×
603
                    lab_test.to_dict(self.request.user)
604
                )
605
            else:
606
                cultures[date_ordered][lab_number][ANAEROBIC][isolate].append(
1✔
607
                    lab_test.to_dict(request.user)
608
                )
609

610
        culture_order = self.sort_by_date_ordered_and_lab_number(culture_order)
1✔
611

612
        for dt, lab_number in culture_order:
1✔
613
            dt_string = self.translate_date_to_string(dt)
1✔
614
            by_date_lab_number = cultures[dt_string][lab_number]
1✔
615
            for robic in [AEROBIC, ANAEROBIC]:
1✔
616
                for isolate in by_date_lab_number[robic].keys():
1✔
617
                    by_date_lab_number[robic][isolate] = self.sort_by_lab_test_order(
1✔
618
                        by_date_lab_number[robic][isolate]
619
                    )
620

621
        return json_response(dict(
1✔
622
            cultures=cultures,
623
            culture_order=culture_order
624
        ))
625

626

627
class DemographicsSearch(LoginRequiredViewset):
1✔
628
    base_name = 'demographics_search'
1✔
629
    PATIENT_FOUND_IN_ELCID = "patient_found_in_elcid"
1✔
630
    PATIENT_FOUND_UPSTREAM = "patient_found_upstream"
1✔
631
    PATIENT_NOT_FOUND = "patient_not_found"
1✔
632

633
    def list(self, request, *args, **kwargs):
1✔
634
        hospital_number = request.query_params.get("hospital_number")
1✔
635
        if not hospital_number:
1✔
636
            return HttpResponseBadRequest("Please pass in a hospital number")
1✔
637
        demographics = emodels.Demographics.objects.filter(
1✔
638
            hospital_number=hospital_number
639
        ).last()
640

641
        # the patient is in elcid
642
        if demographics:
1✔
643
            return json_response(dict(
1✔
644
                patient=demographics.patient.to_dict(request.user),
645
                status=self.PATIENT_FOUND_IN_ELCID
646
            ))
647
        else:
648
            if settings.USE_UPSTREAM_DEMOGRAPHICS:
1✔
649
                demographics = loader.load_demographics(hospital_number)
1✔
650

651
                if demographics:
1✔
652
                    return json_response(dict(
1✔
653
                        patient=dict(demographics=[demographics]),
654
                        status=self.PATIENT_FOUND_UPSTREAM
655
                    ))
656
        return json_response(dict(status=self.PATIENT_NOT_FOUND))
1✔
657

658

659
class BloodCultureIsolateApi(SubrecordViewSet):
1✔
660
    model = emodels.BloodCultureIsolate
1✔
661
    base_name = emodels.BloodCultureIsolate.get_api_name()
1✔
662

663
    def create(self, request):
1✔
664
        bc = self.model()
1✔
665
        bc.update_from_dict(request.data, request.user)
1✔
666
        return json_response(
1✔
667
            bc.to_dict(request.user),
668
            status_code=status.HTTP_201_CREATED
669
        )
670

671

672
elcid_router = OPALRouter()
1✔
673
elcid_router.register(BloodCultureResultApi.base_name, BloodCultureResultApi)
1✔
674
elcid_router.register(
1✔
675
    UpstreamBloodCultureApi.base_name, UpstreamBloodCultureApi
676
)
677
elcid_router.register(DemographicsSearch.base_name, DemographicsSearch)
1✔
678
elcid_router.register(BloodCultureIsolateApi.base_name, BloodCultureIsolateApi)
1✔
679

680
lab_test_router = OPALRouter()
1✔
681
lab_test_router.register(
1✔
682
    'infection_service_summary_api', InfectionServiceTestSummaryApi
683
)
684
lab_test_router.register('lab_test_results_view', LabTestResultsView)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc