• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bleachbit / bleachbit / 24935925314

25 Apr 2026 04:56PM UTC coverage: 73.757% (+1.3%) from 72.5%
24935925314

push

github

az0
Fix: parameter name in progress_cb()

1 of 1 new or added line in 1 file covered. (100.0%)

1089 existing lines in 29 files now uncovered.

7178 of 9732 relevant lines covered (73.76%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.94
/bleachbit/Memory.py
1
# vim: ts=4:sw=4:expandtab
2
# -*- coding: UTF-8 -*-
3

4
# BleachBit
5
# Copyright (C) 2008-2025 Andrew Ziem
6
# https://www.bleachbit.org
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
# GNU General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20

21

22
"""
23
Wipe memory
24
"""
25

26
import logging
1✔
27
import os
1✔
28
import re
1✔
29
import subprocess
1✔
30
import sys
1✔
31

32
from bleachbit import FileUtilities
1✔
33
from bleachbit import General
1✔
34
from bleachbit.Language import get_text as _
1✔
35
from bleachbit.Wipe import wipe_contents
1✔
36

37
logger = logging.getLogger(__name__)
1✔
38

39

40
def count_swap_linux():
1✔
41
    """Count the number of swap devices in use"""
42
    count = 0
1✔
43
    with open("/proc/swaps") as f:
1✔
44
        for line in f:
1✔
45
            if line[0] == '/':
1✔
46
                count += 1
1✔
47
    return count
1✔
48

49

50
def get_proc_swaps():
1✔
51
    """Return the output of 'swapon -s'"""
52
    # Usually 'swapon -s' is identical to '/proc/swaps'
53
    # Here is one exception:
54
    # https://bugs.launchpad.net/ubuntu/+source/bleachbit/+bug/1092792
55
    (rc, stdout, _stderr) = General.run_external(['swapon', '-s'])
1✔
56
    if 0 == rc:
1✔
57
        return stdout
1✔
58
    logger.debug(
×
59
        _("The command 'swapoff -s' failed, so falling back to /proc/swaps for swap information."))
60
    return open("/proc/swaps").read()
×
61

62

63
def parse_swapoff(swapoff):
1✔
64
    """Parse the output of swapoff and return the device name"""
65
    # English is 'swapoff on /dev/sda5' but German is 'swapoff für ...'
66
    # Example output in English with LVM and hyphen: 'swapoff on /dev/mapper/lubuntu-swap_1'
67
    # This matches swap devices and swap files
68
    ret = re.search(r'^swapoff (\w* )?(/[\w/.-]+)$', swapoff)
1✔
69
    if not ret:
1✔
70
        # no matches
71
        return None
×
72
    return ret.group(2)
1✔
73

74

75
def disable_swap_linux():
1✔
76
    """Disable Linux swap and return list of devices"""
77
    if 0 == count_swap_linux():
×
78
        return
×
79
    logger.debug(_("Disabling swap."))
×
80
    args = ["swapoff", "-a", "-v"]
×
81
    (rc, stdout, stderr) = General.run_external(args)
×
82
    if 0 != rc:
×
83
        raise RuntimeError(stderr.replace("\n", ""))
×
84
    devices = []
×
85
    for line in stdout.split('\n'):
×
86
        line = line.replace('\n', '')
×
87
        if '' == line:
×
88
            continue
×
89
        ret = parse_swapoff(line)
×
90
        if ret is None:
×
91
            raise RuntimeError(f"Unexpected output:\nargs='{args}'\n"
×
92
                               f"stdout='{stdout}'\nstderr='{stderr}'")
93
        devices.append(ret)
×
94
    return devices
×
95

96

97
def enable_swap_linux():
1✔
98
    """Enable Linux swap"""
99
    logger.debug(_("Re-enabling swap."))
×
100
    args = ["swapon", "-a"]
×
101
    p = subprocess.Popen(args, stderr=subprocess.PIPE)
×
102
    p.wait()
×
103
    outputs = p.communicate()
×
104
    if 0 != p.returncode:
×
105
        raise RuntimeError(outputs[1].replace("\n", ""))
×
106

107

108
def make_self_oom_target_linux():
1✔
109
    """Make the current process the primary target for Linux out-of-memory killer"""
110
    path = f'/proc/{os.getpid()}/oom_score_adj'
1✔
111
    if os.path.exists(path):
1✔
112
        with open(path, 'w', encoding='utf-8') as f:
1✔
113
            f.write('1000')
1✔
114
    # OOM likes nice processes
115
    logger.debug(_("Setting nice value %d for this process."), os.nice(19))
1✔
116
    # OOM prefers non-privileged processes
117
    try:
1✔
118
        uid = General.get_real_uid()
1✔
119
        if uid > 0:
1✔
120
            # TRANSLATORS: Debug message when a process gives up root/admin privileges.
121
            # %(pid)d is the integer process ID; %(uid)d is the integer user ID to switch to.
122
            drop_msg = _("Dropping privileges of process ID %(pid)d to user ID %(uid)d.")
1✔
123
            logger.debug(drop_msg, {'pid': os.getpid(), 'uid': uid})
1✔
124
            os.seteuid(uid)
1✔
UNCOV
125
    except:
×
UNCOV
126
        logger.exception('Error when dropping privileges')
×
127

128

129
def fill_memory_linux():
1✔
130
    """Fill unallocated memory"""
131
    report_free()
×
132
    allocbytes = int(physical_free() * 0.4)
×
133
    if allocbytes < 1024:
×
UNCOV
134
        return
×
135
    bytes_str = FileUtilities.bytes_to_human(allocbytes)
×
136
    # TRANSLATORS: The variable is a quantity like 5kB
137
    logger.info(_("Allocating and wiping %s of memory."),
×
138
                bytes_str)
139
    try:
×
140
        buf = '\x00' * allocbytes
×
UNCOV
141
    except MemoryError:
×
142
        pass
×
143
    else:
144
        fill_memory_linux()
×
145
        # TRANSLATORS: The variable is a quantity like 5kB
146
        logger.debug(_("Freeing %s of memory."), bytes_str)
×
UNCOV
147
        del buf
×
UNCOV
148
    report_free()
×
149

150

151
def get_swap_size_linux(device, proc_swaps=None):
1✔
152
    """Return the size of the partition in bytes"""
153
    if proc_swaps is None:
1✔
154
        proc_swaps = get_proc_swaps()
1✔
155
    line = proc_swaps.split('\n')[0]
1✔
156
    if not re.search(r'Filename\s+Type\s+Size', line):
1✔
UNCOV
157
        raise RuntimeError("Unexpected first line in swap summary '%s'" % line)
×
158
    for line in proc_swaps.split('\n')[1:]:
1✔
159
        ret = re.search(r"%s\s+\w+\s+([0-9]+)\s" % device, line)
1✔
160
        if ret:
1✔
161
            return int(ret.group(1)) * 1024
1✔
UNCOV
162
    raise RuntimeError("error: cannot find size of swap device '%s'\n%s" %
×
163
                       (device, proc_swaps))
164

165

166
def get_swap_uuid(device):
1✔
167
    """Find the UUID for the swap device"""
168
    uuid = None
1✔
169
    args = ['blkid', device, '-s', 'UUID']
1✔
170
    (_rc, stdout, _stderr) = General.run_external(args)
1✔
171
    for line in stdout.split('\n'):
1✔
172
        # example: /dev/sda5: UUID="ee0e85f6-6e5c-42b9-902f-776531938bbf"
173
        ret = re.search(r"^%s: UUID=\"([a-z0-9-]+)\"" % device, line)
1✔
174
        if ret is not None:
1✔
UNCOV
175
            uuid = ret.group(1)
×
176
    # TRANSLATORS: Debug message. 'Found' is a past tense verb (short for
177
    # "Found [that] the UUID for swap device ..."). %(device)s is the device
178
    # path (e.g., /dev/sda5); %(uuid)s is a UUID string
179
    # (e.g., ee0e85f6-6e5c-42b9-902f-776531938bbf). Do not translate variables.
180
    logger.debug(_("Found UUID for swap device %(device)s is %(uuid)s."),
1✔
181
                 {'device': device, 'uuid': uuid})
182
    return uuid
1✔
183

184

185
def physical_free_darwin(run_vmstat=None):
1✔
186
    def parse_line(k, v):
1✔
187
        return k, int(v.strip(" ."))
1✔
188

189
    def get_page_size(line):
1✔
190
        m = re.match(
1✔
191
            r"Mach Virtual Memory Statistics: \(page size of (\d+) bytes\)",
192
            line)
193
        if m is None:
1✔
194
            raise RuntimeError("Can't parse vm_stat output")
1✔
195
        return int(m.groups()[0])
1✔
196
    if run_vmstat is None:
1✔
UNCOV
197
        def run_vmstat():
×
UNCOV
198
            return subprocess.check_output(["vm_stat"])
×
199
    output = iter(run_vmstat().split("\n"))
1✔
200
    page_size = get_page_size(next(output))
1✔
201
    vm_stat = dict(parse_line(*l.split(":")) for l in output if l != "")
1✔
202
    return vm_stat["Pages free"] * page_size
1✔
203

204

205
def physical_free_linux():
1✔
206
    """Return the physical free memory on Linux"""
207
    free_bytes = 0
1✔
208
    with open("/proc/meminfo") as f:
1✔
209
        for line in f:
1✔
210
            line = line.replace("\n", "")
1✔
211
            ret = re.search(r'(MemFree|Cached):[ ]*([0-9]*) kB', line)
1✔
212
            if ret is not None:
1✔
213
                kb = int(ret.group(2))
1✔
214
                free_bytes += kb * 1024
1✔
215
    if free_bytes > 0:
1✔
216
        return free_bytes
1✔
217
    else:
218
        raise Exception("unknown")
×
219

220

221
def physical_free_windows():
1✔
222
    """Return physical free memory on Windows"""
223

UNCOV
224
    from ctypes import c_ulong, c_ulonglong, Structure, sizeof, windll, byref
×
225

UNCOV
226
    class MEMORYSTATUSEX(Structure):
×
UNCOV
227
        _fields_ = [
×
228
            ('dwLength', c_ulong),
229
            ('dwMemoryLoad', c_ulong),
230
            ('ullTotalPhys', c_ulonglong),
231
            ('ullAvailPhys', c_ulonglong),
232
            ('ullTotalPageFile', c_ulonglong),
233
            ('ullAvailPageFile', c_ulonglong),
234
            ('ullTotalVirtual', c_ulonglong),
235
            ('ullAvailVirtual', c_ulonglong),
236
            ('ullExtendedVirtual', c_ulonglong),
237
        ]
238

239
    def GlobalMemoryStatusEx():
×
240
        x = MEMORYSTATUSEX()
×
UNCOV
241
        x.dwLength = sizeof(x)
×
UNCOV
242
        windll.kernel32.GlobalMemoryStatusEx(byref(x))
×
UNCOV
243
        return x
×
244

UNCOV
245
    z = GlobalMemoryStatusEx()
×
246
    return z.ullAvailPhys
×
247

248

249
def physical_free():
1✔
250
    if sys.platform == 'linux':
1✔
251
        return physical_free_linux()
1✔
UNCOV
252
    elif 'win32' == sys.platform:
×
UNCOV
253
        return physical_free_windows()
×
UNCOV
254
    elif 'darwin' == sys.platform:
×
UNCOV
255
        return physical_free_darwin()
×
256
    else:
UNCOV
257
        raise RuntimeError('unsupported platform for physical_free()')
×
258

259

260
def report_free():
1✔
261
    """Report free memory"""
262
    bytes_free = physical_free()
1✔
263
    bytes_str = FileUtilities.bytes_to_human(bytes_free)
1✔
264
    # TRANSLATORS: The variable is a quantity like 5kB
265
    logger.debug(_("Physical free memory is %s."),
1✔
266
                 bytes_str)
267

268

269
def wipe_swap_linux(devices, proc_swaps):
1✔
270
    """Shred the Linux swap file and then reinitialize it"""
UNCOV
271
    if devices is None:
×
UNCOV
272
        return
×
UNCOV
273
    if 0 < count_swap_linux():
×
274
        raise RuntimeError('Cannot wipe swap while it is in use')
×
275
    for device in devices:
×
276
        # if '/cryptswap' in device:
277
        #    logger.info('Skipping encrypted swap device %s.', device)
278
        #    continue
279
        # TRANSLATORS: The variable is a device like /dev/sda2
UNCOV
280
        logger.info(_("Wiping the swap device %s."), device)
×
281
        safety_limit_bytes = 29 * 1024 ** 3  # 29 gibibytes
×
UNCOV
282
        actual_size_bytes = get_swap_size_linux(device, proc_swaps)
×
283
        if actual_size_bytes > safety_limit_bytes:
×
UNCOV
284
            raise RuntimeError(
×
285
                f'swap device {device} is larger ({actual_size_bytes})'
286
                f' than expected ({safety_limit_bytes})')
287
        uuid = get_swap_uuid(device)
×
288
        # wipe
289
        wipe_contents(device, truncate=False)
×
290
        # reinitialize
291
        # TRANSLATORS: The variable is a device like /dev/sda2
292
        logger.debug(_("Reinitializing the swap device %s."), device)
×
293
        args = ['mkswap', device]
×
UNCOV
294
        if uuid:
×
UNCOV
295
            args.append("-U")
×
UNCOV
296
            args.append(uuid)
×
UNCOV
297
        (rc, _stdout, stderr) = General.run_external(args)
×
298
        if 0 != rc:
×
299
            raise RuntimeError(stderr.replace("\n", ""))
×
300

301

302
def wipe_memory():
1✔
303
    """Wipe unallocated memory"""
304
    for cmd in ('swapon', 'swapoff', 'blkid'):
×
UNCOV
305
        if not FileUtilities.exe_exists(cmd):
×
306
            raise RuntimeError(f"wipe_memory: Command {cmd} not found")
×
307
    # cache the file because 'swapoff' changes it
308
    proc_swaps = get_proc_swaps()
×
309
    devices = disable_swap_linux()
×
310
    yield True  # process GTK+ idle loop
×
311
    # TRANSLATORS: The variable is a device like /dev/sda2
312
    logger.debug(_("Detected these swap devices: %s"), str(devices))
×
313
    wipe_swap_linux(devices, proc_swaps)
×
UNCOV
314
    yield True
×
UNCOV
315
    child_pid = os.fork()
×
316
    if 0 == child_pid:
×
UNCOV
317
        make_self_oom_target_linux()
×
318
        fill_memory_linux()
×
319
        os._exit(0)
×
320
    else:
321
        # TRANSLATORS: This is a debugging message that the parent process
322
        # is waiting for the child process. %(parent_pid)d is the parent
323
        # process ID; %(child_pid)d is the child process ID.
UNCOV
324
        logger.debug(_("The function wipe_memory() with process ID %(parent_pid)d is "
×
325
                       "waiting for child process ID %(child_pid)d."),
326
                     {'parent_pid': os.getpid(), 'child_pid': child_pid})
UNCOV
327
        rc = os.waitpid(child_pid, 0)[1]
×
UNCOV
328
        if rc not in [0, 9]:
×
UNCOV
329
            logger.warning(
×
330
                _("The child memory-wiping process returned code %d."), rc)
UNCOV
331
    enable_swap_linux()
×
UNCOV
332
    yield 0  # how much disk space was recovered
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc