• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pypest / pyemu / 19592938997

14 Nov 2025 05:37PM UTC coverage: 77.871% (+0.07%) from 77.802%
19592938997

push

github

web-flow
Merge pull request #637 from briochh/feat_pestppnightly

Use pestpp-nightly-builds in CI

18 of 22 new or added lines in 1 file covered. (81.82%)

3 existing lines in 1 file now uncovered.

13896 of 17845 relevant lines covered (77.87%)

8.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/pyemu/utils/os_utils.py
1
"""Operating system utilities in the PEST(++) realm
2
"""
3
import os
11✔
4
import sys
11✔
5
import platform
11✔
6
import time
11✔
7
import struct
11✔
8
import shutil
11✔
9
import threading
11✔
10
import subprocess as sp
11✔
11
import multiprocessing as mp
11✔
12
import warnings
11✔
13
import socket
11✔
14
import time
11✔
15
from datetime import datetime
11✔
16
import random
11✔
17
import logging
11✔
18
import tempfile
11✔
19
from contextlib import contextmanager
11✔
20
import json
11✔
21
import uuid
11✔
22
import concurrent.futures
11✔
23

24
import numpy as np
11✔
25
import pandas as pd
11✔
26

27
from ..pyemu_warnings import PyemuWarning
11✔
28
from ..pst import pst_handler
11✔
29
from ..logger import Logger
11✔
30

31
ext = ""
11✔
32
bin_path = os.path.join("..", "bin")
11✔
33
if "linux" in platform.system().lower():
11✔
34
    bin_path = os.path.join(bin_path, "linux")
5✔
35
elif "darwin" in platform.system().lower():
6✔
36
    bin_path = os.path.join(bin_path, "mac")
1✔
37
else:
38
    bin_path = os.path.join(bin_path, "win")
5✔
39
    ext = ".exe"
5✔
40

41
bin_path = os.path.abspath(bin_path)
11✔
42
os.environ["PATH"] += os.pathsep + bin_path
11✔
43

44

45
def _istextfile(filename, blocksize=512):
11✔
46
    """
47
    Function found from:
48
    https://eli.thegreenplace.net/2011/10/19/perls-guess-if-file-is-text-or-binary-implemented-in-python
49
    Returns True if file is most likely a text file
50
    Returns False if file is most likely a binary file
51
    Uses heuristics to guess whether the given file is text or binary,
52
    by reading a single block of bytes from the file.
53
    If more than 30% of the chars in the block are non-text, or there
54
    are NUL ('\x00') bytes in the block, assume this is a binary file.
55
    """
56

57
    import sys
×
58

59
    PY3 = sys.version_info[0] == 3
×
60

61
    # A function that takes an integer in the 8-bit range and returns
62
    # a single-character byte object in py3 / a single-character string
63
    # in py2.
64
    #
65
    int2byte = (lambda x: bytes((x,))) if PY3 else chr
×
66

67
    _text_characters = b"".join(int2byte(i) for i in range(32, 127)) + b"\n\r\t\f\b"
×
68
    block = open(filename, "rb").read(blocksize)
×
69
    if b"\x00" in block:
×
70
        # Files with null bytes are binary
71
        return False
×
72
    elif not block:
×
73
        # An empty file is considered a valid text file
74
        return True
×
75

76
    # Use translate's 'deletechars' argument to efficiently remove all
77
    # occurrences of _text_characters from the block
78
    nontext = block.translate(None, _text_characters)
×
79
    return float(len(nontext)) / len(block) <= 0.30
×
80

81
def _remove_readonly(func, path, excinfo):
11✔
82
    """remove readonly dirs, apparently only a windows issue
83
    add to all rmtree calls: shutil.rmtree(**,onerror=remove_readonly), wk"""
84
    os.chmod(path, 128)  # stat.S_IWRITE==128==normal
×
85
    func(path)
×
86

87
def run(cmd_str, cwd=".", verbose=False, use_sp = False, **kwargs):
11✔
88
    """main run function so both run_sp and run_ossystem can coexist
89

90
    Args:
91
        cmd_str (`str`): the str to execute with `os.system()`
92

93
        cwd (`str`, optional): the directory to execute the command in.
94
            Default is ".".
95
        verbose (`bool`, optional): flag to echo to stdout the  `cmd_str`.
96
            Default is `False`.
97
    Notes:
98
        by default calls run_ossystem which is the OG function from pyemu that uses `os.system()`
99
        if use_sp is True, then run_sp is called which uses `subprocess.Popen` instead of `os.system`
100

101
    Example::
102
        pyemu.os_utils.run("pestpp-ies my.pst",cwd="template")
103
    """
104

105
    if use_sp:
11✔
106
        run_sp(cmd_str, cwd, verbose, **kwargs)
11✔
107
    else:       
108
        run_ossystem(cmd_str, cwd, verbose)
11✔
109

110
def run_ossystem(cmd_str, cwd=".", verbose=False):
11✔
111
    """an OS agnostic function to execute a command line
112

113
    Args:
114
        cmd_str (`str`): the str to execute with `os.system()`
115

116
        cwd (`str`, optional): the directory to execute the command in.
117
            Default is ".".
118
        verbose (`bool`, optional): flag to echo to stdout the  `cmd_str`.
119
            Default is `False`.
120

121
    Notes:
122
        uses `platform` to detect OS and adds .exe suffix or ./ prefix as appropriate
123
        if `os.system` returns non-zero, an exception is raised
124

125
    Example::
126

127
        pyemu.os_utils.run("pestpp-ies my.pst",cwd="template")
128

129
    """
130
    bwd = os.getcwd()
11✔
131
    os.chdir(cwd)
11✔
132
    try:
11✔
133
        exe_name = cmd_str.split()[0]
11✔
134
        if "window" in platform.platform().lower():
11✔
135
            if not exe_name.lower().endswith("exe"):
5✔
136
                raw = cmd_str.split()
5✔
137
                raw[0] = exe_name + ".exe"
5✔
138
                cmd_str = " ".join(raw)
5✔
139
        else:
140
            if exe_name.lower().endswith("exe"):
6✔
141
                raw = cmd_str.split()
×
142
                exe_name = exe_name.replace(".exe", "")
×
143
                raw[0] = exe_name
×
144
                cmd_str = "{0} {1} ".format(*raw)
×
145
            if (os.path.exists(exe_name)
6✔
146
                    and not exe_name.startswith("./")
147
                    and not exe_name.startswith("/")):
148
                cmd_str = "./" + cmd_str
×
149

150
    except Exception as e:
×
151
        os.chdir(bwd)
×
152
        raise Exception("run() error preprocessing command line :{0}".format(str(e)))
×
153
    if verbose:
11✔
154
        print("run():{0}".format(cmd_str))
×
155

156
    try:
11✔
157
        print(cmd_str)
11✔
158
        ret_val = os.system(cmd_str)
11✔
159
    except Exception as e:
×
160
        os.chdir(bwd)
×
161
        raise Exception("run() raised :{0}".format(str(e)))
×
162
    os.chdir(bwd)
11✔
163

164
    if "window" in platform.platform().lower():
11✔
165
        if ret_val != 0:
5✔
166
            raise Exception("run() returned non-zero: {0}".format(ret_val))
5✔
167
    else:
168
        estat = os.WEXITSTATUS(ret_val)
6✔
169
        if estat != 0 or ret_val != 0:
6✔
170
            raise Exception("run() returned non-zero: {0},{1}".format(estat,ret_val))
6✔
171
        
172
def run_sp(cmd_str, cwd=".", verbose=True, logfile=False, **kwargs):
11✔
173
    """an OS agnostic function to execute a command line with subprocess
174

175
    Args:
176
        cmd_str (`str`): the str to execute with `sp.Popen()`
177

178
        cwd (`str`, optional): the directory to execute the command in.
179
            Default is ".".
180
        verbose (`bool`, optional): flag to echo to stdout the  `cmd_str`.
181
            Default is `False`.
182
        shell (`bool`, optional): flag to use shell=True in the `subprocess.Popen` call. Not recommended
183

184
    Notes:
185
        uses sp Popen to execute the command line. By default does not run in shell mode (ie. does not look for the exe in env variables)
186

187
    """
188
    # update shell from  kwargs
189
    shell = kwargs.get("shell", False)
11✔
190
    # detached = kwargs.get("detached", False)
191

192
    # print warning if shell is True
193
    if shell:
11✔
194
        warnings.warn("shell=True is not recommended and may cause issues, but hey! YOLO", PyemuWarning)
11✔
195

196

197
    bwd = os.getcwd()
11✔
198
    os.chdir(cwd)
11✔
199
    exe_name = cmd_str.split()[0]
11✔
200
    if (platform.system() != "Windows"
11✔
201
            and not shutil.which(exe_name)
202
            and not exe_name.startswith("./")
203
            and not exe_name.startswith("/")):
204
        cmd_str = "./" + cmd_str
6✔
205

206
    try:
11✔
207
        cmd_ins = [i for i in cmd_str.split()]
11✔
208
        log_stream = open(os.path.join('pyemu.log'), 'w+', newline='') if logfile else None
11✔
209
        with sp.Popen(cmd_ins, stdout=sp.PIPE, 
11✔
210
                      stderr=sp.STDOUT, text=True,
211
                      shell=shell, bufsize=1) as process:
212
            for line in process.stdout:
11✔
213
                if verbose:
11✔
214
                    print(line, flush=True, end='')
11✔
215
                if logfile:
11✔
216
                    log_stream.write(line.strip('\n'))
11✔
217
                    log_stream.flush()
11✔
218
            process.wait() # wait for the process to finish
11✔
219
            retval = process.returncode
11✔
220

221
    except Exception as e:
11✔
222
        os.chdir(bwd)
11✔
223
        raise Exception("run() raised :{0}".format(str(e)))
11✔
224

225
    finally:
226
        if logfile:
11✔
227
            log_stream.close()
11✔
228
    os.chdir(bwd)
11✔
229

230
    if "window" in platform.platform().lower():
11✔
231
        if retval != 0:
5✔
232
            raise Exception("run() returned non-zero: {0}".format(retval))
×
233
    else:
234
        estat = os.WEXITSTATUS(retval)
6✔
235
        if estat != 0 or retval != 0:
6✔
236
            raise Exception("run() returned non-zero: {0},{1}".format(estat, retval))        
×
237
    return retval
11✔
238

239

240
def _try_remove_existing(d, forgive=False):
11✔
241
    try:
11✔
242
        shutil.rmtree(d, onerror=_remove_readonly)  # , onerror=del_rw)
11✔
243
        return True
11✔
244
    except Exception as e:
×
245
        if not forgive:
×
246
            raise Exception(
×
247
                f"unable to remove existing dir: {d}\n{e}"
248
            )
249
        else:
250
            warnings.warn(
×
251
                f"unable to remove worker dir: {d}\n{e}",
252
                PyemuWarning,
253
            )
254
        return False
×
255

256
def _try_copy_dir(o_d, n_d):
11✔
257
    try:
11✔
258
        shutil.copytree(o_d, n_d, symlinks=True)
11✔
259
    except PermissionError:
×
260
        time.sleep(3) # pause for windows locking issues
×
261
        try:
×
262
            shutil.copytree(o_d, n_d, symlinks=True)
×
263
        except Exception as e:
×
264
            raise Exception(
×
265
                f"unable to copy files from base dir: "
266
                f"{o_d}, to new dir: {n_d}\n{e}"
267
            )
268

269

270
def start_workers(
11✔
271
    worker_dir,
272
    exe_rel_path,
273
    pst_rel_path,
274
    num_workers=None,
275
    worker_root="..",
276
    port=None,
277
    rel_path=None,
278
    local=True,
279
    cleanup=True,
280
    master_dir=None,
281
    verbose=False,
282
    silent_master=False,
283
    reuse_master=False,
284
    restart=False,
285
    ppw_function=None,
286
    ppw_kwargs={}
287
):
288
    """start a group of pest(++) workers on the local machine
289

290
    Args:
291
        worker_dir (`str`): the path to a complete set of input files need by PEST(++).
292
            This directory will be copied to make worker (and optionally the master)
293
            directories
294
        exe_rel_path (`str`): the relative path to and name of the pest(++) executable from within
295
            the `worker_dir`.  For example, if the executable is up one directory from
296
            `worker_dir`, the `exe_rel_path` would be `os.path.join("..","pestpp-ies")`
297
        pst_rel_path (`str`): the relative path to and name of the pest control file from within
298
            `worker_dir`.
299
        num_workers (`int`, optional): number of workers to start. defaults to number of cores
300
        worker_root (`str`, optional):  the root directory to make the new worker directories in.
301
            Default is ".."  (up one directory from where python is running).
302
        rel_path (`str`, optional): the relative path to where pest(++) should be run
303
            from within the worker_dir, defaults to the uppermost level of the worker dir.
304
            This option is usually not needed unless you are one of those crazy people who
305
            spreads files across countless subdirectories.
306
        local (`bool`, optional): flag for using "localhost" instead of actual hostname/IP address on
307
            worker command line. Default is True.  `local` can also be passed as an `str`, in which
308
            case `local` is used as the hostname (for example `local="192.168.10.1"`)
309
        cleanup (`bool`, optional):  flag to remove worker directories once processes exit. Default is
310
            True.  Set to False for debugging issues
311
        master_dir (`str`): name of directory for master instance.  If `master_dir`
312
            exists, then it will be REMOVED!!!  If `master_dir`, is None,
313
            no master instance will be started.  If not None, a copy of `worker_dir` will be
314
            made into `master_dir` and the PEST(++) executable will be started in master mode
315
            in this directory. Default is None
316
        verbose (`bool`, optional): flag to echo useful information to stdout.  Default is False
317
        silent_master (`bool`, optional): flag to pipe master output to devnull and instead print
318
            a simple message to stdout every few seconds.  This is only for
319
            pestpp Travis testing so that log file sizes dont explode. Default is False
320
        reuse_master (`bool`): flag to use an existing `master_dir` as is - this is an advanced user
321
            option for cases where you want to construct your own `master_dir` then have an async
322
            process started in it by this function.
323
        restart (`bool`): flag to add a restart flag to the master start. If `True`, this will include
324
            `/r` in the master call string.
325
        ppw_function (function): a function pointer that uses PyPestWorker to execute model runs.  
326
            This is to help speed up pure-python and really fast running forward models and the
327
            way its implemented in `start_workers` assumes each `PyPestWorker` instance does not need
328
            a separate set of model+files, that is, these workers are assumed to execute entirely in 
329
            memory.  The first three arguments to this function must be `pst_rel_path`, `host`, and `port`.
330
            Default is None.
331
        ppw_kwargs (dict): keyword arguments to pass to `ppw_function`.
332
            Default is empty dict.
333

334
    Notes:
335
        If all workers (and optionally master) exit gracefully, then the worker
336
        dirs will be removed unless `cleanup` is False
337

338
    Example::
339

340
        # start 10 workers using the directory "template" as the base case and
341
        # also start a master instance in a directory "master".
342
        pyemu.helpers.start_workers("template","pestpp-ies","pest.pst",10,master_dir="master",
343
                                    worker_root=".")
344

345
    """
346

347
    if not os.path.isdir(worker_dir):
11✔
348
        raise Exception("worker dir '{0}' not found".format(worker_dir))
×
349
    if not os.path.isdir(worker_root):
11✔
350
        raise Exception("worker root dir not found")
×
351
    if num_workers is None:
11✔
352
        num_workers = mp.cpu_count()
×
353
    else:
354
        num_workers = int(num_workers)
11✔
355
    # assert os.path.exists(os.path.join(worker_dir,rel_path,exe_rel_path))
356
    exe_verf = True
11✔
357

358
    if rel_path:
11✔
359
        if not os.path.exists(os.path.join(worker_dir, rel_path, exe_rel_path)):
×
360
            # print("warning: exe_rel_path not verified...hopefully exe is in the PATH var")
361
            exe_verf = False
×
362
    else:
363
        if not os.path.exists(os.path.join(worker_dir, exe_rel_path)):
11✔
364
            # print("warning: exe_rel_path not verified...hopefully exe is in the PATH var")
365
            exe_verf = False
11✔
366
    if rel_path is not None:
11✔
367
        if not os.path.exists(os.path.join(worker_dir, rel_path, pst_rel_path)):
×
368
            raise Exception("pst_rel_path not found from worker_dir using rel_path")
×
369
    else:
370
        if not os.path.exists(os.path.join(worker_dir, pst_rel_path)):
11✔
371
            raise Exception("pst_rel_path not found from worker_dir")
×
372

373
    if port is None:
11✔
374
        if master_dir is None:
1✔
375
            port = 4004 # the 'ole standard
×
376
        else:
377
            port = PortManager().get_available_port()
1✔
378

379
    if isinstance(local, str):
11✔
380
        hostname = local
×
381
    elif local:
11✔
382
        hostname = "localhost"
11✔
383
    else:
384
        hostname = socket.gethostname()
×
385

386
    base_dir = os.getcwd()
11✔
387
    port = int(port)
11✔
388

389
    if os.path.exists(os.path.join(worker_dir, exe_rel_path)):
11✔
390
        if "window" in platform.platform().lower():
×
391
            if not exe_rel_path.lower().endswith("exe"):
×
392
                exe_rel_path = exe_rel_path + ".exe"
×
393
        else:
394
            if (not exe_rel_path.startswith("./")
×
395
                    and not exe_rel_path.startswith("/")):
396
                exe_rel_path = "./" + exe_rel_path
×
397

398
    if master_dir is not None:
11✔
399
        if master_dir != "." and os.path.exists(master_dir) and not reuse_master:
11✔
400
            _try_remove_existing(master_dir)
×
401
        if master_dir != "." and not reuse_master:
11✔
402
            _try_copy_dir(worker_dir, master_dir)
11✔
403
        
404
        args = [exe_rel_path, pst_rel_path, "/h", ":{0}".format(port)]
11✔
405
        if restart is True:
11✔
406
            # add restart if requested
407
            args = [exe_rel_path, pst_rel_path, "/h", "/r", ":{0}".format(port)]
×
408
        
409
        if rel_path is not None:
11✔
410
            cwd = os.path.join(master_dir, rel_path)
×
411
        else:
412
            cwd = master_dir
11✔
413
        if verbose:
11✔
414
            print("master:{0} in {1}".format(" ".join(args), cwd))
×
415
        stdout = None
11✔
416
        if silent_master:
11✔
417
            stdout = open(os.devnull, "w")
×
418
        try:
11✔
419
            os.chdir(cwd)
11✔
420
            master_p = sp.Popen(args, stdout=stdout)  # ,stdout=sp.PIPE,stderr=sp.PIPE)
11✔
421
            os.chdir(base_dir)
11✔
422
        except Exception as e:
×
423
            raise Exception("error starting master instance: {0}".format(str(e)))
×
424
        time.sleep(0.5)  # a few cycles to let the master get ready
11✔
425

426

427
    procs = []
11✔
428
    worker_dirs = []
11✔
429
    if ppw_function is not None:
11✔
430
        args = (os.path.join(worker_dir, pst_rel_path), hostname, port)
11✔
431
        
432
        # Create processes in batches using ThreadPoolExecutor for faster deployment
433
        def create_and_start_worker(worker_id):
11✔
434
            p = mp.Process(target=ppw_function, args=args, kwargs=ppw_kwargs)
11✔
435
            p.daemon = True
11✔
436
            p.start()
11✔
437
            if verbose:
11✔
438
                print("Started worker {0} (PID: {1})".format(worker_id, p.pid))
×
439
            return p
11✔
440
        
441
        # Use ThreadPoolExecutor to create processes in parallel
442
        max_concurrent_starts = num_workers#min(num_workers, 300)  # Limit concurrent starts
11✔
443
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_concurrent_starts) as executor:
11✔
444
            # Submit all worker creation tasks
445
            future_to_id = {
11✔
446
                executor.submit(create_and_start_worker, i): i 
447
                for i in range(num_workers)
448
            }
449
            
450
            # Collect started processes
451
            for future in concurrent.futures.as_completed(future_to_id):
11✔
452
                worker_id = future_to_id[future]
11✔
453
                try:
11✔
454
                    p = future.result()
11✔
455
                    procs.append(p)
11✔
456
                except Exception as e:
×
457
                    print("Error starting worker {0}: {1}".format(worker_id, e))
×
458
                    raise
×
459

460
    else:
461
        tcp_arg = "{0}:{1}".format(hostname, port)
11✔
462
        
463
        for i in range(num_workers):
11✔
464
            new_worker_dir = os.path.join(worker_root, "worker_{0}".format(i))
11✔
465
            if os.path.exists(new_worker_dir):
11✔
466
                _try_remove_existing(new_worker_dir)
×
467
            _try_copy_dir(worker_dir, new_worker_dir)
11✔
468
            try:
11✔
469
                if exe_verf:
11✔
470
                    # if rel_path is not None:
471
                    #     exe_path = os.path.join(rel_path,exe_rel_path)
472
                    # else:
473
                    exe_path = exe_rel_path
×
474
                else:
475
                    exe_path = exe_rel_path
11✔
476
                args = [exe_path, pst_rel_path, "/h", tcp_arg]
11✔
477
                # print("starting worker in {0} with args: {1}".format(new_worker_dir,args))
478
                if rel_path is not None:
11✔
479
                    cwd = os.path.join(new_worker_dir, rel_path)
×
480
                else:
481
                    cwd = new_worker_dir
11✔
482

483
                os.chdir(cwd)
11✔
484
                if verbose:
11✔
485
                    print("worker:{0} in {1}".format(" ".join(args), cwd))
×
486
                with open(os.devnull, "w") as f:
11✔
487
                    p = sp.Popen(args, stdout=f, stderr=f)
11✔
488
                procs.append(p)
11✔
489
                os.chdir(base_dir)
11✔
490
            except Exception as e:
×
491
                raise Exception("error starting worker: {0}".format(str(e)))
×
492
            worker_dirs.append(new_worker_dir)
11✔
493

494
    if master_dir is not None:
11✔
495
        # while True:
496
        #     line = master_p.stdout.readline()
497
        #     if line != '':
498
        #         print(str(line.strip())+'\r',end='')
499
        #     if master_p.poll() is not None:
500
        #         print(master_p.stdout.readlines())
501
        #         break
502
        if silent_master:
11✔
503
            # this keeps travis from thinking something is wrong...
504
            while True:
×
505
                rv = master_p.poll()
×
506
                if master_p.poll() is not None:
×
507
                    break
×
508
                print(datetime.now(), "still running")
×
509
                time.sleep(5)
×
510
        else:
511
            master_p.wait()
11✔
512
            time.sleep(0.5)  # a few cycles to let the workers end gracefully
11✔
513

514
        # More efficient graceful termination for ppw_function workers
515
        if ppw_function is not None:
11✔
516
            termination_timeout = 10  # seconds
11✔
517
            
518
            if verbose:
11✔
519
                print("Initiating graceful shutdown of {0} workers...".format(len(procs)))
×
520
            
521
            # First, try graceful termination
522
            for p in procs:
11✔
523
                if p.is_alive():
11✔
524
                    try:
6✔
525
                        p.terminate()  # Send SIGTERM
6✔
526
                    except:
×
527
                        pass
×
528
            
529
            # Wait for processes to terminate gracefully
530
            start_time = time.time()
11✔
531
            remaining_procs = procs[:]  # Create a copy
11✔
532
            
533
            while remaining_procs and (time.time() - start_time) < termination_timeout:
11✔
534
                still_alive = []
11✔
535
                for p in remaining_procs:
11✔
536
                    if p.is_alive():
11✔
537
                        still_alive.append(p)
6✔
538
                    else:
539
                        try:
11✔
540
                            p.join(timeout=0.1)  # Clean up zombie processes
11✔
541
                        except:
×
542
                            pass
×
543
                
544
                remaining_procs = still_alive
11✔
545
                if remaining_procs:
11✔
546
                    time.sleep(0.1)
6✔
547
            
548
            # Force kill any remaining processes
549
            if remaining_procs:
11✔
550
                if verbose:
×
551
                    print("Force killing {0} remaining workers...".format(len(remaining_procs)))
×
552
                for p in remaining_procs:
×
553
                    try:
×
554
                        p.kill()  # Send SIGKILL
×
555
                        p.join(timeout=1)
×
556
                    except:
×
557
                        pass
×
558
        else:
559
            # kill any remaining workers (non-ppw_function case)
560
            for p in procs:
11✔
561
                p.kill()
11✔
562
    # Wait for all processes to finish (this waits for sweep to finish, but pre/post/model subprocs may take longer)
563
    for p in procs:
11✔
564
        if ppw_function is not None:
11✔
565
            # For ppw_function processes, we already handled termination above
566
            # Just ensure they're cleaned up
567
            if p.is_alive():
11✔
568
                try:
×
569
                    p.join(timeout=1)
×
570
                except:
×
571
                    pass
×
572
        else:
573
            # For regular worker processes
574
            p.wait()
11✔
575
    if cleanup:
11✔
576
        cleanit = 0
11✔
577
        removed = set()
11✔
578
        while len(removed) < len(worker_dirs):  # arbitrary 100000 limit
11✔
579
            cleanit = cleanit + 1
11✔
580
            for d in worker_dirs:
11✔
581
                if os.path.exists(d):
11✔
582
                    success = _try_remove_existing(d, forgive=True)
11✔
583
                    if success:
11✔
584
                        removed.add(d)  # Fixed: use add() instead of update()
11✔
585
                else:
586
                    removed.add(d)  # Fixed: use add() instead of update()
×
587
            if cleanit > 100:
11✔
588
                break
×
589

590

591
    if master_dir is not None:
11✔
592
        ret_val = master_p.returncode
11✔
593
        if ret_val != 0:
11✔
594
            raise Exception("start_workers() master returned non-zero: {0}".format(ret_val))
×
595

596

597

598
class NetPack(object):
11✔
599
    netpack_type = {
11✔
600
        0: "UNKN", 1: "OK", 2: "CONFIRM_OK", 3: "READY",
601
        4: "REQ_RUNDIR", 5: "RUNDIR", 6: "REQ_LINPACK",
602
        7: "LINPACK", 8: "PAR_NAMES", 9: "OBS_NAMES",
603
        10: "START_RUN", 11: "RUN_FINISHED", 12: "RUN_FAILED",
604
        13: "RUN_KILLED", 14: "TERMINATE", 15: "PING",
605
        16: "REQ_KILL", 17: "IO_ERROR", 18: "CORRUPT_MESG",
606
        19: "DEBUG_LOOP", 20: "DEBUG_FAIL_FREEZE",
607
        21: "START_FILE_WRKR2MSTR", 22: "CONT_FILE_WRKR2MSTR",
608
        23: "FINISH_FILE_WRKR2MSTR"}
609
    sec_message = [1, 3, 5, 7, 9]
11✔
610

611
    def __init__(self,timeout=0.1,verbose=True):
11✔
612
        self.header_size = 8 + 4 + 8 + 8 + 1001
6✔
613
        self.buf_size = None
6✔
614
        self.buf_idx = (0, 8)
6✔
615
        self.type_idx = (8, 12)
6✔
616
        self.group_idx = (12, 20)
6✔
617
        self.runid_idx = (20, 28)
6✔
618
        self.desc_idx = (28, 1028)
6✔
619
        self.data_idx = (1028,None)
6✔
620

621
        self.buf_size = None
6✔
622
        self.mtype = None
6✔
623
        self.group = None
6✔
624
        self.runid = None
6✔
625
        self.desc = None
6✔
626
        self.data_pak = None
6✔
627

628
        self.timeout = float(timeout)
6✔
629
        self.verbose = bool(verbose)
6✔
630

631
        self.sec_message_buf = bytearray()
6✔
632
        for sm in NetPack.sec_message:
6✔
633
            self.sec_message_buf += sm.to_bytes(1,byteorder="little")
6✔
634

635

636
    # def recv(self,num_bytes):
637
    #     data = bytes()
638
    #     total = 0
639
    #     bytes_left = num_bytes
640
    #     while total < num_bytes:
641
    #          data += s.recv()
642

643
    def recv_all(self,s,msg_len):
11✔
644
        # Helper function to recv n bytes or return None if EOF is hit
645
        data = bytearray()
6✔
646
        while len(data) < msg_len:
6✔
647
            packet = s.recv(msg_len - len(data))
6✔
648
            if not packet:
6✔
649
                return None
×
650
            data.extend(packet)
6✔
651
        return data
6✔
652

653
    def nonblocking_recv(self,s,msg_len):
11✔
654
        try:
6✔
655
            msg = self.recv_all(s,msg_len)
6✔
656
        except socket.timeout as e:
×
657
            emess = e.args[0]
×
658
            if emess == 'timed out':
×
659
                return None
×
660
            else:
661
                raise Exception(e)
×
662
        except socket.error as e:
×
663
            # Something else happened, handle error, exit, etc.
664
            raise Exception(e)
×
665
        else:
666
            if msg is None:
6✔
667
                return None
×
668
            if len(msg) == 0:
6✔
669
                return None
×
670
            else:
671
                return msg
6✔
672
        # got a message do something :)
673

674
    def recv(self,s,dtype=None):
11✔
675
        recv_sec_message = None
6✔
676

677
        #data = s.recv(len(self.sec_message_buf))
678
        data = self.nonblocking_recv(s,len(self.sec_message_buf))
6✔
679

680
        #if len(data) == 0:
681
        #    return 0
682
        if data is None:
6✔
683
            return 0
×
684

685
        recv_sec_message = [int(d) for d in data]
6✔
686
        self._check_sec_message(recv_sec_message)
6✔
687
        #data = s.recv(self.header_size)
688
        #if len(data) == 0:
689
        #    return -1
690
        data = self.nonblocking_recv(s,self.header_size)
6✔
691
        if data is None:
6✔
692
            raise Exception("didn't recv header after security message")
×
693
        self.buf_size = int.from_bytes(data[self.buf_idx[0]:self.buf_idx[1]], "little")
6✔
694
        self.mtype = int.from_bytes(data[self.type_idx[0]:self.type_idx[1]], "little")
6✔
695
        self.group = int.from_bytes(data[self.group_idx[0]:self.group_idx[1]], "little")
6✔
696
        self.runid = int.from_bytes(data[self.runid_idx[0]:self.runid_idx[1]], "little")
6✔
697
        self.desc = data[self.desc_idx[0]:self.desc_idx[1]].decode().replace("\x00","")
6✔
698
        data_len = self.buf_size - self.data_idx[0] - 1
6✔
699
        self.data_pak = None
6✔
700
        if data_len > 0:
6✔
701
            raw_data = self.nonblocking_recv(s,data_len)
6✔
702
            if raw_data is None:
6✔
703
                raise Exception("didn't recv data pack after header of {0} bytes".format(data_len))
×
704
            if dtype is None and self.mtype == 10:
6✔
705
                dtype = float
6✔
706
            self.data_pak = self.deserialize_data(raw_data,dtype)
6✔
707
        return len(data) + data_len
6✔
708

709

710
    def deserialize_data(self,data,dtype=None):
11✔
711
        if dtype is not None and dtype in [int,float]:
6✔
712
            return np.frombuffer(data,dtype=dtype)
6✔
713
        ddata = np.array(data.decode().lower().replace('\x00',' ').split())
6✔
714
        if dtype is not None:
6✔
715
            ddata = ddata.astype(dtype)
×
716
        return ddata
6✔
717

718
    def serialize_data(self,data):
11✔
719

720
        if isinstance(data,str):
6✔
721
            return data.encode()
6✔
722
        elif isinstance(data,list):
6✔
723
            return np.array(data).tobytes()
×
724
        elif isinstance(data,np.ndarray):
6✔
725
            return data.tobytes()
6✔
726
        elif isinstance(data,int):
6✔
727
            if self.verbose:
6✔
728
                print("warning: casting int to float to serialize")
×
729
            return struct.pack('d',float(data))
6✔
730
        elif isinstance(data,float):
×
731
            return struct.pack('d', data)
×
732

733
        else:
734
            raise Exception("can't serialize unknown 'data' type {0}".format(data))
×
735

736
    def send(self,s,mtype,group,runid,desc,data):
11✔
737
        buf = bytearray()
6✔
738
        buf += self.sec_message_buf
6✔
739
        sdata = self.serialize_data(data) + "\x00".encode()
6✔
740
        buf_size = self.header_size + len(sdata)
6✔
741
        buf += buf_size.to_bytes(length=8,byteorder="little")
6✔
742
        buf += mtype.to_bytes(length=4,byteorder="little")
6✔
743
        buf += group.to_bytes(length=8, byteorder="little")
6✔
744
        buf += runid.to_bytes(length=8, byteorder="little")
6✔
745
        fill_desc = "\x00" * (1001 - len(desc))
6✔
746
        full_desc = desc + fill_desc
6✔
747
        buf += full_desc.encode()
6✔
748
        buf += sdata
6✔
749
        s.sendall(buf)
6✔
750

751

752
    def _check_sec_message(self,recv_sec_message):
11✔
753
        if recv_sec_message != self.sec_message:
6✔
754
            raise Exception("recv'd security message {0} invalid, should be {1}".\
×
755
                            format(recv_sec_message,self.sec_message))
756

757
class PyPestWorker(object):
11✔
758
    """a pure python worker for pest++.  the pest++ master doesnt even know...
759

760
    Args:
761
        pst (str or pyemu.Pst): something about a control file
762
        host (str): master hostname or IPv4 address
763
        port (int): port number that the master is listening on
764
        timeout (float): number of seconds to sleep at different points in the process.  
765
            if you have lots of pars and/obs, a longer sleep can be helpful, but if you make this smaller,
766
            the worker responds faster...'it depends'
767
        verbose (bool): flag to echo what's going on to stdout
768
        socket_timeout (float): number of seconds that the socket should wait before giving up. 
769
            generally, this can be a big number...
770
    """
771

772
    def __init__(self, pst, host, port, timeout=0.25,verbose=True, socket_timeout=None):
11✔
773
        self.host = host
6✔
774
        self.port = port
6✔
775
        self._pst_arg = pst
6✔
776
        self._pst = None
6✔
777
        self.s = None
6✔
778
        self.timeout = float(timeout)
6✔
779
        self.net_pack = NetPack(timeout=timeout,verbose=verbose)
6✔
780
        self.verbose = bool(verbose)
6✔
781
        self.par_names = None
6✔
782
        self.obs_names = None
6✔
783
        if socket_timeout is None:
6✔
784
            socket_timeout = timeout * 100
6✔
785
        self.socket_timeout = socket_timeout
6✔
786
        self.par_values = None
6✔
787
        self.max_reconnect_attempts = 10
6✔
788
        self.logger_filename = "pypestworker_{0}.txt".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f"))
6✔
789
        self.logger = Logger(self.logger_filename,echo=verbose)
6✔
790
        self.message("PyPestWorker starting with timeout:{0} and socket_timeout:{1}".format(self.timeout, self.socket_timeout))
6✔
791
        self._process_pst()
6✔
792
        self.connect()
6✔
793
        self._lock = threading.Lock()
6✔
794
        self._send_lock = threading.Lock()
6✔
795
        self._listen_thread = threading.Thread(target=self.listen,args=(self._lock,self._send_lock))
6✔
796
        self._listen_thread.start()
6✔
797

798

799
    def _process_pst(self):
11✔
800
        self.message("processing control file")
6✔
801
        if isinstance(self._pst_arg,str):
6✔
802
            self._pst = pst_handler.Pst(self._pst_arg)
6✔
803
        elif isinstance(self._pst_arg,pst_handler.Pst):
×
804
            self._pst = self._pst_arg
×
805
        else:
806
            raise Exception("unrecognized 'pst' arg:{0}".\
×
807
                            format(type(self._pst_arg)))
808

809

810
    def connect(self,is_reconnect=False):
11✔
811
        self.message("trying to connect to {0}:{1}...".format(self.host,self.port))
6✔
812
        self.s = None
6✔
813
        c = 0
6✔
814
        while True:
6✔
815
            try:
6✔
816
                time.sleep(self.timeout)
6✔
817
                c += 1
6✔
818
                if is_reconnect and c > self.max_reconnect_attempts:
6✔
819
                    self.message("max reconnect attempts reached...",True)
×
820
                    return False
×
821
                self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6✔
822
                self.s.connect((self.host, self.port))
6✔
823
                self.message("connected to {0}:{1}".format(self.host,self.port))
6✔
824
                break
6✔
825

826
            except ConnectionRefusedError:
×
827
                continue
×
828
            except Exception as e:
×
829
                continue
×
830
            
831
        self.net_pack = NetPack(timeout=self.timeout,verbose=self.verbose)
6✔
832
        return True
6✔
833

834

835
    def message(self,msg,echo=False):
11✔
836
        self.logger.statement(msg)
6✔
837
        if self.verbose or echo:
6✔
838
            print(str(datetime.now())+" : "+msg)
6✔
839

840

841
    def recv(self,dtype=None):
11✔
842
        n = self.net_pack.recv(self.s,dtype=dtype)
6✔
843
        if n > 0:
6✔
844
            self.message("recv'd message type:{0}, group:{1}, run_id:{2}, desc:{3}"\
6✔
845
                .format(NetPack.netpack_type[self.net_pack.mtype], 
846
                    self.net_pack.group, self.net_pack.runid,self.net_pack.desc))
847
        return n
6✔
848

849

850
    def send(self,mtype,group,runid,desc="",data=0):
11✔
851
        try:
6✔
852
            self.net_pack.send(self.s,mtype,group,runid,desc,data)
6✔
853
        except Exception as e:
×
854
            self.message("WARNING: error sending message:{0}".format(str(e)), True)
×
855
            return False
×
856
        self.message("sent message type:{0}, group: {1}, run_id:{2}, desc:{3}".\
6✔
857
            format(NetPack.netpack_type[mtype], group, runid, desc))
858
        return True
6✔
859

860
    def listen(self,lock=None,send_lock=None):
11✔
861
        self.s.settimeout(self.socket_timeout)
6✔
862
        failed_reconnect = False
6✔
863
        while True:
6✔
864
            time.sleep(self.timeout)
6✔
865
            try:
6✔
866
                n = self.recv()
6✔
867
            except Exception as e:
×
868
                self.message("WARNING: recv exception:"+str(e)+"...trying to reconnect...", True)
×
869
                success = self.connect(is_reconnect=True)
×
870
                if not success:
×
871
                    self.message("...exiting")
×
872
                    time.sleep(self.timeout)
×
873
                    # set the teminate flag so that the get_pars() look will exit
874
                    self._lock.acquire()
×
875
                    self.net_pack.mtype = 14
×
876
                    self._lock.release()
×
877
                    return
×
878
                else:
879
                    self.message("...reconnected successfully...", True)
×
880
                    continue
×
881

882
            if n > 0:
6✔
883
                # need to sync here
884
                if self.net_pack.mtype == 10:
6✔
885
                    if lock is not None:
6✔
886
                        lock.acquire()
6✔
887
                    self.par_values =  self.net_pack.data_pak.copy()
6✔
888
                    if lock is not None:
6✔
889
                        lock.release()
6✔
890

891
                # request cwd
892
                elif self.net_pack.mtype == 4:
6✔
893
                    if self._send_lock is not None:
6✔
894
                        self._send_lock.acquire()
6✔
895
                    success = self.send(mtype=5, group=self.net_pack.group,
6✔
896
                              runid=self.net_pack.runid,
897
                              desc="sending cwd", data=os.getcwd())
898
                    if self._send_lock is not None:
6✔
899
                        self._send_lock.release()
6✔
900
                    if not success:
6✔
901
                        self.message("failed cwd send...trying to reconnect...", True)
×
902
                        success = self.connect(is_reconnect=True)
×
903
                        if not success:
×
904
                            self.message("...exiting", True)
×
905
                            time.sleep(self.timeout)
×
906
                            return
×
907
                        else:
908
                            self.message("reconnect successfully...", True)
×
909
                            continue
×
910

911
                elif self.net_pack.mtype == 8:
6✔
912
                    self.par_names = self.net_pack.data_pak
6✔
913
                    diff = set(self.par_names).symmetric_difference(set(self._pst.par_names))
6✔
914
                    if len(diff) > 0:
6✔
915
                        self.message("WARNING: the following par names are not common\n"+\
×
916
                                    " between the control file and the master:{0}".format(','.join(diff)), True)
917
                elif self.net_pack.mtype == 9:
6✔
918
                    self.obs_names = self.net_pack.data_pak
6✔
919
                    diff = set(self.obs_names).symmetric_difference(set(self._pst.obs_names))
6✔
920
                    if len(diff) > 0:
6✔
921
                        self.message("WARNING: the following obs names are not common\n"+\
×
922
                                    " between the control file and the master:{0}".format(','.join(diff)), True)
923

924
                elif self.net_pack.mtype == 6:
6✔
925
                    if self._send_lock is not None:
6✔
926
                        self._send_lock.acquire()
6✔
927
                    success = self.send(7, self.net_pack.group,
6✔
928
                              self.net_pack.runid,
929
                              "fake linpack result", data=1)
930
                    if self._send_lock is not None:
6✔
931
                        self._send_lock.release()
6✔
932
                    if not success:
6✔
933
                        self.message("failed linpack send...trying to reconnect...", True)
×
934
                        success = self.connect(is_reconnect=True)
×
935
                        if not success:
×
936
                            self.message("...exiting",True)
×
937
                            time.sleep(self.timeout)
×
938
                            return
×
939
                        else:
940
                            self.message("reconnect successfully...", True)
×
941
                            continue
×
942

943
                elif self.net_pack.mtype == 15:
6✔
944
                    if self._send_lock is not None:
×
945
                        self._send_lock.acquire()
×
946
                    sucess = self.send(15, self.net_pack.group,
×
947
                              self.net_pack.runid,
948
                              "ping back")
949
                    if self._send_lock is not None:
×
950
                        self._send_lock.release()
×
951
                    if not success:
×
952
                        self.message("failed ping back...trying to reconnect...", True)
×
953
                        success = self.connect(is_reconnect=True)
×
954
                        if not success:
×
955
                            self.message("...exiting",True)
×
956
                            time.sleep(self.timeout)
×
957
                            return
×
958
                        else:
959
                            self.message("reconnect successfully...", True)
×
960
                            continue
×
961
                elif self.net_pack.mtype == 14:
6✔
962
                    self.message("recv'd terminate signal", True)
6✔
963
                    return
6✔
964
                elif self.net_pack.mtype == 16:
×
965
                    self.message("master is requesting run kill...", True)
×
966

967
                else:
968
                    self.message("WARNING: unsupported request received: {0}".format(NetPack.netpack_type[self.net_pack.mtype]), True)
×
969

970

971
    def get_parameters(self):
11✔
972
        pars = None
6✔
973
        while True:
6✔
974
            self._lock.acquire()
6✔
975
            if self.net_pack.mtype == 14:
6✔
976
                self._lock.release()
6✔
977
                return None
6✔
978
            if self.par_values is not None:
6✔
979
                pars = self.par_values
6✔
980
                self._lock.release()
6✔
981
                break
6✔
982
            self._lock.release()
6✔
983
            time.sleep(self.timeout)
6✔
984
        if len(pars) != len(self.par_names):
6✔
985
            raise Exception("len(par vals) {0} != len(par names)".format(len(pars),len(self.par_names)))
×
986
        return pd.Series(data=pars,index=self.par_names)
6✔
987

988

989
    def send_observations(self,obsvals,parvals=None,request_more_pars=True):
11✔
990
        if len(obsvals) != len(self.obs_names):
6✔
991
            raise Exception("len(obs vals) {0} != len(obs names)".format(len(obsvals), len(self.obs_names)))
×
992
        if isinstance(obsvals,np.ndarray):
6✔
993
            _obsvals = obsvals
6✔
994
        elif isinstance(obsvals,pd.Series):
×
995
            _obsvals = obsvals.loc[self.obs_names].values
×
996
        elif isinstance(obsvals,list):
×
997
            _obsvals = np.array(obsvals)
×
998
        else:
999
            raise Exception("unknown obsvals type {0}".format(type(obsvals)))
×
1000
        if np.any(np.isnan(_obsvals)):
6✔
1001
            raise Exception("nans in obsvals")
×
1002

1003
        if parvals is None:
6✔
1004
            _parvals = self.par_values
6✔
1005
        elif isinstance(parvals,np.ndarray):
×
1006
            _parvals = parvals
×
1007
        elif isinstance(parvals,pd.Series):
×
1008
            _parvals = parvals.loc[self.par_names].values
×
1009
        elif isinstance(parvals,list):
×
1010
            _parvals = np.array(parvals)
×
1011
        else:
1012
            raise Exception("unknown parvals type {0}".format(type(parvals)))
×
1013
        if np.any(np.isnan(_parvals)):
6✔
1014
            raise Exception("nans in parvals")
×
1015

1016
        #first reset par_values to None
1017
        self._lock.acquire()
6✔
1018
        self.par_values = None
6✔
1019
        self._lock.release()
6✔
1020

1021
        data = np.concatenate((_parvals,_obsvals))
6✔
1022

1023
        self._send_lock.acquire()
6✔
1024
        # send the stack obsvals and par vals
1025
        self.send(11, self.net_pack.group, self.net_pack.runid, self.net_pack.desc, data=data)
6✔
1026

1027
        # now send the ready message
1028
        if request_more_pars:
6✔
1029
            self.send(3,0,0,"ready for next run",data=0)
6✔
1030
        self._send_lock.release()
6✔
1031

1032

1033
    def request_more_pars(self):
11✔
1034
        self._send_lock.acquire()
×
1035
        self.send(3, 0, 0, "ready for next run", data=0.0)
×
1036
        self._send_lock.release()
×
1037

1038

1039
    def send_failed_run(self,group=None,runid=None,desc="failed"):
11✔
1040
        if group is None:
×
1041
            group = self.net_pack.group
×
1042
        if runid is None:
×
1043
            runid = self.net_pack.runid
×
1044
        self._send_lock.acquire()
×
1045
        self.send(12, int(group), int(runid), desc, data=0.0)
×
1046
        self._send_lock.release()
×
1047

1048

1049
    def send_killed_run(self,group=None,runid=None,desc="killed"):
11✔
1050
        if group is None:
×
1051
            group = self.net_pack.group
×
1052
        if runid is None:
×
1053
            runid = self.net_pack.runid
×
1054
        self._send_lock.acquire()
×
1055
        self.send(13, int(group), int(runid), desc, data=0.0)
×
1056
        self._send_lock.release()
×
1057

1058

1059

1060

1061
class PortManager(object):
11✔
1062
    """Cross-platform port manager for parallel processes."""
1063
    def __init__(self,
11✔
1064
                 port_range=(4004, 4999),
1065
                 lock_dir=None,
1066
                 max_retries=50,
1067
                 lock_timeout=5,
1068
                 log_level=logging.INFO):
1069
        """
1070
        Initialize the port manager.
1071
        Args:
1072
            port_range: Tuple of (min_port, max_port) to search within
1073
            lock_dir: Directory to store lock files (default: system temp dir)
1074
            max_retries: Maximum attempts to find an available port
1075
            lock_timeout: Time in seconds after which a lock is considered stale
1076
        """
1077
        # Set up instance-specific logger
1078
        self.logger = logging.getLogger(f"{__name__}.PortManager.{id(self)}")
11✔
1079
        self.logger.setLevel(log_level)
11✔
1080
        # Add a handler if none exists
1081
        if not self.logger.handlers:
11✔
1082
            handler = logging.StreamHandler()
11✔
1083
            formatter = logging.Formatter(
11✔
1084
                '%(asctime)s - %(processName)s - %(levelname)s - %(message)s'
1085
            )
1086
            handler.setFormatter(formatter)
11✔
1087
            self.logger.addHandler(handler)
11✔
1088
        self.min_port, self.max_port = port_range
11✔
1089
        self.lock_dir = lock_dir or os.path.join(tempfile.gettempdir(), "port_locks")
11✔
1090
        self.max_retries = max_retries
11✔
1091
        self.lock_timeout = lock_timeout
11✔
1092
        # Ensure lock directory exists
1093
        os.makedirs(self.lock_dir, exist_ok=True)
11✔
1094
        # Generate a unique ID for this process instance
1095
        self.instance_id = str(uuid.uuid4())
11✔
1096
    
1097
    def _is_port_available(self, port):
11✔
1098
        """Check if a port is available by attempting to bind to it."""
1099
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
11✔
1100
            try:
11✔
1101
                # Set socket to reuse address to handle TIME_WAIT state
1102
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
11✔
1103
                s.bind(('localhost', port))
11✔
1104
                return True
11✔
UNCOV
1105
            except (socket.error, OSError):
×
UNCOV
1106
                return False
×
1107
    
1108
    def _get_lock_file(self, port):
11✔
1109
        """Get the path to the lock file for a specific port."""
1110
        return os.path.join(self.lock_dir, f"port_{port}.lock")
11✔
1111
    
1112
    def _clean_stale_locks(self):
11✔
1113
        """Remove stale lock files based on timeout."""
1114
        now = time.time()
11✔
1115
        try:
11✔
1116
            for filename in os.listdir(self.lock_dir):
11✔
1117
                if filename.startswith("port_") and filename.endswith(".lock"):
4✔
1118
                    lock_path = os.path.join(self.lock_dir, filename)
4✔
1119
                    if os.path.exists(lock_path):
4✔
1120
                        # Check if lock is stale
1121
                        if now - os.path.getmtime(lock_path) > self.lock_timeout:
4✔
1122
                            try:
×
1123
                                os.remove(lock_path)
×
1124
                                self.logger.debug(f"Removed stale lock file: {lock_path}")
×
1125
                            except OSError:
×
1126
                                # Another process might have removed it already
1127
                                pass
×
1128
        except Exception as e:
×
1129
            self.logger.warning(f"Error cleaning stale locks: {e}")
×
1130
    
1131
    @contextmanager
11✔
1132
    def _try_lock_port(self, port):
11✔
1133
        """
1134
        Try to create a lock file for a port using a cross-platform approach.
1135
        Uses atomic file creation to implement locking.
1136
        """
1137
        lock_file = self._get_lock_file(port)
11✔
1138
        lock_acquired = False
11✔
1139
        try:
11✔
1140
            # Try to create the lock file - will only succeed if it doesn't exist
1141
            lock_data = {
11✔
1142
                "pid": os.getpid(),
1143
                "instance_id": self.instance_id,
1144
                "timestamp": time.time()
1145
            }
1146
            try:
11✔
1147
                # Try exclusive creation of the file (atomic operation)
1148
                with open(lock_file, 'x') as f:
11✔
1149
                    json.dump(lock_data, f)
11✔
1150
                lock_acquired = True
11✔
1151
                yield True
11✔
1152
            except FileExistsError:
×
1153
                # Lock file already exists
1154
                try:
×
1155
                    # Check if lock file is stale
1156
                    if os.path.exists(lock_file):
×
1157
                        if time.time() - os.path.getmtime(lock_file) > self.lock_timeout:
×
1158
                            # Lock is stale, try to replace it
1159
                            try:
×
1160
                                os.remove(lock_file)
×
1161
                                with open(lock_file, 'x') as f:
×
1162
                                    json.dump(lock_data, f)
×
1163
                                lock_acquired = True
×
1164
                                yield True
×
1165
                                return
×
1166
                            except (FileExistsError, OSError):
×
1167
                                # Failed to acquire lock
1168
                                pass
×
1169
                except OSError:
×
1170
                    pass
×
1171
                yield False
×
1172
        finally:
1173
            # Clean up the lock file if we created it
1174
            if lock_acquired:
11✔
1175
                try:
11✔
1176
                    if os.path.exists(lock_file):
11✔
1177
                        os.remove(lock_file)
11✔
1178
                except OSError as e:
×
1179
                    self.logger.warning(f"Error removing lock file for port {port}: {e}")
×
1180
    
1181
    def get_available_port(self):
11✔
1182
        """
1183
        Find and reserve an available port.
1184
        Returns:
1185
            An available port number.
1186
        Raises:
1187
            RuntimeError: If no available port can be found after max_retries.
1188
        """
1189
        # Clean up stale locks first
1190
        self._clean_stale_locks()
11✔
1191
        # Shuffle port range to distribute port selection
1192
        port_list = list(range(self.min_port, self.max_port + 1))
11✔
1193
        random.shuffle(port_list)
11✔
1194
        attempts = 0
11✔
1195
        while attempts < self.max_retries:
11✔
1196
            # Pick a random port from our shuffled list
1197
            if not port_list:
11✔
1198
                raise RuntimeError("Exhausted all ports in range")
×
1199
            port = port_list.pop(0)
11✔
1200
            attempts += 1
11✔
1201
            # First check if port is available
1202
            if not self._is_port_available(port):
11✔
UNCOV
1203
                continue
×
1204
            # Try to acquire a lock
1205
            with self._try_lock_port(port) as locked:
11✔
1206
                if not locked:
11✔
1207
                    # Another process got this port
1208
                    continue
×
1209
                # Double-check port is still available after locking
1210
                if self._is_port_available(port):
11✔
1211
                    self.logger.info(f"Reserved port {port} for process {os.getpid()}")
11✔
1212
                    return port
11✔
1213
        raise RuntimeError(f"Could not find available port after {self.max_retries} attempts")
×
1214
    
1215
    @contextmanager
11✔
1216
    def reserved_port(self):
11✔
1217
        """Context manager that reserves a port and releases it after use."""
1218
        port = self.get_available_port()
×
1219
        lock_file = self._get_lock_file(port)
×
1220
        try:
×
1221
            yield port
×
1222
        finally:
1223
            # Release the port by removing the lock file
1224
            if os.path.exists(lock_file):
×
1225
                try:
×
1226
                    os.remove(lock_file)
×
1227
                    self.logger.info(f"Released port {port}")
×
1228
                except OSError as e:
×
1229
                    self.logger.warning(f"Error releasing port {port}: {e}")
×
1230

1231

1232

1233

1234
if __name__ == "__main__":
11✔
1235
    host = "localhost"
×
1236
    port = PortManager().get_available_port()
×
1237
    ppw = PyPestWorker(None,host,port)
×
1238
    #ppw.initialize()
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc