• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

angelolab / mibi-bin-tools / 6714940389

01 Nov 2023 02:30AM UTC coverage: 98.496% (-1.2%) from 99.725%
6714940389

Pull #35

github

web-flow
Merge 365b9424f into b8782bf63
Pull Request #35: Adds total spectra extraction

221 of 226 branches covered (0.0%)

Branch coverage included in aggregate %.

19 of 19 new or added lines in 1 file covered. (100.0%)

172 of 173 relevant lines covered (99.42%)

7.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.29
/src/mibi_bin_tools/bin_files.py
1
from typing import Any, Dict, List, Tuple, Union
8✔
2
import os
8✔
3
import json
8✔
4

5
import numpy as np
8✔
6
import pandas as pd
8✔
7
import skimage.io as io
8✔
8
import xarray as xr
8✔
9

10
from mibi_bin_tools import type_utils, _extract_bin
8✔
11
from alpineer import io_utils, image_utils
8✔
12

13

14
def _mass2tof(masses_arr: np.ndarray, mass_offset: float, mass_gain: float,
8✔
15
              time_res: float) -> np.ndarray:
16
    """Convert array of m/z values to equivalent time of flight values
17

18
    Args:
19
        masses_arr (array_like):
20
            Array of m/z values
21
        mass_offset (float):
22
            Mass offset for parabolic transformation
23
        mass_gain (float):
24
            Mass gain for parabolic transformation
25
        time_res (float):
26
            Time resolution for scaling parabolic transformation
27

28
    Returns:
29
        array_like:
30
            Array of time of flight values; indicies paried to `masses_arr`
31
    """
32
    return (mass_gain * np.sqrt(masses_arr) + mass_offset) / time_res
8✔
33

34

35
def _set_tof_ranges(fov: Dict[str, Any], higher: np.ndarray, lower: np.ndarray,
8✔
36
                    time_res: float) -> None:
37
    """Converts and stores provided mass ranges as time of flight ranges within fov metadata
38

39
    Args:
40
        fov (Dict[str, Any]):
41
            Metadata for the fov.
42
        higher (array_like):
43
            Array of m/z values; upper bounds for integration
44
        lower (array_like):
45
            Array of m/z values; lower bounds for integration
46
        time_res (float):
47
            Time resolution for scaling parabolic transformation
48

49
    Returns:
50
        None:
51
            Fovs argument is modified in place
52
    """
53
    key_names = ('upper_tof_range', 'lower_tof_range')
8✔
54
    mass_ranges = (higher, lower)
8✔
55

56
    for key, masses in zip(key_names, mass_ranges):
8✔
57
        # truncate the conversion to ensure consistency
58
        fov[key] = _mass2tof(
8✔
59
            masses, fov['mass_offset'], fov['mass_gain'], time_res
60
        ).astype(np.uint16)
61

62

63
def _write_out(img_data: np.ndarray, out_dir: str, fov_name: str, targets: List[str],
8✔
64
               intensities: Union[bool, List[str]] = False) -> None:
65
    """Parses extracted data and writes out tifs
66

67
    Args:
68
        img_data (np.ndarray):
69
            Array containing the pulse counts, intensity, and intensity * width images
70
        out_dir (str | PathLike):
71
            Directory to save tifs
72
        fov_name (str):
73
            Name of the field of view
74
        targets (array_like):
75
            List of target names (i.e channels)
76
        intensities (bool | List):
77
            Whether or not to write out intensity images.  If a List, specific
78
            peaks can be written out, ignoring the rest, which will only have pulse count images.
79
    """
80
    out_dirs = [
8✔
81
        os.path.join(out_dir, fov_name),
82
        os.path.join(out_dir, fov_name, 'intensities'),
83
    ]
84
    suffixes = [
8✔
85
        '',
86
        '_intensity',
87
    ]
88
    save_dtypes = [
8✔
89
        np.uint32,
90
        np.uint32,
91
    ]
92

93
    for i, (out_dir_i, suffix, save_dtype) in enumerate(zip(out_dirs, suffixes, save_dtypes)):
8✔
94
        # break loop when index is larger than type dimension of img_data
95
        if i+1 > img_data.shape[0]:
8✔
96
            break
8✔
97
        if not os.path.exists(out_dir_i):
8✔
98
            os.makedirs(out_dir_i)
8✔
99
        for j, target in enumerate(targets):
8!
100
            # save all first images regardless of replacing
101
            # if not replace (i=1), only save intensity images for specified targets
102
            if i == 0 or (target in list(intensities)):
8✔
103
                fname = os.path.join(out_dir_i, f"{target}{suffix}.tiff")
8✔
104
                image_utils.save_image(fname=fname, data=img_data[i, :, :, j].astype(save_dtype))
8✔
105

106

107
def _find_bin_files(data_dir: str,
8✔
108
                    include_fovs: Union[List[str], None] = None) -> Dict[str, Dict[str, str]]:
109
    """Locates paired bin/json files within the provided directory.
110

111
    Args:
112
        data_dir (str | PathLike):
113
            Directory containing bin/json files
114
        include_fovs (List | None):
115
            List of fovs to include. Includes all if None.
116

117
    Returns:
118
        Dict[str, Dict[str, str]]:
119
            Dictionary containing the names of the valid bin files
120
    """
121
    bin_files = io_utils.list_files(data_dir, substrs=['.bin'])
8✔
122
    json_files = io_utils.list_files(data_dir, substrs=['.json'])
8✔
123

124
    fov_names = io_utils.extract_delimited_names(bin_files, delimiter='.')
8✔
125

126
    fov_files = {
8✔
127
        fov_name: {
128
            'bin': fov_name + '.bin',
129
            'json': fov_name + '.json',
130
        }
131
        for fov_name in fov_names
132
        if fov_name + '.json' in json_files
133
    }
134

135
    if include_fovs is not None:
8✔
136
        fov_files = {
8✔
137
            fov_file: fov_files[fov_file]
138
            for fov_file in include_fovs
139
            if fov_file in fov_files
140
        }
141

142
    if not len(fov_files):
8✔
143
        raise FileNotFoundError(f'No viable bin files were found in {data_dir}...')
8✔
144

145
    return fov_files
8✔
146

147

148
def _fill_fov_metadata(data_dir: str, fov: Dict[str, Any],
8✔
149
                       panel: Union[Tuple[float, float], pd.DataFrame],
150
                       intensities: Union[bool, List[str]], time_res: float,
151
                       channels: List[str] = None) -> None:
152
    """ Parses user input and mibiscope json to build extraction parameters
153

154
    Fills fov metadata with mass calibration parameters, builds panel, and sets intensity
155
    extraction flags.
156

157
    Args:
158
        data_dir (str):
159
            Directory containing bin files as well as accompanying json metadata files
160
        fov (Dict[str, Any]):
161
            Metadata for the fov.
162
        panel (tuple | pd.DataFrame):
163
            If a tuple, global integration range over all antibodies within json metadata.
164
            If a pd.DataFrame, specific peaks with custom integration ranges.  Column names must be
165
            'Mass' and 'Target' with integration ranges specified via 'Start' and 'Stop' columns.
166
        intensities (bool | List[str]):
167
            Whether or not to extract intensity and intensity * width images.  If a List, specific
168
            peaks can be extracted, ignoring the rest, which will only have pulse count images
169
            extracted.
170
        time_res (float):
171
            Time resolution for scaling parabolic transformation
172
        channels (List[str] | None):
173
            Filters panel for given channels.  All channels in panel extracted if None
174
    Returns:
175
        None:
176
            `fov` argument is modified in place
177
    """
178

179
    with open(os.path.join(data_dir, fov['json']), 'rb') as f:
8✔
180
        data = json.load(f)
8✔
181

182
    fov['mass_gain'] = data['fov']['fullTiming']['massCalibration']['massGain']
8✔
183
    fov['mass_offset'] = data['fov']['fullTiming']['massCalibration']['massOffset']
8✔
184

185
    if type(panel) is tuple:
8✔
186
        _parse_global_panel(data, fov, panel, time_res, channels)
8✔
187
    else:
188
        _parse_df_panel(fov, panel, time_res, channels)
8✔
189

190
    _parse_intensities(fov, intensities)
8✔
191

192

193
def _parse_global_panel(json_metadata: dict, fov: Dict[str, Any], panel: Tuple[float, float],
8✔
194
                        time_res: float, channels: List[str]) -> None:
195
    """Extracts panel contained in mibiscope json metadata
196

197
    Args:
198
        json_metadata (dict):
199
            metadata read via mibiscope json
200
        fov (Dict[str, Any]):
201
            Metadata for the fov.
202
        panel (tuple):
203
            Global integration range over all antibodies within json metadata.
204
            Column names must 'Mass' and 'Target' with integration ranges specified via 'Start' and
205
            'Stop' columns.
206
        time_res (float):
207
            Time resolution for scaling parabolic transformation
208
        channels (List[str] | None):
209
            Filters panel for given channels.  All channels in panel extracted if None
210
    Returns:
211
        None:
212
            `fov` argument is modified in place
213
    """
214
    if json_metadata['fov'].get('panel', None) is None:
8✔
215
        raise KeyError(
8✔
216
            f"'panel' field not found in {fov['json']}. "
217
            + "If this is a moly point, you must manually supply a panel..."
218
        )
219
    rows = json_metadata['fov']['panel']['conjugates']
8✔
220
    fov['masses'], fov['targets'] = zip(*[
8✔
221
        (el['mass'], el['target'])
222
        for el in rows
223
        if channels is None or el['target'] in channels
224
    ])
225

226
    masses_arr = np.array(fov['masses'])
8✔
227
    _set_tof_ranges(fov, masses_arr + panel[1], masses_arr + panel[0], time_res)
8✔
228

229

230
def _parse_df_panel(fov: Dict[str, Any], panel: pd.DataFrame, time_res: float,
8✔
231
                    channels: List[str]) -> None:
232
    """Converts masses from panel into times for fov extraction-metadata structure
233

234
    Args:
235
        fov (Dict[str, Any]):
236
            Metadata for the fov.
237
        panel (pd.DataFrame):
238
            Specific peaks with custom integration ranges.  Column names must be 'Mass' and
239
            'Target' with integration ranges specified via 'Start' and 'Stop' columns.
240
        time_res (float):
241
            Time resolution for scaling parabolic transformation
242
        channels (List[str] | None):
243
            Filters panel for given channels.  All channels in panel extracted if None
244
    Returns:
245
        None:
246
            `fov` argument is modified in place
247
    """
248
    rows = panel.loc[panel['Target'].isin(panel['Target'] if channels is None else channels)]
8✔
249
    fov['masses'] = rows['Mass']
8✔
250
    fov['targets'] = rows['Target']
8✔
251

252
    _set_tof_ranges(fov, rows['Stop'].values, rows['Start'].values, time_res)
8✔
253

254

255
def _parse_intensities(fov: Dict[str, Any], intensities: Union[bool, List[str]]) -> None:
8✔
256
    """Sets intensity extraction flags within the extraction-metadata
257

258
    Args:
259
        fov (Dict[str, Any]):
260
            Metadata for the fov
261
        intensities (bool | List):
262
            Whether or not to extract intensity and intensity * width images.  If a List, specific
263
            peaks can be extracted, ignoring the rest, which will only have pulse count images
264
            extracted.
265
    Returns:
266
        None:
267
            `fov` argument is modified in place
268
    """
269

270
    filtered_intensities = None
8✔
271
    if type(intensities) is list:
8✔
272
        filtered_intensities = [target for target in fov['targets'] if target in intensities]
8✔
273
    elif intensities is True:
8✔
274
        filtered_intensities = fov['targets']
8✔
275

276
    # order the 'calc_intensity' bools
277
    if filtered_intensities is not None:
8✔
278
        fov['calc_intensity'] = [target in list(filtered_intensities) for target in fov['targets']]
8✔
279
    else:
280
        fov['calc_intensity'] = [False, ] * len(fov['targets'])
8✔
281

282

283
def condense_img_data(img_data, targets, intensities, replace):
8✔
284
    """Changes image data from separate pulse and intensity data into one column if replace=True.
285
    Args:
286
        img_data (np.array):
287
            Contains the image data with all pulse and intensity information.
288
        targets (list):
289
            List of targets.
290
        intensities (bool | List):
291
            Whether or not to extract intensity images.  If a List, specific
292
            peaks can be extracted, ignoring the rest, which will only have pulse count images
293
            extracted.
294
        replace (bool):
295
            Whether to replace pulse images with intensity images.
296

297
    Return:
298
        altered img_data according to args
299

300
    """
301
    # extracting intensity and replacing
302
    if type_utils.any_true(intensities) and replace:
8✔
303
        for j, target in enumerate(targets):
8✔
304
            # replace only specified targets
305
            if target in intensities:
8✔
306
                img_data[0, :, :, j] = img_data[1, :, :, j]
8✔
307
        img_data = img_data[[0], :, :, :]
8✔
308

309
    # not extracting intensity
310
    elif not type_utils.any_true(intensities):
8✔
311
        img_data = img_data[[0], :, :, :]
8✔
312

313
    # extracting intensity but not replacing
314
    else:
315
        img_data = img_data[[0, 1], :, :, :]
8✔
316

317
    return img_data
8✔
318

319

320
def extract_bin_files(data_dir: str, out_dir: Union[str, None],
8✔
321
                      include_fovs: Union[List[str], None] = None,
322
                      panel: Union[Tuple[float, float], pd.DataFrame] = (-0.3, 0.0),
323
                      intensities: Union[bool, List[str]] = False, replace=True,
324
                      time_res: float = 500e-6):
325
    """Converts MibiScope bin files to pulse count, intensity, and intensity * width tiff images
326

327
    Args:
328
        data_dir (str | PathLike):
329
            Directory containing bin files as well as accompanying json metadata files
330
        out_dir (str | PathLike | None):
331
            Directory to save the tiffs in.  If None, image data is returned as an ndarray.
332
        include_fovs (List | None):
333
            List of fovs to include.  Includes all if None.
334
        panel (tuple | pd.DataFrame):
335
            If a tuple, global integration range over all antibodies within json metadata.
336
            If a pd.DataFrame, specific peaks with custom integration ranges.  Column names must be
337
            'Mass' and 'Target' with integration ranges specified via 'Start' and 'Stop' columns.
338
        intensities (bool | List):
339
            Whether or not to extract intensity images.  If a List, specific
340
            peaks can be extracted, ignoring the rest, which will only have pulse count images
341
            extracted.
342
        replace (bool):
343
            Whether to replace pulse images with intensity images.
344
        time_res (float):
345
            Time resolution for scaling parabolic transformation
346
    Returns:
347
        None | np.ndarray:
348
            image data if no out_dir is provided, otherwise no return
349
    """
350

351
    fov_files = _find_bin_files(data_dir, include_fovs)
8✔
352

353
    for fov in fov_files.values():
8✔
354
        _fill_fov_metadata(data_dir, fov, panel, intensities, time_res)
8✔
355

356
    bin_files = \
8✔
357
        [(fov, os.path.join(data_dir, fov['bin'])) for fov in fov_files.values()]
358

359
    image_data = []
8✔
360

361
    for i, (fov, bf) in enumerate(bin_files):
8✔
362
        img_data = _extract_bin.c_extract_bin(
8✔
363
            bytes(bf, 'utf-8'), fov['lower_tof_range'],
364
            fov['upper_tof_range'], np.array(fov['calc_intensity'], dtype=np.uint8)
365
        )
366

367
        # convert intensities=True to list of all targets
368
        if type_utils.any_true(intensities):
8✔
369
            if type(intensities) is not list:
8✔
370
                intensities = list(fov['targets'])
8✔
371

372
        img_data = condense_img_data(img_data, list(fov['targets']), intensities, replace)
8✔
373

374
        if out_dir is not None:
8✔
375
            _write_out(
8✔
376
                img_data,
377
                out_dir,
378
                fov['bin'][:-4],
379
                fov['targets'],
380
                intensities
381
            )
382
        else:
383
            if replace or not type_utils.any_true(intensities):
8✔
384
                type_list = ['pulse']
8✔
385
            else:
386
                type_list = ['pulse', 'intensities']
8✔
387
            image_data.append(
8✔
388
                xr.DataArray(
389
                    data=img_data[np.newaxis, :],
390
                    coords=[
391
                        [fov['bin'].split('.')[0]],
392
                        type_list,
393
                        np.arange(img_data.shape[1]),
394
                        np.arange(img_data.shape[2]),
395
                        list(fov['targets']),
396
                    ],
397
                    dims=['fov', 'type', 'x', 'y', 'channel'],
398
                )
399
            )
400

401
    if out_dir is None:
8✔
402
        image_data = xr.concat(image_data, dim='fov')
8✔
403

404
        return image_data
8✔
405

406

407
def get_histograms_per_tof(data_dir: str, fov: str, channels: List[str] = None,
8✔
408
                           panel: Union[Tuple[float, float], pd.DataFrame] = (-0.3, 0.0),
409
                           time_res: float = 500e-6):
410
    """Generates histograms of pulse widths, pulse counts, and pulse intensities found within the
411
    given mass range
412

413
    Args:
414
        data_dir (str | PathLike):
415
            Directory containing bin files as well as accompanying json metadata files
416
        fov (str):
417
            Fov to generate histogram for
418
        channels (str):
419
            Channels to check widths for, default checks all channels
420
        panel (tuple | pd.DataFrame):
421
            If a tuple, global integration range over all antibodies within json metadata.
422
            If a pd.DataFrame, specific peaks with custom integration ranges.  Column names must be
423
            'Mass' and 'Target' with integration ranges specified via 'Start' and 'Stop' columns.
424
        time_res (float):
425
            Time resolution for scaling parabolic transformation
426

427
    Returns:
428
        tuple (dict):
429
            Tuple of dicts containing widths, intensities, and pulse info per channel
430
    """
431
    fov = _find_bin_files(data_dir, [fov])[fov]
8✔
432

433
    _fill_fov_metadata(data_dir, fov, panel, False, time_res, channels)
8✔
434

435
    local_bin_file = os.path.join(data_dir, fov['bin'])
8✔
436

437
    widths, intensities, pulses = _extract_bin.c_extract_histograms(
8✔
438
        bytes(local_bin_file, 'utf-8'),
439
        fov['lower_tof_range'],
440
        fov['upper_tof_range']
441
    )
442

443
    chan_list = fov["targets"].values
8✔
444
    widths = {
8✔
445
        chan: widths_col for chan, widths_col in zip(chan_list, widths.T)
446
    }
447
    intensities = {
8✔
448
        chan: intensities_col for chan, intensities_col in zip(chan_list, intensities.T)
449
    }
450
    pulses = {
8✔
451
        chan: pulses_col for chan, pulses_col in zip(chan_list, pulses.T)
452
    }
453

454
    return widths, intensities, pulses
8✔
455

456

457
def get_median_pulse_height(data_dir: str, fov: str, channels: List[str] = None,
8✔
458
                            panel: Union[Tuple[float, float], pd.DataFrame] = (-0.3, 0.0),
459
                            time_res: float = 500e-6):
460
    """Retrieves median pulse intensity and mean pulse count for a given channel
461

462
    Args:
463
        data_dir (str | PathLike):
464
            Directory containing bin files as well as accompanying json metadata files
465
        fov (str):
466
            Fov to generate histogram for
467
        channel (str):
468
            Channel to check widths for
469
        mass_range (tuple | pd.DataFrame):
470
            Integration range
471
        time_res (float):
472
            Time resolution for scaling parabolic transformation
473

474
    Returns:
475
        dict:
476
            dictionary of median height values per channel
477
    """
478

479
    fov = _find_bin_files(data_dir, [fov])[fov]
8✔
480
    _fill_fov_metadata(data_dir, fov, panel, False, time_res, channels)
8✔
481

482
    local_bin_file = os.path.join(data_dir, fov['bin'])
8✔
483

484
    _, intensities, _ = \
8✔
485
        _extract_bin.c_extract_histograms(
486
            bytes(local_bin_file, 'utf-8'),
487
            fov['lower_tof_range'],
488
            fov['upper_tof_range']
489
        )
490

491
    int_bin = np.cumsum(intensities, axis=0) / intensities.sum(axis=0)
8✔
492
    median_height = (np.abs(int_bin - 0.5)).argmin(axis=0)
8✔
493

494
    chan_list = fov["targets"].values
8✔
495
    median_height = {
8✔
496
        chan: mh for chan, mh in zip(chan_list, median_height)
497
    }
498

499
    return median_height
8✔
500

501

502
def get_total_counts(data_dir: str, include_fovs: Union[List[str], None] = None):
8✔
503
    """Retrieves total counts for each field of view
504

505
    Args:
506
        data_dir (str | PathLike):
507
            Directory containing bin files as well as accompanying json metadata files
508
        include_fovs (List | None):
509
            List of fovs to include.  Includes all if None.
510

511
    Returns:
512
        dict:
513
            dictionary of total counts, with fov names as keys
514
    """
515

516
    fov_files = _find_bin_files(data_dir, include_fovs)
8✔
517

518
    bin_files = \
8✔
519
        [(name, os.path.join(data_dir, fov['bin'])) for name, fov in fov_files.items()]
520

521
    outs = {name: _extract_bin.c_total_counts(bytes(bf, 'utf-8')) for name, bf in bin_files}
8✔
522

523
    return outs
8✔
524

525

526
def get_total_spectra(data_dir: str, include_fovs: Union[List[str], None] = None,
8✔
527
                      panel_df: pd.DataFrame = None, range_pad: float = 0.5):
528
    """Retrieves total spectra for each field of view
529

530
    Args:
531
        data_dir (str | PathLike):
532
            Directory containing bin files as well as accompanying json metadata files
533
        include_fovs (List | None):
534
            List of fovs to include. Includes all if None.
535
        panel_df (pd.DataFrame | None):
536
            If not None, get default callibration information
537
        range_offset (float):
538
            Mass padding below the lowest and highest masses to consider when binning.
539
            The time-of-flight array go from TOF of (lowest mass - 0.5) to (highest_mass + 0.5).
540

541
    Returns:
542
        tuple (dict, dict, list):
543
            dict of total spectra and the corresponding low and high ranges, with fov names as keys
544
    """
545
    if range_pad < 0:
8✔
546
        raise ValueError("range_pad must be >= 0")
×
547

548
    fov_files = _find_bin_files(data_dir, include_fovs)
8✔
549
    if panel_df is not None:
8✔
550
        for fov in fov_files.values():
8✔
551
            _fill_fov_metadata(data_dir, fov, panel_df, False, 500e-6)
8✔
552

553
    bin_files = list(fov_files.items())
8!
554

555
    # TODO: this assumes the panel_df is sorted
556
    lowest_mass = panel_df.loc[0, "Stop"] - range_pad
8✔
557
    highest_mass = panel_df.loc[panel_df.shape[0] - 1, "Stop"] + range_pad
8✔
558

559
    # store the spectra, as well as the time intervals for each FOV
560
    spectra = {}
8✔
561
    tof_interval = {}
8✔
562
    for name, fov in bin_files:
8✔
563
        # compute the low and high boundaries, this will differ per FOV
564
        mass_offset = fov["mass_offset"]
8✔
565
        mass_gain = fov["mass_gain"]
8✔
566
        tof_boundaries = _mass2tof(
8✔
567
            np.array([lowest_mass, highest_mass]), mass_offset, mass_gain, 500e-6
568
        ).astype(np.uint16)
569

570
        # set the boundaries
571
        tof_interval[name] = tof_boundaries
8✔
572

573
        # extract the spectra on an individual basis per channel
574
        spectra[name] = _extract_bin.c_total_spectra(
8✔
575
            bytes(os.path.join(data_dir, fov["bin"]), "utf-8"), tof_boundaries[0], tof_boundaries[1]
576
        )
577

578
    return spectra, tof_interval, fov_files
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc