• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyiron / pyiron_base / 9471909594

11 Jun 2024 07:52PM UTC coverage: 71.339% (+0.06%) from 71.284%
9471909594

Pull #1472

github

web-flow
Merge faae3f5cc into df9b3d159
Pull Request #1472: Add get_input_file_dict() in GenericJob

14 of 21 new or added lines in 2 files covered. (66.67%)

92 existing lines in 3 files now uncovered.

7166 of 10045 relevant lines covered (71.34%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.57
/pyiron_base/jobs/job/util.py
1
# coding: utf-8
2
# Copyright (c) Max-Planck-Institut für Eisenforschung GmbH - Computational Materials Design (CM) Department
3
# Distributed under the terms of "New BSD License", see the LICENSE file.
4
"""
5
Helper functions for the JobCore and GenericJob objects
6
"""
7

8
from itertools import islice
1✔
9
import os
1✔
10
import posixpath
1✔
11
import psutil
1✔
12
import tarfile
1✔
13
import stat
1✔
14
import shutil
1✔
15
import monty.io
1✔
16
from typing import Optional, Union
1✔
17
from pyiron_base.utils.instance import static_isinstance
1✔
18
from pyiron_base.utils.safetar import safe_extract
1✔
19
from pyiron_base.database.sqlcolumnlength import JOB_STR_LENGTH
1✔
20

21
__author__ = "Jan Janssen"
1✔
22
__copyright__ = (
1✔
23
    "Copyright 2020, Max-Planck-Institut für Eisenforschung GmbH - "
24
    "Computational Materials Design (CM) Department"
25
)
26
__version__ = "1.0"
1✔
27
__maintainer__ = "Jan Janssen"
1✔
28
__email__ = "janssen@mpie.de"
1✔
29
__status__ = "production"
1✔
30
__date__ = "Nov 28, 2020"
1✔
31

32

33
def _copy_database_entry(new_job_core, job_copied_id):
1✔
34
    """
35
    Copy database entry from previous job
36

37
    Args:
38
        new_job_core (GenericJob): Copy of the job object
39
        job_copied_id (int): Job id of the copied job
40
    """
41
    db_entry = new_job_core.project.db.get_item_by_id(job_copied_id)
1✔
42
    if db_entry is not None:
1✔
43
        db_entry["job"] = new_job_core.job_name
1✔
44
        db_entry["subjob"] = new_job_core.project_hdf5.h5_path
1✔
45
        db_entry["project"] = new_job_core.project_hdf5.project_path
1✔
46
        db_entry["projectpath"] = new_job_core.project_hdf5.root_path
1✔
47
        del db_entry["id"]
1✔
48
        job_id = new_job_core.project.db.add_item_dict(db_entry)
1✔
49
        new_job_core.reset_job_id(job_id=job_id)
1✔
50

51

52
def _copy_to_delete_existing(project_class, job_name, delete_job):
1✔
53
    """
54
    Check if the job exists already in the project, if that is the case either
55
    delete it or reload it depending on the setting of delete_job
56

57
    Args:
58
        project_class (Project): The project to copy the job to.
59
            (Default is None, use the same project.)
60
        job_name (str): The new name to assign the duplicate job. Required if the project is `None` or the same
61
            project as the copied job. (Default is None, try to keep the same name.)
62
        delete_job (bool): Delete job if it exists already
63

64
    Returns:
65
        GenericJob/ None
66
    """
67
    job_table = project_class.job_table(recursive=False)
1✔
68
    if len(job_table) > 0 and job_name in job_table.job.values:
1✔
69
        if not delete_job:
1✔
70
            return project_class.load(job_name)
1✔
71
        else:
72
            project_class.remove_job(job_name)
1✔
73
            return None
1✔
74

75

76
def _get_project_for_copy(job, project, new_job_name):
1✔
77
    """
78
    Internal helper function to generate a project and hdf5 project for copying
79

80
    Args:
81
        job (JobCore): Job object used for comparison
82
        project (JobCore/ProjectHDFio/Project/None): The project to copy the job to.
83
            (Default is None, use the same project.)
84
        new_job_name (str): The new name to assign the duplicate job. Required if the
85
            project is `None` or the same project as the copied job. (Default is None,
86
            try to keep the same name.)
87

88
    Returns:
89
        Project, ProjectHDFio
90
    """
91
    if static_isinstance(
1✔
92
        obj=project.__class__, obj_type="pyiron_base.jobs.job.core.JobCore"
93
    ):
94
        file_project = project.project
×
95
        hdf5_project = project.project_hdf5.open(new_job_name)
×
96
    elif isinstance(project, job.project.__class__):
1✔
97
        file_project = project
1✔
98
        hdf5_project = job.project_hdf5.__class__(
1✔
99
            project=project, file_name=new_job_name, h5_path="/" + new_job_name
100
        )
101
    elif isinstance(project, job.project_hdf5.__class__):
1✔
102
        file_project = project.project
1✔
103
        hdf5_project = project.open(new_job_name)
1✔
104
    elif project is None:
1✔
105
        file_project = job.project
1✔
106
        hdf5_project = job.project_hdf5.__class__(
1✔
107
            project=file_project, file_name=new_job_name, h5_path="/" + new_job_name
108
        )
109
    else:
110
        raise ValueError("Project should be JobCore/ProjectHDFio/Project/None")
×
111
    return file_project, hdf5_project
1✔
112

113

114
_special_symbol_replacements = {
1✔
115
    ".": "d",
116
    "-": "m",
117
    "+": "p",
118
    ",": "c",
119
    " ": "_",
120
}
121

122

123
def _get_safe_job_name(
1✔
124
    name: Union[str, tuple],
125
    ndigits: Optional[int] = 8,
126
    special_symbols: Optional[dict] = None,
127
):
128
    """
129
    Sanitize a job name, optionally appending numeric values.
130

131
    Args:
132
        name (str|tuple): The name to sanitize, or a tuple of the name and any number
133
            of numeric values to append with '_' in between.
134
        ndigits (int|None): How many digits to round any floating point values in a
135
            `name` tuple to. (Default is 8; to not round at all use None.)
136
        special_symbols (dict|None): Conversions of special symbols to apply. This will
137
            be applied to the default conversion dict, which contains:
138
            DEFAULT_CONV_DICT
139

140
    Returns:
141
        (str): The sanitized (and possibly rounded) name.
142
    """
143
    d_special_symbols = _special_symbol_replacements.copy()
1✔
144
    if special_symbols is not None:
1✔
145
        d_special_symbols.update(special_symbols)
1✔
146

147
    def round_(value, ndigits=ndigits):
1✔
148
        if isinstance(value, float) and ndigits is not None:
1✔
149
            return round(value, ndigits=ndigits)
1✔
150
        return value
1✔
151

152
    if not isinstance(name, str):
1✔
153
        name_rounded = [round_(nn) for nn in name]
1✔
154
        job_name = "_".join([str(nn) for nn in name_rounded])
1✔
155
    else:
156
        job_name = name
1✔
157
    for k, v in d_special_symbols.items():
1✔
158
        job_name = job_name.replace(k, v)
1✔
159
    _is_valid_job_name(job_name=job_name)
1✔
160
    return job_name
1✔
161

162

163
_get_safe_job_name.__doc__ = _get_safe_job_name.__doc__.replace(
1✔
164
    "DEFAULT_CONV_DICT", f"{_special_symbol_replacements}"
165
)
166

167

168
def _rename_job(job, new_job_name):
1✔
169
    """ """
170
    new_job_name = _get_safe_job_name(new_job_name)
1✔
171
    child_ids = job.child_ids
1✔
172
    if child_ids:
1✔
173
        for child_id in child_ids:
×
174
            ham = job.project.load(child_id)
×
175
            ham.move_to(job.project.open(new_job_name + "_hdf5"))
×
176
    old_working_directory = job.working_directory
1✔
177
    if len(job.project_hdf5.h5_path.split("/")) > 2:
1✔
178
        new_location = job.project_hdf5.open("../" + new_job_name)
×
179
    else:
180
        new_location = job.project_hdf5.__class__(
1✔
181
            job.project, new_job_name, h5_path="/" + new_job_name
182
        )
183
    if job.job_id:
1✔
184
        job.project.db.item_update(
×
185
            {"job": new_job_name, "subjob": new_location.h5_path}, job.job_id
186
        )
187
    old_job_name = job.name
1✔
188
    job._name = new_job_name
1✔
189
    job.project_hdf5.copy_to(destination=new_location, maintain_name=False)
1✔
190
    job.project_hdf5.remove_file()
1✔
191
    job.project_hdf5 = new_location
1✔
192
    if os.path.exists(old_working_directory):
1✔
193
        shutil.move(old_working_directory, job.working_directory)
1✔
194
        os.rmdir("/".join(old_working_directory.split("/")[:-1]))
1✔
195
    if os.path.exists(os.path.join(job.working_directory, old_job_name + ".tar.bz2")):
1✔
196
        os.rename(
1✔
197
            os.path.join(job.working_directory, old_job_name + ".tar.bz2"),
198
            os.path.join(job.working_directory, job.job_name + ".tar.bz2"),
199
        )
200

201

202
def _is_valid_job_name(job_name):
1✔
203
    """
204
    internal function to validate the job_name - only available in Python 3.4 <
205

206
    Args:
207
        job_name (str): job name
208
    """
209
    if not job_name.isidentifier():
1✔
210
        raise ValueError(
×
211
            f'Invalid name for a pyiron object, must be letters, digits (not as first character) and "_" only, not {job_name}'
212
        )
213
    if len(job_name) > JOB_STR_LENGTH:
1✔
214
        raise ValueError(
×
215
            f"Invalid name for a PyIron object: must be less then or equal to {JOB_STR_LENGTH} characters"
216
        )
217

218

219
def _get_restart_copy_dict(job):
1✔
220
    copy_dict = {}
1✔
221
    for i, actual_name in enumerate(
1✔
222
        [os.path.basename(f) for f in job.restart_file_list]
223
    ):
NEW
224
        if actual_name in job.restart_file_dict.keys():
×
NEW
225
            new_name = job.restart_file_dict[actual_name]
×
226
        else:
NEW
227
            new_name = os.path.basename(job.restart_file_list[i])
×
NEW
228
        copy_dict[new_name] = job.restart_file_list[i]
×
229
    return copy_dict
1✔
230

231

232
def _copy_restart_files(job):
1✔
233
    """
234
    Internal helper function to copy the files required for the restart job.
235
    """
UNCOV
236
    if not (os.path.isdir(job.working_directory)):
×
237
        raise ValueError(
×
238
            "The working directory is not yet available to copy restart files"
239
        )
NEW
240
    for file_name, source in _get_restart_copy_dict(job=job).items():
×
NEW
241
        shutil.copy(
×
242
            source,
243
            posixpath.join(job.working_directory, file_name),
244
        )
245

246

247
def _kill_child(job):
1✔
248
    """
249
    Internal helper function to kill a child process.
250

251
    Args:
252
        job (JobCore): job object to decompress
253
    """
254
    if (
1✔
255
        static_isinstance(
256
            obj=job.__class__, obj_type="pyiron_base.jobs.master.GenericMaster"
257
        )
258
        and not job.server.run_mode.queue
259
        and (job.status.running or job.status.submitted)
260
    ):
261
        for proc in psutil.process_iter():
×
262
            try:
×
263
                pinfo = proc.as_dict(attrs=["pid", "cwd"])
×
264
            except psutil.NoSuchProcess:
×
265
                pass
×
266
            else:
267
                if pinfo["cwd"] is not None and pinfo["cwd"].startswith(
×
268
                    job.working_directory
269
                ):
270
                    job_process = psutil.Process(pinfo["pid"])
×
271
                    job_process.kill()
×
272

273

274
def _job_compressed_name(job):
1✔
275
    """Return the canonical file name of a compressed job."""
276
    return _get_compressed_job_name(working_directory=job.working_directory)
1✔
277

278

279
def _get_compressed_job_name(working_directory):
1✔
280
    """Return the canonical file name of a compressed job from the working directory."""
281
    return os.path.join(
1✔
282
        working_directory, os.path.basename(working_directory) + ".tar.bz2"
283
    )
284

285

286
def _job_compress(job, files_to_compress=[], files_to_remove=[]):
1✔
287
    """
288
    Compress the output files of a job object.
289

290
    Args:
291
        job (JobCore): job object to compress
292
        files_to_compress (list): list of files to compress
293
        files_to_remove (list): list of files to remove
294
    """
295

296
    def delete_file_or_folder(fullname):
1✔
297
        if os.path.isfile(fullname):
1✔
298
            os.remove(fullname)
1✔
299
        elif os.path.isdir(fullname):
1✔
300
            shutil.rmtree(fullname)
1✔
301

302
    if not _job_is_compressed(job):
1✔
303
        for name in files_to_remove:
1✔
304
            delete_file_or_folder(fullname=os.path.join(job.working_directory, name))
×
305
        cwd = os.getcwd()
1✔
306
        try:
1✔
307
            os.chdir(job.working_directory)
1✔
308
            with tarfile.open(_job_compressed_name(job), "w:bz2") as tar:
1✔
309
                for name in files_to_compress:
1✔
310
                    if "tar" not in name and not stat.S_ISFIFO(os.stat(name).st_mode):
1✔
311
                        tar.add(name)
1✔
312
            for name in files_to_compress:
1✔
313
                if name != _job_compressed_name(job):
1✔
314
                    delete_file_or_folder(
1✔
315
                        fullname=os.path.join(job.working_directory, name)
316
                    )
317
        finally:
318
            os.chdir(cwd)
1✔
319
    else:
320
        job.logger.info("The files are already compressed!")
×
321

322

323
def _job_decompress(job):
1✔
324
    """
325
    Decompress the output files of a compressed job object.
326

327
    Args:
328
        job (JobCore): job object to decompress
329
    """
330
    tar_file_name = _job_compressed_name(job)
1✔
331
    try:
1✔
332
        with tarfile.open(tar_file_name, "r:bz2") as tar:
1✔
333
            safe_extract(tar, job.working_directory)
1✔
334
        os.remove(tar_file_name)
1✔
335
    except IOError:
×
336
        pass
×
337

338

339
def _working_directory_is_compressed(working_directory):
1✔
340
    """
341
    Check if the working directory of a given job is already compressed or not.
342

343
    Args:
344
        working_directory (str): working directory of the job object
345

346
    Returns:
347
        bool: [True/False]
348
    """
349
    compressed_name = os.path.basename(
1✔
350
        _get_compressed_job_name(working_directory=working_directory)
351
    )
352
    return compressed_name in os.listdir(working_directory)
1✔
353

354

355
def _job_is_compressed(job):
1✔
356
    """
357
    Check if the job is already compressed or not.
358

359
    Args:
360
        job (JobCore): job object to check
361

362
    Returns:
363
        bool: [True/False]
364
    """
365
    return _working_directory_is_compressed(working_directory=job.working_directory)
1✔
366

367

368
def _working_directory_list_files(working_directory, include_archive=True):
1✔
369
    """
370
    Returns list of files in the jobs working directory.
371

372
    If the working directory is compressed, return a list of files in the archive.
373

374
    Args:
375
        working_directory (str): working directory of the job object to inspect files in
376
        include_archive (bool): include files in the .tag.gz archive
377

378
    Returns:
379
        list of str: file names
380
    """
381
    if os.path.isdir(working_directory):
1✔
382
        uncompressed_files_lst = os.listdir(working_directory)
1✔
383
        if include_archive and _working_directory_is_compressed(
1✔
384
            working_directory=working_directory
385
        ):
386
            compressed_job_name = _get_compressed_job_name(
1✔
387
                working_directory=working_directory
388
            )
389
            with tarfile.open(compressed_job_name, "r") as tar:
1✔
390
                compressed_files_lst = [
1✔
391
                    member.name for member in tar.getmembers() if member.isfile()
392
                ]
393
                uncompressed_files_lst.remove(os.path.basename(compressed_job_name))
1✔
394
                return uncompressed_files_lst + compressed_files_lst
1✔
395
        else:
396
            return uncompressed_files_lst
1✔
397
    return []
1✔
398

399

400
def _job_list_files(job):
1✔
401
    """
402
    Returns list of files in the jobs working directory.
403

404
    If the job is compressed, return a list of files in the archive.
405

406
    Args:
407
        job (JobCore): job object to inspect files in
408

409
    Returns:
410
        list of str: file names
411
    """
412
    return _working_directory_list_files(working_directory=job.working_directory)
1✔
413

414

415
def _working_directory_read_file(working_directory, file_name, tail=None):
1✔
416
    """
417
    Return list of lines of the given file.
418

419
    Transparently decompresses the file if working directory is compressed.
420

421
    If `tail` is given and job is decompressed, only read the last lines
422
    instead of traversing the full file.
423

424
    Args:
425
        working_directory (str): working directory of the job object
426
        file_name (str): the file to print
427
        tail (int, optional): only return the last lines
428

429
    Raises:
430
        FileNotFoundError: if the given file name does not exist in the job folder
431
    """
432
    if file_name not in _working_directory_list_files(
1✔
433
        working_directory=working_directory
434
    ):
435
        raise FileNotFoundError(file_name)
×
436

437
    if _working_directory_is_compressed(
1✔
438
        working_directory=working_directory
439
    ) and file_name not in os.listdir(working_directory):
440
        with tarfile.open(
1✔
441
            _get_compressed_job_name(working_directory=working_directory),
442
            encoding="utf8",
443
        ) as f:
444
            lines = [
1✔
445
                line.decode("utf8") for line in f.extractfile(file_name).readlines()
446
            ]
447
            if tail is None:
1✔
448
                return lines
1✔
449
            else:
450
                return lines[-tail:]
1✔
451
    else:
452
        file_name = posixpath.join(working_directory, file_name)
1✔
453
        if tail is None:
1✔
454
            with open(file_name) as f:
1✔
455
                return f.readlines()
1✔
456
        else:
457
            lines = list(
1✔
458
                reversed(
459
                    [
460
                        line + os.linesep
461
                        for line in islice(monty.io.reverse_readfile(file_name), tail)
462
                    ]
463
                )
464
            )
465
            # compatibility with the other methods
466
            # monty strips all newlines, where as reading the other ways does
467
            # not.  So if a file does not end with a newline (as most text
468
            # files) adding it to every line like above adds an additional one.
469
            lines[-1] = lines[-1].rstrip(os.linesep)
1✔
470
            return lines
1✔
471

472

473
def _job_read_file(job, file_name, tail=None):
1✔
474
    """
475
    Return list of lines of the given file.
476

477
    Transparently decompresses the file if job is compressed.
478

479
    If `tail` is given and job is decompressed, only read the last lines
480
    instead of traversing the full file.
481

482
    Args:
483
        file_name (str): the file to print
484
        tail (int, optional): only return the last lines
485

486
    Raises:
487
        FileNotFoundError: if the given file name does not exist in the job folder
488
    """
489
    return _working_directory_read_file(
1✔
490
        working_directory=job.working_directory, file_name=file_name, tail=tail
491
    )
492

493

494
def _job_archive(job):
1✔
495
    """
496
    Compress HDF5 file of the job object to tar-archive
497

498
    Args:
499
        job (JobCore): job object to archive
500
    """
501
    fpath = job.project_hdf5.file_path
×
502
    jname = job.job_name
×
503
    h5_dir_name = jname + "_hdf5"
×
504
    h5_file_name = jname + ".h5"
×
505
    cwd = os.getcwd()
×
506
    try:
×
507
        os.chdir(fpath)
×
508
        with tarfile.open(
×
509
            os.path.join(fpath, job.job_name + ".tar.bz2"), "w:bz2"
510
        ) as tar:
511
            for name in [h5_dir_name, h5_file_name]:
×
512
                tar.add(name)
×
513
        for name in [h5_dir_name, h5_file_name]:
×
514
            fullname = os.path.join(fpath, name)
×
515
            if os.path.isfile(fullname):
×
516
                os.remove(fullname)
×
517
            elif os.path.isdir(fullname):
×
518
                shutil.rmtree(fullname)
×
519
    finally:
520
        os.chdir(cwd)
×
521

522

523
def _job_unarchive(job):
1✔
524
    """
525
    Decompress HDF5 file of the job object from tar-archive
526

527
    Args:
528
        job (JobCore): job object to unarchive
529
    """
530
    fpath = job.project_hdf5.file_path
×
531
    try:
×
532
        tar_name = os.path.join(fpath, job.job_name + ".tar.bz2")
×
533
        with tarfile.open(tar_name, "r:bz2") as tar:
×
534
            safe_extract(tar, fpath)
×
535
        os.remove(tar_name)
×
536
    finally:
537
        pass
×
538

539

540
def _job_is_archived(job):
1✔
541
    """
542
    Check if the HDF5 file of the Job is compressed as tar-archive
543

544
    Args:
545
        job (JobCore): job object to check
546

547
    Returns:
548
        bool: [True/False]
549
    """
550
    return os.path.isfile(
×
551
        os.path.join(job.project_hdf5.file_path, job.job_name + ".tar.bz2")
552
    )
553

554

555
def _job_delete_hdf(job):
1✔
556
    """
557
    Delete HDF5 file of job object
558

559
    Args:
560
        job (JobCore): job object to delete
561
    """
562
    if os.path.isfile(job.project_hdf5.file_name):
1✔
563
        os.remove(job.project_hdf5.file_name)
1✔
564

565

566
def _job_delete_files(job):
1✔
567
    """
568
    Delete files in the working directory of job object
569

570
    Args:
571
        job (JobCore): job object to delete
572
    """
573
    working_directory = str(job.working_directory)
1✔
574
    if job._import_directory is None and os.path.exists(working_directory):
1✔
575
        shutil.rmtree(working_directory)
1✔
576
    else:
577
        job._import_directory = None
1✔
578

579

580
def _job_remove_folder(job):
1✔
581
    """
582
    Delete the working directory of the job object
583

584
    Args:
585
        job (JobCore): job object to delete
586
    """
587
    working_directory = os.path.abspath(os.path.join(str(job.working_directory), ".."))
1✔
588
    if os.path.exists(working_directory) and len(os.listdir(working_directory)) == 0:
1✔
589
        shutil.rmtree(working_directory)
1✔
590

591

592
def _job_store_before_copy(job):
1✔
593
    """
594
    Store job in HDF5 file for copying
595

596
    Args:
597
        job (GenericJob): job object to copy
598

599
    Returns:
600
        bool: [True/False] if the HDF5 file of the job exists already
601
    """
602
    if not job.project_hdf5.file_exists:
1✔
603
        delete_file_after_copy = True
1✔
604
    else:
605
        delete_file_after_copy = False
1✔
606
    job.to_hdf()
1✔
607
    return delete_file_after_copy
1✔
608

609

610
def _job_reload_after_copy(job, delete_file_after_copy):
1✔
611
    """
612
    Reload job from HDF5 file after copying
613

614
    Args:
615
        job (GenericJob): copied job object
616
        delete_file_after_copy (bool): delete HDF5 file after reload
617
    """
618
    job.from_hdf()
1✔
619
    if delete_file_after_copy:
1✔
620
        job.project_hdf5.remove_file()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc