• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mantidproject / mantidimaging / 8551127845

04 Apr 2024 07:50AM UTC coverage: 74.291% (-0.01%) from 74.305%
8551127845

push

github

web-flow
Add setting for number of processes to start (#2147)

1890 of 3061 branches covered (61.74%)

Branch coverage included in aggregate %.

4 of 5 new or added lines in 1 file covered. (80.0%)

1 existing line in 1 file now uncovered.

9250 of 11934 relevant lines covered (77.51%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.35
/mantidimaging/core/parallel/manager.py
1
# Copyright (C) 2024 ISIS Rutherford Appleton Laboratory UKRI
2
# SPDX - License - Identifier: GPL-3.0-or-later
3
from __future__ import annotations
1✔
4

5
import time
1✔
6
from multiprocessing import get_context
1✔
7
import os
1✔
8
import uuid
1✔
9
from logging import getLogger
1✔
10
from typing import TYPE_CHECKING
1✔
11

12
import psutil
1✔
13
from psutil import NoSuchProcess, AccessDenied
1✔
14

15
from mantidimaging.core.operations.loader import load_filter_packages
1✔
16

17
if TYPE_CHECKING:
18
    from multiprocessing.pool import Pool
19

20
MEM_PREFIX = 'MI'
1✔
21
MEM_DIR_LINUX = '/dev/shm'
1✔
22
CURRENT_PID = psutil.Process().pid
1✔
23

24
LOG = getLogger(__name__)
1✔
25
perf_logger = getLogger("perf." + __name__)
1✔
26

27
cores: int = 1
1✔
28
pool: Pool | None = None
1✔
29

30

31
def create_and_start_pool(process_count: int) -> None:
1✔
32
    t0 = time.monotonic()
1✔
33
    context = get_context('spawn')
1✔
34
    global cores
35
    if process_count == 0:
1!
36
        cores = context.cpu_count()
1✔
37
    else:
NEW
38
        cores = process_count
×
39
    global pool
40
    LOG.info(f'Creating process pool with {cores} processes')
1✔
41
    pool = context.Pool(cores, initializer=worker_setup)
1✔
42

43
    if perf_logger.isEnabledFor(1):
1!
44
        perf_logger.info(f"Process pool started in {time.monotonic() - t0}")
×
45

46

47
def worker_setup():
1✔
48
    # Required to import modules for running operations
UNCOV
49
    load_filter_packages()
×
50

51

52
def end_pool():
1✔
53
    if pool:
1!
54
        pool.close()
1✔
55
        pool.terminate()
1✔
56

57

58
def generate_mi_shared_mem_name() -> str:
1✔
59
    return f'{MEM_PREFIX}_{CURRENT_PID}_{uuid.uuid4()}'
1✔
60

61

62
def clear_memory_from_current_process_linux() -> None:
1✔
63
    for mem_name in _get_shared_mem_names_linux():
×
64
        if _is_mi_memory_from_current_process(mem_name):
×
65
            free_shared_memory_linux([mem_name])
×
66

67

68
def find_memory_from_previous_process_linux() -> list[str]:
1✔
69
    old_memory = []
1✔
70
    for mem_name in _get_shared_mem_names_linux():
1✔
71
        if _is_safe_to_remove(mem_name):
1✔
72
            old_memory.append(mem_name)
1✔
73
    return old_memory
1✔
74

75

76
def free_shared_memory_linux(mem_names: list[str]) -> None:
1✔
77
    for mem_name in mem_names:
×
78
        os.remove(f'{MEM_DIR_LINUX}/{mem_name}')
×
79

80

81
def _is_safe_to_remove(mem_name: str) -> bool:
1✔
82
    process_start = psutil.Process().create_time()
1✔
83
    if _is_mi_shared_mem(mem_name) and os.path.getmtime(f'{MEM_DIR_LINUX}/{mem_name}') < process_start:
1✔
84
        try:
1✔
85
            pid = int(mem_name.split('_')[1])
1✔
86
            _lookup_process(pid)
1✔
87
        except NoSuchProcess:
1✔
88
            # The process that owns the memory has ended
89
            return True
1✔
90
        except AccessDenied:
1✔
91
            # The process that owns the memory still exists
92
            return False
1✔
93
    return False
1✔
94

95

96
def _get_shared_mem_names_linux() -> list[str]:
1✔
97
    return os.listdir(MEM_DIR_LINUX)
×
98

99

100
def _lookup_process(pid) -> None:
1✔
101
    psutil.Process(pid)
×
102

103

104
def _is_mi_shared_mem(mem_name: str) -> bool:
1✔
105
    split_name = mem_name.split('_')
1✔
106

107
    try:
1✔
108
        int(split_name[1])
1✔
109
    except (IndexError, ValueError):
1✔
110
        return False
1✔
111

112
    return len(split_name) == 3 and split_name[0] == MEM_PREFIX
1✔
113

114

115
def _is_mi_memory_from_current_process(mem_name: str) -> bool:
1✔
116
    return _is_mi_shared_mem(mem_name) and int(mem_name.split('_')[1]) == CURRENT_PID
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc