• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bleachbit / bleachbit / 25300670692

04 May 2026 03:50AM UTC coverage: 74.001% (+0.2%) from 73.851%
25300670692

push

github

az0
Clean node_modules directories

Node.js/npm package dependencies

7392 of 9989 relevant lines covered (74.0%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.61
/bleachbit/Unix.py
1
# SPDX-License-Identifier: GPL-3.0-or-later
2
# Copyright (c) 2008-2026 Andrew Ziem.
3
#
4
# This work is licensed under the terms of the GNU GPL, version 3 or
5
# later.  See the COPYING file in the top-level directory.
6

7
"""
8
Integration specific to Unix-like operating systems
9
"""
10

11
import configparser
1✔
12
import glob
1✔
13
import logging
1✔
14
import os
1✔
15
import platform
1✔
16
import re
1✔
17
import shlex
1✔
18
import subprocess
1✔
19

20
import bleachbit
1✔
21
from bleachbit import FileUtilities, General
1✔
22
from bleachbit.FileUtilities import exe_exists
1✔
23
from bleachbit.Language import get_text as _, native_locale_names
1✔
24

25
logger = logging.getLogger(__name__)
1✔
26

27
try:
1✔
28
    Pattern = re.Pattern
1✔
29
except AttributeError:
×
30
    Pattern = re._pattern_type
×
31

32

33
JOURNALD_REGEX = r'^Vacuuming done, freed ([\d.]+[BKMGT]?) of archived journals (on disk|from [\w/]+).$'
1✔
34

35

36
class LocaleCleanerPath:
1✔
37
    """This represents a path with either a specific folder name or a folder name pattern.
38
    It also may contain several compiled regex patterns for localization items (folders or files)
39
    and additional LocaleCleanerPaths that get traversed when asked to supply a list of localization
40
    items"""
41

42
    def __init__(self, location):
1✔
43
        if location is None:
1✔
44
            raise RuntimeError("location is none")
×
45
        self.pattern = location
1✔
46
        self.children = []
1✔
47

48
    def add_child(self, child):
1✔
49
        """Adds a child LocaleCleanerPath"""
50
        self.children.append(child)
1✔
51
        return child
1✔
52

53
    def add_path_filter(self, pre, post):
1✔
54
        r"""Adds a filter consisting of a prefix and a postfix
55
        (e.g. 'foobar_' and '\.qm' to match 'foobar_en_US.utf-8.qm)"""
56
        try:
1✔
57
            regex = re.compile('^' + pre + Locales.localepattern + post + '$')
1✔
58
        except Exception as errormsg:
×
59
            raise RuntimeError(
×
60
                f"Malformed regex '{pre}' or '{post}': {errormsg}") from errormsg
61
        self.add_child(regex)
1✔
62

63
    def get_subpaths(self, basepath):
1✔
64
        """Returns direct subpaths for this object, i.e. either the named subfolder or all
65
        subfolders matching the pattern"""
66
        if isinstance(self.pattern, Pattern):
1✔
67
            return (os.path.join(basepath, p) for p in os.listdir(basepath)
1✔
68
                    if self.pattern.match(p) and os.path.isdir(os.path.join(basepath, p)))
69
        path = os.path.join(basepath, self.pattern)
1✔
70
        return [path] if os.path.isdir(path) else []
1✔
71

72
    def get_localizations(self, basepath):
1✔
73
        """Returns all localization items for this object and all descendant objects"""
74
        for path in self.get_subpaths(basepath):
1✔
75
            for child in self.children:
1✔
76
                if isinstance(child, LocaleCleanerPath):
1✔
77
                    yield from child.get_localizations(path)
1✔
78
                elif isinstance(child, Pattern):
1✔
79
                    for element in os.listdir(path):
1✔
80
                        match = child.match(element)
1✔
81
                        if match is not None:
1✔
82
                            yield (match.group('locale'),
1✔
83
                                   match.group('specifier'),
84
                                   os.path.join(path, element))
85

86

87
class Locales:
1✔
88
    """Find languages and localization files"""
89

90
    # The regular expression to match locale strings and extract the langcode.
91
    # See test_locale_regex() in tests/TestUnix.py for examples
92
    # This doesn't match all possible valid locale strings to avoid
93
    # matching filenames you might want to keep, e.g. the regex
94
    # to match jp.eucJP might also match jp.importantfileextension
95
    localepattern =\
1✔
96
        r'(?P<locale>[a-z]{2,3})' \
97
        r'(?P<specifier>[_-][A-Z]{2,4})?(?:\.[\w]+[\d-]+|@\w+)?' \
98
        r'(?P<encoding>[.-_](?:(?:ISO|iso|UTF|utf|us-ascii)[\d-]+|(?:euc|EUC)[A-Z]+))?'
99

100
    def __init__(self):
1✔
101
        self._paths = LocaleCleanerPath(location='/')
1✔
102

103
    def add_xml(self, xml_node, parent=None):
1✔
104
        """Parses the xml data and adds nodes to the LocaleCleanerPath-tree"""
105

106
        if parent is None:
1✔
107
            parent = self._paths
1✔
108
        if xml_node.ELEMENT_NODE != xml_node.nodeType:
1✔
109
            return
1✔
110

111
        # if a pattern is supplied, we recurse into all matching subdirectories
112
        if 'regexfilter' == xml_node.nodeName:
1✔
113
            pre = xml_node.getAttribute('prefix') or ''
1✔
114
            post = xml_node.getAttribute('postfix') or ''
1✔
115
            parent.add_path_filter(pre, post)
1✔
116
        elif 'path' == xml_node.nodeName:
1✔
117
            if xml_node.hasAttribute('directoryregex'):
1✔
118
                pattern = xml_node.getAttribute('directoryregex')
1✔
119
                if '/' in pattern:
1✔
120
                    raise RuntimeError(
×
121
                        'directoryregex may not contain slashes.')
122
                pattern = re.compile(pattern)
1✔
123
                parent = parent.add_child(LocaleCleanerPath(pattern))
1✔
124

125
            # a combination of directoryregex and filter could be too much
126
            else:
127
                if xml_node.hasAttribute("location"):
1✔
128
                    # if there's a filter attribute, it should apply to this path
129
                    parent = parent.add_child(LocaleCleanerPath(
1✔
130
                        xml_node.getAttribute('location')))
131

132
                if xml_node.hasAttribute('filter'):
1✔
133
                    userfilter = xml_node.getAttribute('filter')
1✔
134
                    if 1 != userfilter.count('*'):
1✔
135
                        raise RuntimeError(
×
136
                            f"Filter string '{userfilter}' must contain the placeholder * exactly once")
137

138
                    # we can't use re.escape, because it escapes too much
139
                    (pre, post) = (re.sub(r'([\[\]()^$.])', r'\\\1', p)
1✔
140
                                   for p in userfilter.split('*'))
141
                    parent.add_path_filter(pre, post)
1✔
142
        else:
143
            raise RuntimeError(
×
144
                f"Invalid node '{xml_node.nodeName}', expected '<path>' or '<regexfilter>'")
145

146
        # handle child nodes
147
        for child_xml in xml_node.childNodes:
1✔
148
            self.add_xml(child_xml, parent)
1✔
149

150
    def localization_paths(self, locales_to_keep):
1✔
151
        """Returns all localization items matching the previously added xml configuration"""
152
        purgeable_locales = get_purgeable_locales(locales_to_keep)
1✔
153

154
        for (locale, specifier, path) in self._paths.get_localizations('/'):
1✔
155
            specific = locale + (specifier or '')
1✔
156
            if specific in purgeable_locales or \
1✔
157
                    (locale in purgeable_locales and specific not in locales_to_keep):
158
                yield path
1✔
159

160

161
def _is_broken_xdg_desktop_application(config, desktop_pathname):
1✔
162
    """Returns whether application .desktop file is critically broken
163

164
    This function tests only .desktop files with Type=Application.
165
    """
166
    if not config.has_option('Desktop Entry', 'Exec'):
1✔
167
        logger.info(
1✔
168
            "is_broken_xdg_menu: missing required option 'Exec' in '%s'", desktop_pathname)
169
        return True
1✔
170
    exe = config.get('Desktop Entry', 'Exec').split(" ")[0]
1✔
171
    if not os.path.isabs(exe) and not os.environ.get('PATH'):
1✔
172
        raise RuntimeError(
×
173
            f"Cannot find executable '{exe}' because PATH environment variable is not set")
174
    if not FileUtilities.exe_exists(exe):
1✔
175
        logger.info(
×
176
            "is_broken_xdg_menu: executable '%s' does not exist in '%s'", exe, desktop_pathname)
177
        return True
×
178
    if 'env' == exe:
1✔
179
        # Wine v1.0 creates .desktop files like this
180
        # Exec=env WINEPREFIX="/home/z/.wine" wine "C:\\Program
181
        # Files\\foo\\foo.exe"
182
        exec_val = config.get('Desktop Entry', 'Exec')
1✔
183
        try:
1✔
184
            execs = shlex.split(exec_val)
1✔
185
        except ValueError as e:
1✔
186
            logger.info(
1✔
187
                "is_broken_xdg_menu: error splitting 'Exec' key '%s' in '%s'", e, desktop_pathname)
188
            return True
1✔
189
        wineprefix = None
1✔
190
        del execs[0]
1✔
191
        while True:
192
            if execs[0].find("=") < 0:
1✔
193
                break
1✔
194
            (name, value) = execs[0].split("=")
1✔
195
            if name == 'WINEPREFIX':
1✔
196
                wineprefix = value
1✔
197
            del execs[0]
1✔
198
        if not FileUtilities.exe_exists(execs[0]):
1✔
199
            logger.info(
1✔
200
                "is_broken_xdg_menu: executable '%s' does not exist in '%s'", execs[0], desktop_pathname)
201
            return True
1✔
202
        # check the Windows executable exists
203
        if wineprefix:
1✔
204
            windows_exe = wine_to_linux_path(wineprefix, execs[1])
1✔
205
            if not os.path.exists(windows_exe):
1✔
206
                logger.info("is_broken_xdg_menu: Windows executable '%s' does not exist in '%s'",
1✔
207
                            windows_exe, desktop_pathname)
208
                return True
1✔
209
    return False
1✔
210

211

212
def find_available_locales():
1✔
213
    """Returns a list of available locales using locale -a"""
214
    rc, stdout, stderr = General.run_external(['locale', '-a'])
1✔
215
    if rc == 0:
1✔
216
        return stdout.strip().split('\n')
1✔
217

218
    logger.warning("Failed to get available locales: %s", stderr)
×
219
    return []
×
220

221

222
def find_best_locale(user_locale):
1✔
223
    """Find closest match to available locales"""
224
    assert isinstance(user_locale, str)
1✔
225
    if not user_locale:
1✔
226
        return 'C'
1✔
227
    if user_locale in ('C', 'C.utf8', 'POSIX'):
1✔
228
        return user_locale
1✔
229
    available_locales = find_available_locales()
1✔
230

231
    # If requesting a language like 'es' and current locale is compatible
232
    # like 'es_MX', then return that.
233
    # Import here for mock patch.
234
    import locale  # pylint: disable=import-outside-toplevel
1✔
235
    current_locale = locale.getlocale()[0]
1✔
236
    if current_locale and current_locale.startswith(user_locale.split('.')[0]):
1✔
237
        return '.'.join(locale.getlocale())
1✔
238

239
    # Check for exact match.
240
    if user_locale in available_locales:
1✔
241
        return user_locale
1✔
242

243
    # Next, match like 'en' to 'en_US.utf8' (if available) because
244
    # of preference for UTF-8.
245
    for avail_locale in available_locales:
1✔
246
        if avail_locale.startswith(user_locale) and avail_locale.endswith('.utf8'):
1✔
247
            return avail_locale
1✔
248

249
    # Next, match like 'en' to 'en_US' or 'en_US.iso88591'.
250
    for avail_locale in available_locales:
1✔
251
        if avail_locale.startswith(user_locale):
1✔
252
            return avail_locale
1✔
253

254
    return 'C'
1✔
255

256

257
def get_distribution_name_version_platform_freedesktop():
1✔
258
    """Returns the name and version of the distribution using
259
    platform.freedesktop_os_release()
260

261
    Example return value: 'ubuntu 24.10' or None
262

263
    Python 3.10 added platform.freedesktop_os_release().
264
    """
265
    if hasattr(platform, 'freedesktop_os_release'):
1✔
266
        try:
×
267
            release = platform.freedesktop_os_release()
×
268
        except FileNotFoundError:
×
269
            return None
×
270
        dist_id = release.get('ID')
×
271
        dist_version_id = release.get('VERSION_ID')
×
272
        if dist_id and dist_version_id:
×
273
            return f"{dist_id} {dist_version_id}"
×
274
    return None
1✔
275

276

277
def get_distribution_name_version_distro():
1✔
278
    """Returns the name and version of the distribution using the distro
279
    package
280

281
    Example return value: 'ubuntu 24.10' or None
282

283
    distro is a third-party package recommended here:
284
    https://docs.python.org/3.7/library/platform.html
285
    """
286
    try:
1✔
287
        # Import here in case of ImportError.
288
        import distro  # pylint: disable=import-outside-toplevel
1✔
289
        # example 'ubuntu 24.10'
290
        return distro.id() + ' ' + distro.version()
×
291
    except ImportError:
1✔
292
        return None
1✔
293

294

295
def get_distribution_name_version_os_release():
1✔
296
    """Returns the name and version of the distribution using /etc/os-release
297

298
    Example return value: 'ubuntu 24.10' or None
299
    """
300
    if not os.path.exists('/etc/os-release'):
1✔
301
        return None
×
302
    try:
1✔
303
        with open('/etc/os-release', 'r', encoding='utf-8') as f:
1✔
304
            os_release = {}
1✔
305
            for line in f:
1✔
306
                if '=' in line:
1✔
307
                    key, value = line.rstrip().split('=', 1)
1✔
308
                    os_release[key] = value.strip('"\'')
1✔
309
    except Exception as e:
×
310
        logger.debug("Error reading /etc/os-release: %s", e)
×
311
        return None
×
312
    if 'ID' in os_release and 'VERSION_ID' in os_release:
1✔
313
        dist_name = os_release['ID']
1✔
314
        return f"{dist_name} {os_release['VERSION_ID']}"
1✔
315
    return None
×
316

317

318
def get_distribution_name_version():
1✔
319
    """Returns the name and version of the distribution
320

321
    Depending on system capabilities, return value may be:
322
    * 'ubuntu 24.10'
323
    * 'Linux 6.12.3 (unknown distribution)'
324
    * 'Linux (unknown version and distribution)'
325

326
    Python 3.7 had platform.linux_distribution(), but it
327
    was removed in Python 3.8.
328
    """
329
    ret = get_distribution_name_version_platform_freedesktop()
1✔
330
    if ret:
1✔
331
        return ret
×
332
    ret = get_distribution_name_version_distro()
1✔
333
    if ret:
1✔
334
        return ret
×
335
    ret = get_distribution_name_version_os_release()
1✔
336
    if ret:
1✔
337
        return ret
1✔
338
    try:
×
339
        linux_version = platform.release()
×
340
        # example '6.12.3-061203-generic'
341
        linux_version = linux_version.split('-')[0]
×
342
        return f"Linux {linux_version} (unknown distribution)"
×
343
    except Exception as e1:
×
344
        logger.debug("Error calling platform.release(): %s", e1)
×
345
        try:
×
346
            linux_version = os.uname().release
×
347
            # example '6.12.3-061203-generic'
348
            linux_version = linux_version.split('-')[0]
×
349
            return f"Linux {linux_version} (unknown distribution)"
×
350
        except Exception as e2:
×
351
            logger.debug("Error calling os.uname(): %s", e2)
×
352
    return "Linux (unknown version and distribution)"
×
353

354

355
def get_purgeable_locales(locales_to_keep):
1✔
356
    """Returns all locales to be purged"""
357
    if not locales_to_keep:
1✔
358
        raise RuntimeError('Found no locales to keep')
×
359

360
    assert isinstance(locales_to_keep, list)
1✔
361

362
    # Start with all locales as potentially purgeable
363
    purgeable_locales = set(native_locale_names.keys())
1✔
364

365
    # Remove the locales we want to keep
366
    for keep in locales_to_keep:
1✔
367
        purgeable_locales.discard(keep)
1✔
368
        # If keeping a variant (e.g. 'en_US'), also keep the base locale (e.g. 'en')
369
        if '_' in keep:
1✔
370
            purgeable_locales.discard(keep[:keep.find('_')])
1✔
371
        # If keeping a base locale (e.g. 'en'), also keep all its variants (e.g. 'en_US')
372
        if '_' not in keep:
1✔
373
            purgeable_locales = {locale for locale in purgeable_locales
1✔
374
                                 if not locale.startswith(keep + '_')}
375

376
    return frozenset(purgeable_locales)
1✔
377

378

379
def is_unregistered_mime(mimetype):
1✔
380
    """Returns True if the MIME type is known to be unregistered. If
381
    registered or unknown, conservatively returns False."""
382
    try:
×
383
        from bleachbit.GtkShim import Gio  # pylint: disable=import-outside-toplevel
×
384
        if 0 == len(Gio.app_info_get_all_for_type(mimetype)):
×
385
            return True
×
386
    except ImportError:
×
387
        logger.warning(
×
388
            'error calling gio.app_info_get_all_for_type(%s)', mimetype)
389
    return False
×
390

391

392
def is_broken_xdg_desktop(pathname):
1✔
393
    """Returns whether the given XDG .desktop file is critically broken.
394
    Reference: http://standards.freedesktop.org/desktop-entry-spec/latest/"""
395
    config = bleachbit.RawConfigParser()
1✔
396
    try:
1✔
397
        config.read(pathname)
1✔
398
    except UnicodeDecodeError:
1✔
399
        logger.info(
1✔
400
            "is_broken_xdg_menu: cannot decode file: '%s'", pathname)
401
        return True
1✔
402
    except (configparser.Error) as e:
1✔
403
        logger.info(
1✔
404
            "is_broken_xdg_menu: %s: '%s'", e, pathname)
405
        return True
1✔
406
    if not config.has_section('Desktop Entry'):
1✔
407
        logger.info(
1✔
408
            "is_broken_xdg_menu: missing required section 'Desktop Entry': '%s'", pathname)
409
        return True
1✔
410
    if not config.has_option('Desktop Entry', 'Type'):
1✔
411
        logger.info(
1✔
412
            "is_broken_xdg_menu: missing required option 'Type': '%s'", pathname)
413
        return True
1✔
414
    if not config.has_option('Desktop Entry', 'Name'):
1✔
415
        logger.info(
1✔
416
            "is_broken_xdg_menu: missing required option 'Name': '%s'", pathname)
417
        return True
1✔
418
    file_type = config.get('Desktop Entry', 'Type').strip().lower()
1✔
419
    if 'link' == file_type:
1✔
420
        if not config.has_option('Desktop Entry', 'URL') and \
1✔
421
                not config.has_option('Desktop Entry', 'URL[$e]'):
422
            logger.info(
1✔
423
                "is_broken_xdg_menu: missing required option 'URL': '%s'", pathname)
424
            return True
1✔
425
        return False
×
426
    if 'mimetype' == file_type:
1✔
427
        if not config.has_option('Desktop Entry', 'MimeType'):
×
428
            logger.info(
×
429
                "is_broken_xdg_menu: missing required option 'MimeType': '%s'", pathname)
430
            return True
×
431
        mimetype = config.get('Desktop Entry', 'MimeType').strip().lower()
×
432
        if is_unregistered_mime(mimetype):
×
433
            logger.info(
×
434
                "is_broken_xdg_menu: MimeType '%s' not registered '%s'", mimetype, pathname)
435
            return True
×
436
        return False
×
437
    if 'application' == file_type:
1✔
438
        return _is_broken_xdg_desktop_application(config, pathname)
1✔
439
    logger.warning("unhandled type '%s': file '%s'", file_type, pathname)
×
440
    return False
×
441

442

443
def rotated_logs():
1✔
444
    """Yield a list of rotated (i.e., old) logs in /var/log/
445

446
    See:
447
    https://bugs.launchpad.net/bleachbit/+bug/367575
448
    https://github.com/bleachbit/bleachbit/issues/1744
449
    """
450
    keep_lists = [re.compile(r'/var/log/(removed_)?(packages|scripts)'),
1✔
451
                  re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')]
452
    positive_re = re.compile(r'(\.(\d+|bz2|gz|xz|old)|\-\d{8}?)')
1✔
453

454
    for path in bleachbit.FileUtilities.children_in_directory('/var/log'):
1✔
455
        keep_list_match = False
1✔
456
        for keep_list in keep_lists:
1✔
457
            if keep_list.search(path) or bleachbit.FileUtilities.whitelisted(path):
1✔
458
                keep_list_match = True
1✔
459
                break
1✔
460
        if keep_list_match:
1✔
461
            continue
1✔
462
        if positive_re.search(path):
1✔
463
            yield path
1✔
464

465

466
def wine_to_linux_path(wineprefix, windows_pathname):
1✔
467
    """Return a Linux pathname from an absolute Windows pathname and Wine prefix"""
468
    drive_letter = windows_pathname[0]
1✔
469
    windows_pathname = windows_pathname.replace(drive_letter + ":",
1✔
470
                                                "drive_" + drive_letter.lower())
471
    windows_pathname = windows_pathname.replace("\\", "/")
1✔
472
    return os.path.join(wineprefix, windows_pathname)
1✔
473

474

475
def run_cleaner_cmd(cmd, args, freed_space_regex=r'[\d.]+[kMGTE]?B?', error_line_regexes=None):
1✔
476
    """Runs a specified command and returns how much space was (reportedly) freed.
477
    The subprocess shouldn't need any user input and the user should have the
478
    necessary rights.
479
    freed_space_regex gets applied to every output line, if the re matches,
480
    add values captured by the single group in the regex"""
481
    if not FileUtilities.exe_exists(cmd):
1✔
482
        raise RuntimeError(_('Executable not found: %s') % cmd)
1✔
483
    freed_space_regex = re.compile(freed_space_regex)
1✔
484
    error_line_regexes = [re.compile(regex)
1✔
485
                          for regex in error_line_regexes or []]
486

487
    env = {'LC_ALL': 'C', 'PATH': os.getenv('PATH')}
1✔
488
    output = subprocess.check_output([cmd] + args, stderr=subprocess.STDOUT,
1✔
489
                                     universal_newlines=True, env=env)
490
    freed_space = 0
1✔
491
    for line in output.split('\n'):
1✔
492
        m = freed_space_regex.match(line)
1✔
493
        if m is not None:
1✔
494
            freed_space += FileUtilities.human_to_bytes(m.group(1))
1✔
495
        for error_re in error_line_regexes:
1✔
496
            if error_re.search(line):
1✔
497
                raise RuntimeError('Invalid output from %s: %s' % (cmd, line))
1✔
498

499
    return freed_space
1✔
500

501

502
def journald_clean():
1✔
503
    """Clean the system journals"""
504
    try:
1✔
505
        return run_cleaner_cmd('journalctl', ['--vacuum-size=1'], JOURNALD_REGEX)
1✔
506
    except subprocess.CalledProcessError as e:
×
507
        raise RuntimeError(
×
508
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
509

510

511
def apt_autoremove():
1✔
512
    """Run 'apt-get autoremove' and return the size (un-rounded, in bytes) of freed space"""
513

514
    args = ['--yes', 'autoremove']
1✔
515
    # After this operation, 74.7MB disk space will be freed.
516
    # After this operation, 44.0 kB disk space will be freed.
517
    freed_space_regex = r'.*, ([\d.]+ ?[a-zA-Z]{2}) disk space will be freed.'
1✔
518
    try:
1✔
519
        return run_cleaner_cmd('apt-get', args, freed_space_regex, ['^E: '])
1✔
520
    except subprocess.CalledProcessError as e:
1✔
521
        raise RuntimeError(
1✔
522
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
523

524

525
def apt_autoclean():
1✔
526
    """Run 'apt-get autoclean' and return the size (un-rounded, in bytes) of freed space"""
527
    try:
1✔
528
        return run_cleaner_cmd('apt-get', ['autoclean'], r'^Del .*\[([\d.]+[a-zA-Z]{2})}]', ['^E: '])
1✔
529
    except subprocess.CalledProcessError as e:
1✔
530
        raise RuntimeError(
1✔
531
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
532

533

534
def apt_clean():
1✔
535
    """Run 'apt-get clean' and return the size in bytes of freed space"""
536
    old_size = get_apt_size()
×
537
    try:
×
538
        run_cleaner_cmd('apt-get', ['clean'], '^unused regex$', ['^E: '])
×
539
    except subprocess.CalledProcessError as e:
×
540
        raise RuntimeError(
×
541
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
542
    new_size = get_apt_size()
×
543
    return old_size - new_size
×
544

545

546
def get_apt_size():
1✔
547
    """Return the size of the apt cache (in bytes)"""
548
    (_rc, stdout, _stderr) = General.run_external(['apt-get', '-s', 'clean'])
1✔
549
    paths = re.findall(r'/[/a-z\.\*]+', stdout)
1✔
550
    return get_globs_size(paths)
1✔
551

552

553
def get_globs_size(paths):
1✔
554
    """Get the cumulative size (in bytes) of a list of globs"""
555
    total_size = 0
1✔
556
    for path in paths:
1✔
557
        for p in glob.iglob(path):
1✔
558
            total_size += FileUtilities.getsize(p)
1✔
559
    return total_size
1✔
560

561

562
def yum_clean():
1✔
563
    """Run 'yum clean all' and return size in bytes recovered"""
564
    if os.path.exists('/var/run/yum.pid'):
1✔
565
        msg = _(
×
566
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Yum"
567
        raise RuntimeError(msg)
×
568

569
    old_size = FileUtilities.getsizedir('/var/cache/yum')
1✔
570
    args = ['--enablerepo=*', 'clean', 'all']
1✔
571
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
572
    try:
1✔
573
        run_cleaner_cmd('yum', args, '^unused regex$', invalid)
1✔
574
    except subprocess.CalledProcessError as e:
1✔
575
        raise RuntimeError(
×
576
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
577
    new_size = FileUtilities.getsizedir('/var/cache/yum')
×
578
    return old_size - new_size
×
579

580

581
def dnf_clean():
1✔
582
    """Run 'dnf clean all' and return size in bytes recovered"""
583
    if os.path.exists('/var/run/dnf.pid'):
1✔
584
        msg = _(
×
585
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
586
        raise RuntimeError(msg)
×
587

588
    old_size = FileUtilities.getsizedir('/var/cache/dnf')
1✔
589
    args = ['--enablerepo=*', 'clean', 'all']
1✔
590
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
591
    try:
1✔
592
        run_cleaner_cmd('dnf', args, '^unused regex$', invalid)
1✔
593
    except subprocess.CalledProcessError as e:
1✔
594
        raise RuntimeError(
×
595
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
596
    new_size = FileUtilities.getsizedir('/var/cache/dnf')
×
597

598
    return old_size - new_size
×
599

600

601
units = {"B": 1, "k": 10**3, "M": 10**6, "G": 10**9}
1✔
602

603

604
def parse_size(size):
1✔
605
    """Parse the size returned by dnf"""
606
    number, unit = [string.strip() for string in size.split()]
1✔
607
    return int(float(number) * units[unit])
1✔
608

609

610
def dnf_autoremove():
1✔
611
    """Run 'dnf autoremove' and return size in bytes recovered."""
612
    if os.path.exists('/var/run/dnf.pid'):
1✔
613
        msg = _(
1✔
614
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
615
        raise RuntimeError(msg)
1✔
616
    cmd = ['dnf', '-y', 'autoremove']
1✔
617
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
618
    freed_bytes = 0
1✔
619
    allout = stdout + stderr
1✔
620
    if 'Error: This command has to be run under the root user.' in allout:
1✔
621
        raise RuntimeError('dnf autoremove requires root permissions')
×
622
    if rc > 0:
1✔
623
        raise RuntimeError(f'dnf autoremove raised error {rc}: {stderr}')
1✔
624

625
    cregex = re.compile(r"Freed space: ([\d.]+[\s]+[BkMG])")
1✔
626
    match = cregex.search(allout)
1✔
627
    if match:
1✔
628
        freed_bytes = parse_size(match.group(1))
1✔
629
    logger.debug(
1✔
630
        'dnf_autoremove >> total freed bytes: %s', freed_bytes)
631
    return freed_bytes
1✔
632

633

634
def pacman_cache():
1✔
635
    """Clean cache in pacman"""
636
    if os.path.exists('/var/lib/pacman/db.lck'):
1✔
637
        msg = _(
×
638
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "pacman"
639
        raise RuntimeError(msg)
×
640
    if not exe_exists('paccache'):
1✔
641
        raise RuntimeError('paccache not found')
1✔
642
    cmd = ['paccache', '-rk0']
1✔
643
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
644
    if rc > 0:
1✔
645
        raise RuntimeError(f'paccache raised error {rc}: {stderr}')
×
646
    # parse line like this: "==> finished: 3 packages removed (42.31 MiB freed)"
647
    cregex = re.compile(
1✔
648
        r"==> finished: ([\d.]+) packages removed \(([\d.]+\s+[BkMG]) freed\)")
649
    match = cregex.search(stdout)
1✔
650
    if match:
1✔
651
        return parse_size(match.group(2))
1✔
652
    return 0
×
653

654

655
def snap_parse_list(stdout):
1✔
656
    """Parse output of `snap list --all`"""
657
    disabled_snaps = []
1✔
658
    lines = stdout.strip().split('\n')
1✔
659
    if not lines:
1✔
660
        return disabled_snaps
×
661
    # Example output: "No snaps are installed yet. Try 'snap install hello-world'."
662
    raw_header = lines[0]
1✔
663
    header = raw_header.lower()
1✔
664
    if 'no snaps' in header and 'install' in header:
1✔
665
        return disabled_snaps
1✔
666
    if "name" not in header or "rev" not in header or "notes" not in header:
1✔
667
        logger.warning(
×
668
            "Unexpected 'snap list --all' output; returning 0. First line: %r", raw_header)
669
        return disabled_snaps
×
670
    for line in lines[1:]:  # Skip header line
1✔
671
        parts = line.split()
1✔
672
        if len(parts) >= 4 and 'disabled' in line:
1✔
673
            snapname = parts[0]
1✔
674
            revision = parts[2]
1✔
675
            disabled_snaps.append((snapname, revision))
1✔
676
    return disabled_snaps
1✔
677

678

679
def snap_disabled_full(really_delete):
1✔
680
    """Remove disabled snaps"""
681
    assert isinstance(really_delete, bool)
1✔
682
    if not exe_exists('snap'):
1✔
683
        raise RuntimeError('snap not found')
×
684

685
    # Get list of all snaps.
686
    cmd = ['snap', 'list', '--all']
1✔
687
    (rc, stdout, stderr) = General.run_external(cmd, clean_env=True)
1✔
688
    if rc > 0:
1✔
689
        raise RuntimeError(f'snap list raised error {rc}: {stderr}')
×
690

691
    # Parse output to find disabled snaps.
692
    disabled_snaps = snap_parse_list(stdout)
1✔
693
    if not disabled_snaps:
1✔
694
        return 0
1✔
695

696
    # Remove disabled snaps.
697
    total_freed = 0
×
698
    for snapname, revision in disabled_snaps:
×
699
        # `snap info` returns info only about active snaps.
700
        # Instead, get size from the snap file directly.
701
        snap_file = f'/var/lib/snapd/snaps/{snapname}_{revision}.snap'
×
702
        if os.path.exists(snap_file):
×
703
            snap_size = os.path.getsize(snap_file)
×
704
            logger.debug('Found snap file: %s, size: %s',
×
705
                         snap_file, f"{snap_size:,}")
706
        else:
707
            logger.warning('Could not find snap file: %s', snap_file)
×
708
            snap_size = 0
×
709

710
        # Remove the snap revision
711
        if really_delete:
×
712
            remove_cmd = ['snap', 'remove', snapname, f'--revision={revision}']
×
713
            (rc, _, remove_stderr) = General.run_external(
×
714
                remove_cmd, clean_env=True)
715
            if rc > 0:
×
716
                logger.warning(
×
717
                    'Failed to remove snap %s revision %s: %s', snapname, revision, remove_stderr)
718
                break
×
719
            else:
720
                total_freed += snap_size
×
721
                logger.debug(
×
722
                    'Removed snap %s revision %s, freed %s bytes', snapname, revision, snap_size)
723
        else:
724
            total_freed += snap_size
×
725

726
    return total_freed
×
727

728

729
def snap_disabled_clean():
1✔
730
    """Remove disabled snaps"""
731
    return snap_disabled_full(True)
×
732

733

734
def snap_disabled_preview():
1✔
735
    """Preview snaps that would be removed"""
736
    return snap_disabled_full(False)
1✔
737

738

739
def is_unix_display_protocol_wayland():
1✔
740
    """Return True if the display protocol is Wayland."""
741
    assert os.name == 'posix'
1✔
742
    if 'XDG_SESSION_TYPE' in os.environ:
1✔
743
        if os.environ['XDG_SESSION_TYPE'] == 'wayland':
1✔
744
            return True
1✔
745
        # If not wayland, then x11, mir, etc.
746
        return False
1✔
747
    if 'WAYLAND_DISPLAY' in os.environ:
1✔
748
        return True
1✔
749
    # Ubuntu 24.10 showed "ubuntu-xorg".
750
    # openSUSE Tumbleweed and Fedora 41 showed "gnome".
751
    # Fedora 41 also showed "plasma".
752
    if os.environ.get('DESKTOP_SESSION') in ('ubuntu-xorg', 'gnome', 'plasma'):
1✔
753
        return False
×
754
    # Wayland (Ubuntu 23.10) sets DISPLAY=:0 like x11, so do not check DISPLAY.
755
    try:
1✔
756
        (rc, stdout, _stderr) = General.run_external(['loginctl'])
1✔
757
    except FileNotFoundError:
1✔
758
        return False
1✔
759
    if rc != 0:
1✔
760
        logger.warning('logintctl returned rc %s', rc)
1✔
761
        return False
1✔
762
    try:
1✔
763
        session = stdout.split('\n')[1].strip().split(' ')[0]
1✔
764
    except (IndexError, ValueError):
1✔
765
        logger.warning('unexpected output from loginctl: %s', stdout)
1✔
766
        return False
1✔
767
    if not session.isdigit():
1✔
768
        logger.warning('unexpected session loginctl: %s', session)
1✔
769
        return False
1✔
770
    result = General.run_external(
1✔
771
        ['loginctl', 'show-session', session, '-p', 'Type'])
772
    return 'wayland' in result[1].lower()
1✔
773

774

775
def root_is_not_allowed_to_X_session():
1✔
776
    """Return True if root is not allowed to X session.
777

778
    This function is called only with root on Wayland.
779
    """
780
    assert os.name == 'posix'
1✔
781
    try:
1✔
782
        result = General.run_external(['xhost'], clean_env=False)
1✔
783
        xhost_returned_error = result[0] == 1
1✔
784
        return xhost_returned_error
1✔
785
    except (FileNotFoundError, OSError) as exc:
×
786
        logger.debug(
×
787
            'xhost check failed (%s); assuming root is not allowed to X session', exc)
788
        return True
×
789

790

791
def is_display_protocol_wayland_and_root_not_allowed():
1✔
792
    """Return True if the display protocol is Wayland and root is not allowed to X session"""
793
    try:
×
794
        is_wayland = bleachbit.Unix.is_unix_display_protocol_wayland()
×
795
    except Exception as e:
×
796
        logger.exception(e)
×
797
        return False
×
798
    return (
×
799
        is_wayland and
800
        os.environ.get('USER') == 'root' and
801
        bleachbit.Unix.root_is_not_allowed_to_X_session()
802
    )
803

804

805
locales = Locales()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc