• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pyiron_base / 9757327958

02 Jul 2024 08:11AM UTC coverage: 71.455% (+0.02%) from 71.438%
9757327958

Pull #1505

github

web-flow
Merge 8c7bf806e into 51ba6c28a
Pull Request #1505: PythonFunctionContainerJob: Use calculate() function

36 of 39 new or added lines in 2 files covered. (92.31%)

2 existing lines in 1 file now uncovered.

7232 of 10121 relevant lines covered (71.46%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.2
/pyiron_base/jobs/job/generic.py
1
# coding: utf-8
2
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
3
# Distributed under the terms of "New BSD License", see the LICENSE file.
4
"""
5
Generic Job class extends the JobCore class with all the functionality to run the job object.
6
"""
7

8
from concurrent.futures import Future, Executor
1✔
9
from datetime import datetime
1✔
10
from inspect import isclass
1✔
11
import os
1✔
12
import platform
1✔
13
import posixpath
1✔
14
from typing import Optional
1✔
15
import warnings
1✔
16

17
from h5io_browser.base import _read_hdf, _write_hdf
1✔
18
from pyiron_snippets.deprecate import deprecate
1✔
19

20
from pyiron_base.state import state
1✔
21
from pyiron_base.state.signal import catch_signals
1✔
22
from pyiron_base.jobs.job.core import (
1✔
23
    JobCore,
24
    _doc_str_job_core_args,
25
    _doc_str_job_core_attr,
26
)
27
from pyiron_base.jobs.job.extension.executable import Executable
1✔
28
from pyiron_base.jobs.job.extension.files import File
1✔
29
from pyiron_base.jobs.job.extension.jobstatus import JobStatus
1✔
30
from pyiron_base.jobs.job.runfunction import (
1✔
31
    CalculateFunctionCaller,
32
    execute_job_with_calculate_function,
33
    execute_job_with_external_executable,
34
    run_job_with_parameter_repair,
35
    run_job_with_status_initialized,
36
    run_job_with_status_created,
37
    run_job_with_status_submitted,
38
    run_job_with_status_running,
39
    run_job_with_status_refresh,
40
    run_job_with_status_busy,
41
    run_job_with_status_collect,
42
    run_job_with_status_suspended,
43
    run_job_with_status_finished,
44
    run_job_with_runmode_modal,
45
    run_job_with_runmode_queue,
46
    write_input_files_from_input_dict,
47
)
48
from pyiron_base.jobs.job.util import (
1✔
49
    _get_restart_copy_dict,
50
    _kill_child,
51
    _job_store_before_copy,
52
    _job_reload_after_copy,
53
)
54
from pyiron_base.utils.instance import static_isinstance, import_class
1✔
55
from pyiron_base.jobs.job.extension.server.generic import Server
1✔
56
from pyiron_base.database.filetable import FileTable
1✔
57
from pyiron_base.interfaces.has_dict import HasDict
1✔
58

59
__author__ = "Joerg Neugebauer, Jan Janssen"
1✔
60
__copyright__ = (
1✔
61
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
62
    "Computational Materials Design (CM) Department"
63
)
64
__version__ = "1.0"
1✔
65
__maintainer__ = "Jan Janssen"
1✔
66
__email__ = "janssen@mpie.de"
1✔
67
__status__ = "production"
1✔
68
__date__ = "Sep 1, 2017"
1✔
69

70
# Modular Docstrings
71
_doc_str_generic_job_attr = (
1✔
72
    _doc_str_job_core_attr
73
    + "\n"
74
    + """\
75
        .. attribute:: version
76

77
            Version of the hamiltonian, which is also the version of the executable unless a custom executable is used.
78

79
        .. attribute:: executable
80

81
            Executable used to run the job - usually the path to an external executable.
82

83
        .. attribute:: library_activated
84

85
            For job types which offer a Python library pyiron can use the python library instead of an external
86
            executable.
87

88
        .. attribute:: server
89

90
            Server object to handle the execution environment for the job.
91

92
        .. attribute:: queue_id
93

94
            the ID returned from the queuing system - it is most likely not the same as the job ID.
95

96
        .. attribute:: logger
97

98
            logger object to monitor the external execution and internal pyiron warnings.
99

100
        .. attribute:: restart_file_list
101

102
            list of files which are used to restart the calculation from these files.
103

104
        .. attribute:: exclude_nodes_hdf
105

106
            list of nodes which are excluded from storing in the hdf5 file.
107

108
        .. attribute:: exclude_groups_hdf
109

110
            list of groups which are excluded from storing in the hdf5 file.
111

112
        .. attribute:: job_type
113

114
            Job type object with all the available job types: ['ExampleJob', 'ParallelMaster',
115
                                                               'ScriptJob', 'ListMaster']
116
"""
117
)
118

119

120
class GenericJob(JobCore, HasDict):
1✔
121
    __doc__ = (
1✔
122
        """
123
    Generic Job class extends the JobCore class with all the functionality to run the job object. From this class
124
    all specific job types are derived. Therefore it should contain the properties/routines common to all jobs.
125
    The functions in this module should be as generic as possible.
126

127
    Sub classes that need to add special behavior after :method:`.copy_to()` can override
128
    :method:`._after_generic_copy_to()`.
129
"""
130
        + "\n"
131
        + _doc_str_job_core_args
132
        + "\n"
133
        + _doc_str_generic_job_attr
134
    )
135

136
    def __init__(self, project, job_name):
1✔
137
        super(GenericJob, self).__init__(project, job_name)
1✔
138
        self.__name__ = type(self).__name__
1✔
139
        self.__version__ = "0.4"
1✔
140
        self.__hdf_version__ = "0.1.0"
1✔
141
        self._server = Server()
1✔
142
        self._logger = state.logger
1✔
143
        self._executable = None
1✔
144
        if not state.database.database_is_disabled:
1✔
145
            self._status = JobStatus(db=project.db, job_id=self.job_id)
1✔
146
            self.refresh_job_status()
1✔
147
        elif os.path.exists(self.project_hdf5.file_name):
1✔
148
            initial_status = _read_hdf(
1✔
149
                # in most cases self.project_hdf5.h5_path == / + self.job_name but not for child jobs of GenericMasters
150
                self.project_hdf5.file_name,
151
                self.project_hdf5.h5_path + "/status",
152
            )
153
            self._status = JobStatus(initial_status=initial_status)
1✔
154
            if "job_id" in self.list_nodes():
1✔
155
                self._job_id = _read_hdf(
1✔
156
                    # in most cases self.project_hdf5.h5_path == / + self.job_name but not for child jobs of GenericMasters
157
                    self.project_hdf5.file_name,
158
                    self.project_hdf5.h5_path + "/job_id",
159
                )
160
        else:
161
            self._status = JobStatus()
1✔
162
        self._restart_file_list = list()
1✔
163
        self._restart_file_dict = dict()
1✔
164
        self._exclude_nodes_hdf = list()
1✔
165
        self._exclude_groups_hdf = list()
1✔
166
        self._collect_output_funct = None
1✔
167
        self._executor_type = None
1✔
168
        self._process = None
1✔
169
        self._compress_by_default = False
1✔
170
        self._job_with_calculate_function = False
1✔
171
        self._write_work_dir_warnings = True
1✔
172
        self.interactive_cache = None
1✔
173
        self.error = GenericError(working_directory=self.project_hdf5.working_directory)
1✔
174

175
    @property
1✔
176
    def version(self):
1✔
177
        """
178
        Get the version of the hamiltonian, which is also the version of the executable unless a custom executable is
179
        used.
180

181
        Returns:
182
            str: version number
183
        """
184
        if self.__version__:
1✔
185
            return self.__version__
1✔
186
        else:
187
            self._executable_activate()
×
188
            if self._executable is not None:
×
189
                return self._executable.version
×
190
            else:
191
                return None
×
192

193
    @version.setter
1✔
194
    def version(self, new_version):
1✔
195
        """
196
        Set the version of the hamiltonian, which is also the version of the executable unless a custom executable is
197
        used.
198

199
        Args:
200
            new_version (str): version
201
        """
202
        self._executable_activate()
×
203
        self._executable.version = new_version
×
204

205
    @property
1✔
206
    def executable(self):
1✔
207
        """
208
        Get the executable used to run the job - usually the path to an external executable.
209

210
        Returns:
211
            (str/pyiron_base.job.executable.Executable): exectuable path
212
        """
213
        self._executable_activate()
1✔
214
        return self._executable
1✔
215

216
    @executable.setter
1✔
217
    def executable(self, exe):
1✔
218
        """
219
        Set the executable used to run the job - usually the path to an external executable.
220

221
        Args:
222
            exe (str): executable path, if no valid path is provided an executable is chosen based on version.
223
        """
224
        self._executable_activate()
1✔
225
        self._executable.executable_path = exe
1✔
226

227
    @property
1✔
228
    def server(self):
1✔
229
        """
230
        Get the server object to handle the execution environment for the job.
231

232
        Returns:
233
            Server: server object
234
        """
235
        return self._server
1✔
236

237
    @server.setter
1✔
238
    def server(self, server):
1✔
239
        """
240
        Set the server object to handle the execution environment for the job.
241
        Args:
242
            server (Server): server object
243
        """
244
        self._server = server
×
245

246
    @property
1✔
247
    def queue_id(self):
1✔
248
        """
249
        Get the queue ID, the ID returned from the queuing system - it is most likely not the same as the job ID.
250

251
        Returns:
252
            int: queue ID
253
        """
254
        return self.server.queue_id
×
255

256
    @queue_id.setter
1✔
257
    def queue_id(self, qid):
1✔
258
        """
259
        Set the queue ID, the ID returned from the queuing system - it is most likely not the same as the job ID.
260

261
        Args:
262
            qid (int): queue ID
263
        """
264
        self.server.queue_id = qid
×
265

266
    @property
1✔
267
    def logger(self):
1✔
268
        """
269
        Get the logger object to monitor the external execution and internal pyiron warnings.
270

271
        Returns:
272
            logging.getLogger(): logger object
273
        """
274
        return self._logger
1✔
275

276
    @property
1✔
277
    def restart_file_list(self):
1✔
278
        """
279
        Get the list of files which are used to restart the calculation from these files.
280

281
        Returns:
282
            list: list of files
283
        """
284
        self._restart_file_list = [
1✔
285
            str(f) if isinstance(f, File) else f for f in self._restart_file_list
286
        ]
287
        return self._restart_file_list
1✔
288

289
    @restart_file_list.setter
1✔
290
    def restart_file_list(self, filenames):
1✔
291
        """
292
        Append new files to the restart file list - the list of files which are used to restart the calculation from.
293

294
        Args:
295
            filenames (list):
296
        """
297
        for f in filenames:
×
298
            if isinstance(f, File):
×
299
                f = str(f)
×
300
            if not (os.path.isfile(f)):
×
301
                raise IOError("File: {} does not exist".format(f))
×
302
            self.restart_file_list.append(f)
×
303

304
    @property
1✔
305
    def restart_file_dict(self):
1✔
306
        """
307
        A dictionary of the new name of the copied restart files
308
        """
309
        for actual_name in [os.path.basename(f) for f in self.restart_file_list]:
×
310
            if actual_name not in self._restart_file_dict.keys():
×
311
                self._restart_file_dict[actual_name] = actual_name
×
312
        return self._restart_file_dict
×
313

314
    @restart_file_dict.setter
1✔
315
    def restart_file_dict(self, val):
1✔
316
        if not isinstance(val, dict):
×
317
            raise ValueError("restart_file_dict should be a dictionary!")
×
318
        else:
319
            self._restart_file_dict = {}
×
320
            for k, v in val.items():
×
321
                if isinstance(k, File):
×
322
                    k = str(k)
×
323
                if isinstance(v, File):
×
324
                    v = str(v)
×
325
                self._restart_file_dict[k] = v
×
326

327
    @property
1✔
328
    def exclude_nodes_hdf(self):
1✔
329
        """
330
        Get the list of nodes which are excluded from storing in the hdf5 file
331

332
        Returns:
333
            nodes(list)
334
        """
335
        return self._exclude_nodes_hdf
×
336

337
    @exclude_nodes_hdf.setter
1✔
338
    def exclude_nodes_hdf(self, val):
1✔
339
        if isinstance(val, str):
×
340
            val = [val]
×
341
        elif not hasattr(val, "__len__"):
×
342
            raise ValueError("Wrong type of variable.")
×
343
        self._exclude_nodes_hdf = val
×
344

345
    @property
1✔
346
    def exclude_groups_hdf(self):
1✔
347
        """
348
        Get the list of groups which are excluded from storing in the hdf5 file
349

350
        Returns:
351
            groups(list)
352
        """
353
        return self._exclude_groups_hdf
×
354

355
    @exclude_groups_hdf.setter
1✔
356
    def exclude_groups_hdf(self, val):
1✔
357
        if isinstance(val, str):
×
358
            val = [val]
×
359
        elif not hasattr(val, "__len__"):
×
360
            raise ValueError("Wrong type of variable.")
×
361
        self._exclude_groups_hdf = val
×
362

363
    @property
1✔
364
    def job_type(self):
1✔
365
        """
366
        Job type object with all the available job types: ['ExampleJob', 'ParallelMaster', 'ScriptJob',
367
                                                           'ListMaster']
368
        Returns:
369
            JobTypeChoice: Job type object
370
        """
371
        return self.project.job_type
×
372

373
    @property
1✔
374
    def working_directory(self):
1✔
375
        """
376
        Get the working directory of the job is executed in - outside the HDF5 file. The working directory equals the
377
        path but it is represented by the filesystem:
378
            /absolute/path/to/the/file.h5/path/inside/the/hdf5/file
379
        becomes:
380
            /absolute/path/to/the/file_hdf5/path/inside/the/hdf5/file
381

382
        Returns:
383
            str: absolute path to the working directory
384
        """
385
        if self._import_directory is not None:
1✔
386
            return self._import_directory
×
387
        elif not self.project_hdf5.working_directory:
1✔
388
            self._create_working_directory()
×
389
        return self.project_hdf5.working_directory
1✔
390

391
    @property
1✔
392
    def executor_type(self):
1✔
393
        return self._executor_type
×
394

395
    @executor_type.setter
1✔
396
    def executor_type(self, exe):
1✔
397
        if exe is None:
1✔
398
            self._executor_type = exe
1✔
399
        elif isinstance(exe, str):
1✔
400
            try:
1✔
401
                exe_class = import_class(exe)  # Make sure it's available
1✔
402
                if not (
1✔
403
                    isclass(exe_class) and issubclass(exe_class, Executor)
404
                ):  # And what we want
405
                    raise TypeError(
×
406
                        f"{exe} imported OK, but {exe_class} is not a subclass of {Executor}"
407
                    )
408
            except Exception as e:
1✔
409
                raise ImportError("Something went wrong trying to import {exe}") from e
1✔
410
            else:
411
                self._executor_type = exe
1✔
412
        elif isclass(exe) and issubclass(exe, Executor):
1✔
413
            self._executor_type = f"{exe.__module__}.{exe.__name__}"
1✔
414
        elif isinstance(exe, Executor):
×
415
            raise NotImplementedError(
×
416
                "We don't want to let you pass an entire executor, because you might think its state comes "
417
                "with it. Try passing `.__class__` on this object instead."
418
            )
419
        else:
420
            raise TypeError(
×
421
                f"Expected an executor class or string representing one, but got {exe}"
422
            )
423

424
    @property
1✔
425
    def calculate_kwargs(self) -> dict:
1✔
426
        """
427
        Generate keyword arguments for the calculate() function. A new simulation code only has to extend the
428
        get_input_parameter_dict() function which by default specifies an hierarchical dictionary with files_to_write
429
        and files_to_copy.
430

431
        Example:
432

433
        >>> calculate_function = job.get_calculate_function()
434
        >>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
435
        >>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
436

437
        Returns:
438
            dict: keyword arguments for the calculate() function
439
        """
440
        executable, shell = self.executable.get_input_for_subprocess_call(
1✔
441
            cores=self.server.cores, threads=self.server.threads, gpus=self.server.gpus
442
        )
443
        return {
1✔
444
            "working_directory": self.working_directory,
445
            "input_parameter_dict": self.get_input_parameter_dict(),
446
            "executable_script": executable,
447
            "shell_parameter": shell,
448
            "cores": self.server.cores,
449
            "threads": self.server.threads,
450
            "gpus": self.server.gpus,
451
            "conda_environment_name": self.server.conda_environment_name,
452
            "conda_environment_path": self.server.conda_environment_path,
453
            "accept_crash": self.server.accept_crash,
454
            "accepted_return_codes": self.executable.accepted_return_codes,
455
            "output_parameter_dict": self.get_output_parameter_dict(),
456
        }
457

458
    def clear_job(self):
1✔
459
        """
460
        Convenience function to clear job info after suspend. Mimics deletion of all the job info after suspend in a
461
        local test environment.
462
        """
463
        del self.__name__
×
464
        del self.__version__
×
465
        del self._executable
×
466
        del self._server
×
467
        del self._logger
×
468
        del self._import_directory
×
469
        del self._status
×
470
        del self._restart_file_list
×
471
        del self._restart_file_dict
×
472

473
    def copy(self):
1✔
474
        """
475
        Copy the GenericJob object which links to the job and its HDF5 file
476

477
        Returns:
478
            GenericJob: New GenericJob object pointing to the same job
479
        """
480
        # Store all job arguments in the HDF5 file
481
        delete_file_after_copy = _job_store_before_copy(job=self)
1✔
482

483
        # Copy Python object - super().copy() causes recursion error for serial master
484
        copied_self = self.__class__(
1✔
485
            job_name=self.job_name, project=self.project_hdf5.open("..")
486
        )
487
        copied_self.reset_job_id()
1✔
488

489
        # Reload object from HDF5 file
490
        _job_reload_after_copy(
1✔
491
            job=copied_self, delete_file_after_copy=delete_file_after_copy
492
        )
493

494
        # Copy executor - it cannot be copied and is just linked instead
495
        if self.server.executor is not None:
1✔
496
            copied_self.server.executor = self.server.executor
1✔
497
        if self.server.future is not None and not self.server.future.done():
1✔
498
            raise RuntimeError(
1✔
499
                "Jobs whose server has executor and future attributes cannot be copied unless the future is `done()`"
500
            )
501
        return copied_self
1✔
502

503
    def collect_logfiles(self):
1✔
504
        """
505
        Collect the log files of the external executable and store the information in the HDF5 file. This method has
506
        to be implemented in the individual hamiltonians.
507
        """
508
        pass
1✔
509

510
    def get_calculate_function(self) -> callable:
1✔
511
        """
512
        Generate calculate() function
513

514
        Example:
515

516
        >>> calculate_function = job.get_calculate_function()
517
        >>> shell_output, parsed_output, job_crashed = calculate_function(**job.calculate_kwargs)
518
        >>> job.save_output(output_dict=parsed_output, shell_output=shell_output)
519

520
        Returns:
521
            callable: calculate() functione
522
        """
523
        return CalculateFunctionCaller(
1✔
524
            collect_output_funct=self._collect_output_funct,
525
        )
526

527
    def get_input_parameter_dict(self) -> dict:
1✔
528
        """
529
        Get an hierarchical dictionary of input files. On the first level the dictionary is divided in file_to_create
530
        and files_to_copy. Both are dictionaries use the file names as keys. In file_to_create the values are strings
531
        which represent the content which is going to be written to the corresponding file. In files_to_copy the values
532
        are the paths to the source files to be copied.
533

534
        Returns:
535
            dict: hierarchical dictionary of input files
536
        """
537
        if (
1✔
538
            state.settings.configuration["write_work_dir_warnings"]
539
            and self._write_work_dir_warnings
540
            and not self._job_with_calculate_function
541
        ):
542
            content = [
1✔
543
                "Files in this directory are intended to be written and read by pyiron. \n\n",
544
                "pyiron may transform user input to enhance performance, thus, use these files with care!\n",
545
                "Consult the log and/or the documentation to gain further information.\n\n",
546
                "To disable writing these warning files, specify \n",
547
                "WRITE_WORK_DIR_WARNINGS=False in the .pyiron configuration file (or set the ",
548
                "PYIRONWRITEWORKDIRWARNINGS environment variable accordingly).",
549
            ]
550
            return {
1✔
551
                "files_to_create": {
552
                    "WARNING_pyiron_modified_content": "".join(content)
553
                },
554
                "files_to_copy": _get_restart_copy_dict(job=self),
555
            }
556
        else:
557
            return {
1✔
558
                "files_to_create": {},
559
                "files_to_copy": _get_restart_copy_dict(job=self),
560
            }
561

562
    def get_output_parameter_dict(self):
1✔
563
        return {}
1✔
564

565
    def collect_output(self):
1✔
566
        """
567
        Collect the output files of the external executable and store the information in the HDF5 file. This method has
568
        to be implemented in the individual hamiltonians.
569
        """
570
        raise NotImplementedError(
×
571
            "read procedure must be defined for derived Hamilton!"
572
        )
573

574
    def save_output(
1✔
575
        self, output_dict: Optional[dict] = None, shell_output: Optional[str] = None
576
    ):
577
        """
578
        Store output of the calculate function in the HDF5 file.
579

580
        Args:
581
            output_dict (dict): hierarchical output dictionary to be stored in the HDF5 file.
582
            shell_output (str): shell output from calling the external executable to be stored in the HDF5 file.
583
        """
584
        if shell_output is not None:
1✔
585
            self.storage.output.stdout = shell_output
1✔
586
        if output_dict is not None:
1✔
587
            self.output.update(output_dict)
1✔
588
        if shell_output is not None or output_dict is not None:
1✔
589
            self.to_hdf()
1✔
590

591
    def suspend(self):
1✔
592
        """
593
        Suspend the job by storing the object and its state persistently in HDF5 file and exit it.
594
        """
595
        self.to_hdf()
×
596
        self.status.suspended = True
×
597
        self._logger.info(
×
598
            "{}, status: {}, job has been suspended".format(
599
                self.job_info_str, self.status
600
            )
601
        )
602
        self.clear_job()
×
603

604
    def refresh_job_status(self):
1✔
605
        """
606
        Refresh job status by updating the job status with the status from the database if a job ID is available.
607
        """
608
        if self.job_id:
1✔
609
            self._status = JobStatus(
1✔
610
                initial_status=self.project.db.get_job_status(self.job_id),
611
                db=self.project.db,
612
                job_id=self.job_id,
613
            )
614
        elif state.database.database_is_disabled:
1✔
615
            self._status = JobStatus(
×
616
                initial_status=_read_hdf(
617
                    self.project_hdf5.file_name, self.job_name + "/status"
618
                )
619
            )
620
        if (
1✔
621
            isinstance(self.server.future, Future)
622
            and not self.status.finished
623
            and self.server.future.done()
624
        ):
625
            if self.server.future.cancelled():
1✔
626
                self.status.aborted = True
1✔
627
            else:
628
                self.status.finished = True
×
629

630
    def write_input(self):
1✔
631
        """
632
        Call routines that generate the code specific input files
633
        Returns:
634
        """
635
        write_input_files_from_input_dict(
1✔
636
            input_dict=self.get_input_parameter_dict(),
637
            working_directory=self.working_directory,
638
        )
639

640
    def _internal_copy_to(
1✔
641
        self,
642
        project=None,
643
        new_job_name=None,
644
        new_database_entry=True,
645
        copy_files=True,
646
        delete_existing_job=False,
647
    ):
648
        # Store all job arguments in the HDF5 file
649
        delete_file_after_copy = _job_store_before_copy(job=self)
1✔
650

651
        # Call the copy_to() function defined in the JobCore
652
        new_job_core, file_project, hdf5_project, reloaded = super(
1✔
653
            GenericJob, self
654
        )._internal_copy_to(
655
            project=project,
656
            new_job_name=new_job_name,
657
            new_database_entry=new_database_entry,
658
            copy_files=copy_files,
659
            delete_existing_job=delete_existing_job,
660
        )
661
        if reloaded:
1✔
662
            return new_job_core, file_project, hdf5_project, reloaded
1✔
663

664
        # Reload object from HDF5 file
665
        if not static_isinstance(
1✔
666
            obj=project.__class__, obj_type="pyiron_base.jobs.job.core.JobCore"
667
        ):
668
            _job_reload_after_copy(
1✔
669
                job=new_job_core, delete_file_after_copy=delete_file_after_copy
670
            )
671
        if delete_file_after_copy:
1✔
672
            self.project_hdf5.remove_file()
1✔
673
        return new_job_core, file_project, hdf5_project, reloaded
1✔
674

675
    def copy_to(
1✔
676
        self,
677
        project=None,
678
        new_job_name=None,
679
        input_only=False,
680
        new_database_entry=True,
681
        delete_existing_job=False,
682
        copy_files=True,
683
    ):
684
        """
685
        Copy the content of the job including the HDF5 file to a new location.
686

687
        Args:
688
            project (JobCore/ProjectHDFio/Project/None): The project to copy the job to.
689
                (Default is None, use the same project.)
690
            new_job_name (str): The new name to assign the duplicate job. Required if the project is `None` or the same
691
                project as the copied job. (Default is None, try to keep the same name.)
692
            input_only (bool): [True/False] Whether to copy only the input. (Default is False.)
693
            new_database_entry (bool): [True/False] Whether to create a new database entry. If input_only is True then
694
                new_database_entry is False. (Default is True.)
695
            delete_existing_job (bool): [True/False] Delete existing job in case it exists already (Default is False.)
696
            copy_files (bool): If True copy all files the working directory of the job, too
697

698
        Returns:
699
            GenericJob: GenericJob object pointing to the new location.
700
        """
701
        # Update flags
702
        if input_only and new_database_entry:
1✔
703
            warnings.warn(
×
704
                "input_only conflicts new_database_entry; setting new_database_entry=False"
705
            )
706
            new_database_entry = False
×
707

708
        # Call the copy_to() function defined in the JobCore
709
        new_job_core, file_project, hdf5_project, reloaded = self._internal_copy_to(
1✔
710
            project=project,
711
            new_job_name=new_job_name,
712
            new_database_entry=new_database_entry,
713
            copy_files=copy_files,
714
            delete_existing_job=delete_existing_job,
715
        )
716

717
        # Remove output if it should not be copied
718
        if input_only:
1✔
719
            for group in new_job_core.project_hdf5.list_groups():
1✔
720
                if "output" in group:
1✔
721
                    del new_job_core.project_hdf5[
×
722
                        posixpath.join(new_job_core.project_hdf5.h5_path, group)
723
                    ]
724
            new_job_core.status.initialized = True
1✔
725
        new_job_core._after_generic_copy_to(
1✔
726
            self, new_database_entry=new_database_entry, reloaded=reloaded
727
        )
728
        return new_job_core
1✔
729

730
    def _after_generic_copy_to(self, original, new_database_entry, reloaded):
1✔
731
        """
732
        Called in :method:`.copy_to()` after :method`._internal_copy_to()` to allow sub classes to modify copy behavior.
733

734
        Args:
735
            original (:class:`.GenericJob`): job that this job was copied from
736
            new_database_entry (bool): Whether to create a new database entry was created.
737
            reloaded (bool): True if this job was reloaded instead of copied.
738
        """
739
        pass
1✔
740

741
    def copy_file_to_working_directory(self, file):
1✔
742
        """
743
        Copy a specific file to the working directory before the job is executed.
744

745
        Args:
746
            file (str): path of the file to be copied.
747
        """
748
        if os.path.isabs(file):
×
749
            self.restart_file_list.append(file)
×
750
        else:
751
            self.restart_file_list.append(os.path.abspath(file))
×
752

753
    def copy_template(self, project=None, new_job_name=None):
1✔
754
        """
755
        Copy the content of the job including the HDF5 file but without the output data to a new location
756

757
        Args:
758
            project (JobCore/ProjectHDFio/Project/None): The project to copy the job to.
759
                (Default is None, use the same project.)
760
            new_job_name (str): The new name to assign the duplicate job. Required if the project is `None` or the same
761
                project as the copied job. (Default is None, try to keep the same name.)
762

763
        Returns:
764
            GenericJob: GenericJob object pointing to the new location.
765
        """
766
        return self.copy_to(
1✔
767
            project=project,
768
            new_job_name=new_job_name,
769
            input_only=True,
770
            new_database_entry=False,
771
        )
772

773
    def remove(self, _protect_childs=True):
1✔
774
        """
775
        Remove the job - this removes the HDF5 file, all data stored in the HDF5 file an the corresponding database entry.
776

777
        Args:
778
            _protect_childs (bool): [True/False] by default child jobs can not be deleted, to maintain the consistency
779
                                    - default=True
780
        """
781
        if isinstance(self.server.future, Future) and not self.server.future.done():
1✔
782
            self.server.future.cancel()
×
783
        super().remove(_protect_childs=_protect_childs)
1✔
784

785
    def remove_child(self):
1✔
786
        """
787
        internal function to remove command that removes also child jobs.
788
        Do never use this command, since it will destroy the integrity of your project.
789
        """
790
        _kill_child(job=self)
1✔
791
        super(GenericJob, self).remove_child()
1✔
792

793
    def remove_and_reset_id(self, _protect_childs=True):
1✔
794
        if self.job_id is not None:
1✔
795
            master_id, parent_id = self.master_id, self.parent_id
1✔
796
            self.remove(_protect_childs=_protect_childs)
1✔
797
            self.reset_job_id()
1✔
798
            self.master_id, self.parent_id = master_id, parent_id
1✔
799
        else:
800
            self.remove(_protect_childs=_protect_childs)
1✔
801

802
    def kill(self):
1✔
803
        if self.status.running or self.status.submitted:
×
804
            self.remove_and_reset_id()
×
805
        else:
806
            raise ValueError(
×
807
                "The kill() function is only available during the execution of the job."
808
            )
809

810
    def validate_ready_to_run(self):
1✔
811
        """
812
        Validate that the calculation is ready to be executed. By default no generic checks are performed, but one could
813
        check that the input information is complete or validate the consistency of the input at this point.
814

815
        Raises:
816
            ValueError: if ready check is unsuccessful
817
        """
818
        pass
1✔
819

820
    def check_setup(self):
1✔
821
        """
822
        Checks whether certain parameters (such as plane wave cutoff radius in DFT) are changed from the pyiron standard
823
        values to allow for a physically meaningful results. This function is called manually or only when the job is
824
        submitted to the queueing system.
825
        """
826
        pass
×
827

828
    def reset_job_id(self, job_id=None):
1✔
829
        """
830
        Reset the job id sets the job_id to None in the GenericJob as well as all connected modules like JobStatus.
831
        """
832
        super().reset_job_id(job_id=job_id)
1✔
833
        self._status = JobStatus(db=self.project.db, job_id=self._job_id)
1✔
834

835
    @deprecate(
1✔
836
        run_again="Either delete the job via job.remove() or use delete_existing_job=True.",
837
        version="0.4.0",
838
    )
839
    def run(
1✔
840
        self,
841
        delete_existing_job=False,
842
        repair=False,
843
        debug=False,
844
        run_mode=None,
845
        run_again=False,
846
    ):
847
        """
848
        This is the main run function, depending on the job status ['initialized', 'created', 'submitted', 'running',
849
        'collect','finished', 'refresh', 'suspended'] the corresponding run mode is chosen.
850

851
        Args:
852
            delete_existing_job (bool): Delete the existing job and run the simulation again.
853
            repair (bool): Set the job status to created and run the simulation again.
854
            debug (bool): Debug Mode - defines the log level of the subprocess the job is executed in.
855
            run_mode (str): ['modal', 'non_modal', 'queue', 'manual'] overwrites self.server.run_mode
856
            run_again (bool): Same as delete_existing_job (deprecated)
857
        """
858
        if not isinstance(delete_existing_job, bool):
1✔
859
            raise ValueError(
1✔
860
                f"We got delete_existing_job = {delete_existing_job}. If you"
861
                " meant to delete the job, set delete_existing_job"
862
                " = True"
863
            )
864
        with catch_signals(self.signal_intercept):
1✔
865
            if run_again:
1✔
866
                delete_existing_job = True
×
867
            try:
1✔
868
                self._logger.info(
1✔
869
                    "run {}, status: {}".format(self.job_info_str, self.status)
870
                )
871
                status = self.status.string
1✔
872
                if run_mode is not None:
1✔
873
                    self.server.run_mode = run_mode
×
874
                if delete_existing_job:
1✔
875
                    status = "initialized"
1✔
876
                    self.remove_and_reset_id(_protect_childs=False)
1✔
877
                if repair and self.job_id and not self.status.finished:
1✔
878
                    self._run_if_repair()
×
879
                elif status == "initialized":
1✔
880
                    self._run_if_new(debug=debug)
1✔
881
                elif status == "created":
1✔
882
                    self._run_if_created()
1✔
883
                elif status == "submitted":
1✔
884
                    run_job_with_status_submitted(job=self)
×
885
                elif status == "running":
1✔
886
                    self._run_if_running()
×
887
                elif status == "collect":
1✔
888
                    self._run_if_collect()
1✔
889
                elif status == "suspend":
1✔
890
                    self._run_if_suspended()
×
891
                elif status == "refresh":
1✔
892
                    self.run_if_refresh()
×
893
                elif status == "busy":
1✔
894
                    self._run_if_busy()
×
895
                elif status == "finished":
1✔
896
                    run_job_with_status_finished(job=self)
1✔
897
                elif status == "aborted":
1✔
898
                    raise ValueError(
1✔
899
                        "Running an aborted job with `delete_existing_job=False` is meaningless."
900
                    )
901
            except Exception:
1✔
902
                self.drop_status_to_aborted()
1✔
903
                raise
1✔
904

905
    def run_if_modal(self):
1✔
906
        """
907
        The run if modal function is called by run to execute the simulation, while waiting for the output. For this we
908
        use subprocess.check_output()
909
        """
910
        run_job_with_runmode_modal(job=self)
×
911

912
    def run_static(self):
1✔
913
        """
914
        The run static function is called by run to execute the simulation.
915
        """
916
        if self._job_with_calculate_function:
1✔
917
            execute_job_with_calculate_function(job=self)
1✔
918
        else:
919
            return execute_job_with_external_executable(job=self)
1✔
920

921
    def run_if_scheduler(self):
1✔
922
        """
923
        The run if queue function is called by run if the user decides to submit the job to and queing system. The job
924
        is submitted to the queuing system using subprocess.Popen()
925
        Returns:
926
            int: Returns the queue ID for the job.
927
        """
928
        return run_job_with_runmode_queue(job=self)
×
929

930
    def transfer_from_remote(self):
1✔
931
        state.queue_adapter.get_job_from_remote(
×
932
            working_directory="/".join(self.working_directory.split("/")[:-1]),
933
        )
934
        state.queue_adapter.transfer_file_to_remote(
×
935
            file=self.project_hdf5.file_name,
936
            transfer_back=True,
937
            delete_file_on_remote=True,
938
        )
939
        if state.database.database_is_disabled:
×
940
            self.project.db.update()
×
941
        else:
942
            ft = FileTable(index_from=self.project_hdf5.path + "_hdf5/")
×
943
            df = ft.job_table(
×
944
                sql_query=None,
945
                user=state.settings.login_user,
946
                project_path=None,
947
                all_columns=True,
948
            )
949
            db_dict_lst = []
×
950
            for j, st, sj, p, h, hv, c, ts, tp, tc in zip(
×
951
                df.job.values,
952
                df.status.values,
953
                df.subjob.values,
954
                df.project.values,
955
                df.hamilton.values,
956
                df.hamversion.values,
957
                df.computer.values,
958
                df.timestart.values,
959
                df.timestop.values,
960
                df.totalcputime.values,
961
            ):
962
                gp = self.project._convert_str_to_generic_path(p)
×
963
                db_dict_lst.append(
×
964
                    {
965
                        "username": state.settings.login_user,
966
                        "projectpath": gp.root_path,
967
                        "project": gp.project_path,
968
                        "job": j,
969
                        "subjob": sj,
970
                        "hamversion": hv,
971
                        "hamilton": h,
972
                        "status": st,
973
                        "computer": c,
974
                        "timestart": datetime.utcfromtimestamp(ts.tolist() / 1e9),
975
                        "timestop": datetime.utcfromtimestamp(tp.tolist() / 1e9),
976
                        "totalcputime": tc,
977
                        "masterid": self.master_id,
978
                        "parentid": None,
979
                    }
980
                )
981
            _ = [self.project.db.add_item_dict(d) for d in db_dict_lst]
×
982
        self.status.string = self.project_hdf5["status"]
×
983
        if self.master_id is not None:
×
984
            self._reload_update_master(project=self.project, master_id=self.master_id)
×
985

986
    def run_if_interactive(self):
1✔
987
        """
988
        For jobs which executables are available as Python library, those can also be executed with a library call
989
        instead of calling an external executable. This is usually faster than a single core python job.
990
        """
991
        raise NotImplementedError(
×
992
            "This function needs to be implemented in the specific class."
993
        )
994

995
    def run_if_interactive_non_modal(self):
1✔
996
        """
997
        For jobs which executables are available as Python library, those can also be executed with a library call
998
        instead of calling an external executable. This is usually faster than a single core python job.
999
        """
1000
        raise NotImplementedError(
×
1001
            "This function needs to be implemented in the specific class."
1002
        )
1003

1004
    def interactive_close(self):
1✔
1005
        """
1006
        For jobs which executables are available as Python library, those can also be executed with a library call
1007
        instead of calling an external executable. This is usually faster than a single core python job. After the
1008
        interactive execution, the job can be closed using the interactive_close function.
1009
        """
1010
        raise NotImplementedError(
×
1011
            "This function needs to be implemented in the specific class."
1012
        )
1013

1014
    def interactive_fetch(self):
1✔
1015
        """
1016
        For jobs which executables are available as Python library, those can also be executed with a library call
1017
        instead of calling an external executable. This is usually faster than a single core python job. To access the
1018
        output data during the execution the interactive_fetch function is used.
1019
        """
1020
        raise NotImplementedError(
×
1021
            "This function needs to be implemented in the specific class."
1022
        )
1023

1024
    def interactive_flush(self, path="generic", include_last_step=True):
1✔
1025
        """
1026
        For jobs which executables are available as Python library, those can also be executed with a library call
1027
        instead of calling an external executable. This is usually faster than a single core python job. To write the
1028
        interactive cache to the HDF5 file the interactive flush function is used.
1029
        """
1030
        raise NotImplementedError(
×
1031
            "This function needs to be implemented in the specific class."
1032
        )
1033

1034
    def send_to_database(self):
1✔
1035
        """
1036
        if the jobs should be store in the external/public database this could be implemented here, but currently it is
1037
        just a placeholder.
1038
        """
1039
        if self.server.send_to_db:
1✔
1040
            pass
×
1041

1042
    def _init_child_job(self, parent):
1✔
1043
        """
1044
        Finalize job initialization when job instance is created as a child from another one.
1045

1046
        Master jobs use this to set their own reference job, when created from that reference job.
1047

1048
        Args:
1049
            parent (:class:`.GenericJob`): job instance that this job was created from
1050
        """
1051
        pass
×
1052

1053
    def create_job(self, job_type, job_name, delete_existing_job=False):
1✔
1054
        """
1055
        Create one of the following jobs:
1056
        - 'StructureContainer’:
1057
        - ‘StructurePipeline’:
1058
        - ‘AtomisticExampleJob’: example job just generating random number
1059
        - ‘ExampleJob’: example job just generating random number
1060
        - ‘Lammps’:
1061
        - ‘KMC’:
1062
        - ‘Sphinx’:
1063
        - ‘Vasp’:
1064
        - ‘GenericMaster’:
1065
        - ‘ParallelMaster’: series of jobs run in parallel
1066
        - ‘KmcMaster’:
1067
        - ‘ThermoLambdaMaster’:
1068
        - ‘RandomSeedMaster’:
1069
        - ‘MeamFit’:
1070
        - ‘Murnaghan’:
1071
        - ‘MinimizeMurnaghan’:
1072
        - ‘ElasticMatrix’:
1073
        - ‘ConvergenceEncutParallel’:
1074
        - ‘ConvergenceKpointParallel’:
1075
        - ’PhonopyMaster’:
1076
        - ‘DefectFormationEnergy’:
1077
        - ‘LammpsASE’:
1078
        - ‘PipelineMaster’:
1079
        - ’TransformationPath’:
1080
        - ‘ThermoIntEamQh’:
1081
        - ‘ThermoIntDftEam’:
1082
        - ‘ScriptJob’: Python script or jupyter notebook job container
1083
        - ‘ListMaster': list of jobs
1084

1085
        Args:
1086
            job_type (str): job type can be ['StructureContainer’, ‘StructurePipeline’, ‘AtomisticExampleJob’,
1087
                                             ‘ExampleJob’, ‘Lammps’, ‘KMC’, ‘Sphinx’, ‘Vasp’, ‘GenericMaster’,
1088
                                             ‘ParallelMaster’, ‘KmcMaster’,
1089
                                             ‘ThermoLambdaMaster’, ‘RandomSeedMaster’, ‘MeamFit’, ‘Murnaghan’,
1090
                                             ‘MinimizeMurnaghan’, ‘ElasticMatrix’,
1091
                                             ‘ConvergenceEncutParallel’, ‘ConvergenceKpointParallel’, ’PhonopyMaster’,
1092
                                             ‘DefectFormationEnergy’, ‘LammpsASE’, ‘PipelineMaster’,
1093
                                             ’TransformationPath’, ‘ThermoIntEamQh’, ‘ThermoIntDftEam’, ‘ScriptJob’,
1094
                                             ‘ListMaster']
1095
            job_name (str): name of the job
1096
            delete_existing_job (bool): delete an existing job - default false
1097

1098
        Returns:
1099
            GenericJob: job object depending on the job_type selected
1100
        """
1101
        job = self.project.create_job(
1✔
1102
            job_type=job_type,
1103
            job_name=job_name,
1104
            delete_existing_job=delete_existing_job,
1105
        )
1106
        job._init_child_job(self)
1✔
1107
        return job
1✔
1108

1109
    def update_master(self, force_update=False):
1✔
1110
        """
1111
        After a job is finished it checks whether it is linked to any metajob - meaning the master ID is pointing to
1112
        this jobs job ID. If this is the case and the master job is in status suspended - the child wakes up the master
1113
        job, sets the status to refresh and execute run on the master job. During the execution the master job is set to
1114
        status refresh. If another child calls update_master, while the master is in refresh the status of the master is
1115
        set to busy and if the master is in status busy at the end of the update_master process another update is
1116
        triggered.
1117

1118
        Args:
1119
            force_update (bool): Whether to check run mode for updating master
1120
        """
1121
        if not state.database.database_is_disabled:
1✔
1122
            master_id = self.master_id
1✔
1123
            project = self.project
1✔
1124
            self._logger.info(
1✔
1125
                "update master: {} {} {}".format(
1126
                    master_id, self.get_job_id(), self.server.run_mode
1127
                )
1128
            )
1129
            if master_id is not None and (
1✔
1130
                force_update
1131
                or not (
1132
                    self.server.run_mode.thread
1133
                    or self.server.run_mode.modal
1134
                    or self.server.run_mode.interactive
1135
                    or self.server.run_mode.worker
1136
                )
1137
            ):
1138
                self._reload_update_master(project=project, master_id=master_id)
×
1139

1140
    def job_file_name(self, file_name, cwd=None):
1✔
1141
        """
1142
        combine the file name file_name with the path of the current working directory
1143

1144
        Args:
1145
            file_name (str): name of the file
1146
            cwd (str): current working directory - this overwrites self.project_hdf5.working_directory - optional
1147

1148
        Returns:
1149
            str: absolute path to the file in the current working directory
1150
        """
1151
        if cwd is None:
×
1152
            cwd = self.project_hdf5.working_directory
×
1153
        return posixpath.join(cwd, file_name)
×
1154

1155
    def _set_hdf(self, hdf=None, group_name=None):
1✔
1156
        if hdf is not None:
1✔
1157
            self._hdf5 = hdf
1✔
1158
        if group_name is not None and self._hdf5 is not None:
1✔
1159
            self._hdf5 = self._hdf5.open(group_name)
1✔
1160

1161
    def to_dict(self):
1✔
1162
        data_dict = self._type_to_dict()
1✔
1163
        data_dict["status"] = self.status.string
1✔
1164
        data_dict["input/generic_dict"] = {
1✔
1165
            "restart_file_list": self.restart_file_list,
1166
            "restart_file_dict": self._restart_file_dict,
1167
            "exclude_nodes_hdf": self._exclude_nodes_hdf,
1168
            "exclude_groups_hdf": self._exclude_groups_hdf,
1169
        }
1170
        data_dict["server"] = self._server.to_dict()
1✔
1171
        self._executable_activate_mpi()
1✔
1172
        if self._executable is not None:
1✔
1173
            data_dict["executable"] = self._executable.to_dict()
1✔
1174
        if self._import_directory is not None:
1✔
1175
            data_dict["import_directory"] = self._import_directory
×
1176
        if self._executor_type is not None:
1✔
1177
            data_dict["executor_type"] = self._executor_type
1✔
1178
        if len(self._files_to_compress) > 0:
1✔
1179
            data_dict["files_to_compress"] = self._files_to_compress
×
1180
        if len(self._files_to_remove) > 0:
1✔
1181
            data_dict["files_to_compress"] = self._files_to_remove
×
1182
        return data_dict
1✔
1183

1184
    def from_dict(self, job_dict):
1✔
1185
        self._type_from_dict(type_dict=job_dict)
1✔
1186
        if "import_directory" in job_dict.keys():
1✔
1187
            self._import_directory = job_dict["import_directory"]
×
1188
        self._server.from_dict(server_dict=job_dict["server"])
1✔
1189
        if "executable" in job_dict.keys() and job_dict["executable"] is not None:
1✔
1190
            self.executable.from_dict(job_dict["executable"])
1✔
1191
        input_dict = job_dict["input"]
1✔
1192
        if "generic_dict" in input_dict.keys():
1✔
1193
            generic_dict = input_dict["generic_dict"]
1✔
1194
            self._restart_file_list = generic_dict["restart_file_list"]
1✔
1195
            self._restart_file_dict = generic_dict["restart_file_dict"]
1✔
1196
            self._exclude_nodes_hdf = generic_dict["exclude_nodes_hdf"]
1✔
1197
            self._exclude_groups_hdf = generic_dict["exclude_groups_hdf"]
1✔
1198
        # Backwards compatbility
1199
        if "restart_file_list" in input_dict.keys():
1✔
1200
            self._restart_file_list = input_dict["restart_file_list"]
×
1201
        if "restart_file_dict" in input_dict.keys():
1✔
1202
            self._restart_file_dict = input_dict["restart_file_dict"]
×
1203
        if "exclude_nodes_hdf" in input_dict.keys():
1✔
1204
            self._exclude_nodes_hdf = input_dict["exclude_nodes_hdf"]
×
1205
        if "exclude_groups_hdf" in input_dict.keys():
1✔
1206
            self._exclude_groups_hdf = input_dict["exclude_groups_hdf"]
×
1207
        if "executor_type" in input_dict.keys():
1✔
1208
            self._executor_type = input_dict["executor_type"]
×
1209

1210
    def to_hdf(self, hdf=None, group_name=None):
1✔
1211
        """
1212
        Store the GenericJob in an HDF5 file
1213

1214
        Args:
1215
            hdf (ProjectHDFio): HDF5 group object - optional
1216
            group_name (str): HDF5 subgroup name - optional
1217
        """
1218
        self._set_hdf(hdf=hdf, group_name=group_name)
1✔
1219
        self._hdf5.write_dict(data_dict=self.to_dict())
1✔
1220

1221
    @classmethod
1✔
1222
    def from_hdf_args(cls, hdf):
1✔
1223
        """
1224
        Read arguments for instance creation from HDF5 file
1225

1226
        Args:
1227
            hdf (ProjectHDFio): HDF5 group object
1228
        """
1229
        job_name = posixpath.splitext(posixpath.basename(hdf.file_name))[0]
1✔
1230
        project_hdf5 = type(hdf)(
1✔
1231
            project=hdf.create_project_from_hdf5(), file_name=job_name
1232
        )
1233
        return {"job_name": job_name, "project": project_hdf5}
1✔
1234

1235
    def from_hdf(self, hdf=None, group_name=None):
1✔
1236
        """
1237
        Restore the GenericJob from an HDF5 file
1238

1239
        Args:
1240
            hdf (ProjectHDFio): HDF5 group object - optional
1241
            group_name (str): HDF5 subgroup name - optional
1242
        """
1243
        self._set_hdf(hdf=hdf, group_name=group_name)
1✔
1244
        job_dict = self._hdf5.read_dict_from_hdf()
1✔
1245
        with self._hdf5.open("input") as hdf5_input:
1✔
1246
            job_dict["input"] = hdf5_input.read_dict_from_hdf(recursive=True)
1✔
1247
        # Backwards compatibility to the previous HasHDF based interface
1248
        if "executable" in self._hdf5.list_groups():
1✔
1249
            exe_dict = self._hdf5["executable/executable"].to_object().to_builtin()
×
1250
            exe_dict["READ_ONLY"] = self._hdf5["executable/executable/READ_ONLY"]
×
1251
            job_dict["executable"] = {"executable": exe_dict}
×
1252
        self.from_dict(job_dict=job_dict)
1✔
1253

1254
    def save(self):
1✔
1255
        """
1256
        Save the object, by writing the content to the HDF5 file and storing an entry in the database.
1257

1258
        Returns:
1259
            (int): Job ID stored in the database
1260
        """
1261
        self.to_hdf()
1✔
1262
        if not state.database.database_is_disabled:
1✔
1263
            job_id = self.project.db.add_item_dict(self.db_entry())
1✔
1264
            self._job_id = job_id
1✔
1265
            _write_hdf(
1✔
1266
                hdf_filehandle=self.project_hdf5.file_name,
1267
                data=job_id,
1268
                h5_path=self.job_name + "/job_id",
1269
                overwrite="update",
1270
            )
1271
            self.refresh_job_status()
1✔
1272
        else:
1273
            job_id = self.job_name
1✔
1274
        if self._check_if_input_should_be_written():
1✔
1275
            self.project_hdf5.create_working_directory()
1✔
1276
            self.write_input()
1✔
1277
        self.status.created = True
1✔
1278
        print(
1✔
1279
            "The job "
1280
            + self.job_name
1281
            + " was saved and received the ID: "
1282
            + str(job_id)
1283
        )
1284
        return job_id
1✔
1285

1286
    def convergence_check(self):
1✔
1287
        """
1288
        Validate the convergence of the calculation.
1289

1290
        Returns:
1291
             (bool): If the calculation is converged
1292
        """
1293
        return True
1✔
1294

1295
    def db_entry(self):
1✔
1296
        """
1297
        Generate the initial database entry for the current GenericJob
1298

1299
        Returns:
1300
            (dict): database dictionary {"username", "projectpath", "project", "job", "subjob", "hamversion",
1301
                                         "hamilton", "status", "computer", "timestart", "masterid", "parentid"}
1302
        """
1303
        db_dict = {
1✔
1304
            "username": state.settings.login_user,
1305
            "projectpath": self.project_hdf5.root_path,
1306
            "project": self.project_hdf5.project_path,
1307
            "job": self.job_name,
1308
            "subjob": self.project_hdf5.h5_path,
1309
            "hamversion": self.version,
1310
            "hamilton": self.__name__,
1311
            "status": self.status.string,
1312
            "computer": self._db_server_entry(),
1313
            "timestart": datetime.now(),
1314
            "masterid": self.master_id,
1315
            "parentid": self.parent_id,
1316
        }
1317
        return db_dict
1✔
1318

1319
    def restart(self, job_name=None, job_type=None):
1✔
1320
        """
1321
        Create an restart calculation from the current calculation - in the GenericJob this is the same as create_job().
1322
        A restart is only possible after the current job has finished. If you want to run the same job again with
1323
        different input parameters use job.run(delete_existing_job=True) instead.
1324

1325
        Args:
1326
            job_name (str): job name of the new calculation - default=<job_name>_restart
1327
            job_type (str): job type of the new calculation - default is the same type as the exeisting calculation
1328

1329
        Returns:
1330

1331
        """
1332
        if self.job_id is None:
1✔
1333
            self.save()
×
1334
        if job_name is None:
1✔
1335
            job_name = "{}_restart".format(self.job_name)
1✔
1336
        if job_type is None:
1✔
1337
            job_type = self.__name__
1✔
1338
        if job_type == self.__name__ and job_name not in self.project.list_nodes():
1✔
1339
            new_ham = self.copy_to(
1✔
1340
                new_job_name=job_name,
1341
                new_database_entry=False,
1342
                input_only=True,
1343
                copy_files=False,
1344
            )
1345
        else:
1346
            new_ham = self.create_job(job_type, job_name)
×
1347
        new_ham.parent_id = self.job_id
1✔
1348
        # ensuring that the new job does not inherit the restart_file_list from the old job
1349
        new_ham._restart_file_list = list()
1✔
1350
        new_ham._restart_file_dict = dict()
1✔
1351
        return new_ham
1✔
1352

1353
    def _list_all(self):
1✔
1354
        """
1355
        List all groups and nodes of the HDF5 file - where groups are equivalent to directories and nodes to files.
1356

1357
        Returns:
1358
            dict: {'groups': [list of groups], 'nodes': [list of nodes]}
1359
        """
1360
        h5_dict = self.project_hdf5.list_all()
×
1361
        if self.server.new_hdf:
×
1362
            h5_dict["groups"] += self._list_ext_childs()
×
1363
        return h5_dict
×
1364

1365
    def signal_intercept(self, sig):
1✔
1366
        """
1367
        Abort the job and log signal that caused it.
1368

1369
        Expected to be called from
1370
        :func:`pyiron_base.state.signal.catch_signals`.
1371

1372
        Args:
1373
            sig (int): the signal that triggered the abort
1374
        """
1375
        try:
×
1376
            self._logger.info(
×
1377
                "Job {} intercept signal {}, job is shutting down".format(
1378
                    self._job_id, sig
1379
                )
1380
            )
1381
            self.drop_status_to_aborted()
×
1382
        except:
×
1383
            raise
×
1384

1385
    def drop_status_to_aborted(self):
1✔
1386
        """
1387
        Change the job status to aborted when the job was intercepted.
1388
        """
1389
        self.refresh_job_status()
1✔
1390
        if not (self.status.finished or self.status.suspended):
1✔
1391
            self.status.aborted = True
1✔
1392
            self.project_hdf5["status"] = self.status.string
1✔
1393

1394
    def _run_if_new(self, debug=False):
1✔
1395
        """
1396
        Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
1397
        the hdf5 file and the corresponding directory structure.
1398

1399
        Args:
1400
            debug (bool): Debug Mode
1401
        """
1402
        run_job_with_status_initialized(job=self, debug=debug)
1✔
1403

1404
    def _run_if_created(self):
1✔
1405
        """
1406
        Internal helper function the run if created function is called when the job status is 'created'. It executes
1407
        the simulation, either in modal mode, meaning waiting for the simulation to finish, manually, or submits the
1408
        simulation to the que.
1409

1410
        Returns:
1411
            int: Queue ID - if the job was send to the queue
1412
        """
1413
        return run_job_with_status_created(job=self)
1✔
1414

1415
    def _run_if_repair(self):
1✔
1416
        """
1417
        Internal helper function the run if repair function is called when the run() function is called with the
1418
        'repair' parameter.
1419
        """
1420
        run_job_with_parameter_repair(job=self)
×
1421

1422
    def _run_if_running(self):
1✔
1423
        """
1424
        Internal helper function the run if running function is called when the job status is 'running'. It allows the
1425
        user to interact with the simulation while it is running.
1426
        """
1427
        run_job_with_status_running(job=self)
×
1428

1429
    def run_if_refresh(self):
1✔
1430
        """
1431
        Internal helper function the run if refresh function is called when the job status is 'refresh'. If the job was
1432
        suspended previously, the job is going to be started again, to be continued.
1433
        """
1434
        run_job_with_status_refresh(job=self)
×
1435

1436
    def set_input_to_read_only(self):
1✔
1437
        """
1438
        This function enforces read-only mode for the input classes, but it has to be implemented in the individual
1439
        classes.
1440
        """
1441
        self.server.lock()
1✔
1442

1443
    def _run_if_busy(self):
1✔
1444
        """
1445
        Internal helper function the run if busy function is called when the job status is 'busy'.
1446
        """
1447
        run_job_with_status_busy(job=self)
×
1448

1449
    def _run_if_collect(self):
1✔
1450
        """
1451
        Internal helper function the run if collect function is called when the job status is 'collect'. It collects
1452
        the simulation output using the standardized functions collect_output() and collect_logfiles(). Afterwards the
1453
        status is set to 'finished'
1454
        """
1455
        run_job_with_status_collect(job=self)
1✔
1456

1457
    def _run_if_suspended(self):
1✔
1458
        """
1459
        Internal helper function the run if suspended function is called when the job status is 'suspended'. It
1460
        restarts the job by calling the run if refresh function after setting the status to 'refresh'.
1461
        """
1462
        run_job_with_status_suspended(job=self)
×
1463

1464
    def _executable_activate(self, enforce=False, codename=None):
1✔
1465
        """
1466
        Internal helper function to koad the executable object, if it was not loaded already.
1467

1468
        Args:
1469
            enforce (bool): Force the executable module to reinitialize
1470
            codename (str): Name of the resource directory and run script.
1471
        """
1472
        if self._executable is None or enforce:
1✔
1473
            if codename is not None:
1✔
1474
                self._executable = Executable(
×
1475
                    codename=codename,
1476
                    module=codename,
1477
                    path_binary_codes=None,
1478
                )
1479
            elif len(self.__module__.split(".")) > 1:
1✔
1480
                self._executable = Executable(
1✔
1481
                    codename=self.__name__,
1482
                    module=self.__module__.split(".")[-2],
1483
                    path_binary_codes=None,
1484
                )
1485
            elif self.__module__ == "__main__":
×
1486
                # Special case when the job classes defined in Jupyter notebooks
1487
                parent_class = self.__class__.__bases__[0]
×
1488
                self._executable = Executable(
×
1489
                    codename=parent_class.__name__,
1490
                    module=parent_class.__module__.split(".")[-2],
1491
                    path_binary_codes=None,
1492
                )
1493
            else:
1494
                self._executable = Executable(
×
1495
                    codename=self.__name__,
1496
                    path_binary_codes=None,
1497
                )
1498

1499
    def _type_to_dict(self):
1✔
1500
        """
1501
        Internal helper function to save type and version in HDF5 file root
1502
        """
1503
        data_dict = super()._type_to_dict()
1✔
1504
        if self._executable:  # overwrite version - default self.__version__
1✔
1505
            data_dict["VERSION"] = self.executable.version
1✔
1506
        if hasattr(self, "__hdf_version__"):
1✔
1507
            data_dict["HDF_VERSION"] = self.__hdf_version__
1✔
1508
        return data_dict
1✔
1509

1510
    def _type_from_dict(self, type_dict):
1✔
1511
        self.__obj_type__ = type_dict["TYPE"]
1✔
1512
        if self._executable is None:
1✔
1513
            self.__obj_version__ = type_dict["VERSION"]
1✔
1514

1515
    def _type_from_hdf(self):
1✔
1516
        """
1517
        Internal helper function to load type and version from HDF5 file root
1518
        """
1519
        self._type_from_dict(
×
1520
            type_dict={
1521
                "TYPE": self._hdf5["TYPE"],
1522
                "VERSION": self._hdf5["VERSION"],
1523
            }
1524
        )
1525

1526
    def run_time_to_db(self):
1✔
1527
        """
1528
        Internal helper function to store the run_time in the database
1529
        """
1530
        if not state.database.database_is_disabled and self.job_id is not None:
1✔
1531
            self.project.db.item_update(self._runtime(), self.job_id)
1✔
1532

1533
    def _runtime(self):
1✔
1534
        """
1535
        Internal helper function to calculate runtime by substracting the starttime, from the stoptime.
1536

1537
        Returns:
1538
            (dict): Database dictionary db_dict
1539
        """
1540
        start_time = self.project.db.get_item_by_id(self.job_id)["timestart"]
1✔
1541
        stop_time = datetime.now()
1✔
1542
        return {
1✔
1543
            "timestop": stop_time,
1544
            "totalcputime": int((stop_time - start_time).total_seconds()),
1545
        }
1546

1547
    def _db_server_entry(self):
1✔
1548
        """
1549
        Internal helper function to connect all the info regarding the server into a single word that can be used
1550
        e.g. as entry in a database
1551

1552
        Returns:
1553
            (str): server info as single word
1554

1555
        """
1556
        return self._server.db_entry()
1✔
1557

1558
    def _executable_activate_mpi(self):
1✔
1559
        """
1560
        Internal helper function to switch the executable to MPI mode
1561
        """
1562
        try:
1✔
1563
            if self.server.cores > 1:
1✔
1564
                self.executable.mpi = True
1✔
1565
        except ValueError:
×
1566
            self.server.cores = 1
×
1567
            warnings.warn(
×
1568
                "No multi core executable found falling back to the single core executable.",
1569
                RuntimeWarning,
1570
            )
1571

1572
    @deprecate("Use job.save()")
1✔
1573
    def _create_job_structure(self, debug=False):
1✔
1574
        """
1575
        Internal helper function to create the input directories, save the job in the database and write the wrapper.
1576

1577
        Args:
1578
            debug (bool): Debug Mode
1579
        """
1580
        self.save()
×
1581

1582
    def _check_if_input_should_be_written(self):
1✔
1583
        if self._job_with_calculate_function:
1✔
1584
            return False
1✔
1585
        else:
1586
            return not (
1✔
1587
                self.server.run_mode.interactive
1588
                or self.server.run_mode.interactive_non_modal
1589
            )
1590

1591
    def _before_successor_calc(self, ham):
1✔
1592
        """
1593
        Internal helper function which is executed based on the hamiltonian of the successor job, before it is executed.
1594
        This function is used to execute a series of jobs based on their parent relationship - marked by the parent ID.
1595
        Mainly used by the ListMaster job type.
1596
        """
1597
        pass
×
1598

1599
    def _reload_update_master(self, project, master_id):
1✔
1600
        queue_flag = self.server.run_mode.queue
×
1601
        master_db_entry = project.db.get_item_by_id(master_id)
×
1602
        if master_db_entry["status"] == "suspended":
×
1603
            project.db.set_job_status(job_id=master_id, status="refresh")
×
1604
            self._logger.info("run_if_refresh() called")
×
1605
            del self
×
1606
            master_inspect = project.inspect(master_id)
×
1607
            if master_inspect["server"]["run_mode"] == "non_modal" or (
×
1608
                master_inspect["server"]["run_mode"] == "modal" and queue_flag
1609
            ):
1610
                master = project.load(master_id)
×
1611
                master.run_if_refresh()
×
1612
        elif master_db_entry["status"] == "refresh":
×
1613
            project.db.set_job_status(job_id=master_id, status="busy")
×
1614
            self._logger.info("busy master: {} {}".format(master_id, self.get_job_id()))
×
1615
            del self
×
1616

1617
    def _get_executor(self, max_workers=None):
1✔
1618
        if self._executor_type is None:
1✔
1619
            raise ValueError(
1✔
1620
                "No executor type defined - Please set self.executor_type."
1621
            )
1622
        elif (
1✔
1623
            self._executor_type == "pympipool.Executor"
1624
            and platform.system() == "Darwin"
1625
        ):
1626
            # The Mac firewall might prevent connections based on the network address - especially Github CI
1627
            return import_class(self._executor_type)(
×
1628
                max_cores=max_workers, hostname_localhost=True
1629
            )
1630
        elif self._executor_type == "pympipool.Executor":
1✔
1631
            # The pympipool Executor defines max_cores rather than max_workers
1632
            return import_class(self._executor_type)(max_cores=max_workers)
1✔
1633
        elif isinstance(self._executor_type, str):
1✔
1634
            return import_class(self._executor_type)(max_workers=max_workers)
1✔
1635
        else:
1636
            raise TypeError("The self.executor_type has to be a string.")
×
1637

1638

1639
class GenericError(object):
1✔
1640
    def __init__(self, working_directory):
1✔
1641
        self._working_directory = working_directory
1✔
1642

1643
    def __repr__(self):
1✔
1644
        all_messages = ""
1✔
1645
        for message in [self.print_message(), self.print_queue()]:
1✔
1646
            if message is True:
1✔
NEW
1647
                all_messages += message
×
1648
        if len(all_messages) == 0:
1✔
1649
            all_messages = "There is no error/warning"
1✔
1650
        return all_messages
1✔
1651

1652
    def print_message(self, string=""):
1✔
1653
        return self._print_error(file_name="error.msg", string=string)
1✔
1654

1655
    def print_queue(self, string=""):
1✔
1656
        return self._print_error(file_name="error.out", string=string)
1✔
1657

1658
    def _print_error(self, file_name, string="", print_yes=True):
1✔
1659
        if not os.path.exists(os.path.join(self._working_directory, file_name)):
1✔
1660
            return ""
1✔
NEW
1661
        elif print_yes:
×
NEW
1662
            with open(os.path.join(self._working_directory, file_name)) as f:
×
1663
                return string.join(f.readlines())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc