• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pantsbuild / pants / 20040415697

08 Dec 2025 07:34PM UTC coverage: 80.293% (+0.009%) from 80.284%
20040415697

push

github

web-flow
support setting the working directory on interactive process (#22802)

We've run into a few cases where setting the working directory on an
InteractiveProcess would be useful, in particular for rules are tests
(ie TestRequest). In order to support debugging we convert the setup
process to InteractiveProcess but since that drops the working directory
you're unable to utilize it currently.

As a workaround we currently write a bash script into the sandbox like 

```
cd ${DIR} && exec {COMMAND}
```

This will help clean that up

This is also a pre-requisite to supporting setting the working dir in
the Run goal (small thread here
https://pantsbuild.slack.com/archives/C01CQHVDMMW/p1707448457657149)
I'll handle that in a separate pr though

19 of 19 new or added lines in 1 file covered. (100.0%)

2 existing lines in 1 file now uncovered.

78406 of 97650 relevant lines covered (80.29%)

3.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.69
/src/python/pants/pantsd/lock_test.py
1
# Copyright 2016 Pants project contributors (see CONTRIBUTORS.md).
2
# Licensed under the Apache License, Version 2.0 (see LICENSE).
3

4
import os
1✔
5
import shutil
1✔
6
import tempfile
1✔
7
import unittest
1✔
8
from multiprocessing import Manager, Process
1✔
9
from threading import Thread
1✔
10

11
from pants.pantsd.lock import OwnerPrintingInterProcessFileLock
1✔
12

13

14
def hold_lock_until_terminate(path, lock_held, terminate):
1✔
15
    lock = OwnerPrintingInterProcessFileLock(path)
×
16
    lock.acquire()
×
17
    lock_held.set()
×
18
    # NOTE: We shouldn't ever wait this long, this is just to ensure
19
    # we don't somehow leak child processes.
20
    terminate.wait(60)
×
21
    lock.release()
×
22
    lock_held.clear()
×
23

24

25
class TestOwnerPrintingInterProcessFileLock(unittest.TestCase):
1✔
26
    def setUp(self):
1✔
27
        self.lock_dir = tempfile.mkdtemp()
1✔
28
        self.lock_path = os.path.join(self.lock_dir, "lock")
1✔
29
        self.lock = OwnerPrintingInterProcessFileLock(self.lock_path)
1✔
30
        self.manager = Manager()
1✔
31
        self.lock_held = self.manager.Event()
1✔
32
        self.terminate = self.manager.Event()
1✔
33
        self.lock_process = Process(
1✔
34
            target=hold_lock_until_terminate, args=(self.lock_path, self.lock_held, self.terminate)
35
        )
36

37
    def tearDown(self):
1✔
38
        self.terminate.set()
1✔
39
        try:
1✔
40
            shutil.rmtree(self.lock_dir)
1✔
UNCOV
41
        except OSError:
×
UNCOV
42
            pass
×
43

44
    def test_non_blocking_attempt(self):
1✔
45
        self.lock_process.start()
1✔
46
        self.lock_held.wait()
1✔
47
        self.assertFalse(self.lock.acquire(blocking=False))
1✔
48

49
    def test_message(self):
1✔
50
        self.lock_process.start()
1✔
51
        self.lock_held.wait()
1✔
52
        self.assertTrue(os.path.exists(self.lock.message_path))
1✔
53
        with open(self.lock.message_path) as f:
1✔
54
            message_content = f.read()
1✔
55
        self.assertIn(str(self.lock_process.pid), message_content)
1✔
56

57
        os.unlink(self.lock.message_path)
1✔
58

59
        def message_fn(message):
1✔
60
            self.assertIn(self.lock.missing_message_output, message)
1✔
61

62
        self.lock.acquire(blocking=False, message_fn=message_fn)
1✔
63

64
    def test_blocking(self):
1✔
65
        self.lock_process.start()
1✔
66
        self.lock_held.wait()
1✔
67
        self.assertFalse(self.lock.acquire(timeout=0.1))
1✔
68

69
        acquire_is_blocking = self.manager.Event()
1✔
70

71
        def terminate_subproc(terminate, acquire_is_blocking):
1✔
72
            acquire_is_blocking.wait()
1✔
73
            terminate.set()
1✔
74

75
        Thread(target=terminate_subproc, args=(self.terminate, acquire_is_blocking)).start()
1✔
76

77
        def message_fn(message):
1✔
78
            self.assertIn(str(self.lock_process.pid), message)
1✔
79
            acquire_is_blocking.set()
1✔
80

81
        # NOTE: We shouldn't ever wait this long (locally this runs in ~milliseconds)
82
        # but sometimes CI containers are extremely slow, so we choose a very large
83
        # value just in case.
84
        self.assertTrue(self.lock.acquire(timeout=30, message_fn=message_fn))
1✔
85

86
    def test_reentrant(self):
1✔
87
        self.assertTrue(self.lock.acquire())
1✔
88
        self.assertTrue(self.lock.acquire())
1✔
89

90
    def test_release(self):
1✔
91
        self.assertTrue(self.lock.acquire())
1✔
92
        self.assertTrue(self.lock.acquired)
1✔
93
        self.lock.release()
1✔
94
        self.assertFalse(self.lock.acquired)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc