• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pyiron_base / 9471909594

11 Jun 2024 07:52PM UTC coverage: 71.339% (+0.06%) from 71.284%
9471909594

Pull #1472

github

web-flow
Merge faae3f5cc into df9b3d159
Pull Request #1472: Add get_input_file_dict() in GenericJob

14 of 21 new or added lines in 2 files covered. (66.67%)

92 existing lines in 3 files now uncovered.

7166 of 10045 relevant lines covered (71.34%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.85
/pyiron_base/jobs/job/runfunction.py
1
# coding: utf-8
2
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
3
# Distributed under the terms of "New BSD License", see the LICENSE file.
4
from concurrent.futures import ProcessPoolExecutor
1✔
5
from datetime import datetime
1✔
6
import multiprocessing
1✔
7
import os
1✔
8
import posixpath
1✔
9
import subprocess
1✔
10

11
from jinja2 import Template
1✔
12
from pyiron_snippets.deprecate import deprecate
1✔
13

14
from pyiron_base.jobs.job.wrapper import JobWrapper
1✔
15
from pyiron_base.state import state
1✔
16
from pyiron_base.state.signal import catch_signals
1✔
17
from pyiron_base.utils.instance import static_isinstance
1✔
18

19

20
try:
1✔
21
    import flux.job
1✔
22

23
    flux_available = True
×
24
except ImportError:
1✔
25
    flux_available = False
1✔
26

27
"""
1✔
28
The function job.run() inside pyiron is executed differently depending on the status of the job object. This module 
29
introduces the most general run functions and how they are selected. 
30

31
If an additional parameter is provided, then a specific run function is executed: 
32
    repair: run_job_with_parameter_repair
33

34
If no explicit parameter is provided the first implicit parameter is the job.status: 
35
    initialized: run_job_with_status_initialized
36
    created: run_job_with_status_created
37
    submitted: run_job_with_status_submitted
38
    running: run_job_with_status_running
39
    refresh: run_job_with_status_refresh
40
    busy: run_job_with_status_busy
41
    collect: run_job_with_status_collect
42
    suspended: run_job_with_status_suspended
43
    finished: run_job_with_status_finished
44
    
45
Afterwards inside the run_job_with_status_created() function the job is executed differently depending on the run mode 
46
of the server object attached to the job object: job.server.run_mode
47
    manual: run_job_with_runmode_manually
48
    modal: run_job_with_runmode_modal
49
    non_modal: run_job_with_runmode_non_modal
50
    interactive: run_job_with_runmode_interactive
51
    interactive_non_modal: run_job_with_runmode_interactive_non_modal
52
    queue: run_job_with_runmode_queue
53
    srun: run_job_with_runmode_srun
54
    executor: run_job_with_runmode_executor
55
    thread: only affects children of a GenericMaster 
56
    worker: only affects children of a GenericMaster 
57
    
58
Finally for jobs which call an external executable the execution is implemented in an function as well: 
59
    execute_job_with_external_executable
60
"""
61

62

63
# Parameter
64
def run_job_with_parameter_repair(job):
1✔
65
    """
66
    Internal helper function the run if repair function is called when the run() function is called with the
67
    'repair' parameter.
68

69
    Args:
70
        job (GenericJob): pyiron job object
71
    """
72
    job._run_if_created()
×
73

74

75
# Job Status
76
def run_job_with_status_initialized(job, debug=False):
1✔
77
    """
78
    Internal helper function the run if new function is called when the job status is 'initialized'. It prepares
79
    the hdf5 file and the corresponding directory structure.
80

81
    Args:
82
        job (GenericJob): pyiron job object
83
        debug (bool): Debug Mode
84
    """
85
    job.validate_ready_to_run()
1✔
86
    if job.server.run_mode.queue:
1✔
87
        job.check_setup()
×
88
    if job.check_if_job_exists():
1✔
89
        print("job exists already and therefore was not created!")
×
90
    else:
91
        job.save()
1✔
92
        job.run()
1✔
93

94

95
def run_job_with_status_created(job):
1✔
96
    """
97
    Internal helper function the run if created function is called when the job status is 'created'. It executes
98
    the simulation, either in modal mode, meaning waiting for the simulation to finish, manually, or submits the
99
    simulation to the que.
100

101
    Args:
102
        job (GenericJob): pyiron job object
103

104
    Returns:
105
        int: Queue ID - if the job was send to the queue
106
    """
107
    job.status.submitted = True
1✔
108

109
    # Different run modes
110
    if job.server.run_mode.manual:
1✔
111
        run_job_with_runmode_manually(job=job, _manually_print=True)
1✔
112
    elif job.server.run_mode.worker:
1✔
113
        run_job_with_runmode_manually(job=job, _manually_print=True)
1✔
114
    elif job.server.run_mode.modal:
1✔
115
        job.run_static()
1✔
116
    elif job.server.run_mode.srun:
1✔
117
        run_job_with_runmode_srun(job=job)
×
118
    elif job.server.run_mode.executor:
1✔
119
        if job.server.gpus is not None:
1✔
120
            gpus_per_slot = int(job.server.gpus / job.server.cores)
×
121
            if gpus_per_slot < 0:
×
122
                raise ValueError(
×
123
                    "Both job.server.gpus and job.server.cores have to be greater than zero."
124
                )
125
        else:
126
            gpus_per_slot = None
1✔
127
        run_job_with_runmode_executor(
1✔
128
            job=job,
129
            executor=job.server.executor,
130
            gpus_per_slot=gpus_per_slot,
131
        )
132
    elif (
1✔
133
        job.server.run_mode.non_modal
134
        or job.server.run_mode.thread
135
        or job.server.run_mode.worker
136
    ):
137
        run_job_with_runmode_non_modal(job=job)
1✔
138
    elif job.server.run_mode.queue:
×
139
        job.run_if_scheduler()
×
140
    elif job.server.run_mode.interactive:
×
141
        job.run_if_interactive()
×
142
    elif job.server.run_mode.interactive_non_modal:
×
143
        job.run_if_interactive_non_modal()
×
144

145

146
def run_job_with_status_submitted(
1✔
147
    job,
148
):  # Submitted jobs are handled by the job wrapper!
149
    """
150
    Internal helper function the run if submitted function is called when the job status is 'submitted'. It means
151
    the job is waiting in the queue. ToDo: Display a list of the users jobs in the queue.
152

153
    Args:
154
        job (GenericJob): pyiron job object
155
    """
156
    if (
×
157
        job.server.run_mode.queue
158
        and not job.project.queue_check_job_is_waiting_or_running(job)
159
    ):
160
        if not state.queue_adapter.remote_flag:
×
161
            job.run(delete_existing_job=True)
×
162
        else:
163
            job.transfer_from_remote()
×
164
    else:
165
        print("Job " + str(job.job_id) + " is waiting in the que!")
×
166

167

168
def run_job_with_status_running(job):
1✔
169
    """
170
    Internal helper function the run if running function is called when the job status is 'running'. It allows the
171
    user to interact with the simulation while it is running.
172

173
    Args:
174
        job (GenericJob): pyiron job object
175
    """
176
    if (
×
177
        job.server.run_mode.queue
178
        and not job.project.queue_check_job_is_waiting_or_running(job)
179
    ):
180
        job.run(delete_existing_job=True)
×
181
    elif job.server.run_mode.interactive:
×
182
        job.run_if_interactive()
×
183
    elif job.server.run_mode.interactive_non_modal:
×
184
        job.run_if_interactive_non_modal()
×
185
    else:
186
        print("Job " + str(job.job_id) + " is running!")
×
187

188

189
def run_job_with_status_refresh(job):
1✔
190
    """
191
    Internal helper function the run if refresh function is called when the job status is 'refresh'. If the job was
192
    suspended previously, the job is going to be started again, to be continued.
193

194
    Args:
195
        job (GenericJob): pyiron job object
196
    """
197
    raise NotImplementedError(
×
198
        "Refresh is not supported for this job type for job  " + str(job.job_id)
199
    )
200

201

202
def run_job_with_status_busy(job):
1✔
203
    """
204
    Internal helper function the run if busy function is called when the job status is 'busy'.
205

206
    Args:
207
        job (GenericJob): pyiron job object
208
    """
209
    raise NotImplementedError(
×
210
        "Refresh is not supported for this job type for job  " + str(job.job_id)
211
    )
212

213

214
def run_job_with_status_collect(job):
1✔
215
    """
216
    Internal helper function the run if collect function is called when the job status is 'collect'. It collects
217
    the simulation output using the standardized functions collect_output() and collect_logfiles(). Afterwards the
218
    status is set to 'finished'
219

220
    Args:
221
        job (GenericJob): pyiron job object
222
    """
223
    job.collect_output()
1✔
224
    job.collect_logfiles()
1✔
225
    job.run_time_to_db()
1✔
226
    if job.status.collect:
1✔
227
        if not job.convergence_check():
1✔
228
            job.status.not_converged = True
×
229
        else:
230
            if job._compress_by_default:
1✔
231
                job.compress()
×
232
            job.status.finished = True
1✔
233
    job._hdf5["status"] = job.status.string
1✔
234
    job.send_to_database()
1✔
235
    job.update_master()
1✔
236

237

238
def run_job_with_status_suspended(job):
1✔
239
    """
240
    Internal helper function the run if suspended function is called when the job status is 'suspended'. It
241
    restarts the job by calling the run if refresh function after setting the status to 'refresh'.
242

243
    Args:
244
        job (GenericJob): pyiron job object
245
    """
UNCOV
246
    job.status.refresh = True
×
UNCOV
247
    job.run()
×
248

249

250
@deprecate(
1✔
251
    run_again="Either delete the job via job.remove() or use delete_existing_job=True.",
252
    version="0.4.0",
253
)
254
def run_job_with_status_finished(job):
1✔
255
    """
256
    Internal helper function the run if finished function is called when the job status is 'finished'. It loads
257
    the existing job.
258

259
    Args:
260
        job (GenericJob): pyiron job object
261
    """
262
    job.logger.warning(
1✔
263
        "The job {} is being loaded instead of running. To re-run use the argument "
264
        "'delete_existing_job=True in create_job'".format(job.job_name)
265
    )
266
    job.from_hdf()
1✔
267

268

269
# Run Modes
270
def run_job_with_runmode_manually(job, _manually_print=True):
1✔
271
    """
272
    Internal helper function to run a job manually.
273

274
    Args:
275
        job (GenericJob): pyiron job object
276
        _manually_print (bool): [True/False] print command for execution - default=True
277
    """
278
    if _manually_print:
1✔
279
        abs_working = posixpath.abspath(job.project_hdf5.working_directory)
1✔
280
        if not state.database.database_is_disabled:
1✔
281
            print(
1✔
282
                "You have selected to start the job manually. "
283
                + "To run it, go into the working directory {} and ".format(abs_working)
284
                + "call 'python -m pyiron_base.cli wrapper -p {}".format(abs_working)
285
                + " -j {} ' ".format(job.job_id)
286
            )
287
        else:
288
            print(
1✔
289
                "You have selected to start the job manually. "
290
                + "To run it, go into the working directory {} and ".format(abs_working)
291
                + "call 'python -m pyiron_base.cli wrapper -p {}".format(abs_working)
292
                + " -f {} ' ".format(
293
                    job.project_hdf5.file_name + job.project_hdf5.h5_path
294
                )
295
            )
296

297

298
def run_job_with_runmode_modal(job):
1✔
299
    """
300
    The run if modal function is called by run to execute the simulation, while waiting for the output. For this we
301
    use subprocess.check_output()
302

303
    Args:
304
        job (GenericJob): pyiron job object
305
    """
UNCOV
306
    job.run_static()
×
307

308

309
def run_job_with_runmode_non_modal(job):
1✔
310
    """
311
    The run if non modal function is called by run to execute the simulation in the background. For this we use
312
    multiprocessing.Process()
313

314
    Args:
315
        job (GenericJob): pyiron job object
316
    """
317
    if not state.database.database_is_disabled:
1✔
318
        if not state.database.using_local_database:
1✔
319
            args = (job.project_hdf5.working_directory, job.job_id, None, False, None)
1✔
320
        else:
UNCOV
321
            args = (
×
322
                job.project_hdf5.working_directory,
323
                job.job_id,
324
                None,
325
                False,
326
                str(job.project.db.conn.engine.url),
327
            )
328
    else:
UNCOV
329
        args = (
×
330
            job.project_hdf5.working_directory,
331
            None,
332
            job.project_hdf5.file_name + job.project_hdf5.h5_path,
333
            False,
334
            None,
335
        )
336

337
    p = multiprocessing.Process(
1✔
338
        target=multiprocess_wrapper,
339
        args=args,
340
    )
341
    if job.master_id and job.server.run_mode.non_modal:
1✔
UNCOV
342
        del job
×
UNCOV
343
        p.start()
×
344
    else:
345
        if job.server.run_mode.non_modal:
1✔
UNCOV
346
            p.start()
×
347
        else:
348
            job._process = p
1✔
349
            job._process.start()
1✔
350

351

352
def run_job_with_runmode_queue(job):
1✔
353
    """
354
    The run if queue function is called by run if the user decides to submit the job to and queing system. The job
355
    is submitted to the queuing system using subprocess.Popen()
356

357
    Args:
358
        job (GenericJob): pyiron job object
359

360
    Returns:
361
        int: Returns the queue ID for the job.
362
    """
UNCOV
363
    if state.queue_adapter is None:
×
UNCOV
364
        raise TypeError("No queue adapter defined.")
×
365
    if state.queue_adapter.remote_flag:
×
366
        filename = state.queue_adapter.convert_path_to_remote(
×
367
            path=job.project_hdf5.file_name
368
        )
UNCOV
369
        working_directory = state.queue_adapter.convert_path_to_remote(
×
370
            path=job.working_directory
371
        )
UNCOV
372
        command = (
×
373
            "python -m pyiron_base.cli wrapper -p "
374
            + working_directory
375
            + " -f "
376
            + filename
377
            + job.project_hdf5.h5_path
378
            + " --submit"
379
        )
UNCOV
380
        state.queue_adapter.transfer_file_to_remote(
×
381
            file=job.project_hdf5.file_name, transfer_back=False
382
        )
UNCOV
383
    elif state.database.database_is_disabled:
×
UNCOV
384
        command = (
×
385
            "python -m pyiron_base.cli wrapper -p "
386
            + job.working_directory
387
            + " -f "
388
            + job.project_hdf5.file_name
389
            + job.project_hdf5.h5_path
390
        )
391
    else:
UNCOV
392
        command = (
×
393
            "python -m pyiron_base.cli wrapper -p "
394
            + job.working_directory
395
            + " -j "
396
            + str(job.job_id)
397
        )
UNCOV
398
    que_id = state.queue_adapter.submit_job(
×
399
        queue=job.server.queue,
400
        job_name="pi_" + str(job.job_id),
401
        working_directory=job.project_hdf5.working_directory,
402
        cores=job.server.cores,
403
        run_time_max=job.server.run_time,
404
        memory_max=job.server.memory_limit,
405
        command=command,
406
        **job.server.additional_arguments,
407
    )
UNCOV
408
    if que_id is not None:
×
UNCOV
409
        job.server.queue_id = que_id
×
410
        job.project_hdf5.write_dict(data_dict={"server": job.server.to_dict()})
×
411
        print("Queue system id: ", que_id)
×
412
    else:
413
        job._logger.warning("Job aborted")
×
UNCOV
414
        job.status.aborted = True
×
415
        raise ValueError("run_queue.sh crashed")
×
416
    state.logger.debug("submitted %s", job.job_name)
×
417
    job._logger.debug("job status: %s", job.status)
×
418
    job._logger.info(
×
419
        "{}, status: {}, submitted: queue id {}".format(
420
            job.job_info_str, job.status, que_id
421
        )
422
    )
423

424

425
def run_job_with_runmode_srun(job):
1✔
UNCOV
426
    working_directory = job.project_hdf5.working_directory
×
UNCOV
427
    if not state.database.database_is_disabled:
×
428
        if not state.database.using_local_database:
×
429
            command = (
×
430
                "srun python -m pyiron_base.cli wrapper -p "
431
                + working_directory
432
                + "- j "
433
                + job.job_id
434
            )
435
        else:
UNCOV
436
            raise ValueError(
×
437
                "run_job_with_runmode_srun() does not support local databases."
438
            )
439
    else:
UNCOV
440
        command = (
×
441
            "srun python -m pyiron_base.cli wrapper -p "
442
            + working_directory
443
            + " -f "
444
            + job.project_hdf5.file_name
445
            + job.project_hdf5.h5_path
446
        )
UNCOV
447
    os.makedirs(working_directory, exist_ok=True)
×
UNCOV
448
    del job
×
449
    subprocess.Popen(
×
450
        command,
451
        cwd=working_directory,
452
        shell=True,
453
        stdout=subprocess.PIPE,
454
        stderr=subprocess.STDOUT,
455
        universal_newlines=True,
456
        env=os.environ.copy(),
457
    )
458

459

460
def run_job_with_runmode_executor(job, executor, gpus_per_slot=None):
1✔
461
    """
462
    Introduced in Python 3.2 the concurrent.futures interface enables the asynchronous execution of python programs.
463
    A function is submitted to the executor and a future object is returned. The future object is updated in the
464
    background once the executor finished executing the function. The job.server.run_mode.executor implements the same
465
    functionality for pyiron jobs. An executor is set as an attribute to the server object:
466

467
    >>> job.server.executor = concurrent.futures.Executor()
468
    >>> job.run()
469
    >>> job.server.future.done()
470
    False
471
    >>> job.server.future.result()
472
    >>> job.server.future.done()
473
    True
474

475
    When the job is executed by calling the run() function a future object is returned. The job is then executed in the
476
    background and the user can use the future object to check the status of the job.
477

478
    Args:
479
        job (GenericJob): pyiron job object
480
        executor (concurrent.futures.Executor): executor class which implements the executor interface defined in the
481
                                                python concurrent.futures.Executor class.
482
        gpus_per_slot (int): number of GPUs per MPI rank, typically 1
483
    """
484

485
    if static_isinstance(
1✔
486
        obj=job,
487
        obj_type="pyiron_base.jobs.master.generic.GenericMaster",
488
        # The static check is used to avoid a circular import:
489
        # runfunction -> GenericJob -> GenericMaster -> runfunction
490
        # This smells a bit, so if a better architecture is found in the future, use it
491
        # to avoid string-based specifications
492
    ):
UNCOV
493
        raise NotImplementedError(
×
494
            "Currently job.server.run_mode.executor does not support GenericMaster jobs."
495
        )
496
    if flux_available and isinstance(executor, flux.job.FluxExecutor):
1✔
UNCOV
497
        run_job_with_runmode_executor_flux(
×
498
            job=job, executor=executor, gpus_per_slot=gpus_per_slot
499
        )
500
    elif isinstance(executor, ProcessPoolExecutor):
1✔
501
        run_job_with_runmode_executor_futures(job=job, executor=executor)
1✔
502
    else:
UNCOV
503
        raise NotImplementedError(
×
504
            "Currently only flux.job.FluxExecutor and concurrent.futures.ProcessPoolExecutor are supported."
505
        )
506

507

508
def run_job_with_runmode_executor_futures(job, executor):
1✔
509
    """
510
    Interface for the ProcessPoolExecutor implemented in the python standard library as part of the concurrent.futures
511
    module. The ProcessPoolExecutor does not provide any resource management, so the user is responsible to keep track of
512
    the number of compute cores in use, as over-subscription can lead to low performance.
513

514
    The [ProcessPoolExecutor docs](https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor) state: "The __main__ module must be importable by worker subprocesses. This means that ProcessPoolExecutor will not work in the interactive interpreter." (i.e. Jupyter notebooks). For standard usage this is a non-issue, but for the edge case of job classes defined in-notebook (e.g. children of `PythonTemplateJob`), the using the ProcessPoolExecutor will result in errors. To resolve this, relocate such classes to an importable .py file.
515

516
    >>> from concurrent.futures import ProcessPoolExecutor
517
    >>> job.server.executor = ProcessPoolExecutor()
518
    >>> job.server.future.done()
519
    False
520
    >>> job.server.future.result()
521
    >>> job.server.future.done()
522
    True
523

524
    Args:
525
        job (GenericJob): pyiron job object
526
        executor (concurrent.futures.Executor): executor class which implements the executor interface defined in the
527
                                                python concurrent.futures.Executor class.
528
    """
529
    if state.database.database_is_disabled:
1✔
UNCOV
530
        file_path = job.project_hdf5.file_name + job.project_hdf5.h5_path
×
UNCOV
531
        connection_string = None
×
532
    else:
533
        file_path = None
1✔
534
        if state.database.using_local_database:
1✔
UNCOV
535
            connection_string = str(job.project.db.conn.engine.url)
×
536
        else:
537
            connection_string = None
1✔
538

539
    job.server.future = executor.submit(
1✔
540
        multiprocess_wrapper,
541
        working_directory=job.project_hdf5.working_directory,
542
        job_id=job.job_id,
543
        file_path=file_path,
544
        debug=False,
545
        connection_string=connection_string,
546
    )
547

548

549
def run_job_with_runmode_executor_flux(job, executor, gpus_per_slot=None):
1✔
550
    """
551
    Interface for the flux.job.FluxExecutor executor. Flux is a hierarchical resource management. It can either be used to
552
    replace queuing systems like SLURM or be used as a user specific queuing system within an existing allocation.
553
    pyiron provides two interfaces to flux, this executor interface as well as a traditional queuing system interface
554
    via pysqa. This executor interface is designed for the development of asynchronous simulation protocols, while the
555
    traditional queuing system interface simplifies the transition from other queuing systems like SLURM. The usuage
556
    is analog to the concurrent.futures.Executor interface:
557

558
    >>> from flux.job import FluxExecutor
559
    >>> job.server.executor = FluxExecutor()
560
    >>> job.run()
561
    >>> job.server.future.done()
562
    False
563
    >>> job.server.future.result()
564
    >>> job.server.future.done()
565
    True
566

567
    A word of caution - flux is currently only available on Linux, for all other operation systems the ProcessPoolExecutor
568
    from the python standard library concurrent.futures is recommended. The advantage of flux over the ProcessPoolExecutor
569
    is that flux takes over the resource management, like monitoring how many cores are available while with the
570
    ProcessPoolExecutor this is left to the user.
571

572
    Args:
573
        job (GenericJob): pyiron job object
574
        executor (flux.job.FluxExecutor): flux executor class which implements the executor interface defined in the
575
                                      python concurrent.futures.Executor class.
576
        gpus_per_slot (int): number of GPUs per MPI rank, typically 1
577

578
     Returns:
579
         concurrent.futures.Future: future object to develop asynchronous simulation protocols
580
    """
UNCOV
581
    if not flux_available:
×
UNCOV
582
        raise ModuleNotFoundError(
×
583
            "No module named 'flux'. Running in flux mode is only available on Linux;"
584
            "For CPU jobs, please use `conda install -c conda-forge flux-core`; for "
585
            "GPU support you will additionally need "
586
            "`conda install -c conda-forge flux-sched libhwloc=*=cuda*`"
587
        )
UNCOV
588
    executable_str, job_name = _generate_flux_execute_string(
×
589
        job=job, database_is_disabled=state.database.database_is_disabled
590
    )
UNCOV
591
    jobspec = flux.job.JobspecV1.from_batch_command(
×
592
        jobname=job_name,
593
        script=executable_str,
594
        num_nodes=1,
595
        cores_per_slot=1,
596
        gpus_per_slot=gpus_per_slot,
597
        num_slots=job.server.cores,
598
    )
UNCOV
599
    jobspec.cwd = job.project_hdf5.working_directory
×
UNCOV
600
    jobspec.environment = dict(os.environ)
×
601
    job.server.future = executor.submit(jobspec)
×
602

603

604
def run_time_decorator(func):
1✔
605
    def wrapper(job):
1✔
606
        if not state.database.database_is_disabled and job.job_id is not None:
1✔
607
            job.project.db.item_update({"timestart": datetime.now()}, job.job_id)
1✔
608
            output = func(job)
1✔
609
            job.project.db.item_update(job._runtime(), job.job_id)
1✔
610
        else:
UNCOV
611
            output = func(job)
×
612
        return output
1✔
613

614
    return wrapper
1✔
615

616

617
@run_time_decorator
1✔
618
def execute_job_with_external_executable(job):
1✔
619
    """
620
    The run static function is called by run to execute the simulation.
621

622
    Args:
623
        job (GenericJob): pyiron job object
624
    """
625
    job._logger.info(
1✔
626
        "{}, status: {}, run job (modal)".format(job.job_info_str, job.status)
627
    )
628
    if job.executable.executable_path == "":
1✔
UNCOV
629
        job.status.aborted = True
×
UNCOV
630
        job._hdf5["status"] = job.status.string
×
631
        raise ValueError("No executable set!")
×
632
    job.status.running = True
1✔
633
    executable, shell = job.executable.get_input_for_subprocess_call(
1✔
634
        cores=job.server.cores, threads=job.server.threads, gpus=job.server.gpus
635
    )
636
    job_crashed, out = False, None
1✔
637
    if (
1✔
638
        job.server.conda_environment_name is None
639
        and job.server.conda_environment_path is None
640
    ):
641
        try:
1✔
642
            out = subprocess.run(
1✔
643
                executable,
644
                cwd=job.project_hdf5.working_directory,
645
                shell=shell,
646
                stdout=subprocess.PIPE,
647
                stderr=subprocess.STDOUT,
648
                universal_newlines=True,
649
                check=True,
650
                env=os.environ.copy(),
651
            ).stdout
652
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
1✔
653
            job_crashed, out = handle_failed_job(job=job, error=e)
1✔
654
    else:
655
        import conda_subprocess
1✔
656

657
        if job.server.conda_environment_name is not None:
1✔
UNCOV
658
            prefix_name = job.server.conda_environment_name
×
UNCOV
659
            prefix_path = None
×
660
        else:
661
            prefix_name = None
1✔
662
            prefix_path = job.server.conda_environment_path
1✔
663
        try:
1✔
664
            out = conda_subprocess.run(
1✔
665
                executable,
666
                cwd=job.project_hdf5.working_directory,
667
                stdout=subprocess.PIPE,
668
                stderr=subprocess.STDOUT,
669
                universal_newlines=True,
670
                check=True,
671
                prefix_name=prefix_name,
672
                prefix_path=prefix_path,
673
            ).stdout
UNCOV
674
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
×
UNCOV
675
            job_crashed, out = handle_failed_job(job=job, error=e)
×
676

677
    job._logger.info(
1✔
678
        "{}, status: {}, output: {}".format(job.job_info_str, job.status, out)
679
    )
680
    with open(
1✔
681
        posixpath.join(job.project_hdf5.working_directory, "error.out"), mode="w"
682
    ) as f_err:
683
        f_err.write(out)
1✔
684
    handle_finished_job(job=job, job_crashed=job_crashed, collect_output=True)
1✔
685
    return out
1✔
686

687

688
def handle_finished_job(job, job_crashed=False, collect_output=True):
1✔
689
    """
690
    Handle finished jobs, collect the calculation output and set the status to aborted if the job crashed
691

692
    Args:
693
        job (GenericJob): pyiron job object
694
        job_crashed (boolean): flag to indicate failed jobs
695
        collect_output (boolean): flag to indicate if the collect_output() function should be called
696
    """
697
    job.set_input_to_read_only()
1✔
698
    if collect_output:
1✔
699
        job.status.collect = True
1✔
700
        job.run()
1✔
701
    if job_crashed:
1✔
UNCOV
702
        job.status.aborted = True
×
UNCOV
703
        job._hdf5["status"] = job.status.string
×
704

705

706
def handle_failed_job(job, error):
1✔
707
    """
708
    Handle failed jobs write error message to text file and update database
709

710
    Args:
711
        job (GenericJob): pyiron job object
712
        error (subprocess.SubprocessError): error of the subprocess executing the job
713

714
    Returns:
715
        boolean, str: job crashed and error message
716
    """
717
    if hasattr(error, "output"):
1✔
718
        out = error.output
1✔
719
        if error.returncode in job.executable.accepted_return_codes:
1✔
UNCOV
720
            return False, out
×
721
        elif not job.server.accept_crash:
1✔
722
            job._logger.warning("Job aborted")
1✔
723
            job._logger.warning(error.output)
1✔
724
            job.status.aborted = True
1✔
725
            job._hdf5["status"] = job.status.string
1✔
726
            job.run_time_to_db()
1✔
727
            error_file = posixpath.join(job.project_hdf5.working_directory, "error.msg")
1✔
728
            with open(error_file, "w") as f:
1✔
729
                f.write(error.output)
1✔
730
            if job.server.run_mode.non_modal:
1✔
UNCOV
731
                state.database.close_connection()
×
732
            raise RuntimeError("Job aborted")
1✔
733
        else:
UNCOV
734
            return True, out
×
735
    else:
736
        job._logger.warning("Job aborted")
×
UNCOV
737
        job.status.aborted = True
×
738
        job._hdf5["status"] = job.status.string
×
739
        job.run_time_to_db()
×
740
        if job.server.run_mode.non_modal:
×
741
            state.database.close_connection()
×
742
        raise RuntimeError("Job aborted")
×
743

744

745
def multiprocess_wrapper(
1✔
746
    working_directory, job_id=None, file_path=None, debug=False, connection_string=None
747
):
UNCOV
748
    if job_id is not None:
×
UNCOV
749
        job_wrap = JobWrapper(
×
750
            working_directory=str(working_directory),
751
            job_id=int(job_id),
752
            debug=debug,
753
            connection_string=connection_string,
754
        )
UNCOV
755
    elif file_path is not None:
×
UNCOV
756
        hdf5_file = (
×
757
            ".".join(file_path.split(".")[:-1])
758
            + "."
759
            + file_path.split(".")[-1].split("/")[0]
760
        )
UNCOV
761
        h5_path = "/".join(file_path.split(".")[-1].split("/")[1:])
×
UNCOV
762
        job_wrap = JobWrapper(
×
763
            working_directory,
764
            job_id=None,
765
            hdf5_file=hdf5_file,
766
            h5_path="/" + h5_path,
767
            debug=debug,
768
            connection_string=connection_string,
769
        )
770
    else:
UNCOV
771
        raise ValueError("Either job_id or file_path have to be not None.")
×
UNCOV
772
    with catch_signals(job_wrap.job.signal_intercept):
×
773
        job_wrap.job.run_static()
×
774

775

776
def _generate_flux_execute_string(job, database_is_disabled):
1✔
777
    if not database_is_disabled:
1✔
778
        executable_template = Template(
1✔
779
            "#!/bin/bash\n"
780
            + "python -m pyiron_base.cli wrapper -p {{working_directory}} -j {{job_id}}"
781
        )
782
        executable_str = executable_template.render(
1✔
783
            working_directory=job.working_directory,
784
            job_id=str(job.job_id),
785
        )
786
        job_name = "pi_" + str(job.job_id)
1✔
787
    else:
788
        executable_template = Template(
1✔
789
            "#!/bin/bash\n"
790
            + "python -m pyiron_base.cli wrapper -p {{working_directory}} -f {{file_name}}{{h5_path}}"
791
        )
792
        executable_str = executable_template.render(
1✔
793
            working_directory=job.working_directory,
794
            file_name=job.project_hdf5.file_name,
795
            h5_path=job.project_hdf5.h5_path,
796
        )
797
        job_name = "pi_" + job.job_name
1✔
798
    return executable_str, job_name
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc