• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / executorlib / 12483233294

24 Dec 2024 02:59PM UTC coverage: 96.004% (-0.4%) from 96.367%
12483233294

Pull #535

github

web-flow
Merge 1799b4891 into 5cf3ecc7e
Pull Request #535: Add type checking with mypy

120 of 127 new or added lines in 15 files covered. (94.49%)

3 existing lines in 2 files now uncovered.

1033 of 1076 relevant lines covered (96.0%)

0.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/executorlib/cache/executor.py
1
import os
1✔
2
from typing import Callable, Optional
1✔
3

4
from executorlib.base.executor import ExecutorBase
1✔
5
from executorlib.cache.shared import execute_tasks_h5
1✔
6
from executorlib.cache.subprocess_spawner import (
1✔
7
    execute_in_subprocess,
8
    terminate_subprocess,
9
)
10
from executorlib.standalone.inputcheck import (
1✔
11
    check_executor,
12
    check_flux_executor_pmi_mode,
13
    check_flux_log_files,
14
    check_hostname_localhost,
15
    check_max_workers_and_cores,
16
    check_nested_flux_executor,
17
)
18
from executorlib.standalone.thread import RaisingThread
1✔
19

20
try:
1✔
21
    from executorlib.cache.queue_spawner import execute_with_pysqa
1✔
22
except ImportError:
×
23
    # If pysqa is not available fall back to executing tasks in a subprocess
NEW
24
    execute_with_pysqa = execute_in_subprocess  # type: ignore
×
25

26

27
class FileExecutor(ExecutorBase):
1✔
28
    def __init__(
1✔
29
        self,
30
        cache_directory: str = "cache",
31
        resource_dict: Optional[dict] = None,
32
        execute_function: Callable = execute_with_pysqa,
33
        terminate_function: Optional[Callable] = None,
34
        pysqa_config_directory: Optional[str] = None,
35
        backend: Optional[str] = None,
36
        disable_dependencies: bool = False,
37
    ):
38
        """
39
        Initialize the FileExecutor.
40

41
        Args:
42
            cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
43
            resource_dict (dict): A dictionary of resources required by the task. With the following keys:
44
                              - cores (int): number of MPI cores to be used for each function call
45
                              - cwd (str/None): current working directory where the parallel python task is executed
46
            execute_function (Callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
47
            terminate_function (Callable, optional): The function to terminate the tasks.
48
            pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend).
49
            backend (str, optional): name of the backend used to spawn tasks.
50
            disable_dependencies (boolean): Disable resolving future objects during the submission.
51
        """
52
        super().__init__(max_cores=None)
1✔
53
        default_resource_dict = {
1✔
54
            "cores": 1,
55
            "cwd": None,
56
        }
57
        if resource_dict is None:
1✔
58
            resource_dict = {}
1✔
59
        resource_dict.update(
1✔
60
            {k: v for k, v in default_resource_dict.items() if k not in resource_dict}
61
        )
62
        if execute_function == execute_in_subprocess and terminate_function is None:
1✔
63
            terminate_function = terminate_subprocess
1✔
64
        cache_directory_path = os.path.abspath(cache_directory)
1✔
65
        os.makedirs(cache_directory_path, exist_ok=True)
1✔
66
        self._set_process(
1✔
67
            RaisingThread(
68
                target=execute_tasks_h5,
69
                kwargs={
70
                    "future_queue": self._future_queue,
71
                    "execute_function": execute_function,
72
                    "cache_directory": cache_directory_path,
73
                    "resource_dict": resource_dict,
74
                    "terminate_function": terminate_function,
75
                    "pysqa_config_directory": pysqa_config_directory,
76
                    "backend": backend,
77
                    "disable_dependencies": disable_dependencies,
78
                },
79
            )
80
        )
81

82

83
def create_file_executor(
1✔
84
    max_workers: Optional[int] = None,
85
    backend: str = "flux_submission",
86
    max_cores: Optional[int] = None,
87
    cache_directory: Optional[str] = None,
88
    resource_dict: Optional[dict] = None,
89
    flux_executor=None,
90
    flux_executor_pmi_mode: Optional[str] = None,
91
    flux_executor_nesting: bool = False,
92
    flux_log_files: bool = False,
93
    pysqa_config_directory: Optional[str] = None,
94
    hostname_localhost: Optional[bool] = None,
95
    block_allocation: bool = False,
96
    init_function: Optional[Callable] = None,
97
    disable_dependencies: bool = False,
98
):
99
    if cache_directory is None:
1✔
100
        cache_directory = "executorlib_cache"
1✔
101
    if block_allocation:
1✔
102
        raise ValueError(
1✔
103
            "The option block_allocation is not available with the pysqa based backend."
104
        )
105
    if init_function is not None:
1✔
106
        raise ValueError(
1✔
107
            "The option to specify an init_function is not available with the pysqa based backend."
108
        )
109
    check_flux_executor_pmi_mode(flux_executor_pmi_mode=flux_executor_pmi_mode)
1✔
110
    check_max_workers_and_cores(max_cores=max_cores, max_workers=max_workers)
1✔
111
    check_hostname_localhost(hostname_localhost=hostname_localhost)
1✔
112
    check_executor(executor=flux_executor)
1✔
113
    check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
1✔
114
    check_flux_log_files(flux_log_files=flux_log_files)
1✔
115
    return FileExecutor(
1✔
116
        cache_directory=cache_directory,
117
        resource_dict=resource_dict,
118
        pysqa_config_directory=pysqa_config_directory,
119
        backend=backend.split("_submission")[0],
120
        disable_dependencies=disable_dependencies,
121
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc