• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bleachbit / bleachbit / 27391762247

12 Jun 2026 03:03AM UTC coverage: 74.407% (+0.2%) from 74.247%
27391762247

push

github

az0
Merge branch 'dev2'

34 of 60 new or added lines in 6 files covered. (56.67%)

775 existing lines in 17 files now uncovered.

7719 of 10374 relevant lines covered (74.41%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.91
/bleachbit/Unix.py
1
# SPDX-License-Identifier: GPL-3.0-or-later
2
# Copyright (c) 2008-2026 Andrew Ziem.
3
#
4
# This work is licensed under the terms of the GNU GPL, version 3 or
5
# later.  See the COPYING file in the top-level directory.
6

7
"""
8
Integration specific to Unix-like operating systems
9
"""
10

11
import configparser
1✔
12
import glob
1✔
13
import logging
1✔
14
import os
1✔
15
import platform
1✔
16
import re
1✔
17
import shlex
1✔
18
import subprocess
1✔
19

20
import bleachbit
1✔
21
from bleachbit import FileUtilities, General
1✔
22
from bleachbit.FileUtilities import children_in_directory, exe_exists
1✔
23
from bleachbit.Language import get_text as _, native_locale_names
1✔
24

25
logger = logging.getLogger(__name__)
1✔
26

27
# Cache for snapd_is_active() to avoid repeated systemctl calls.
28
_snapd_is_active_cache = None
1✔
29

30
try:
1✔
31
    Pattern = re.Pattern
1✔
UNCOV
32
except AttributeError:
×
UNCOV
33
    Pattern = re._pattern_type
×
34

35

36
JOURNALD_REGEX = r'^Vacuuming done, freed ([\d.]+[BKMGT]?) of archived journals (on disk|from [\w/]+).$'
1✔
37

38

39
class LocaleCleanerPath:
1✔
40
    """This represents a path with either a specific folder name or a folder name pattern.
41
    It also may contain several compiled regex patterns for localization items (folders or files)
42
    and additional LocaleCleanerPaths that get traversed when asked to supply a list of localization
43
    items"""
44

45
    def __init__(self, location):
1✔
46
        if location is None:
1✔
UNCOV
47
            raise RuntimeError("location is none")
×
48
        self.pattern = location
1✔
49
        self.children = []
1✔
50

51
    def add_child(self, child):
1✔
52
        """Adds a child LocaleCleanerPath"""
53
        self.children.append(child)
1✔
54
        return child
1✔
55

56
    def add_path_filter(self, pre, post):
1✔
57
        r"""Adds a filter consisting of a prefix and a postfix
58
        (e.g. 'foobar_' and '\.qm' to match 'foobar_en_US.utf-8.qm)"""
59
        try:
1✔
60
            regex = re.compile('^' + pre + Locales.localepattern + post + '$')
1✔
UNCOV
61
        except Exception as errormsg:
×
UNCOV
62
            raise RuntimeError(
×
63
                f"Malformed regex '{pre}' or '{post}': {errormsg}") from errormsg
64
        self.add_child(regex)
1✔
65

66
    def get_subpaths(self, basepath):
1✔
67
        """Returns direct subpaths for this object, i.e. either the named subfolder or all
68
        subfolders matching the pattern"""
69
        if isinstance(self.pattern, Pattern):
1✔
70
            return (os.path.join(basepath, p) for p in os.listdir(basepath)
1✔
71
                    if self.pattern.match(p) and os.path.isdir(os.path.join(basepath, p)))
72
        path = os.path.join(basepath, self.pattern)
1✔
73
        return [path] if os.path.isdir(path) else []
1✔
74

75
    def get_localizations(self, basepath):
1✔
76
        """Returns all localization items for this object and all descendant objects"""
77
        for path in self.get_subpaths(basepath):
1✔
78
            for child in self.children:
1✔
79
                if isinstance(child, LocaleCleanerPath):
1✔
80
                    yield from child.get_localizations(path)
1✔
81
                elif isinstance(child, Pattern):
1✔
82
                    for element in os.listdir(path):
1✔
83
                        match = child.match(element)
1✔
84
                        if match is not None:
1✔
85
                            yield (match.group('locale'),
1✔
86
                                   match.group('specifier'),
87
                                   os.path.join(path, element))
88

89

90
class Locales:
1✔
91
    """Find languages and localization files"""
92

93
    # The regular expression to match locale strings and extract the langcode.
94
    # See test_locale_regex() in tests/TestUnix.py for examples
95
    # This doesn't match all possible valid locale strings to avoid
96
    # matching filenames you might want to keep, e.g. the regex
97
    # to match jp.eucJP might also match jp.importantfileextension
98
    localepattern =\
1✔
99
        r'(?P<locale>[a-z]{2,3})' \
100
        r'(?P<specifier>[_-][A-Z]{2,4})?(?:\.[\w]+[\d-]+|@\w+)?' \
101
        r'(?P<encoding>[.-_](?:(?:ISO|iso|UTF|utf|us-ascii)[\d-]+|(?:euc|EUC)[A-Z]+))?'
102

103
    def __init__(self):
1✔
104
        self._paths = LocaleCleanerPath(location='/')
1✔
105

106
    def add_xml(self, xml_node, parent=None):
1✔
107
        """Parses the xml data and adds nodes to the LocaleCleanerPath-tree"""
108

109
        if parent is None:
1✔
110
            parent = self._paths
1✔
111
        if xml_node.ELEMENT_NODE != xml_node.nodeType:
1✔
112
            return
1✔
113

114
        # if a pattern is supplied, we recurse into all matching subdirectories
115
        if 'regexfilter' == xml_node.nodeName:
1✔
116
            pre = xml_node.getAttribute('prefix') or ''
1✔
117
            post = xml_node.getAttribute('postfix') or ''
1✔
118
            parent.add_path_filter(pre, post)
1✔
119
        elif 'path' == xml_node.nodeName:
1✔
120
            if xml_node.hasAttribute('directoryregex'):
1✔
121
                pattern = xml_node.getAttribute('directoryregex')
1✔
122
                if '/' in pattern:
1✔
UNCOV
123
                    raise RuntimeError(
×
124
                        'directoryregex may not contain slashes.')
125
                pattern = re.compile(pattern)
1✔
126
                parent = parent.add_child(LocaleCleanerPath(pattern))
1✔
127

128
            # a combination of directoryregex and filter could be too much
129
            else:
130
                if xml_node.hasAttribute("location"):
1✔
131
                    # if there's a filter attribute, it should apply to this path
132
                    parent = parent.add_child(LocaleCleanerPath(
1✔
133
                        xml_node.getAttribute('location')))
134

135
                if xml_node.hasAttribute('filter'):
1✔
136
                    userfilter = xml_node.getAttribute('filter')
1✔
137
                    if 1 != userfilter.count('*'):
1✔
UNCOV
138
                        raise RuntimeError(
×
139
                            f"Filter string '{userfilter}' must contain the placeholder * exactly once")
140

141
                    # we can't use re.escape, because it escapes too much
142
                    (pre, post) = (re.sub(r'([\[\]()^$.])', r'\\\1', p)
1✔
143
                                   for p in userfilter.split('*'))
144
                    parent.add_path_filter(pre, post)
1✔
145
        else:
UNCOV
146
            raise RuntimeError(
×
147
                f"Invalid node '{xml_node.nodeName}', expected '<path>' or '<regexfilter>'")
148

149
        # handle child nodes
150
        for child_xml in xml_node.childNodes:
1✔
151
            self.add_xml(child_xml, parent)
1✔
152

153
    def localization_paths(self, locales_to_keep):
1✔
154
        """Returns all localization items matching the previously added xml configuration"""
155
        purgeable_locales = get_purgeable_locales(locales_to_keep)
1✔
156

157
        for (locale, specifier, path) in self._paths.get_localizations('/'):
1✔
158
            specific = locale + (specifier or '')
1✔
159
            if specific in purgeable_locales or \
1✔
160
                    (locale in purgeable_locales and specific not in locales_to_keep):
161
                yield path
1✔
162

163

164
def _is_broken_xdg_desktop_application(config, desktop_pathname):
1✔
165
    """Returns whether application .desktop file is critically broken
166

167
    This function tests only .desktop files with Type=Application.
168
    """
169
    if not config.has_option('Desktop Entry', 'Exec'):
1✔
170
        logger.info(
1✔
171
            "is_broken_xdg_menu: missing required option 'Exec' in '%s'", desktop_pathname)
172
        return True
1✔
173
    exec_val = config.get('Desktop Entry', 'Exec')
1✔
174
    try:
1✔
175
        exec_parts = shlex.split(exec_val)
1✔
176
    except ValueError as e:
1✔
177
        # Malformed quoting in the Exec value. Per the XDG Desktop Entry
178
        # spec this is undefined behavior; be conservative and keep the
179
        # file rather than risk deleting a working launcher.
180
        logger.warning(
1✔
181
            "is_broken_xdg_menu: cannot parse 'Exec' key (%s) in '%s'", e, desktop_pathname)
182
        return False
1✔
183
    if not exec_parts:
1✔
184
        logger.info(
1✔
185
            "is_broken_xdg_menu: empty 'Exec' value in '%s'", desktop_pathname)
186
        return True
1✔
187
    exe = exec_parts[0]
1✔
188
    if not os.path.isabs(exe) and not os.environ.get('PATH'):
1✔
189
        raise RuntimeError(
×
190
            f"Cannot find executable '{exe}' because PATH environment variable is not set")
191
    if not FileUtilities.exe_exists(exe):
1✔
UNCOV
192
        logger.info(
×
193
            "is_broken_xdg_menu: executable '%s' does not exist in '%s'", exe, desktop_pathname)
UNCOV
194
        return True
×
195
    if 'env' == exe:
1✔
196
        # Wine v1.0 creates .desktop files like this
197
        # Exec=env WINEPREFIX="/home/z/.wine" wine "C:\\Program
198
        # Files\\foo\\foo.exe"
199
        execs = list(exec_parts)
1✔
200
        wineprefix = None
1✔
201
        del execs[0]
1✔
202
        while True:
203
            if execs[0].find("=") < 0:
1✔
204
                break
1✔
205
            (name, value) = execs[0].split("=")
1✔
206
            if name == 'WINEPREFIX':
1✔
207
                wineprefix = value
1✔
208
            del execs[0]
1✔
209
        if not FileUtilities.exe_exists(execs[0]):
1✔
210
            logger.info(
1✔
211
                "is_broken_xdg_menu: executable '%s' does not exist in '%s'", execs[0], desktop_pathname)
212
            return True
1✔
213
        # check the Windows executable exists
214
        if wineprefix:
1✔
215
            windows_exe = wine_to_linux_path(wineprefix, execs[1])
1✔
216
            if not os.path.exists(windows_exe):
1✔
217
                logger.info("is_broken_xdg_menu: Windows executable '%s' does not exist in '%s'",
1✔
218
                            windows_exe, desktop_pathname)
219
                return True
1✔
220
    return False
1✔
221

222

223
def find_available_locales():
1✔
224
    """Returns a list of available locales using locale -a"""
225
    rc, stdout, stderr = General.run_external(['locale', '-a'])
1✔
226
    if rc == 0:
1✔
227
        return stdout.strip().split('\n')
1✔
228

UNCOV
229
    logger.warning("Failed to get available locales: %s", stderr)
×
UNCOV
230
    return []
×
231

232

233
def find_best_locale(user_locale):
1✔
234
    """Find closest match to available locales"""
235
    assert isinstance(user_locale, str)
1✔
236
    if not user_locale:
1✔
237
        return 'C'
1✔
238
    if user_locale in ('C', 'C.utf8', 'POSIX'):
1✔
239
        return user_locale
1✔
240
    available_locales = find_available_locales()
1✔
241

242
    # If requesting a language like 'es' and current locale is compatible
243
    # like 'es_MX', then return that.
244
    # Import here for mock patch.
245
    import locale  # pylint: disable=import-outside-toplevel
1✔
246
    current_locale = locale.getlocale()[0]
1✔
247
    if current_locale and current_locale.startswith(user_locale.split('.')[0]):
1✔
248
        return '.'.join(locale.getlocale())
1✔
249

250
    # Check for exact match.
251
    if user_locale in available_locales:
1✔
252
        return user_locale
1✔
253

254
    # Next, match like 'en' to 'en_US.utf8' (if available) because
255
    # of preference for UTF-8.
256
    for avail_locale in available_locales:
1✔
257
        if avail_locale.startswith(user_locale) and avail_locale.endswith('.utf8'):
1✔
258
            return avail_locale
1✔
259

260
    # Next, match like 'en' to 'en_US' or 'en_US.iso88591'.
261
    for avail_locale in available_locales:
1✔
262
        if avail_locale.startswith(user_locale):
1✔
263
            return avail_locale
1✔
264

265
    return 'C'
1✔
266

267

268
def get_distribution_name_version_platform_freedesktop():
1✔
269
    """Returns the name and version of the distribution using
270
    platform.freedesktop_os_release()
271

272
    Example return value: 'ubuntu 24.10' or None
273

274
    Python 3.10 added platform.freedesktop_os_release().
275
    """
276
    if hasattr(platform, 'freedesktop_os_release'):
1✔
277
        try:
×
278
            release = platform.freedesktop_os_release()
×
279
        except FileNotFoundError:
×
280
            return None
×
281
        dist_id = release.get('ID')
×
UNCOV
282
        dist_version_id = release.get('VERSION_ID')
×
UNCOV
283
        if dist_id and dist_version_id:
×
UNCOV
284
            return f"{dist_id} {dist_version_id}"
×
285
    return None
1✔
286

287

288
def get_distribution_name_version_distro():
1✔
289
    """Returns the name and version of the distribution using the distro
290
    package
291

292
    Example return value: 'ubuntu 24.10' or None
293

294
    distro is a third-party package recommended here:
295
    https://docs.python.org/3.7/library/platform.html
296
    """
297
    try:
1✔
298
        # Import here in case of ImportError.
299
        import distro  # pylint: disable=import-outside-toplevel
1✔
300
        # example 'ubuntu 24.10'
UNCOV
301
        return distro.id() + ' ' + distro.version()
×
302
    except ImportError:
1✔
303
        return None
1✔
304

305

306
def get_distribution_name_version_os_release():
1✔
307
    """Returns the name and version of the distribution using /etc/os-release
308

309
    Example return value: 'ubuntu 24.10' or None
310
    """
311
    if not os.path.exists('/etc/os-release'):
1✔
UNCOV
312
        return None
×
313
    try:
1✔
314
        with open('/etc/os-release', 'r', encoding='utf-8') as f:
1✔
315
            os_release = {}
1✔
316
            for line in f:
1✔
317
                if '=' in line:
1✔
318
                    key, value = line.rstrip().split('=', 1)
1✔
319
                    os_release[key] = value.strip('"\'')
1✔
UNCOV
320
    except Exception as e:
×
UNCOV
321
        logger.debug("Error reading /etc/os-release: %s", e)
×
UNCOV
322
        return None
×
323
    if 'ID' in os_release and 'VERSION_ID' in os_release:
1✔
324
        dist_name = os_release['ID']
1✔
325
        return f"{dist_name} {os_release['VERSION_ID']}"
1✔
UNCOV
326
    return None
×
327

328

329
def get_distribution_name_version():
1✔
330
    """Returns the name and version of the distribution
331

332
    Depending on system capabilities, return value may be:
333
    * 'ubuntu 24.10'
334
    * 'Linux 6.12.3 (unknown distribution)'
335
    * 'Linux (unknown version and distribution)'
336

337
    Python 3.7 had platform.linux_distribution(), but it
338
    was removed in Python 3.8.
339
    """
340
    ret = get_distribution_name_version_platform_freedesktop()
1✔
341
    if ret:
1✔
342
        return ret
×
343
    ret = get_distribution_name_version_distro()
1✔
344
    if ret:
1✔
UNCOV
345
        return ret
×
346
    ret = get_distribution_name_version_os_release()
1✔
347
    if ret:
1✔
348
        return ret
1✔
349
    try:
×
350
        linux_version = platform.release()
×
351
        # example '6.12.3-061203-generic'
352
        linux_version = linux_version.split('-')[0]
×
353
        return f"Linux {linux_version} (unknown distribution)"
×
354
    except Exception as e1:
×
UNCOV
355
        logger.debug("Error calling platform.release(): %s", e1)
×
356
        try:
×
357
            linux_version = os.uname().release
×
358
            # example '6.12.3-061203-generic'
359
            linux_version = linux_version.split('-')[0]
×
360
            return f"Linux {linux_version} (unknown distribution)"
×
UNCOV
361
        except Exception as e2:
×
UNCOV
362
            logger.debug("Error calling os.uname(): %s", e2)
×
UNCOV
363
    return "Linux (unknown version and distribution)"
×
364

365

366
def get_mount_points():
1✔
367
    """Return read-write mount points that may have trash"""
368
    try:
1✔
369
        import psutil # pylint: disable=import-outside-toplevel
1✔
UNCOV
370
    except ImportError:
×
UNCOV
371
        logger.warning('install psutil for better trash detection')
×
UNCOV
372
        return []
×
373
    mount_points = []
1✔
374
    try:
1✔
375
        for partition in psutil.disk_partitions():
1✔
376
            mountpoint = partition.mountpoint
1✔
377
            if re.match(r'^/(proc|sys|dev|run|boot)', mountpoint):
1✔
378
                continue
1✔
379
            if 'ro' in partition.opts.split(','):
1✔
380
                continue
1✔
381
            mount_points.append(mountpoint)
1✔
UNCOV
382
    except (OSError, psutil.Error) as e:
×
UNCOV
383
        logger.warning("Error getting mount points: %s", e)
×
384
    return mount_points
1✔
385

386
def get_purgeable_locales(locales_to_keep):
1✔
387
    """Returns all locales to be purged"""
388
    if not locales_to_keep:
1✔
UNCOV
389
        raise RuntimeError('Found no locales to keep')
×
390

391
    assert isinstance(locales_to_keep, list)
1✔
392

393
    # Start with all locales as potentially purgeable
394
    purgeable_locales = set(native_locale_names.keys())
1✔
395

396
    # Remove the locales we want to keep
397
    for keep in locales_to_keep:
1✔
398
        purgeable_locales.discard(keep)
1✔
399
        # If keeping a variant (e.g. 'en_US'), also keep the base locale (e.g. 'en')
400
        if '_' in keep:
1✔
401
            purgeable_locales.discard(keep[:keep.find('_')])
1✔
402
        # If keeping a base locale (e.g. 'en'), also keep all its variants (e.g. 'en_US')
403
        if '_' not in keep:
1✔
404
            purgeable_locales = {locale for locale in purgeable_locales
1✔
405
                                 if not locale.startswith(keep + '_')}
406

407
    return frozenset(purgeable_locales)
1✔
408

409

410
def get_trash_paths():
1✔
411
    """Iterate over all trash on POSIX systems"""
412
    # Import here to avoid a circular import.
413
    # pylint: disable=import-outside-toplevel
414
    from bleachbit import Command
1✔
415
    # macOS-style flat trash (non-recursive)
416
    dirname = os.path.expanduser("~/.Trash")
1✔
417
    for filename in children_in_directory(dirname, False):
1✔
UNCOV
418
        yield Command.Delete(filename)
×
419
    # Freedesktop trash spec directories
420
    # https://specifications.freedesktop.org/trash-spec/trashspec-1.0.html
421
    home_trash = os.path.join(
1✔
422
        os.environ.get('XDG_DATA_HOME', os.path.expanduser('~/.local/share')),
423
        'Trash')
424
    fallback_trash = os.path.expanduser('~/.local/share/Trash')
1✔
425
    trash_dirs = [home_trash]
1✔
426
    if home_trash != fallback_trash:
1✔
427
        trash_dirs.append(fallback_trash)
1✔
428
    # Snap apps store trash under ~/snap/<app>/<revision>/.local/share/Trash
429
    for d in glob.glob(os.path.expanduser("~/snap/*/*/.local/share/Trash")):
1✔
430
        # Do not follow revision symlinks. For example, current -> 238 would match twice.
431
        rev_dir = os.path.dirname(os.path.dirname(os.path.dirname(d)))
1✔
432
        if not os.path.islink(rev_dir):
1✔
433
            trash_dirs.append(d)
1✔
434
    # Per-mountpoint trash (method 1: .Trash/$uid, method 2: .Trash-$uid)
435
    uid = os.getuid()
1✔
436
    for mountpoint in get_mount_points():
1✔
437
        trash_dirs.append(os.path.join(mountpoint, '.Trash', str(uid)))
1✔
438
        trash_dirs.append(os.path.join(mountpoint, f'.Trash-{uid}'))
1✔
439
    # Deduplicate while preserving order
440
    seen = set()
1✔
441
    for trash_dir in trash_dirs:
1✔
442
        real_path = os.path.realpath(trash_dir)
1✔
443
        if real_path in seen:
1✔
UNCOV
444
            continue
×
445
        seen.add(real_path)
1✔
446
        for subdir in ('files', 'info', 'expunged'):
1✔
447
            dirname = os.path.join(trash_dir, subdir)
1✔
448
            for filename in children_in_directory(dirname, True):
1✔
449
                yield Command.Delete(filename)
1✔
450

451
def is_unregistered_mime(mimetype):
1✔
452
    """Returns True if the MIME type is known to be unregistered. If
453
    registered or unknown, conservatively returns False."""
454
    try:
×
455
        from bleachbit.GtkShim import Gio  # pylint: disable=import-outside-toplevel
×
456
        if 0 == len(Gio.app_info_get_all_for_type(mimetype)):
×
UNCOV
457
            return True
×
458
    except ImportError:
×
UNCOV
459
        logger.warning(
×
460
            'error calling gio.app_info_get_all_for_type(%s)', mimetype)
UNCOV
461
    return False
×
462

463

464
def is_broken_xdg_desktop(pathname):
1✔
465
    """Returns whether the given XDG .desktop file is critically broken.
466
    Reference: http://standards.freedesktop.org/desktop-entry-spec/latest/"""
467
    config = bleachbit.RawConfigParser()
1✔
468
    try:
1✔
469
        config.read(pathname)
1✔
470
    except UnicodeDecodeError:
1✔
471
        logger.info(
1✔
472
            "is_broken_xdg_menu: cannot decode file: '%s'", pathname)
473
        return True
1✔
474
    except (configparser.Error) as e:
1✔
475
        logger.info(
1✔
476
            "is_broken_xdg_menu: %s: '%s'", e, pathname)
477
        return True
1✔
478
    if not config.has_section('Desktop Entry'):
1✔
479
        logger.info(
1✔
480
            "is_broken_xdg_menu: missing required section 'Desktop Entry': '%s'", pathname)
481
        return True
1✔
482
    if not config.has_option('Desktop Entry', 'Type'):
1✔
483
        logger.info(
1✔
484
            "is_broken_xdg_menu: missing required option 'Type': '%s'", pathname)
485
        return True
1✔
486
    if not config.has_option('Desktop Entry', 'Name'):
1✔
487
        logger.info(
1✔
488
            "is_broken_xdg_menu: missing required option 'Name': '%s'", pathname)
489
        return True
1✔
490
    file_type = config.get('Desktop Entry', 'Type').strip().lower()
1✔
491
    if 'link' == file_type:
1✔
492
        if not config.has_option('Desktop Entry', 'URL') and \
1✔
493
                not config.has_option('Desktop Entry', 'URL[$e]'):
494
            logger.info(
1✔
495
                "is_broken_xdg_menu: missing required option 'URL': '%s'", pathname)
496
            return True
1✔
497
        return False
×
498
    if 'mimetype' == file_type:
1✔
499
        if not config.has_option('Desktop Entry', 'MimeType'):
×
500
            logger.info(
×
501
                "is_broken_xdg_menu: missing required option 'MimeType': '%s'", pathname)
502
            return True
×
UNCOV
503
        mimetype = config.get('Desktop Entry', 'MimeType').strip().lower()
×
504
        if is_unregistered_mime(mimetype):
×
505
            logger.info(
×
506
                "is_broken_xdg_menu: MimeType '%s' not registered '%s'", mimetype, pathname)
UNCOV
507
            return True
×
508
        return False
×
509
    if 'application' == file_type:
1✔
510
        return _is_broken_xdg_desktop_application(config, pathname)
1✔
UNCOV
511
    logger.warning("unhandled type '%s': file '%s'", file_type, pathname)
×
UNCOV
512
    return False
×
513

514

515
def rotated_logs():
1✔
516
    """Yield a list of rotated (i.e., old) logs in /var/log/
517

518
    See:
519
    https://bugs.launchpad.net/bleachbit/+bug/367575
520
    https://github.com/bleachbit/bleachbit/issues/1744
521
    """
522
    keep_lists = [re.compile(r'/var/log/(removed_)?(packages|scripts)'),
1✔
523
                  re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')]
524
    positive_re = re.compile(r'(\.(\d+|bz2|gz|xz|old)|\-\d{8}?)')
1✔
525

526
    for path in bleachbit.FileUtilities.children_in_directory('/var/log'):
1✔
527
        keep_list_match = False
1✔
528
        for keep_list in keep_lists:
1✔
529
            if keep_list.search(path) or bleachbit.FileUtilities.whitelisted(path):
1✔
530
                keep_list_match = True
1✔
531
                break
1✔
532
        if keep_list_match:
1✔
533
            continue
1✔
534
        if positive_re.search(path):
1✔
535
            yield path
1✔
536

537

538
def wine_to_linux_path(wineprefix, windows_pathname):
1✔
539
    """Return a Linux pathname from an absolute Windows pathname and Wine prefix"""
540
    drive_letter = windows_pathname[0]
1✔
541
    windows_pathname = windows_pathname.replace(drive_letter + ":",
1✔
542
                                                "drive_" + drive_letter.lower())
543
    windows_pathname = windows_pathname.replace("\\", "/")
1✔
544
    return os.path.join(wineprefix, windows_pathname)
1✔
545

546

547
def run_cleaner_cmd(cmd, args, freed_space_regex=r'[\d.]+[kMGTE]?B?', error_line_regexes=None):
1✔
548
    """Runs a specified command and returns how much space was (reportedly) freed.
549
    The subprocess shouldn't need any user input and the user should have the
550
    necessary rights.
551
    freed_space_regex gets applied to every output line, if the re matches,
552
    add values captured by the single group in the regex"""
553
    if not FileUtilities.exe_exists(cmd):
1✔
554
        raise RuntimeError(_('Executable not found: %s') % cmd)
1✔
555
    freed_space_regex = re.compile(freed_space_regex)
1✔
556
    error_line_regexes = [re.compile(regex)
1✔
557
                          for regex in error_line_regexes or []]
558

559
    env = {'LC_ALL': 'C', 'PATH': os.getenv('PATH')}
1✔
560
    output = subprocess.check_output([cmd] + args, stderr=subprocess.STDOUT,
1✔
561
                                     universal_newlines=True, env=env)
562
    freed_space = 0
1✔
563
    for line in output.split('\n'):
1✔
564
        m = freed_space_regex.match(line)
1✔
565
        if m is not None:
1✔
566
            freed_space += FileUtilities.human_to_bytes(m.group(1))
1✔
567
        for error_re in error_line_regexes:
1✔
568
            if error_re.search(line):
1✔
569
                raise RuntimeError('Invalid output from %s: %s' % (cmd, line))
1✔
570

571
    return freed_space
1✔
572

573

574
def journald_clean():
1✔
575
    """Clean the system journals"""
576
    try:
1✔
577
        return run_cleaner_cmd('journalctl', ['--vacuum-size=1'], JOURNALD_REGEX)
1✔
UNCOV
578
    except subprocess.CalledProcessError as e:
×
UNCOV
579
        raise RuntimeError(
×
580
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
581

582

583
def apt_autoremove():
1✔
584
    """Run 'apt-get autoremove' and return the size (un-rounded, in bytes) of freed space"""
585

586
    args = ['--yes', 'autoremove']
1✔
587
    # After this operation, 74.7MB disk space will be freed.
588
    # After this operation, 44.0 kB disk space will be freed.
589
    freed_space_regex = r'.*, ([\d.]+ ?[a-zA-Z]{2}) disk space will be freed.'
1✔
590
    try:
1✔
591
        return run_cleaner_cmd('apt-get', args, freed_space_regex, ['^E: '])
1✔
592
    except subprocess.CalledProcessError as e:
1✔
593
        raise RuntimeError(
1✔
594
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
595

596

597
def apt_autoclean():
1✔
598
    """Run 'apt-get autoclean' and return the size (un-rounded, in bytes) of freed space"""
599
    try:
1✔
600
        return run_cleaner_cmd('apt-get', ['autoclean'], r'^Del .*\[([\d.]+[a-zA-Z]{2})}]', ['^E: '])
1✔
601
    except subprocess.CalledProcessError as e:
1✔
602
        raise RuntimeError(
1✔
603
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
604

605

606
def apt_clean():
1✔
607
    """Run 'apt-get clean' and return the size in bytes of freed space"""
608
    old_size = get_apt_size()
×
609
    try:
×
UNCOV
610
        run_cleaner_cmd('apt-get', ['clean'], '^unused regex$', ['^E: '])
×
611
    except subprocess.CalledProcessError as e:
×
612
        raise RuntimeError(
×
613
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
UNCOV
614
    new_size = get_apt_size()
×
UNCOV
615
    return old_size - new_size
×
616

617

618
def get_apt_size():
1✔
619
    """Return the size of the apt cache (in bytes)"""
620
    (_rc, stdout, _stderr) = General.run_external(['apt-get', '-s', 'clean'])
1✔
621
    paths = re.findall(r'/[/a-z\.\*]+', stdout)
1✔
622
    return get_globs_size(paths)
1✔
623

624

625
def get_globs_size(paths):
1✔
626
    """Get the cumulative size (in bytes) of a list of globs"""
627
    total_size = 0
1✔
628
    for path in paths:
1✔
629
        for p in glob.iglob(path):
1✔
630
            total_size += FileUtilities.getsize(p)
1✔
631
    return total_size
1✔
632

633

634
def yum_clean():
1✔
635
    """Run 'yum clean all' and return size in bytes recovered"""
636
    if os.path.exists('/var/run/yum.pid'):
1✔
UNCOV
637
        msg = _(
×
638
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Yum"
UNCOV
639
        raise RuntimeError(msg)
×
640

641
    old_size = FileUtilities.getsizedir('/var/cache/yum')
1✔
642
    args = ['--enablerepo=*', 'clean', 'all']
1✔
643
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
644
    try:
1✔
645
        run_cleaner_cmd('yum', args, '^unused regex$', invalid)
1✔
646
    except subprocess.CalledProcessError as e:
1✔
647
        raise RuntimeError(
×
648
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
UNCOV
649
    new_size = FileUtilities.getsizedir('/var/cache/yum')
×
UNCOV
650
    return old_size - new_size
×
651

652

653
def dnf_clean():
1✔
654
    """Run 'dnf clean all' and return size in bytes recovered"""
655
    if os.path.exists('/var/run/dnf.pid'):
1✔
UNCOV
656
        msg = _(
×
657
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
UNCOV
658
        raise RuntimeError(msg)
×
659

660
    old_size = FileUtilities.getsizedir('/var/cache/dnf')
1✔
661
    args = ['--enablerepo=*', 'clean', 'all']
1✔
662
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
663
    try:
1✔
664
        run_cleaner_cmd('dnf', args, '^unused regex$', invalid)
1✔
665
    except subprocess.CalledProcessError as e:
1✔
UNCOV
666
        raise RuntimeError(
×
667
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
UNCOV
668
    new_size = FileUtilities.getsizedir('/var/cache/dnf')
×
669

UNCOV
670
    return old_size - new_size
×
671

672

673
units = {"B": 1, "k": 10**3, "M": 10**6, "G": 10**9}
1✔
674

675

676
def parse_size(size):
1✔
677
    """Parse the size returned by dnf"""
678
    number, unit = [string.strip() for string in size.split()]
1✔
679
    return int(float(number) * units[unit])
1✔
680

681

682
def dnf_autoremove():
1✔
683
    """Run 'dnf autoremove' and return size in bytes recovered."""
684
    if os.path.exists('/var/run/dnf.pid'):
1✔
685
        msg = _(
1✔
686
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
687
        raise RuntimeError(msg)
1✔
688
    cmd = ['dnf', '-y', 'autoremove']
1✔
689
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
690
    freed_bytes = 0
1✔
691
    allout = stdout + stderr
1✔
692
    if 'Error: This command has to be run under the root user.' in allout:
1✔
UNCOV
693
        raise RuntimeError('dnf autoremove requires root permissions')
×
694
    if rc > 0:
1✔
695
        raise RuntimeError(f'dnf autoremove raised error {rc}: {stderr}')
1✔
696

697
    cregex = re.compile(r"Freed space: ([\d.]+[\s]+[BkMG])")
1✔
698
    match = cregex.search(allout)
1✔
699
    if match:
1✔
700
        freed_bytes = parse_size(match.group(1))
1✔
701
    logger.debug(
1✔
702
        'dnf_autoremove >> total freed bytes: %s', freed_bytes)
703
    return freed_bytes
1✔
704

705

706
def pacman_cache():
1✔
707
    """Clean cache in pacman"""
708
    if os.path.exists('/var/lib/pacman/db.lck'):
1✔
UNCOV
709
        msg = _(
×
710
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "pacman"
UNCOV
711
        raise RuntimeError(msg)
×
712
    if not exe_exists('paccache'):
1✔
713
        raise RuntimeError('paccache not found')
1✔
714
    cmd = ['paccache', '-rk0']
1✔
715
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
716
    if rc > 0:
1✔
UNCOV
717
        raise RuntimeError(f'paccache raised error {rc}: {stderr}')
×
718
    # parse line like this: "==> finished: 3 packages removed (42.31 MiB freed)"
719
    cregex = re.compile(
1✔
720
        r"==> finished: ([\d.]+) packages removed \(([\d.]+\s+[BkMG]) freed\)")
721
    match = cregex.search(stdout)
1✔
722
    if match:
1✔
723
        return parse_size(match.group(2))
1✔
UNCOV
724
    return 0
×
725

726

727
def snap_parse_list(stdout):
1✔
728
    """Parse output of `snap list --all`"""
729
    disabled_snaps = []
1✔
730
    lines = stdout.strip().split('\n')
1✔
731
    if not lines:
1✔
UNCOV
732
        return disabled_snaps
×
733
    # Example output: "No snaps are installed yet. Try 'snap install hello-world'."
734
    raw_header = lines[0]
1✔
735
    header = raw_header.lower()
1✔
736
    if 'no snaps' in header and 'install' in header:
1✔
737
        return disabled_snaps
1✔
738
    if "name" not in header or "rev" not in header or "notes" not in header:
1✔
UNCOV
739
        logger.warning(
×
740
            "Unexpected 'snap list --all' output; returning 0. First line: %r", raw_header)
UNCOV
741
        return disabled_snaps
×
742
    for line in lines[1:]:  # Skip header line
1✔
743
        parts = line.split()
1✔
744
        if len(parts) >= 4 and 'disabled' in line:
1✔
745
            snapname = parts[0]
1✔
746
            revision = parts[2]
1✔
747
            disabled_snaps.append((snapname, revision))
1✔
748
    return disabled_snaps
1✔
749

750

751
def snapd_is_active():
1✔
752
    """Return True if snap is installed and snapd is active.
753

754
    The result is cached in a module-level variable to avoid repeated
755
    systemctl calls during a single BleachBit run.
756
    """
757
    global _snapd_is_active_cache  # pylint: disable=global-statement
758
    if _snapd_is_active_cache is not None:
1✔
759
        return _snapd_is_active_cache
1✔
760
    if not exe_exists('snap'):
1✔
761
        _snapd_is_active_cache = False
1✔
762
        return False
1✔
763
    if not exe_exists('systemctl'):
1✔
764
        _snapd_is_active_cache = False
1✔
765
        return False
1✔
766
    # When snap is installed but snapd is inactive, then `snap list --all`
767
    # or `snap version` may have a long delay, so we check the service status first.
768
    try:
1✔
769
        (rc, _stdout, _stderr) = General.run_external(
1✔
770
            ['systemctl', 'is-active', '--quiet', 'snapd.socket'],
771
            timeout=5)
772
    except subprocess.TimeoutExpired:
1✔
773
        logger.warning(
1✔
774
            'systemctl is-active snapd.socket timed out: it seems snap is installed but snapd is inactive')
775
        _snapd_is_active_cache = False
1✔
776
        return False
1✔
777
    except (FileNotFoundError, OSError) as exc:
×
UNCOV
778
        logger.warning('systemctl is-active snapd.socket failed: %s', exc)
×
UNCOV
779
        _snapd_is_active_cache = False
×
780
        return False
×
781
    _snapd_is_active_cache = rc == 0
1✔
782
    return _snapd_is_active_cache
1✔
783

784

785
def clear_snapd_cache():
1✔
786
    """Clear the snapd_is_active() cache."""
787
    global _snapd_is_active_cache  # pylint: disable=global-statement
788
    _snapd_is_active_cache = None
1✔
789

790

791
def snap_disabled_full(really_delete):
1✔
792
    """Remove disabled snaps"""
793
    assert isinstance(really_delete, bool)
1✔
794
    if not snapd_is_active():
1✔
795
        raise RuntimeError('snap not found or snapd is not active')
×
796

797
    # Get list of all snaps.
798
    cmd = ['snap', 'list', '--all']
1✔
799
    try:
1✔
800
        (rc, stdout, stderr) = General.run_external(
1✔
801
            cmd, clean_env=True, timeout=15)
802
    except subprocess.TimeoutExpired as exc:
1✔
803
        raise RuntimeError(
1✔
804
            'snap list --all timed out after 15 seconds; is snapd running?') from exc
805
    if rc > 0:
1✔
UNCOV
806
        raise RuntimeError(f'snap list raised error {rc}: {stderr}')
×
807

808
    # Parse output to find disabled snaps.
809
    disabled_snaps = snap_parse_list(stdout)
1✔
810
    if not disabled_snaps:
1✔
811
        return 0
1✔
812

813
    # Remove disabled snaps.
UNCOV
814
    total_freed = 0
×
UNCOV
815
    for snapname, revision in disabled_snaps:
×
816
        # `snap info` returns info only about active snaps.
817
        # Instead, get size from the snap file directly.
UNCOV
818
        snap_file = f'/var/lib/snapd/snaps/{snapname}_{revision}.snap'
×
UNCOV
819
        if os.path.exists(snap_file):
×
UNCOV
820
            snap_size = os.path.getsize(snap_file)
×
UNCOV
821
            logger.debug('Found snap file: %s, size: %s',
×
822
                         snap_file, f"{snap_size:,}")
823
        else:
UNCOV
824
            logger.warning('Could not find snap file: %s', snap_file)
×
UNCOV
825
            snap_size = 0
×
826

827
        # Remove the snap revision
UNCOV
828
        if really_delete:
×
829
            # Consider there may be a slow system with a large snap.
UNCOV
830
            remove_cmd = ['snap', 'remove', snapname, f'--revision={revision}']
×
UNCOV
831
            try:
×
UNCOV
832
                (rc, _, remove_stderr) = General.run_external(
×
833
                    remove_cmd, clean_env=True, timeout=60)
UNCOV
834
            except subprocess.TimeoutExpired:
×
UNCOV
835
                logger.error(
×
836
                    'Timeout removing snap %s revision %s', snapname, revision)
UNCOV
837
                break
×
UNCOV
838
            if rc > 0:
×
UNCOV
839
                logger.error(
×
840
                    'Failed to remove snap %s revision %s: %s', snapname, revision, remove_stderr)
UNCOV
841
                break
×
UNCOV
842
            total_freed += snap_size
×
UNCOV
843
            logger.debug(
×
844
                'Removed snap %s revision %s, freed %s bytes', snapname, revision, snap_size)
845
        else:
UNCOV
846
            total_freed += snap_size
×
847

UNCOV
848
    return total_freed
×
849

850

851
def snap_disabled_clean():
1✔
852
    """Remove disabled snaps"""
UNCOV
853
    return snap_disabled_full(True)
×
854

855

856
def snap_disabled_preview():
1✔
857
    """Preview snaps that would be removed"""
858
    return snap_disabled_full(False)
1✔
859

860

861
def is_unix_display_protocol_wayland():
1✔
862
    """Return True if the display protocol is Wayland."""
863
    assert os.name == 'posix'
1✔
864
    if 'XDG_SESSION_TYPE' in os.environ:
1✔
865
        if os.environ['XDG_SESSION_TYPE'] == 'wayland':
1✔
866
            return True
1✔
867
        # If not wayland, then x11, mir, etc.
868
        return False
1✔
869
    if 'WAYLAND_DISPLAY' in os.environ:
1✔
870
        return True
1✔
871
    # Ubuntu 24.10 showed "ubuntu-xorg".
872
    # openSUSE Tumbleweed and Fedora 41 showed "gnome".
873
    # Fedora 41 also showed "plasma".
874
    if os.environ.get('DESKTOP_SESSION') in ('ubuntu-xorg', 'gnome', 'plasma'):
1✔
UNCOV
875
        return False
×
876
    # Wayland (Ubuntu 23.10) sets DISPLAY=:0 like x11, so do not check DISPLAY.
877
    try:
1✔
878
        (rc, stdout, _stderr) = General.run_external(['loginctl'])
1✔
879
    except FileNotFoundError:
1✔
880
        return False
1✔
881
    if rc != 0:
1✔
882
        logger.warning('logintctl returned rc %s', rc)
1✔
883
        return False
1✔
884
    try:
1✔
885
        session = stdout.split('\n')[1].strip().split(' ')[0]
1✔
886
    except (IndexError, ValueError):
1✔
887
        logger.warning('unexpected output from loginctl: %s', stdout)
1✔
888
        return False
1✔
889
    if not session.isdigit():
1✔
890
        logger.warning('unexpected session loginctl: %s', session)
1✔
891
        return False
1✔
892
    result = General.run_external(
1✔
893
        ['loginctl', 'show-session', session, '-p', 'Type'])
894
    return 'wayland' in result[1].lower()
1✔
895

896

897
def root_is_not_allowed_to_X_session():
1✔
898
    """Return True if root is not allowed to X session.
899

900
    This function is called only with root on Wayland.
901
    """
902
    assert os.name == 'posix'
1✔
903
    try:
1✔
904
        result = General.run_external(['xhost'], clean_env=False)
1✔
905
        xhost_returned_error = result[0] == 1
1✔
906
        return xhost_returned_error
1✔
UNCOV
907
    except (FileNotFoundError, OSError) as exc:
×
UNCOV
908
        logger.debug(
×
909
            'xhost check failed (%s); assuming root is not allowed to X session', exc)
UNCOV
910
        return True
×
911

912

913
def is_display_protocol_wayland_and_root_not_allowed():
1✔
914
    """Return True if the display protocol is Wayland and root is not allowed to X session"""
UNCOV
915
    try:
×
UNCOV
916
        is_wayland = bleachbit.Unix.is_unix_display_protocol_wayland()
×
UNCOV
917
    except Exception as e:
×
UNCOV
918
        logger.exception(e)
×
UNCOV
919
        return False
×
UNCOV
920
    return (
×
921
        is_wayland and
922
        os.environ.get('USER') == 'root' and
923
        bleachbit.Unix.root_is_not_allowed_to_X_session()
924
    )
925

926

927
def flush_dns():
1✔
928
    """Flush the DNS resolver cache
929

930
    Returns 0 on success.
931
    Raises RuntimeError on failure.
932
    """
UNCOV
933
    if exe_exists('resolvectl'):
×
UNCOV
934
        args = ['resolvectl', 'flush-caches']
×
UNCOV
935
    elif exe_exists('systemd-resolve'):
×
UNCOV
936
        args = ['systemd-resolve', '--flush-caches']
×
937
    else:
UNCOV
938
        raise RuntimeError('Neither resolvectl nor systemd-resolve found')
×
UNCOV
939
    (rc, stdout, stderr) = General.run_external(args)
×
UNCOV
940
    if 0 != rc:
×
941
        # If service is not found, then there may be nothing to flush.
UNCOV
942
        if 'Unit dbus-org.freedesktop.resolve1.service not found' in stderr:
×
UNCOV
943
            logger.warning(stderr)
×
UNCOV
944
            return 0
×
UNCOV
945
        raise RuntimeError(
×
946
            f'Command: {args}\nReturn code: {rc}\nStdout: {stdout}\nStderr: {stderr}')
UNCOV
947
    return 0
×
948

949

950
locales = Locales()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc