• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

neurospin-deepinsight / brainprep / 15466399122

05 Jun 2025 11:57AM UTC coverage: 35.744% (-0.2%) from 35.977%
15466399122

Pull #19

github

web-flow
Merge 2f8a6cef3 into 2f230a9ce
Pull Request #19: Automatic brain mask computation in quasi raw + bug fixed for software version

13 of 54 new or added lines in 3 files covered. (24.07%)

3 existing lines in 1 file now uncovered.

692 of 1936 relevant lines covered (35.74%)

0.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.62
/brainprep/utils.py
1
# -*- coding: utf-8 -*-
2
##########################################################################
3
# NSAp - Copyright (C) CEA, 2021 - 2022
4
# Distributed under the terms of the CeCILL-B license, as published by
5
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
6
# http://www.cecill.info/licences/Licence_CeCILL-B_V1-en.html
7
# for details.
8
##########################################################################
9

10
"""
11
Module that contains some utility functions.
12
"""
13

14
# Imports
15
import os
1✔
16
import re
1✔
17
import sys
1✔
18
import gzip
1✔
19
import subprocess
1✔
20
import numpy as np
1✔
21
import pandas as pd
1✔
22
import nibabel
1✔
23
from .color_utils import print_command, print_error
1✔
24

25

26
def execute_command(command):
1✔
27
    """ Execute a command.
28

29
    Parameters
30
    ----------
31
    command: list of str
32
        the command to be executed.
33
    """
34
    print_command(" ".join(command))
1✔
35
    proc = subprocess.Popen(
1✔
36
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
37
    output, error = proc.communicate()
1✔
38
    if proc.returncode != 0:
1✔
39
        raise ValueError(
×
40
            "\nCommand {0} failed:\n\n- output:\n{1}\n\n- error: "
41
            "{2}\n\n".format(" ".join(command),
42
                             output.decode("utf8"),
43
                             error.decode("utf8")))
44

45

46
def check_command(command):
1✔
47
    """ Check if a command is installed.
48

49
    .. note:: This function is based on which linux command.
50

51
    Parameters
52
    ----------
53
    command: str
54
        the name of the command to locate.
55
    """
56
    if sys.platform != "linux":
1✔
57
        raise ValueError("This code works only on a linux machine.")
×
58
    process = subprocess.Popen(
1✔
59
        ["which", command], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
60
    stdout, stderr = process.communicate()
1✔
61
    stdout = stdout.decode("utf8")
1✔
62
    stderr = stderr.decode("utf8")
1✔
63
    exitcode = process.returncode
1✔
64
    if exitcode != 0:
1✔
65
        print_error("Command {0}: {1}".format(command, stderr))
×
66
        raise ValueError("Impossible to locate command '{0}'.".format(command))
×
67

68

69
def check_version(package_name, check_pkg_version):
1✔
70
    """ Check installed version of a package.
71

72
    .. note:: This function is based on dpkg linux command.
73

74
    Parameters
75
    ----------
76
    package_name: str
77
        the name of the package we want to check the version.
78
    """
79
    process = subprocess.Popen(
1✔
80
        ["dpkg", "-s", package_name],
81
        stdout=subprocess.PIPE, stderr=subprocess.PIPE)
82
    stdout, stderr = process.communicate()
1✔
83
    stdout = stdout.decode("utf8")
1✔
84
    stderr = stderr.decode("utf8")
1✔
85
    exitcode = process.returncode
1✔
86
    if check_pkg_version:
1✔
87
        # local computer installation
88
        if exitcode != 0:
1✔
89
            version = None
×
90
            print_error("Version {0}: {1}".format(package_name, stderr))
×
91
            raise ValueError(
×
92
                "Impossible to check package '{0}' version."
93
                .format(package_name))
94
        else:
95
            versions = re.findall("Version: .*$", stdout, re.MULTILINE)
1✔
96
            version = "|".join(versions)
1✔
97
    else:
98
        # specific installation
99
        version = "custom install (no check)."
1✔
100
    print("{0} - {1}".format(package_name, version))
1✔
101

102

103
def write_matlabbatch(template, nii_files, tpm_file, darteltpm_file,
1✔
104
                      session, batch_file, outdir, model_long=1):
105
    """ Complete matlab batch from template and unzip T1w file in the outdir.
106
        Create the outdir.
107

108
    Parameters
109
    ----------
110
    template: str
111
        path to template batch to be completed.
112
    nii_files: list
113
        the Nifti images to be processed.
114
    tpm_file: str
115
        path to the SPM TPM file.
116
    darteltpm_file: str
117
        path to the CAT12 tempalte file.
118
    batch_file: str
119
        Filepath to the matlabbatch
120
    outdir: str
121
        the destination folder for cat12vbm outputs.
122
    session: str
123
        the session names, usefull for longitudinal preprocessings.
124
        Warning session and nii files must be in the same order.
125
    model_long: int
126
        longitudinal model choice, default 1.
127
        1 short time (weeks), 2 long time (years) between images sessions.
128
    """
129
    nii_files_str = ""
1✔
130
    if session:
1✔
131
        outdir = [os.path.join(outdir, ses) for ses in session]
1✔
132
    if not isinstance(outdir, list):
1✔
133
        outdir = [outdir]
×
134
    for idx, path in enumerate(nii_files):
1✔
135
        nii_files_str += "'{0}' \n".format(
1✔
136
            ungzip_file(path, outdir=outdir[idx]))
137
    with open(template, "r") as of:
1✔
138
        stream = of.read()
1✔
139
        stream = stream.format(model_long=model_long, anat_file=nii_files_str,
1✔
140
                               tpm_file=tpm_file,
141
                               darteltpm_file=darteltpm_file)
142
    with open(batch_file, "w") as of:
1✔
143
        of.write(stream)
1✔
144

145

146
def ungzip_file(zfile, prefix="u", outdir=None):
1✔
147
    """ Copy and ungzip the input file.
148

149
    Parameters
150
    ----------
151
    zfile: str
152
        input file to ungzip.
153
    prefix: str, default 'u'
154
        the prefix of the result file.
155
    outdir: str, default None)
156
        the output directory where ungzip file is saved. If not set use the
157
        input image directory.
158

159
    Returns
160
    -------
161
    unzfile: str
162
        the ungzip file.
163
    """
164
    # Checks
165
    if not os.path.isfile(zfile):
1✔
166
        raise ValueError("'{0}' is not a valid filename.".format(zfile))
×
167
    if outdir is not None:
1✔
168
        if not os.path.isdir(outdir):
1✔
169
            raise ValueError("'{0}' is not a valid directory.".format(outdir))
×
170
    else:
171
        outdir = os.path.dirname(zfile)
×
172

173
    # Get the file descriptors
174
    base, extension = os.path.splitext(zfile)
1✔
175
    basename = os.path.basename(base)
1✔
176

177
    # Ungzip only known extension
178
    if extension in [".gz"]:
1✔
179
        basename = prefix + basename
×
180
        unzfile = os.path.join(outdir, basename)
×
181
        with gzip.open(zfile, "rb") as gzfobj:
×
182
            data = gzfobj.read()
×
183
        with open(unzfile, "wb") as openfile:
×
184
            openfile.write(data)
×
185

186
    # Default, unknown compression extension: the input file is returned
187
    else:
188
        unzfile = zfile
1✔
189

190
    return unzfile
1✔
191

192

193
def get_bids_keys(filename):
1✔
194
    """ Extract BIDS 'participant_id', 'session' and 'run' keys from a
195
    filename.
196

197
    Parameters
198
    ----------
199
    filename: str
200
        a bids path.
201

202
    Returns
203
    -------
204
    keys: dict
205
        the retrieved BIDS keys/values, no key if no match. Add the file name
206
        in 'ni_path'.
207
    """
208
    keys = {}
×
209
    participant_re = re.compile("sub-([^_/]+)")
×
210
    session_re = re.compile("ses-([^_/]+)")
×
211
    run_re = re.compile("run-([a-zA-Z0-9]+)")
×
212
    for name, regex in (("participant_id", participant_re),
×
213
                        ("session", session_re),
214
                        ("run", run_re)):
215
        match = regex.findall(filename)
×
216
        if len(set(match)) != 1:
×
217
            if name == "participant_id":
×
218
                raise ValueError(
×
219
                    "Found several or no '{}' in path '{}'.".format(
220
                        name, filename))
221
        else:
222
            keys[name] = match[0]
×
223
    if "run" not in keys:
×
224
        keys["run"] = "1"
×
225
    if "session" not in keys:
×
226
        keys["session"] = "V1"
×
227
    keys["ni_path"] = filename
×
228
    return keys
×
229

230

231
def load_images(img_files, check_same_referential=True):
1✔
232
    """ Load a list of images in a BIDS organisation: check that all images
233
    are in the same referential.
234

235
    Parameters
236
    ----------
237
    img_files: list of str (n_subjects, )
238
        path to images.
239

240
    Returns
241
    -------
242
    imgs_arr: array (n_subjects, 1, image_axis0, image_axis1, ...)
243
        the generated array.
244
    df: pandas DataFrame
245
        description of the array with columns 'participant_id',
246
        'session', 'run', 'ni_path'.
247
    """
248
    ref_affine = None
×
249
    ref_shape = None
×
250
    data = []
×
251
    info = {}
×
252
    for path in img_files:
×
253
        keys = get_bids_keys(path)
×
254
        participant_id = keys["participant_id"]
×
255
        session = keys.get("session", "V1")
×
256
        run = keys.get("run", "1")
×
257
        img = nibabel.load(path)
×
258
        if ref_affine is None:
×
259
            ref_affine = img.affine
×
260
            ref_shape = img.shape
×
261
        else:
262
            assert np.allclose(ref_affine, img.affine), "Different affine."
×
263
            assert ref_shape == img.shape, "Different shape."
×
264
        data.append(np.expand_dims(img.get_fdata(), axis=0))
×
265
        info.setdefault("participant_id", []).append(participant_id)
×
266
        info.setdefault("session", []).append(session)
×
267
        info.setdefault("run", []).append(run)
×
268
        info.setdefault("ni_path", []).append(path)
×
269
    df = pd.DataFrame.from_dict(info)
×
270
    imgs_arr = np.asarray(data)
×
271
    return imgs_arr, df
×
272

273

274
def create_clickable(path_or_url):
1✔
275
    """ Foramt a path or a URL as a HTML href.
276

277
    Parameters
278
    ----------
279
    path_or_url: str
280
        a path or a URL.
281

282
    Returns
283
    -------
284
    url: str
285
        a href formated URL.
286
    """
287
    url = "<a href='{}' target='_blank'>&plus;</a>".format(path_or_url)
×
288
    return url
×
289

290

291
def listify(obj):
1✔
292
    """ Function to transform a coma separated string to a list of string.
293

294
    Parameters
295
    ----------
296
    obj: list or string
297
        the input data.
298

299
    Returns
300
    -------
301
    list: list
302
        the list of input data or input data.
303
    """
304
    if not isinstance(obj, list):
×
305
        return obj.split(",")
×
306
    else:
307
        return obj
×
308

309

310
def cp_file(src, dst):
1✔
311
    """ Copy a file from src to dst.
312

313
    Parameters
314
    ----------
315
    src: str
316
        the source file.
317
    dst: str
318
        the destination file.
319
    """
NEW
320
    if not os.path.isfile(src):
×
NEW
321
        raise ValueError("Source file '{}' does not exist.".format(src))
×
NEW
322
    if not os.path.isdir(os.path.dirname(dst)):
×
NEW
323
        raise ValueError("Destination directory '{}' does not exist."
×
324
                         .format(os.path.dirname(dst)))
NEW
325
    execute_command(["cp", src, dst])
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc