• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

karellen / geventmp / 17340928350

30 Aug 2025 07:15AM UTC coverage: 77.889% (+2.4%) from 75.467%
17340928350

push

github

web-flow
Merge pull request #32 from karellen/geventmp_forkserver

Instrument Multiprocessing forkserver

38 of 71 branches covered (53.52%)

Branch coverage included in aggregate %.

44 of 69 new or added lines in 5 files covered. (63.77%)

21 existing lines in 2 files now uncovered.

272 of 327 relevant lines covered (83.18%)

12.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.32
/src/main/python/geventmp/_mp/3/_mp_synchronize.py
1
#   -*- coding: utf-8 -*-
2
#   Copyright 2019 Karellen, Inc. and contributors
3
#
4
#   Licensed under the Apache License, Version 2.0 (the "License");
5
#   you may not use this file except in compliance with the License.
6
#   You may obtain a copy of the License at
7
#
8
#       http://www.apache.org/licenses/LICENSE-2.0
9
#
10
#   Unless required by applicable law or agreed to in writing, software
11
#   distributed under the License is distributed on an "AS IS" BASIS,
12
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
#   See the License for the specific language governing permissions and
14
#   limitations under the License.
15

16
import os
18✔
17
import sys
18✔
18
from multiprocessing import context
18✔
19
from multiprocessing import reduction
18✔
20
from multiprocessing import util
18✔
21
from multiprocessing.synchronize import SemLock as _SemLock, \
18✔
22
    Semaphore as _Semaphore, \
23
    BoundedSemaphore as _BoundedSemaphore, \
24
    Lock as _Lock, \
25
    RLock as _RLock, \
26
    SEMAPHORE, SEM_VALUE_MAX
27
from threading import get_ident
18✔
28

29
from gevent.hub import _get_hub_noargs as get_hub
18✔
30
from gevent.os import nb_read, nb_write, _read, ignored_errors
18✔
31
from gevent.timeout import with_timeout, Timeout
18✔
32

33
__implements__ = ["SemLock", "Semaphore", "BoundedSemaphore", "Lock", "RLock"]
18✔
34
__target__ = "multiprocessing.synchronize"
18✔
35

36
_BUF_ONE = (1).to_bytes(8, sys.byteorder, signed=False)
18✔
37

38

39
class SemLockEventFd:
18✔
40
    def __init__(self, kind, value, maxvalue, *, ctx=None):
18✔
41
        self.kind = kind
10✔
42
        self.maxvalue = maxvalue
10✔
43
        self.fd = os.eventfd(value, os.EFD_CLOEXEC | os.EFD_NONBLOCK | os.EFD_SEMAPHORE)
10✔
44
        util.debug(f"created semlock with fd {self.fd}")
10✔
45
        self._reset()
10✔
46

47
    def _reset(self):
18✔
48
        self._fd_path = f"/proc/{os.getpid()}/fdinfo/{self.fd}"
10✔
49
        self._semlock = self
10✔
50
        self.count = 0
10✔
51
        self.tid = None
10✔
52

53
    def __del__(self):
18✔
54
        try:
1✔
55
            os.close(self.fd)
1✔
56
        except OSError:
×
57
            pass
×
58

59
    def acquire(self, block=True, timeout=None):
18✔
60
        if self.kind != SEMAPHORE and self._is_mine():
10!
61
            self.count += 1
×
62
            return True
×
63

64
        if block:
10✔
65
            if timeout is not None:
10✔
66
                try:
10✔
67
                    with_timeout(timeout, nb_read, self.fd, 8)
10✔
68
                except Timeout:
10✔
69
                    return False
10✔
70
            else:
71
                nb_read(self.fd, 8)
10✔
72
        else:
73
            try:
10✔
74
                _read(self.fd, 8)
10✔
75
            except OSError as e:
10✔
76
                if e.errno not in ignored_errors:
10!
77
                    raise
×
78
                return False
10✔
79

80
        self.count += 1
10✔
81
        self.tid = get_ident()
10✔
82
        return True
10✔
83

84
    def release(self):
18✔
85
        if self.kind != SEMAPHORE:
10✔
86
            if not self._is_mine():
10!
87
                raise AssertionError("attempt to release recursive lock not owned by thread")
×
88

89
            if self.count > 1:
10!
90
                self.count -= 1
×
91
                return
×
92
        elif self.maxvalue != SEM_VALUE_MAX:
10✔
93
            count = self._get_value()
10✔
94
            if count >= self.maxvalue:
10✔
95
                raise ValueError("semaphore or lock released too many times")
10✔
96

97
        nb_write(self.fd, _BUF_ONE)
10✔
98
        self.count -= 1
10✔
99

100
    def _get_value(self):
18✔
101
        with open(self._fd_path, "rb") as f:
10✔
102
            for line in f:
10!
103
                if line.startswith(b"eventfd-count:"):
10✔
104
                    return int(line[14:].strip())
10✔
105

106
    def _count(self):
18✔
107
        return self.count
10✔
108

109
    def _is_mine(self):
18✔
110
        return self.count > 0 and self.tid == get_ident()
10✔
111

112
    def _is_zero(self):
18✔
113
        return self._get_value() == 0
12✔
114

115
    def __enter__(self):
18✔
116
        return self.acquire()
10✔
117

118
    def __exit__(self, *args):
18✔
119
        self.release()
12✔
120

121
    def __getstate__(self):
18✔
122
        context.assert_spawning(self)
12✔
123
        df = reduction.DupFd(self.fd)
12✔
124
        return df, self.kind, self.maxvalue
12✔
125

126
    def __setstate__(self, state):
18✔
UNCOV
127
        df, kind, maxvalue = state
2✔
UNCOV
128
        fd = df.detach()
2✔
UNCOV
129
        util.debug(f'recreated blocker with fd {fd}')
2✔
UNCOV
130
        self.kind = kind
2✔
UNCOV
131
        self.maxvalue = maxvalue
2✔
UNCOV
132
        self.fd = fd
2✔
UNCOV
133
        self._reset()
2✔
134

135

136
class SemLockSem(_SemLock):
18✔
137
    def _make_methods(self):
18✔
UNCOV
138
        self._acquire = self._semlock.acquire
8✔
UNCOV
139
        self._release = self._semlock.release
8✔
140

141
    def acquire(self, *args, **kwargs):
18✔
UNCOV
142
        if self._semlock.kind != SEMAPHORE:
8!
143
            return self._acquire(*args, **kwargs)
×
UNCOV
144
        return get_hub().threadpool.apply(self._acquire, args, kwargs)
8✔
145

146
    def release(self, *args, **kwargs):
18✔
UNCOV
147
        self._release(*args, **kwargs)
8✔
148

149
    def __enter__(self):
18✔
UNCOV
150
        if self._semlock.kind != SEMAPHORE:
8!
UNCOV
151
            return super().__enter__()
8✔
152
        return get_hub().threadpool.apply(super().__enter__)
×
153

154

155
try:
18✔
156
    os.eventfd
18✔
157
    SemLock = SemLockEventFd
10✔
UNCOV
158
except AttributeError:
8✔
UNCOV
159
    SemLock = SemLockSem
8✔
160

161
Semaphore = type("Semaphore", (SemLock,), dict(_Semaphore.__dict__))
18✔
162
BoundedSemaphore = type("BoundedSemaphore", (Semaphore,), dict(_BoundedSemaphore.__dict__))
18✔
163
Lock = type("Lock", (SemLock,), dict(_Lock.__dict__))
18✔
164
RLock = type("RLock", (SemLock,), dict(_RLock.__dict__))
18✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc