• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mantidproject / mslice / 13084206280

28 Jan 2025 09:32AM UTC coverage: 87.668%. Remained the same
13084206280

push

github

web-flow
Merge pull request #1048 from mantidproject/ruff-format

Ruff format mslice

440 of 499 new or added lines in 39 files covered. (88.18%)

6 existing lines in 5 files now uncovered.

3064 of 3495 relevant lines covered (87.67%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.06
/src/mslice/models/workspacemanager/workspace_algorithms.py
1
"""
2
Uses mantid algorithms to perform workspace operations
3
"""
4

5
# -----------------------------------------------------------------------------
6
# Imports
7
# -----------------------------------------------------------------------------
8
import os.path
1✔
9
from os.path import splitext
1✔
10

11
import numpy as np
1✔
12
from scipy import constants
1✔
13

14
import mslice.util.mantid.init_mantid  # noqa: F401
1✔
15

16
from mantid.api import WorkspaceUnitValidator
1✔
17
from mantid.api import MatrixWorkspace
1✔
18

19
from mslice.models.axis import Axis
1✔
20
from mslice.util.mantid.algorithm_wrapper import add_to_ads, remove_from_ads
1✔
21
from mslice.models.workspacemanager.workspace_provider import (
1✔
22
    get_workspace_handle,
23
    delete_workspace,
24
)
25
from mslice.util.mantid.mantid_algorithms import (
1✔
26
    Load,
27
    MergeMD,
28
    MergeRuns,
29
    Scale,
30
    Minus,
31
    ConvertUnits,
32
    Rebose,
33
)
34
from mslice.workspace.pixel_workspace import PixelWorkspace
1✔
35
from mslice.workspace.histogram_workspace import HistogramWorkspace
1✔
36
from mslice.workspace.workspace import Workspace
1✔
37

38
from .file_io import save_ascii, save_matlab, save_nexus, save_nxspe
1✔
39

40
# -----------------------------------------------------------------------------
41
# Classes and functions
42
# -----------------------------------------------------------------------------
43

44
# Defines some conversion factors
45
E2q = (
1✔
46
    2.0 * constants.m_n / (constants.hbar**2)
47
)  # Energy to (neutron momentum)^2 (==2m_n/hbar^2)
48
meV2J = constants.e / 1000.0  # meV to Joules
1✔
49
m2A = 1.0e10  # metres to Angstrom
1✔
50

51

52
def get_limits(workspace, axis):
1✔
53
    workspace = get_workspace_handle(workspace)
1✔
54
    if workspace.limits is None or len(workspace.limits) == 0:
1✔
55
        _processLoadedWSLimits(workspace)
1✔
56
    if axis in workspace.limits:
1✔
57
        return workspace.limits[axis]
1✔
58
    else:
59
        # If we cannot get the step size from the data, use the old 1/100 steps.
60
        ws_h = workspace.raw_ws
1✔
61
        dim = ws_h.getDimension(ws_h.getDimensionIndexByName(axis))
1✔
62
        minimum = dim.getMinimum()
1✔
63
        maximum = dim.getMaximum()
1✔
64
        step = (maximum - minimum) / 100
1✔
65
        return minimum, maximum, step
1✔
66

67

68
def processEfixed(workspace):
1✔
69
    """Checks whether the fixed energy is defined for this workspace"""
70
    try:
1✔
71
        workspace.e_fixed = _get_ws_EFixed(workspace.raw_ws)
1✔
72
        workspace.ef_defined = True
1✔
73
    except RuntimeError:
×
74
        workspace.ef_defined = False
×
75

76

77
def _processLoadedWSLimits(workspace):
1✔
78
    """Processes an (angle-deltaE) workspace to get the limits and step size in angle, energy and |Q|"""
79
    # For cases, e.g. indirect, where EFixed has not been set yet, return calculate later.
80
    if workspace.e_fixed is None:
1✔
81
        workspace.e_fixed = get_EFixed(workspace.raw_ws)
1✔
82
        if workspace.e_fixed is None:
1✔
83
            return
×
84
    if isinstance(workspace, PixelWorkspace):
1✔
85
        process_limits_event(workspace)
1✔
86
    elif isinstance(workspace, Workspace):
1✔
87
        process_limits(workspace)
1✔
88

89

90
def process_limits(ws):
1✔
91
    en = ws.raw_ws.getAxis(0).extractValues()
1✔
92
    theta = _get_theta_for_limits(ws)
1✔
93
    qmin, qmax, qstep = get_q_limits(theta, en, ws.e_fixed)
1✔
94
    set_limits(
1✔
95
        ws, qmin, qmax, qstep, theta, np.min(en), np.max(en), np.mean(np.diff(en))
96
    )
97

98

99
def process_limits_event(ws):
1✔
100
    e_dim = ws.raw_ws.getDimension(ws.raw_ws.getDimensionIndexByName("DeltaE"))
1✔
101
    emin = e_dim.getMinimum()
1✔
102
    emax = e_dim.getMaximum()
1✔
103
    theta = _get_theta_for_limits_event(ws)
1✔
104
    estep = _original_step_size(ws)
1✔
105
    emax_1 = -emin if (str(ws.e_mode == "Direct")) else emax
1✔
106
    qmin, qmax, qstep = get_q_limits(theta, emax_1, ws.e_fixed)
1✔
107
    set_limits(ws, qmin, qmax, qstep, theta, emin, emax, estep)
1✔
108

109

110
def _original_step_size(workspace):
1✔
111
    rebin_history = _get_algorithm_history("Rebin", workspace.raw_ws.getHistory())
1✔
112
    params_history = _get_property_from_history("Params", rebin_history)
1✔
NEW
113
    return float(params_history.value().split(",")[1])
×
114

115

116
def _get_algorithm_history(name, workspace_history):
1✔
117
    histories = workspace_history.getAlgorithmHistories()
1✔
118
    for history in reversed(histories):
1✔
119
        if history.name() == name:
1✔
120
            return history
×
121
    return None
1✔
122

123

124
def _get_property_from_history(name, history):
1✔
125
    for property in history.getProperties():
1✔
126
        if property.name() == name:
×
127
            return property
×
128
    return None
×
129

130

131
def get_q_limits(theta, en, efix):
1✔
132
    # calculates the Q(E) line for the given two theta and then finds the min and max values
133
    qlines = [
1✔
134
        np.sqrt(
135
            E2q
136
            * (2 * efix - en - 2 * np.sqrt(efix * (efix - en)) * np.cos(tth))
137
            * meV2J
138
        )
139
        / 1e10
140
        for tth in theta[:2]
141
    ]
142
    qmin = np.nanmin(qlines[0])
1✔
143
    qmax = np.nanmax(qlines[1])
1✔
144
    const_tth = np.radians(0.5)
1✔
145
    qstep = np.sqrt(E2q * 2 * efix * (1 - np.cos(const_tth)) * meV2J) / m2A
1✔
146
    qmin -= qstep
1✔
147
    qmax += qstep
1✔
148
    return qmin, qmax, qstep
1✔
149

150

151
def set_limits(ws, qmin, qmax, qstep, theta, emin, emax, estep):
1✔
152
    ws.limits["MomentumTransfer"] = [qmin, qmax, qstep]
1✔
153
    ws.limits["|Q|"] = ws.limits["MomentumTransfer"]  # ConvertToMD renames it(!)
1✔
154
    ws.limits["2Theta"] = theta * 180 / np.pi
1✔
155
    ws.limits["DeltaE"] = [emin, emax, estep]
1✔
156

157

158
def _get_theta_for_limits(ws):
1✔
159
    num_hist = ws.raw_ws.getNumberHistograms()
1✔
160
    theta = [
1✔
161
        ws.raw_ws.detectorTwoTheta(ws.raw_ws.getDetector(i)) for i in range(num_hist)
162
    ]
163
    round_fac = 100
1✔
164
    ws.is_PSD = not all(x < y for x, y in zip(theta, theta[1:]))
1✔
165
    # Rounds the differences to avoid pixels with same 2theta. Implies min limit of ~0.5 degrees
166
    thdiff = np.diff(np.round(np.sort(theta) * round_fac) / round_fac)
1✔
167
    return np.array(
1✔
168
        [np.min(theta), np.max(theta), np.min(thdiff[np.where(thdiff > 0)])]
169
    )
170

171

172
def _get_theta_for_limits_event(ws):
1✔
173
    spectrum_info = ws.raw_ws.getExperimentInfo(0).spectrumInfo()
1✔
174
    theta = []
1✔
175
    i = 0
1✔
176
    while True:
1✔
177
        try:
1✔
178
            if not spectrum_info.isMonitor(i):
1✔
179
                theta.append(spectrum_info.twoTheta(i))
1✔
180
            i += 1
1✔
181
        except IndexError:
1✔
182
            break
1✔
183
    theta = np.unique(theta)
1✔
184
    round_fac = 100
1✔
185
    thdiff = np.diff(np.round(np.sort(theta) * round_fac) / round_fac)
1✔
186
    return np.array(
1✔
187
        [np.min(theta), np.max(theta), np.min(thdiff[np.where(thdiff > 0)])]
188
    )
189

190

191
def load(filename, output_workspace):
1✔
192
    workspace = Load(OutputWorkspace=output_workspace, Filename=filename)
1✔
193
    try:
1✔
194
        workspace.e_mode = get_EMode(workspace.raw_ws)
1✔
195
        if workspace.e_mode == "Indirect":
1✔
196
            processEfixed(workspace)
×
197
        if (
1✔
198
            isinstance(workspace.raw_ws, MatrixWorkspace)
199
            and WorkspaceUnitValidator("DeltaE_inWavenumber").isValid(workspace.raw_ws)
200
        ) == "":
NEW
201
            workspace = ConvertUnits(
×
202
                InputWorkspace=workspace,
203
                Target="DeltaE",
204
                EMode=workspace.e_mode,
205
                OutputWorkspace=workspace.name,
206
            )
207
        _processLoadedWSLimits(workspace)
1✔
208
    except:  # noqa: E722
×
209
        delete_workspace(workspace)
×
210
        raise
×
211
    return workspace
1✔
212

213

214
def combine_workspace(selected_workspaces, new_name):
1✔
215
    workspaces = [get_workspace_handle(ws) for ws in selected_workspaces]
1✔
216
    workspace_names = [workspace.name for workspace in workspaces]
1✔
217
    ws = MergeMD(OutputWorkspace=new_name, InputWorkspaces=workspace_names)
1✔
218
    propagate_properties(workspaces[0], ws)
1✔
219
    # Set limits for result workspace. Use precalculated step size, otherwise get limits directly from Mantid workspace
220
    ax1 = ws.raw_ws.getDimension(0)
1✔
221
    ax2 = ws.raw_ws.getDimension(1)
1✔
222
    step1 = []
1✔
223
    step2 = []
1✔
224
    for in_ws in selected_workspaces:
1✔
225
        step1.append(get_limits(in_ws, ax1.name)[2])
1✔
226
        step2.append(get_limits(in_ws, ax2.name)[2])
1✔
227
    ws.limits[ax1.name] = [ax1.getMinimum(), ax1.getMaximum(), np.max(step1)]
1✔
228
    ws.limits[ax2.name] = [ax2.getMinimum(), ax2.getMaximum(), np.max(step2)]
1✔
229
    return ws
1✔
230

231

232
def add_workspace_runs(selected_ws):
1✔
233
    out_ws_name = selected_ws[0] + "_sum"
1✔
234
    sum_ws = MergeRuns(OutputWorkspace=out_ws_name, InputWorkspaces=selected_ws)
1✔
235
    propagate_properties(get_workspace_handle(selected_ws[0]), sum_ws)
1✔
236

237

238
def subtract(workspaces, background_ws, ssf):
1✔
239
    scaled_bg_ws = Scale(
1✔
240
        OutputWorkspace=str(background_ws) + "_scaled",
241
        InputWorkspace=str(background_ws),
242
        Factor=ssf,
243
        store=False,
244
    )
245
    try:
1✔
246
        for ws_name in workspaces:
1✔
247
            result = Minus(
1✔
248
                OutputWorkspace=ws_name + "_subtracted",
249
                LHSWorkspace=ws_name,
250
                RHSWorkspace=scaled_bg_ws,
251
            )
252
            propagate_properties(get_workspace_handle(ws_name), result)
1✔
253
    except ValueError as e:
×
254
        raise ValueError(e)
×
255

256

257
def rebose_single(ws, from_temp, to_temp):
1✔
258
    ws = get_workspace_handle(ws)
1✔
259
    results = Rebose(
1✔
260
        InputWorkspace=ws,
261
        CurrentTemperature=from_temp,
262
        TargetTemperature=to_temp,
263
        OutputWorkspace=ws.name + "_bosed",
264
    )
265
    propagate_properties(ws, results)
1✔
266
    return results
1✔
267

268

269
def scale_workspaces(workspaces, scale_factor=None, from_temp=None, to_temp=None):
1✔
270
    if scale_factor is None:
1✔
271
        if from_temp is None or to_temp is None:
1✔
272
            raise ValueError(
1✔
273
                "You must specify all inputs (scale factor or both temperatures)"
274
            )
275
        for ws_name in workspaces:
1✔
276
            rebose_single(ws_name, from_temp, to_temp)
1✔
277
    else:
278
        for ws_name in workspaces:
×
279
            ws = get_workspace_handle(ws_name)
×
NEW
280
            result = Scale(
×
281
                InputWorkspace=ws.raw_ws,
282
                Factor=scale_factor,
283
                OutputWorkspace=ws_name + "_scaled",
284
            )
UNCOV
285
            propagate_properties(ws, result)
×
286

287

288
def save_workspaces(workspaces, path, save_name, extension):
1✔
289
    """
290
    :param workspaces: list of workspaces to save
291
    :param path: directory to save to
292
    :param save_name: name to save the file as (plus file extension). Pass none to use workspace name
293
    :param extension: file extension (such as .txt)
294
    """
295
    if extension == ".nxs":
1✔
296
        save_method = save_nexus
1✔
297
    elif extension == ".nxspe":
1✔
298
        save_method = save_nxspe
×
299
    elif extension == ".txt":
1✔
300
        save_method = save_ascii
×
301
    elif extension == ".mat":
1✔
302
        save_method = save_matlab
1✔
303
    else:
304
        raise RuntimeError("unrecognised file extension")
×
305
    if isinstance(save_name, str):
1✔
306
        if len(workspaces) == 1:
1✔
307
            save_names = [save_name]
1✔
308
        else:
309
            name, _ = splitext(save_name)
×
NEW
310
            save_names = [
×
311
                "{}_{:03d}{}".format(name, idx + 1, extension)
312
                for idx in range(len(workspaces))
313
            ]
314
    else:
NEW
315
        save_names = [save_name] if not hasattr(save_name, "__iter__") else save_name
×
316
        if len(save_names) != len(workspaces):
×
317
            save_names = [None] * len(workspaces)
×
318
    for workspace, save_name_single in zip(workspaces, save_names):
1✔
319
        _save_single_ws(workspace, save_name_single, save_method, path, extension)
1✔
320

321

322
def export_workspace_to_ads(workspace):
1✔
323
    """
324
    Exports an MSlice workspace to ADS. If the workspace is MDHisto, convert it to Matrix
325
    :param workspace: name of MSlice workspace to export to ADS
326
    """
327
    workspace = get_workspace_handle(workspace)
1✔
328
    if isinstance(workspace, HistogramWorkspace):
1✔
329
        workspace = workspace.convert_to_matrix()
1✔
330
    add_to_ads(workspace)
1✔
331

332

333
def remove_workspace_from_ads(workspacename):
1✔
334
    remove_from_ads(workspacename)
1✔
335

336

337
def _save_single_ws(workspace, save_name, save_method, path, extension):
1✔
338
    save_as = save_name if save_name is not None else str(workspace) + extension
1✔
339
    full_path = os.path.join(str(path), save_as)
1✔
340
    if isinstance(workspace, str):
1✔
341
        workspace = get_workspace_handle(workspace)
1✔
342
    save_method(workspace, full_path)
1✔
343

344

345
def get_axis_from_dimension(workspace, id):
1✔
346
    dim = workspace.raw_ws.getDimension(id).name
1✔
347
    min, max, step = workspace.limits[dim]
1✔
348
    return Axis(dim, min, max, step)
1✔
349

350

351
def is_pixel_workspace(workspace_name):
1✔
352
    workspace = get_workspace_handle(workspace_name)
×
353
    return isinstance(workspace, PixelWorkspace)
×
354

355

356
def get_EMode(workspace):
1✔
357
    """Returns the energy analysis mode (direct or indirect of a workspace)"""
358
    emode = str(_get_ws_EMode(workspace))
1✔
359
    if emode == "Elastic":
1✔
360
        # Work-around for older versions of Mantid which does not set instrument name
361
        # in NXSPE files, so LoadNXSPE does not know if it is direct or indirect data
NEW
362
        ei_log = workspace.run().getProperty("Ei").value
×
NEW
363
        emode = "Indirect" if np.isnan(ei_log) else "Direct"
×
364
    return emode
1✔
365

366

367
def _get_ws_EMode(ws_handle):
1✔
368
    try:
1✔
369
        emode = ws_handle.getEMode()
1✔
370
    except AttributeError:  # workspace is not matrix workspace
×
371
        try:
×
NEW
372
            emode = _get_exp_info_using(
×
373
                ws_handle, lambda e: ws_handle.getExperimentInfo(e).getEMode()
374
            )
375
        except ValueError:
×
376
            raise ValueError("Workspace contains different EModes")
×
377
    return emode
1✔
378

379

380
def get_EFixed(raw_ws):
1✔
381
    efix = np.nan
1✔
382
    try:
1✔
383
        efix = _get_ws_EFixed(raw_ws)
1✔
384
    except RuntimeError:  # Efixed not defined
×
385
        # This could occur for malformed NXSPE without the instrument name set.
386
        # LoadNXSPE then sets EMode to 'Elastic' and getEFixed fails.
387
        try:
×
NEW
388
            if raw_ws.run().hasProperty("Ei"):
×
NEW
389
                efix = raw_ws.run().getProperty("Ei").value
×
390
        except AttributeError:
×
NEW
391
            if raw_ws.getExperimentInfo(0).run().hasProperty("Ei"):
×
NEW
392
                efix = raw_ws.getExperimentInfo(0).run().getProperty("Ei").value
×
393
    if efix is not None and not np.isnan(efix):  # error if none is passed to isnan
1✔
394
        return efix
1✔
395
    else:
396
        return None
×
397

398

399
def _get_ws_EFixed(raw_ws):
1✔
400
    try:
1✔
401
        efixed = raw_ws.getEFixed(raw_ws.getDetector(0).getID())
1✔
402
    except AttributeError:  # workspace is not matrix workspace
1✔
403
        try:
1✔
404
            efixed = _get_exp_info_using(
1✔
405
                raw_ws, lambda e: raw_ws.getExperimentInfo(e).getEFixed(1)
406
            )
407
        except ValueError:
×
408
            raise ValueError("Workspace contains different EFixed values")
×
409
    return efixed
1✔
410

411

412
def _get_exp_info_using(raw_ws, get_exp_info):
1✔
413
    """get data from MultipleExperimentInfo. Returns None if ExperimentInfo is not found"""
414
    prev = None
1✔
415
    for exp in range(raw_ws.getNumExperimentInfo()):
1✔
416
        exp_value = get_exp_info(exp)
1✔
417
        if prev is not None:
1✔
418
            if exp_value != prev:
×
419
                raise ValueError
×
420
        prev = exp_value
1✔
421
    return prev
1✔
422

423

424
def propagate_properties(old_workspace, new_workspace):
1✔
425
    """Propagates MSlice only properties of workspaces, e.g. limits"""
426
    properties = [
1✔
427
        "is_PSD",
428
        "parent",
429
        "intensity_corrected",
430
        "axes",
431
        "ef_defined",
432
        "e_mode",
433
        "limits",
434
        "e_fixed",
435
        "is_slice",
436
    ]
437
    for prop in properties:
1✔
438
        old_prop = getattr(old_workspace, prop)
1✔
439
        if old_prop:
1✔
440
            setattr(new_workspace, prop, old_prop)
1✔
441

442

443
def get_comment(workspace):
1✔
444
    if hasattr(workspace, "getComment"):
1✔
445
        return workspace.getComment()
×
446
    if not hasattr(workspace, "raw_ws"):
1✔
447
        workspace = get_workspace_handle(workspace)
×
448
    return workspace.raw_ws.getComment()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc