• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mantidproject / mslice / 21272670920

22 Jan 2026 06:03PM UTC coverage: 86.657% (-0.2%) from 86.88%
21272670920

push

github

web-flow
add a delayed delete to ads workspace removal (#1149)

* add a delated delete

* added changes from Duc's PR

* added null check and renamed methods

* bug fix

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

15 of 26 new or added lines in 5 files covered. (57.69%)

2 existing lines in 1 file now uncovered.

3111 of 3590 relevant lines covered (86.66%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.79
/src/mslice/presenters/cut_plotter_presenter.py
1
import numpy as np
1✔
2

3
from mslice.views.cut_plotter import (
1✔
4
    plot_cut_impl,
5
    draw_interactive_cut,
6
    cut_figure_exists,
7
    get_current_plot,
8
)
9
from mslice.models.alg_workspace_ops import get_range_end
1✔
10
from mslice.models.cut.cut import SampleTempValueError
1✔
11
from mslice.models.cut.cut_functions import compute_cut
1✔
12
from mslice.models.labels import generate_legend
1✔
13
from mslice.models.workspacemanager.workspace_algorithms import export_workspace_to_ads
1✔
14
from mslice.models.workspacemanager.workspace_provider import (
1✔
15
    add_workspace,
16
    get_workspace_handle,
17
)
18
import mslice.plotting.pyplot as plt
1✔
19
from mslice.presenters.presenter_utility import PresenterUtility
1✔
20
from mslice.plotting.plot_window.overplot_interface import (
1✔
21
    remove_line,
22
    plot_overplot_line,
23
)
24
from mslice.models.powder.powder_functions import compute_powder_line
1✔
25
from mslice.models.intensity_correction_algs import sample_temperature
1✔
26
from mslice.models.axis import Axis
1✔
27
from mslice.util.intensity_correction import IntensityType, IntensityCache
1✔
28
import warnings
1✔
29
from sys import float_info
1✔
30

31
BRAGG_SIZE_ON_AXES = 0.15
1✔
32

33

34
class CutPlotterPresenter(PresenterUtility):
1✔
35
    _current_icut = None  # static variable, as only one icut can be open at any time.
1✔
36
    _prepared_cut_for_cache = (
1✔
37
        None  # static variable, used and reset to None upon plotting of a cut
38
    )
39
    # needs to be static as icut design means that icut does not have access to correct CPP on first plot
40

41
    def __init__(self):
1✔
42
        self._main_presenter = None
1✔
43
        self._interactive_cut_cache = None
1✔
44
        self._cut_cache_dict = {}  # Dict of list of currently displayed cuts index by axes
1✔
45
        self._temp_cut_cache = []
1✔
46
        self._overplot_cache = {}
1✔
47
        self._cache_intensity_correction_methods()
1✔
48

49
    def run_cut(self, workspace, cut, plot_over=False, save_only=False):
1✔
50
        workspace = get_workspace_handle(workspace)
1✔
51
        if save_only:
1✔
52
            self.save_cut_to_workspace(workspace, cut)
1✔
53
            return
1✔
54
        if cut.width is not None:
1✔
55
            return self._plot_with_width(workspace, cut, plot_over)
1✔
56
        else:
57
            self._plot_cut(workspace, cut, plot_over)
1✔
58

59
    def _plot_cut(
1✔
60
        self,
61
        workspace,
62
        cut,
63
        plot_over,
64
        store=True,
65
        update_main=True,
66
        intensity_correction=IntensityType.SCATTERING_FUNCTION,
67
        final_plot=True,
68
    ):
69
        cut_axis = cut.cut_axis
1✔
70
        integration_axis = cut.integration_axis
1✔
71
        if not cut.cut_ws:
1✔
72
            cut.cut_ws = compute_cut(
1✔
73
                workspace,
74
                cut_axis,
75
                integration_axis,
76
                cut.norm_to_one,
77
                cut.algorithm,
78
                store,
79
            )
80
            self.prepare_cut_for_cache(cut)
1✔
81
        if intensity_correction == IntensityType.SCATTERING_FUNCTION:
1✔
82
            cut_ws = cut.cut_ws
1✔
83
            intensity_range = (cut.intensity_start, cut.intensity_end)
1✔
84
        else:
85
            cut_ws = cut.get_intensity_corrected_ws(intensity_correction)
1✔
86
            intensity_range = cut.get_corrected_intensity_range(intensity_correction)
1✔
87
        ws_label_name = workspace.parent if workspace.parent else workspace.name
1✔
88
        legend = generate_legend(
1✔
89
            ws_label_name,
90
            integration_axis.units,
91
            integration_axis.start,
92
            integration_axis.end,
93
        )
94
        en_conversion = (
1✔
95
            self._main_presenter.is_energy_conversion_allowed()
96
            if self._main_presenter
97
            else True
98
        )
99
        plot_cut_impl(cut_ws, intensity_range, plot_over, legend, en_conversion)
1✔
100
        current_plot_intensity = self.get_current_plot_intensity()
1✔
101
        if (
1✔
102
            final_plot
103
            and plot_over
104
            and current_plot_intensity
105
            and not intensity_correction == current_plot_intensity
106
        ):
107
            self.apply_intensity_correction_after_plot_over(current_plot_intensity)
×
108
        if update_main:
1✔
109
            self.set_is_icut(False)
1✔
110
            self.update_main_window()
1✔
111

112
    def prepare_cut_for_cache(self, cut):
1✔
113
        CutPlotterPresenter._prepared_cut_for_cache = cut.copy_for_cache()
1✔
114

115
    def cache_prepared_cut(self, ax, plot_over):
1✔
116
        self.save_cache(ax, CutPlotterPresenter._prepared_cut_for_cache, plot_over)
×
117
        CutPlotterPresenter._prepared_cut_for_cache = None
×
118

119
    def get_prepared_cut_for_cache(self):
1✔
120
        return CutPlotterPresenter._prepared_cut_for_cache
1✔
121

122
    def _plot_with_width(self, workspace, cut, plot_over):
1✔
123
        """This function handles the width parameter."""
124
        integration_start = cut.integration_axis.start
1✔
125
        integration_end = cut.integration_axis.end
1✔
126
        cut_start = integration_start
1✔
127
        cut_end = get_range_end(integration_start, integration_end, cut.width)
1✔
128
        while cut_start != cut_end:
1✔
129
            cut.integration_axis.start = cut_start
1✔
130
            cut.integration_axis.end = cut_end
1✔
131
            final_plot = True if cut_start + cut.width == integration_end else False
1✔
132
            self._plot_cut(workspace, cut, plot_over, final_plot=final_plot)
1✔
133
            cut_start = cut_end
1✔
134
            cut_end = get_range_end(cut_end, integration_end, cut.width)
1✔
135
            cut.cut_ws = None
1✔
136
            # The first plot will respect which button the user pressed. The rest will over plot
137
            plot_over = True
1✔
138
        cut.reset_integration_axis(cut.start, cut.end)
1✔
139

140
        return cut.cut_axis.validate_step_against_workspace(workspace)
1✔
141

142
    def save_cache(self, ax, cut, plot_over=False):
1✔
143
        # If plot over is True you want to save all plotted cuts for use by the cli
144
        if cut.intensity_corrected:
1✔
145
            return
1✔
146
        if ax not in self._cut_cache_dict.keys():
1✔
147
            self._cut_cache_dict[ax] = []
1✔
148
        if len(self._cut_cache_dict[ax]) == 0:
1✔
149
            self._cut_cache_dict[ax].append(cut)
1✔
150
        else:
151
            cut_already_cached, cached_cut = self._cut_already_cached(cut)
1✔
152
            if not cut_already_cached:
1✔
153
                if not plot_over:
1✔
154
                    self._cut_cache_dict[ax] = []
1✔
155
                self._cut_cache_dict[ax].append(cut)
1✔
156
            else:
157
                if not plot_over:
1✔
158
                    self._cut_cache_dict[ax] = [cached_cut]
1✔
159
                elif cut not in self._cut_cache_dict[ax]:
×
160
                    self._cut_cache_dict[ax].append(cached_cut)
×
161

162
    def _cut_already_cached(self, cut):
1✔
163
        for cached_cut in self._temp_cut_cache:
1✔
164
            if self._cut_is_equal(cached_cut, cut):
1✔
165
                return True, cached_cut
1✔
166
        return False, None
1✔
167

168
    @staticmethod
1✔
169
    def _cut_is_equal(cached_cut, cut):
1✔
170
        cached_cut_params = [
1✔
171
            cached_cut.cut_axis,
172
            cached_cut.integration_axis,
173
            cached_cut.intensity_start,
174
            cached_cut.intensity_end,
175
            cached_cut.norm_to_one,
176
            cached_cut.width,
177
            cached_cut.algorithm,
178
            cached_cut.cut_ws,
179
        ]
180
        cut_params = [
1✔
181
            cut.cut_axis,
182
            cut.integration_axis,
183
            cut.intensity_start,
184
            cut.intensity_end,
185
            cut.norm_to_one,
186
            cut.width,
187
            cut.algorithm,
188
            cut.cut_ws,
189
        ]
190
        if cached_cut_params == cut_params:
1✔
191
            return True
1✔
192
        else:
193
            return False
×
194

195
    def remove_cut_from_cache_by_index(self, ax, index):
1✔
196
        del self._cut_cache_dict[ax][index]
×
197
        return len(self._cut_cache_dict[ax])
×
198

199
    def get_cache(self, ax):
1✔
200
        return self._cut_cache_dict[ax] if ax in self._cut_cache_dict.keys() else None
1✔
201

202
    def save_cut_to_workspace(self, workspace, cut):
1✔
203
        cut_ws = compute_cut(
1✔
204
            workspace,
205
            cut.cut_axis,
206
            cut.integration_axis,
207
            cut.norm_to_one,
208
            cut.algorithm,
209
        )
210
        self._main_presenter.update_displayed_workspaces()
1✔
211
        export_workspace_to_ads(cut_ws)
1✔
212

213
    def plot_cut_from_selected_workspace(self, plot_over=False):
1✔
214
        selected_workspaces = self._main_presenter.get_selected_workspaces()
1✔
215
        for workspace_name in selected_workspaces:
1✔
216
            self.plot_cut_from_workspace(workspace_name, plot_over)
1✔
217
            plot_over = True  # plot over if multiple workspaces selected
1✔
218

219
    def plot_cut_from_workspace(
1✔
220
        self, workspace, plot_over=False, intensity_range=(None, None)
221
    ):
222
        workspace = get_workspace_handle(workspace)
1✔
223
        lines = plot_cut_impl(
1✔
224
            workspace, intensity_range=intensity_range, plot_over=plot_over
225
        )
226
        self.set_is_icut(False)
1✔
227
        return lines
1✔
228

229
    def plot_interactive_cut(self, workspace, cut, store, intensity_correction):
1✔
230
        self._plot_cut(
1✔
231
            workspace,
232
            cut,
233
            False,
234
            store,
235
            update_main=False,
236
            intensity_correction=intensity_correction,
237
        )
238
        draw_interactive_cut(workspace)
1✔
239
        self.set_icut_cut(cut)
1✔
240

241
    def hide_overplot_line(self, workspace, key):
1✔
242
        cache = self._overplot_cache
×
243
        if key in cache:
×
244
            line = cache.pop(key)
×
245
            remove_line(line)
×
246

247
    @staticmethod
1✔
248
    def _get_log_bragg_y_coords(size, portion_of_axes, datum):
1✔
249
        datum = 0.001 if datum == 0 else datum
×
250
        y1, y2 = plt.gca().get_ylim()
×
251
        if (y2 > 0 and y1 > 0) or (y2 < 0 and y1 < 0):
×
252
            total_steps = np.log10(y2 / y1)
×
253
        elif y1 < 0:
×
254
            total_steps_up = np.log10(y2) + 1 if abs(y2) >= 1 else abs(y2)
×
255
            total_steps_down = np.log10(-y1) + 1 if abs(y1) >= 1 else abs(y1)
×
256
            total_steps = total_steps_up + total_steps_down
×
257
        else:
258
            y1 = 1 if y1 == 0 else y1
×
259
            y2 = 1 if y2 == 0 else y2
×
260
            total_steps = np.log10(y2 / y1) + 1
×
261

262
        adj_factor = total_steps * portion_of_axes / 2
×
263
        return (
×
264
            np.resize(np.array([10**adj_factor, 10 ** (-adj_factor), np.nan]), size)
265
            * datum
266
        )
267

268
    def add_overplot_line(
1✔
269
        self,
270
        workspace_name,
271
        key,
272
        recoil,
273
        cif=None,
274
        e_is_logarithmic=None,
275
        datum=0,
276
        intensity_correction=IntensityType.SCATTERING_FUNCTION,
277
        **kwargs,
278
    ):
279
        cache = self._cut_cache_dict[plt.gca()][0]
1✔
280
        if cache.rotated:
1✔
281
            warnings.warn("No Bragg peak found as cut has no |Q| dimension.")
×
282
            return
×
283
        try:
1✔
284
            ws_handle = get_workspace_handle(workspace_name)
1✔
285
            if not ws_handle.parent:
×
286
                workspace_name = workspace_name.split("(")[0][:-4]
×
287
            else:
288
                workspace_name = ws_handle.parent
×
289
        except KeyError:
1✔
290
            # Workspace is interactively generated and is not in the workspace list
291
            workspace_name = workspace_name.split("(")[0][:-4]
1✔
292

293
        # Get 10% of the maximum signal
294
        scale_fac = self._get_overall_max_signal(intensity_correction) / 10
1✔
295

296
        q_axis = self._get_overall_q_axis()
1✔
297
        x, y = compute_powder_line(workspace_name, q_axis, key, cif_file=cif)
1✔
298
        try:
1✔
299
            if not e_is_logarithmic:
1✔
300
                y = np.array(y) * scale_fac / np.nanmax(y) + datum
1✔
301
            else:
302
                y = self._get_log_bragg_y_coords(len(y), BRAGG_SIZE_ON_AXES, datum)
×
303

304
            self._overplot_cache[key] = plot_overplot_line(
1✔
305
                x, y, key, recoil, cache, **kwargs
306
            )
307
        except (ValueError, IndexError):
×
308
            warnings.warn("No Bragg peak found.")
×
309

310
    def _get_overall_q_axis(self):
1✔
311
        min_q = float_info.max
1✔
312
        max_q = -min_q
1✔
313
        for cut in self._cut_cache_dict[plt.gca()]:
1✔
314
            if cut.q_axis.end > max_q:
1✔
315
                max_q = cut.q_axis.end
1✔
316
            if cut.q_axis.start < min_q:
1✔
317
                min_q = cut.q_axis.start
1✔
318
        return Axis(cut.q_axis.units, min_q, max_q, cut.q_axis.step, cut.q_axis.e_unit)
1✔
319

320
    def _get_overall_max_signal(self, intensity_correction):
1✔
321
        overall_max_signal = 0
1✔
322
        for cut in self._cut_cache_dict[plt.gca()]:
1✔
323
            try:
1✔
324
                cut.sample_temp
1✔
325
            except SampleTempValueError:
1✔
326
                try:
1✔
327
                    self.propagate_sample_temperatures_throughout_cache(plt.gca())
1✔
328
                except RuntimeError:
×
329
                    continue
×
330
            ws = cut.get_intensity_corrected_ws(intensity_correction)
1✔
331
            max_cut_signal = np.nanmax(ws.get_signal())
1✔
332
            if max_cut_signal > overall_max_signal:
1✔
333
                overall_max_signal = max_cut_signal
1✔
334
        return overall_max_signal
1✔
335

336
    def set_is_icut(self, is_icut):
1✔
337
        if cut_figure_exists():
1✔
338
            plt.gcf().canvas.manager.set_is_icut(is_icut)
1✔
339

340
    def update_main_window(self):
1✔
341
        if self._main_presenter is not None:
1✔
342
            self._main_presenter.highlight_ws_tab(2)
1✔
343
            self._main_presenter.update_displayed_workspaces()
1✔
344

345
    def workspace_selection_changed(self):
1✔
346
        pass
1✔
347

348
    def is_overplot(self, line):
1✔
349
        return line in self._overplot_cache.values()
×
350

351
    def _show_intensity(self, cut_cache, intensity_correction):
1✔
352
        plot_over = False
1✔
353
        self._temp_cut_cache = list(cut_cache)
1✔
354
        for cached_cut in self._temp_cut_cache:
1✔
355
            workspace = get_workspace_handle(cached_cut.workspace_name)
1✔
356
            self._plot_cut(
1✔
357
                workspace,
358
                cached_cut,
359
                plot_over=plot_over,
360
                intensity_correction=intensity_correction,
361
            )
362
            plot_over = True
1✔
363
        self._temp_cut_cache = []
1✔
364

365
    def show_scattering_function(self, axes):
1✔
366
        for key, value in self._cut_cache_dict.items():
×
367
            if key == axes:
×
368
                self._show_intensity(value, IntensityType.SCATTERING_FUNCTION)
×
369
                break
×
370

371
    def show_dynamical_susceptibility(self, axes):
1✔
372
        for key, value in self._cut_cache_dict.items():
×
373
            if key == axes:
×
374
                self._show_intensity(value, IntensityType.CHI)
×
375
                break
×
376

377
    def show_dynamical_susceptibility_magnetic(self, axes):
1✔
378
        for key, value in self._cut_cache_dict.items():
×
379
            if key == axes:
×
380
                self._show_intensity(value, IntensityType.CHI_MAGNETIC)
×
381
                break
×
382

383
    def show_d2sigma(self, axes):
1✔
384
        for key, value in self._cut_cache_dict.items():
×
385
            if key == axes:
×
386
                self._show_intensity(value, IntensityType.D2SIGMA)
×
387
                break
×
388

389
    def show_symmetrised(self, axes):
1✔
390
        for key, value in self._cut_cache_dict.items():
×
391
            if key == axes:
×
392
                self._show_intensity(value, IntensityType.SYMMETRISED)
×
393
                break
×
394

395
    def show_gdos(self, axes):
1✔
396
        for key, value in self._cut_cache_dict.items():
×
397
            if key == axes:
×
398
                self._show_intensity(value, IntensityType.GDOS)
×
399
                break
×
400

401
    def set_sample_temperature(self, axes, ws_name, temp):
1✔
402
        cut_dict = {}
1✔
403
        parent_ws_name = None
1✔
404
        for cut in self._cut_cache_dict[axes]:
1✔
405
            if ws_name == cut.workspace_name:
1✔
406
                parent_ws_name = cut.parent_ws_name
1✔
407
            if cut.parent_ws_name not in cut_dict.keys():
1✔
408
                cut_dict[cut.parent_ws_name] = [cut]
1✔
409
            else:
410
                cut_dict[cut.parent_ws_name].append(cut)
1✔
411

412
        if parent_ws_name:
1✔
413
            for cut in cut_dict[parent_ws_name]:
1✔
414
                cut.sample_temp = temp
1✔
415
        else:
416
            warnings.warn("Sample temperature not set, cut not found in cache")
×
417

418
    def propagate_sample_temperatures_throughout_cache(self, axes):
1✔
419
        if len(self._cut_cache_dict[axes]) <= 1:
1✔
420
            return False
×
421

422
        temperature_dict = {}
1✔
423
        cuts_with_no_temp = []
1✔
424
        for cut in self._cut_cache_dict[axes]:
1✔
425
            if (
1✔
426
                cut.raw_sample_temp
427
                and cut.parent_ws_name not in temperature_dict.keys()
428
            ):
429
                temperature_dict[cut.parent_ws_name] = cut.sample_temp
1✔
430
            elif not cut.raw_sample_temp:
1✔
431
                cuts_with_no_temp.append(cut)
1✔
432

433
        if len(temperature_dict) > 0:
1✔
434
            for cut in list(cuts_with_no_temp):
1✔
435
                if cut.parent_ws_name in temperature_dict.keys():
1✔
436
                    cut.sample_temp = temperature_dict[cut.parent_ws_name]
1✔
437
                    cuts_with_no_temp.remove(cut)
1✔
438

439
        if len(cuts_with_no_temp) == 0:
1✔
440
            return True
×
441
        else:
442
            return False
1✔
443

444
    def set_sample_temperature_by_field(self, axes, field, workspace_name):
1✔
445
        temp = sample_temperature(workspace_name, [field])
×
446
        self.set_sample_temperature(axes, workspace_name, temp)
×
447

448
    def get_icut(self):
1✔
449
        return self._interactive_cut_cache
1✔
450

451
    def store_icut(self, icut):
1✔
452
        self._interactive_cut_cache = icut
1✔
453

454
    @staticmethod
1✔
455
    def get_icut_cut():
1✔
456
        return CutPlotterPresenter._current_icut
×
457

458
    def store_icut_cut(self):
1✔
UNCOV
459
        cut = CutPlotterPresenter.get_icut_cut()
×
460
        if cut:
×
NEW
461
            if (
×
462
                cut.cut_ws.name
463
                in self._main_presenter._workspace_presenter._workspaces_to_removed_from_ads
464
            ):
NEW
465
                self._main_presenter._workspace_presenter._workspaces_to_removed_from_ads.remove(
×
466
                    cut.cut_ws.name
467
                )
UNCOV
468
            add_workspace(cut.cut_ws, cut.workspace_name)
×
469

470
    @staticmethod
1✔
471
    def set_icut_cut(icut_cut):
1✔
472
        CutPlotterPresenter._current_icut = icut_cut
1✔
473

474
    @staticmethod
1✔
475
    def set_icut_intensity_category(intensity_type):
1✔
476
        icut_plot = get_current_plot()  # icut is locked to current
×
477
        if hasattr(icut_plot, "set_intensity_from_type"):
×
478
            icut_plot.set_intensity_from_type(intensity_type)
×
479

480
    @staticmethod
1✔
481
    def get_current_plot_intensity():
1✔
482
        return get_current_plot().intensity_type
×
483

484
    @staticmethod
1✔
485
    def apply_intensity_correction_after_plot_over(intensity_type):
1✔
486
        return get_current_plot().trigger_action_from_type(intensity_type)
×
487

488
    def _cache_intensity_correction_methods(self):
1✔
489
        cat = plt.CATEGORY_CUT
1✔
490
        IntensityCache.cache_method(
1✔
491
            cat,
492
            IntensityType.SCATTERING_FUNCTION,
493
            self.show_scattering_function.__name__,
494
        )
495
        IntensityCache.cache_method(
1✔
496
            cat, IntensityType.CHI, self.show_dynamical_susceptibility.__name__
497
        )
498
        IntensityCache.cache_method(
1✔
499
            cat,
500
            IntensityType.CHI_MAGNETIC,
501
            self.show_dynamical_susceptibility_magnetic.__name__,
502
        )
503
        IntensityCache.cache_method(
1✔
504
            cat, IntensityType.D2SIGMA, self.show_d2sigma.__name__
505
        )
506
        IntensityCache.cache_method(
1✔
507
            cat, IntensityType.SYMMETRISED, self.show_symmetrised.__name__
508
        )
509
        IntensityCache.cache_method(cat, IntensityType.GDOS, self.show_gdos.__name__)
1✔
510

511
    def window_close_complete(self):
1✔
512
        if self._main_presenter:
1✔
NEW
513
            self._main_presenter.remove_pending_remove_workspaces_from_ads()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc