• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bleachbit / bleachbit / 23171475269

16 Mar 2026 11:50PM UTC coverage: 73.166% (+2.0%) from 71.186%
23171475269

push

github

az0
Gracefully handle missing urllib3

- Do not crash on startup in case of missing urllib3.
- Check for missing packages.
- Notify the user.

Affects BleachBit 5.1.0 and 5.1.1 in Flatpak

Closes https://github.com/bleachbit/bleachbit/issues/2056

20 of 25 new or added lines in 2 files covered. (80.0%)

1105 existing lines in 26 files now uncovered.

7043 of 9626 relevant lines covered (73.17%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.72
/bleachbit/Unix.py
1
# vim: ts=4:sw=4:expandtab
2
# -*- coding: UTF-8 -*-
3

4
# BleachBit
5
# Copyright (C) 2008-2025 Andrew Ziem
6
# https://www.bleachbit.org
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
# GNU General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
20

21

22
"""
23
Integration specific to Unix-like operating systems
24
"""
25

26
import configparser
1✔
27
import glob
1✔
28
import logging
1✔
29
import os
1✔
30
import platform
1✔
31
import re
1✔
32
import shlex
1✔
33
import subprocess
1✔
34
import sys
1✔
35

36
import bleachbit
1✔
37
from bleachbit import FileUtilities, General
1✔
38
from bleachbit.General import get_real_uid, get_real_username
1✔
39
from bleachbit.FileUtilities import exe_exists
1✔
40
from bleachbit.Language import get_text as _, native_locale_names
1✔
41

42
logger = logging.getLogger(__name__)
1✔
43

44
try:
1✔
45
    Pattern = re.Pattern
1✔
46
except AttributeError:
×
47
    Pattern = re._pattern_type
×
48

49

50
JOURNALD_REGEX = r'^Vacuuming done, freed ([\d.]+[BKMGT]?) of archived journals (on disk|from [\w/]+).$'
1✔
51

52

53
class LocaleCleanerPath:
1✔
54
    """This represents a path with either a specific folder name or a folder name pattern.
55
    It also may contain several compiled regex patterns for localization items (folders or files)
56
    and additional LocaleCleanerPaths that get traversed when asked to supply a list of localization
57
    items"""
58

59
    def __init__(self, location):
1✔
60
        if location is None:
1✔
61
            raise RuntimeError("location is none")
×
62
        self.pattern = location
1✔
63
        self.children = []
1✔
64

65
    def add_child(self, child):
1✔
66
        """Adds a child LocaleCleanerPath"""
67
        self.children.append(child)
1✔
68
        return child
1✔
69

70
    def add_path_filter(self, pre, post):
1✔
71
        r"""Adds a filter consisting of a prefix and a postfix
72
        (e.g. 'foobar_' and '\.qm' to match 'foobar_en_US.utf-8.qm)"""
73
        try:
1✔
74
            regex = re.compile('^' + pre + Locales.localepattern + post + '$')
1✔
75
        except Exception as errormsg:
×
76
            raise RuntimeError(
×
77
                f"Malformed regex '{pre}' or '{post}': {errormsg}") from errormsg
78
        self.add_child(regex)
1✔
79

80
    def get_subpaths(self, basepath):
1✔
81
        """Returns direct subpaths for this object, i.e. either the named subfolder or all
82
        subfolders matching the pattern"""
83
        if isinstance(self.pattern, Pattern):
1✔
84
            return (os.path.join(basepath, p) for p in os.listdir(basepath)
1✔
85
                    if self.pattern.match(p) and os.path.isdir(os.path.join(basepath, p)))
86
        path = os.path.join(basepath, self.pattern)
1✔
87
        return [path] if os.path.isdir(path) else []
1✔
88

89
    def get_localizations(self, basepath):
1✔
90
        """Returns all localization items for this object and all descendant objects"""
91
        for path in self.get_subpaths(basepath):
1✔
92
            for child in self.children:
1✔
93
                if isinstance(child, LocaleCleanerPath):
1✔
94
                    yield from child.get_localizations(path)
1✔
95
                elif isinstance(child, Pattern):
1✔
96
                    for element in os.listdir(path):
1✔
97
                        match = child.match(element)
1✔
98
                        if match is not None:
1✔
99
                            yield (match.group('locale'),
1✔
100
                                   match.group('specifier'),
101
                                   os.path.join(path, element))
102

103

104
class Locales:
1✔
105
    """Find languages and localization files"""
106

107
    # The regular expression to match locale strings and extract the langcode.
108
    # See test_locale_regex() in tests/TestUnix.py for examples
109
    # This doesn't match all possible valid locale strings to avoid
110
    # matching filenames you might want to keep, e.g. the regex
111
    # to match jp.eucJP might also match jp.importantfileextension
112
    localepattern =\
1✔
113
        r'(?P<locale>[a-z]{2,3})' \
114
        r'(?P<specifier>[_-][A-Z]{2,4})?(?:\.[\w]+[\d-]+|@\w+)?' \
115
        r'(?P<encoding>[.-_](?:(?:ISO|iso|UTF|utf|us-ascii)[\d-]+|(?:euc|EUC)[A-Z]+))?'
116

117
    def __init__(self):
1✔
118
        self._paths = LocaleCleanerPath(location='/')
1✔
119

120
    def add_xml(self, xml_node, parent=None):
1✔
121
        """Parses the xml data and adds nodes to the LocaleCleanerPath-tree"""
122

123
        if parent is None:
1✔
124
            parent = self._paths
1✔
125
        if xml_node.ELEMENT_NODE != xml_node.nodeType:
1✔
126
            return
1✔
127

128
        # if a pattern is supplied, we recurse into all matching subdirectories
129
        if 'regexfilter' == xml_node.nodeName:
1✔
130
            pre = xml_node.getAttribute('prefix') or ''
1✔
131
            post = xml_node.getAttribute('postfix') or ''
1✔
132
            parent.add_path_filter(pre, post)
1✔
133
        elif 'path' == xml_node.nodeName:
1✔
134
            if xml_node.hasAttribute('directoryregex'):
1✔
135
                pattern = xml_node.getAttribute('directoryregex')
1✔
136
                if '/' in pattern:
1✔
137
                    raise RuntimeError(
×
138
                        'directoryregex may not contain slashes.')
139
                pattern = re.compile(pattern)
1✔
140
                parent = parent.add_child(LocaleCleanerPath(pattern))
1✔
141

142
            # a combination of directoryregex and filter could be too much
143
            else:
144
                if xml_node.hasAttribute("location"):
1✔
145
                    # if there's a filter attribute, it should apply to this path
146
                    parent = parent.add_child(LocaleCleanerPath(
1✔
147
                        xml_node.getAttribute('location')))
148

149
                if xml_node.hasAttribute('filter'):
1✔
150
                    userfilter = xml_node.getAttribute('filter')
1✔
151
                    if 1 != userfilter.count('*'):
1✔
152
                        raise RuntimeError(
×
153
                            f"Filter string '{userfilter}' must contain the placeholder * exactly once")
154

155
                    # we can't use re.escape, because it escapes too much
156
                    (pre, post) = (re.sub(r'([\[\]()^$.])', r'\\\1', p)
1✔
157
                                   for p in userfilter.split('*'))
158
                    parent.add_path_filter(pre, post)
1✔
159
        else:
160
            raise RuntimeError(
×
161
                f"Invalid node '{xml_node.nodeName}', expected '<path>' or '<regexfilter>'")
162

163
        # handle child nodes
164
        for child_xml in xml_node.childNodes:
1✔
165
            self.add_xml(child_xml, parent)
1✔
166

167
    def localization_paths(self, locales_to_keep):
1✔
168
        """Returns all localization items matching the previously added xml configuration"""
169
        purgeable_locales = get_purgeable_locales(locales_to_keep)
1✔
170

171
        for (locale, specifier, path) in self._paths.get_localizations('/'):
1✔
172
            specific = locale + (specifier or '')
1✔
173
            if specific in purgeable_locales or \
1✔
174
                    (locale in purgeable_locales and specific not in locales_to_keep):
175
                yield path
1✔
176

177

178
def _is_broken_xdg_desktop_application(config, desktop_pathname):
1✔
179
    """Returns whether application .desktop file is critically broken
180

181
    This function tests only .desktop files with Type=Application.
182
    """
183
    if not config.has_option('Desktop Entry', 'Exec'):
1✔
184
        logger.info(
1✔
185
            "is_broken_xdg_menu: missing required option 'Exec' in '%s'", desktop_pathname)
186
        return True
1✔
187
    exe = config.get('Desktop Entry', 'Exec').split(" ")[0]
1✔
188
    if not os.path.isabs(exe) and not os.environ.get('PATH'):
1✔
189
        raise RuntimeError(
×
190
            f"Cannot find executable '{exe}' because PATH environment variable is not set")
191
    if not FileUtilities.exe_exists(exe):
1✔
192
        logger.info(
×
193
            "is_broken_xdg_menu: executable '%s' does not exist in '%s'", exe, desktop_pathname)
194
        return True
×
195
    if 'env' == exe:
1✔
196
        # Wine v1.0 creates .desktop files like this
197
        # Exec=env WINEPREFIX="/home/z/.wine" wine "C:\\Program
198
        # Files\\foo\\foo.exe"
199
        exec_val = config.get('Desktop Entry', 'Exec')
1✔
200
        try:
1✔
201
            execs = shlex.split(exec_val)
1✔
202
        except ValueError as e:
1✔
203
            logger.info(
1✔
204
                "is_broken_xdg_menu: error splitting 'Exec' key '%s' in '%s'", e, desktop_pathname)
205
            return True
1✔
206
        wineprefix = None
1✔
207
        del execs[0]
1✔
208
        while True:
209
            if execs[0].find("=") < 0:
1✔
210
                break
1✔
211
            (name, value) = execs[0].split("=")
1✔
212
            if name == 'WINEPREFIX':
1✔
213
                wineprefix = value
1✔
214
            del execs[0]
1✔
215
        if not FileUtilities.exe_exists(execs[0]):
1✔
216
            logger.info(
1✔
217
                "is_broken_xdg_menu: executable '%s' does not exist in '%s'", execs[0], desktop_pathname)
218
            return True
1✔
219
        # check the Windows executable exists
220
        if wineprefix:
1✔
221
            windows_exe = wine_to_linux_path(wineprefix, execs[1])
1✔
222
            if not os.path.exists(windows_exe):
1✔
223
                logger.info("is_broken_xdg_menu: Windows executable '%s' does not exist in '%s'",
1✔
224
                            windows_exe, desktop_pathname)
225
                return True
1✔
226
    return False
1✔
227

228

229
def find_available_locales():
1✔
230
    """Returns a list of available locales using locale -a"""
231
    rc, stdout, stderr = General.run_external(['locale', '-a'])
1✔
232
    if rc == 0:
1✔
233
        return stdout.strip().split('\n')
1✔
234

235
    logger.warning("Failed to get available locales: %s", stderr)
×
236
    return []
×
237

238

239
def find_best_locale(user_locale):
1✔
240
    """Find closest match to available locales"""
241
    assert isinstance(user_locale, str)
1✔
242
    if not user_locale:
1✔
243
        return 'C'
1✔
244
    if user_locale in ('C', 'C.utf8', 'POSIX'):
1✔
245
        return user_locale
1✔
246
    available_locales = find_available_locales()
1✔
247

248
    # If requesting a language like 'es' and current locale is compatible
249
    # like 'es_MX', then return that.
250
    # Import here for mock patch.
251
    import locale  # pylint: disable=import-outside-toplevel
1✔
252
    current_locale = locale.getlocale()[0]
1✔
253
    if current_locale and current_locale.startswith(user_locale.split('.')[0]):
1✔
254
        return '.'.join(locale.getlocale())
1✔
255

256
    # Check for exact match.
257
    if user_locale in available_locales:
1✔
258
        return user_locale
1✔
259

260
    # Next, match like 'en' to 'en_US.utf8' (if available) because
261
    # of preference for UTF-8.
262
    for avail_locale in available_locales:
1✔
263
        if avail_locale.startswith(user_locale) and avail_locale.endswith('.utf8'):
1✔
264
            return avail_locale
1✔
265

266
    # Next, match like 'en' to 'en_US' or 'en_US.iso88591'.
267
    for avail_locale in available_locales:
1✔
268
        if avail_locale.startswith(user_locale):
1✔
269
            return avail_locale
1✔
270

271
    return 'C'
1✔
272

273

274
def get_distribution_name_version_platform_freedesktop():
1✔
275
    """Returns the name and version of the distribution using
276
    platform.freedesktop_os_release()
277

278
    Example return value: 'ubuntu 24.10' or None
279

280
    Python 3.10 added platform.freedesktop_os_release().
281
    """
282
    if hasattr(platform, 'freedesktop_os_release'):
1✔
283
        try:
×
284
            release = platform.freedesktop_os_release()
×
285
        except FileNotFoundError:
×
286
            return None
×
287
        dist_id = release.get('ID')
×
288
        dist_version_id = release.get('VERSION_ID')
×
289
        if dist_id and dist_version_id:
×
290
            return f"{dist_id} {dist_version_id}"
×
291
    return None
1✔
292

293

294
def get_distribution_name_version_distro():
1✔
295
    """Returns the name and version of the distribution using the distro
296
    package
297

298
    Example return value: 'ubuntu 24.10' or None
299

300
    distro is a third-party package recommended here:
301
    https://docs.python.org/3.7/library/platform.html
302
    """
303
    try:
1✔
304
        # Import here in case of ImportError.
305
        import distro  # pylint: disable=import-outside-toplevel
1✔
306
        # example 'ubuntu 24.10'
307
        return distro.id() + ' ' + distro.version()
×
308
    except ImportError:
1✔
309
        return None
1✔
310

311

312
def get_distribution_name_version_os_release():
1✔
313
    """Returns the name and version of the distribution using /etc/os-release
314

315
    Example return value: 'ubuntu 24.10' or None
316
    """
317
    if not os.path.exists('/etc/os-release'):
1✔
318
        return None
×
319
    try:
1✔
320
        with open('/etc/os-release', 'r', encoding='utf-8') as f:
1✔
321
            os_release = {}
1✔
322
            for line in f:
1✔
323
                if '=' in line:
1✔
324
                    key, value = line.rstrip().split('=', 1)
1✔
325
                    os_release[key] = value.strip('"\'')
1✔
326
    except Exception as e:
×
327
        logger.debug("Error reading /etc/os-release: %s", e)
×
328
        return None
×
329
    if 'ID' in os_release and 'VERSION_ID' in os_release:
1✔
330
        dist_name = os_release['ID']
1✔
331
        return f"{dist_name} {os_release['VERSION_ID']}"
1✔
332
    return None
×
333

334

335
def get_distribution_name_version():
1✔
336
    """Returns the name and version of the distribution
337

338
    Depending on system capabilities, return value may be:
339
    * 'ubuntu 24.10'
340
    * 'Linux 6.12.3 (unknown distribution)'
341
    * 'Linux (unknown version and distribution)'
342

343
    Python 3.7 had platform.linux_distribution(), but it
344
    was removed in Python 3.8.
345
    """
346
    ret = get_distribution_name_version_platform_freedesktop()
1✔
347
    if ret:
1✔
348
        return ret
×
349
    ret = get_distribution_name_version_distro()
1✔
350
    if ret:
1✔
351
        return ret
×
352
    ret = get_distribution_name_version_os_release()
1✔
353
    if ret:
1✔
354
        return ret
1✔
355
    try:
×
356
        linux_version = platform.release()
×
357
        # example '6.12.3-061203-generic'
358
        linux_version = linux_version.split('-')[0]
×
359
        return f"Linux {linux_version} (unknown distribution)"
×
360
    except Exception as e1:
×
361
        logger.debug("Error calling platform.release(): %s", e1)
×
362
        try:
×
363
            linux_version = os.uname().release
×
364
            # example '6.12.3-061203-generic'
365
            linux_version = linux_version.split('-')[0]
×
366
            return f"Linux {linux_version} (unknown distribution)"
×
367
        except Exception as e2:
×
368
            logger.debug("Error calling os.uname(): %s", e2)
×
369
    return "Linux (unknown version and distribution)"
×
370

371

372
def get_purgeable_locales(locales_to_keep):
1✔
373
    """Returns all locales to be purged"""
374
    if not locales_to_keep:
1✔
375
        raise RuntimeError('Found no locales to keep')
×
376

377
    assert isinstance(locales_to_keep, list)
1✔
378

379
    # Start with all locales as potentially purgeable
380
    purgeable_locales = set(native_locale_names.keys())
1✔
381

382
    # Remove the locales we want to keep
383
    for keep in locales_to_keep:
1✔
384
        purgeable_locales.discard(keep)
1✔
385
        # If keeping a variant (e.g. 'en_US'), also keep the base locale (e.g. 'en')
386
        if '_' in keep:
1✔
387
            purgeable_locales.discard(keep[:keep.find('_')])
1✔
388
        # If keeping a base locale (e.g. 'en'), also keep all its variants (e.g. 'en_US')
389
        if '_' not in keep:
1✔
390
            purgeable_locales = {locale for locale in purgeable_locales
1✔
391
                                 if not locale.startswith(keep + '_')}
392

393
    return frozenset(purgeable_locales)
1✔
394

395

396
def is_unregistered_mime(mimetype):
1✔
397
    """Returns True if the MIME type is known to be unregistered. If
398
    registered or unknown, conservatively returns False."""
399
    try:
×
400
        from bleachbit.GtkShim import Gio  # pylint: disable=import-outside-toplevel
×
401
        if 0 == len(Gio.app_info_get_all_for_type(mimetype)):
×
402
            return True
×
403
    except ImportError:
×
404
        logger.warning(
×
405
            'error calling gio.app_info_get_all_for_type(%s)', mimetype)
406
    return False
×
407

408

409
def is_broken_xdg_desktop(pathname):
1✔
410
    """Returns whether the given XDG .desktop file is critically broken.
411
    Reference: http://standards.freedesktop.org/desktop-entry-spec/latest/"""
412
    config = bleachbit.RawConfigParser()
1✔
413
    try:
1✔
414
        config.read(pathname)
1✔
415
    except UnicodeDecodeError:
1✔
416
        logger.info(
1✔
417
            "is_broken_xdg_menu: cannot decode file: '%s'", pathname)
418
        return True
1✔
419
    except (configparser.Error) as e:
1✔
420
        logger.info(
1✔
421
            "is_broken_xdg_menu: %s: '%s'", e, pathname)
422
        return True
1✔
423
    if not config.has_section('Desktop Entry'):
1✔
424
        logger.info(
1✔
425
            "is_broken_xdg_menu: missing required section 'Desktop Entry': '%s'", pathname)
426
        return True
1✔
427
    if not config.has_option('Desktop Entry', 'Type'):
1✔
428
        logger.info(
1✔
429
            "is_broken_xdg_menu: missing required option 'Type': '%s'", pathname)
430
        return True
1✔
431
    file_type = config.get('Desktop Entry', 'Type').strip().lower()
1✔
432
    if 'link' == file_type:
1✔
433
        if not config.has_option('Desktop Entry', 'URL') and \
1✔
434
                not config.has_option('Desktop Entry', 'URL[$e]'):
435
            logger.info(
1✔
436
                "is_broken_xdg_menu: missing required option 'URL': '%s'", pathname)
437
            return True
1✔
438
        return False
×
439
    if 'mimetype' == file_type:
1✔
440
        if not config.has_option('Desktop Entry', 'MimeType'):
×
441
            logger.info(
×
442
                "is_broken_xdg_menu: missing required option 'MimeType': '%s'", pathname)
443
            return True
×
444
        mimetype = config.get('Desktop Entry', 'MimeType').strip().lower()
×
445
        if is_unregistered_mime(mimetype):
×
446
            logger.info(
×
447
                "is_broken_xdg_menu: MimeType '%s' not registered '%s'", mimetype, pathname)
448
            return True
×
449
        return False
×
450
    if 'application' != file_type:
1✔
451
        logger.warning("unhandled type '%s': file '%s'", file_type, pathname)
×
452
        return False
×
453
    if _is_broken_xdg_desktop_application(config, pathname):
1✔
454
        return True
1✔
455
    return False
1✔
456

457

458
def is_process_running_ps_aux(exename, require_same_user):
1✔
459
    """Check whether exename is running by calling 'ps aux -c'
460

461
    exename: name of the executable
462
    require_same_user: if True, ignore processes run by other users
463

464
    When running under sudo, this uses the non-root username.
465
    """
466
    ps_out = subprocess.check_output(["ps", "aux", "-c"],
1✔
467
                                     universal_newlines=True)
468
    first_line = ps_out.split('\n', maxsplit=1)[0].strip()
1✔
469
    if "USER" not in first_line or "COMMAND" not in first_line:
1✔
470
        raise RuntimeError("Unexpected ps header format")
1✔
471

472
    for line in ps_out.split("\n")[1:]:
1✔
473
        parts = line.split()
1✔
474
        if len(parts) < 11:
1✔
475
            continue
1✔
476
        process_user = parts[0]
1✔
477
        process_cmd = parts[10]
1✔
478
        if process_cmd != exename:
1✔
479
            continue
1✔
480
        if not require_same_user or process_user == get_real_username():
1✔
481
            return True
1✔
482
    return False
1✔
483

484

485
def is_process_running_linux(exename, require_same_user):
1✔
486
    """Check whether exename is running
487

488
    The exename is checked two different ways.
489

490
    When running under sudo, this uses the non-root user ID.
491
    """
492
    for filename in glob.iglob("/proc/*/exe"):
1✔
493
        does_exe_match = False
1✔
494
        try:
1✔
495
            target = os.path.realpath(filename)
1✔
496
        except TypeError:
1✔
497
            # happens, for example, when link points to
498
            # '/etc/password\x00 (deleted)'
499
            pass
×
500
        except OSError:
1✔
501
            # 13 = permission denied
502
            pass
1✔
503
        else:
504
            # Google Chrome 74 on Ubuntu 19.04 shows up as
505
            # /opt/google/chrome/chrome (deleted)
506
            found_exename = os.path.basename(target).replace(' (deleted)', '')
1✔
507
            does_exe_match = exename == found_exename
1✔
508

509
        if not does_exe_match:
1✔
510
            with open(os.path.join(os.path.dirname(filename), 'stat'), 'r', encoding='utf-8') as stat_file:
1✔
511
                proc_name = stat_file.read().split()[1].strip('()')
1✔
512
                if proc_name == exename:
1✔
513
                    does_exe_match = True
1✔
514
                else:
515
                    continue
1✔
516

517
        if not require_same_user:
1✔
518
            return True
1✔
519

520
        try:
1✔
521
            uid = os.stat(os.path.dirname(filename)).st_uid
1✔
522
        except OSError:
×
523
            # permission denied means not the same user
524
            continue
×
525
        # In case of sudo, use the regular user's ID.
526
        if uid == get_real_uid():
1✔
527
            return True
1✔
528
    return False
1✔
529

530

531
def is_process_running(exename, require_same_user):
1✔
532
    """Check whether exename is running
533

534
    exename: name of the executable
535
    require_same_user: if True, ignore processes run by other users
536

537
    """
538
    if sys.platform == 'linux':
1✔
539
        return is_process_running_linux(exename, require_same_user)
1✔
540
    if sys.platform == 'darwin' or sys.platform.startswith('openbsd') or sys.platform.startswith('freebsd'):
×
541
        return is_process_running_ps_aux(exename, require_same_user)
×
542
    raise RuntimeError('unsupported platform for is_process_running()')
×
543

544

545
def rotated_logs():
1✔
546
    """Yield a list of rotated (i.e., old) logs in /var/log/
547

548
    See:
549
    https://bugs.launchpad.net/bleachbit/+bug/367575
550
    https://github.com/bleachbit/bleachbit/issues/1744
551
    """
552
    keep_lists = [re.compile(r'/var/log/(removed_)?(packages|scripts)'),
1✔
553
                  re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}')]
554
    positive_re = re.compile(r'(\.(\d+|bz2|gz|xz|old)|\-\d{8}?)')
1✔
555

556
    for path in bleachbit.FileUtilities.children_in_directory('/var/log'):
1✔
557
        keep_list_match = False
1✔
558
        for keep_list in keep_lists:
1✔
559
            if keep_list.search(path) or bleachbit.FileUtilities.whitelisted(path):
1✔
560
                keep_list_match = True
1✔
561
                break
1✔
562
        if keep_list_match:
1✔
563
            continue
1✔
564
        if positive_re.search(path):
1✔
565
            yield path
1✔
566

567

568
def wine_to_linux_path(wineprefix, windows_pathname):
1✔
569
    """Return a Linux pathname from an absolute Windows pathname and Wine prefix"""
570
    drive_letter = windows_pathname[0]
1✔
571
    windows_pathname = windows_pathname.replace(drive_letter + ":",
1✔
572
                                                "drive_" + drive_letter.lower())
573
    windows_pathname = windows_pathname.replace("\\", "/")
1✔
574
    return os.path.join(wineprefix, windows_pathname)
1✔
575

576

577
def run_cleaner_cmd(cmd, args, freed_space_regex=r'[\d.]+[kMGTE]?B?', error_line_regexes=None):
1✔
578
    """Runs a specified command and returns how much space was (reportedly) freed.
579
    The subprocess shouldn't need any user input and the user should have the
580
    necessary rights.
581
    freed_space_regex gets applied to every output line, if the re matches,
582
    add values captured by the single group in the regex"""
583
    if not FileUtilities.exe_exists(cmd):
1✔
584
        raise RuntimeError(_('Executable not found: %s') % cmd)
1✔
585
    freed_space_regex = re.compile(freed_space_regex)
1✔
586
    error_line_regexes = [re.compile(regex)
1✔
587
                          for regex in error_line_regexes or []]
588

589
    env = {'LC_ALL': 'C', 'PATH': os.getenv('PATH')}
1✔
590
    output = subprocess.check_output([cmd] + args, stderr=subprocess.STDOUT,
1✔
591
                                     universal_newlines=True, env=env)
592
    freed_space = 0
1✔
593
    for line in output.split('\n'):
1✔
594
        m = freed_space_regex.match(line)
1✔
595
        if m is not None:
1✔
596
            freed_space += FileUtilities.human_to_bytes(m.group(1))
1✔
597
        for error_re in error_line_regexes:
1✔
598
            if error_re.search(line):
1✔
599
                raise RuntimeError('Invalid output from %s: %s' % (cmd, line))
1✔
600

601
    return freed_space
1✔
602

603

604
def journald_clean():
1✔
605
    """Clean the system journals"""
606
    try:
1✔
607
        return run_cleaner_cmd('journalctl', ['--vacuum-size=1'], JOURNALD_REGEX)
1✔
608
    except subprocess.CalledProcessError as e:
×
609
        raise RuntimeError(
×
610
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
611

612

613
def apt_autoremove():
1✔
614
    """Run 'apt-get autoremove' and return the size (un-rounded, in bytes) of freed space"""
615

616
    args = ['--yes', 'autoremove']
1✔
617
    # After this operation, 74.7MB disk space will be freed.
618
    # After this operation, 44.0 kB disk space will be freed.
619
    freed_space_regex = r'.*, ([\d.]+ ?[a-zA-Z]{2}) disk space will be freed.'
1✔
620
    try:
1✔
621
        return run_cleaner_cmd('apt-get', args, freed_space_regex, ['^E: '])
1✔
622
    except subprocess.CalledProcessError as e:
1✔
623
        raise RuntimeError(
1✔
624
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
625

626

627
def apt_autoclean():
1✔
628
    """Run 'apt-get autoclean' and return the size (un-rounded, in bytes) of freed space"""
629
    try:
1✔
630
        return run_cleaner_cmd('apt-get', ['autoclean'], r'^Del .*\[([\d.]+[a-zA-Z]{2})}]', ['^E: '])
1✔
631
    except subprocess.CalledProcessError as e:
1✔
632
        raise RuntimeError(
1✔
633
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
634

635

636
def apt_clean():
1✔
637
    """Run 'apt-get clean' and return the size in bytes of freed space"""
638
    old_size = get_apt_size()
×
639
    try:
×
640
        run_cleaner_cmd('apt-get', ['clean'], '^unused regex$', ['^E: '])
×
641
    except subprocess.CalledProcessError as e:
×
642
        raise RuntimeError(
×
643
            f"Error calling '{' '.join(e.cmd)}':\n{e.output}") from e
644
    new_size = get_apt_size()
×
645
    return old_size - new_size
×
646

647

648
def get_apt_size():
1✔
649
    """Return the size of the apt cache (in bytes)"""
650
    (_rc, stdout, _stderr) = General.run_external(['apt-get', '-s', 'clean'])
1✔
651
    paths = re.findall(r'/[/a-z\.\*]+', stdout)
1✔
652
    return get_globs_size(paths)
1✔
653

654

655
def get_globs_size(paths):
1✔
656
    """Get the cumulative size (in bytes) of a list of globs"""
657
    total_size = 0
1✔
658
    for path in paths:
1✔
659
        for p in glob.iglob(path):
1✔
660
            total_size += FileUtilities.getsize(p)
1✔
661
    return total_size
1✔
662

663

664
def yum_clean():
1✔
665
    """Run 'yum clean all' and return size in bytes recovered"""
666
    if os.path.exists('/var/run/yum.pid'):
1✔
667
        msg = _(
×
668
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Yum"
669
        raise RuntimeError(msg)
×
670

671
    old_size = FileUtilities.getsizedir('/var/cache/yum')
1✔
672
    args = ['--enablerepo=*', 'clean', 'all']
1✔
673
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
674
    try:
1✔
675
        run_cleaner_cmd('yum', args, '^unused regex$', invalid)
1✔
676
    except subprocess.CalledProcessError as e:
1✔
677
        raise RuntimeError(
×
678
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
679
    new_size = FileUtilities.getsizedir('/var/cache/yum')
×
680
    return old_size - new_size
×
681

682

683
def dnf_clean():
1✔
684
    """Run 'dnf clean all' and return size in bytes recovered"""
685
    if os.path.exists('/var/run/dnf.pid'):
1✔
686
        msg = _(
×
687
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
688
        raise RuntimeError(msg)
×
689

690
    old_size = FileUtilities.getsizedir('/var/cache/dnf')
1✔
691
    args = ['--enablerepo=*', 'clean', 'all']
1✔
692
    invalid = ['You need to be root', 'Cannot remove rpmdb file']
1✔
693
    try:
1✔
694
        run_cleaner_cmd('dnf', args, '^unused regex$', invalid)
1✔
695
    except subprocess.CalledProcessError as e:
1✔
696
        raise RuntimeError(
×
697
            f"Error calling '{' '.join(str(part) for part in e.cmd)}':\n{e.output}") from e
698
    new_size = FileUtilities.getsizedir('/var/cache/dnf')
×
699

700
    return old_size - new_size
×
701

702

703
units = {"B": 1, "k": 10**3, "M": 10**6, "G": 10**9}
1✔
704

705

706
def parse_size(size):
1✔
707
    """Parse the size returned by dnf"""
708
    number, unit = [string.strip() for string in size.split()]
1✔
709
    return int(float(number) * units[unit])
1✔
710

711

712
def dnf_autoremove():
1✔
713
    """Run 'dnf autoremove' and return size in bytes recovered."""
714
    if os.path.exists('/var/run/dnf.pid'):
1✔
715
        msg = _(
1✔
716
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "Dnf"
717
        raise RuntimeError(msg)
1✔
718
    cmd = ['dnf', '-y', 'autoremove']
1✔
719
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
720
    freed_bytes = 0
1✔
721
    allout = stdout + stderr
1✔
722
    if 'Error: This command has to be run under the root user.' in allout:
1✔
723
        raise RuntimeError('dnf autoremove requires root permissions')
×
724
    if rc > 0:
1✔
725
        raise RuntimeError(f'dnf autoremove raised error {rc}: {stderr}')
1✔
726

727
    cregex = re.compile(r"Freed space: ([\d.]+[\s]+[BkMG])")
1✔
728
    match = cregex.search(allout)
1✔
729
    if match:
1✔
730
        freed_bytes = parse_size(match.group(1))
1✔
731
    logger.debug(
1✔
732
        'dnf_autoremove >> total freed bytes: %s', freed_bytes)
733
    return freed_bytes
1✔
734

735

736
def pacman_cache():
1✔
737
    """Clean cache in pacman"""
738
    if os.path.exists('/var/lib/pacman/db.lck'):
1✔
739
        msg = _(
×
740
            "%s cannot be cleaned because it is currently running.  Close it, and try again.") % "pacman"
741
        raise RuntimeError(msg)
×
742
    if not exe_exists('paccache'):
1✔
743
        raise RuntimeError('paccache not found')
1✔
744
    cmd = ['paccache', '-rk0']
1✔
745
    (rc, stdout, stderr) = General.run_external(cmd)
1✔
746
    if rc > 0:
1✔
747
        raise RuntimeError(f'paccache raised error {rc}: {stderr}')
×
748
    # parse line like this: "==> finished: 3 packages removed (42.31 MiB freed)"
749
    cregex = re.compile(
1✔
750
        r"==> finished: ([\d.]+) packages removed \(([\d.]+\s+[BkMG]) freed\)")
751
    match = cregex.search(stdout)
1✔
752
    if match:
1✔
753
        return parse_size(match.group(2))
1✔
754
    return 0
×
755

756

757
def snap_parse_list(stdout):
1✔
758
    """Parse output of `snap list --all`"""
759
    disabled_snaps = []
1✔
760
    lines = stdout.strip().split('\n')
1✔
761
    if not lines:
1✔
762
        return disabled_snaps
×
763
    # Example output: "No snaps are installed yet. Try 'snap install hello-world'."
764
    raw_header = lines[0]
1✔
765
    header = raw_header.lower()
1✔
766
    if 'no snaps' in header and 'install' in header:
1✔
767
        return disabled_snaps
1✔
768
    if "name" not in header or "rev" not in header or "notes" not in header:
1✔
769
        logger.warning(
×
770
            "Unexpected 'snap list --all' output; returning 0. First line: %r", raw_header)
771
        return disabled_snaps
×
772
    for line in lines[1:]:  # Skip header line
1✔
773
        parts = line.split()
1✔
774
        if len(parts) >= 4 and 'disabled' in line:
1✔
775
            snapname = parts[0]
1✔
776
            revision = parts[2]
1✔
777
            disabled_snaps.append((snapname, revision))
1✔
778
    return disabled_snaps
1✔
779

780

781
def snap_disabled_full(really_delete):
1✔
782
    """Remove disabled snaps"""
783
    assert isinstance(really_delete, bool)
1✔
784
    if not exe_exists('snap'):
1✔
785
        raise RuntimeError('snap not found')
×
786

787
    # Get list of all snaps.
788
    cmd = ['snap', 'list', '--all']
1✔
789
    (rc, stdout, stderr) = General.run_external(cmd, clean_env=True)
1✔
790
    if rc > 0:
1✔
791
        raise RuntimeError(f'snap list raised error {rc}: {stderr}')
×
792

793
    # Parse output to find disabled snaps.
794
    disabled_snaps = snap_parse_list(stdout)
1✔
795
    if not disabled_snaps:
1✔
796
        return 0
1✔
797

798
    # Remove disabled snaps.
799
    total_freed = 0
×
800
    for snapname, revision in disabled_snaps:
×
801
        # `snap info` returns info only about active snaps.
802
        # Instead, get size from the snap file directly.
803
        snap_file = f'/var/lib/snapd/snaps/{snapname}_{revision}.snap'
×
804
        if os.path.exists(snap_file):
×
805
            snap_size = os.path.getsize(snap_file)
×
806
            logger.debug('Found snap file: %s, size: %s',
×
807
                         snap_file, f"{snap_size:,}")
808
        else:
809
            logger.warning('Could not find snap file: %s', snap_file)
×
810
            snap_size = 0
×
811

812
        # Remove the snap revision
813
        if really_delete:
×
814
            remove_cmd = ['snap', 'remove', snapname, f'--revision={revision}']
×
815
            (rc, _, remove_stderr) = General.run_external(
×
816
                remove_cmd, clean_env=True)
817
            if rc > 0:
×
818
                logger.warning(
×
819
                    'Failed to remove snap %s revision %s: %s', snapname, revision, remove_stderr)
820
                break
×
821
            else:
822
                total_freed += snap_size
×
823
                logger.debug(
×
824
                    'Removed snap %s revision %s, freed %s bytes', snapname, revision, snap_size)
825
        else:
826
            total_freed += snap_size
×
827

828
    return total_freed
×
829

830

831
def snap_disabled_clean():
1✔
832
    """Remove disabled snaps"""
833
    return snap_disabled_full(True)
×
834

835

836
def snap_disabled_preview():
1✔
837
    """Preview snaps that would be removed"""
838
    return snap_disabled_full(False)
1✔
839

840

841
def is_unix_display_protocol_wayland():
1✔
842
    """Return True if the display protocol is Wayland."""
843
    assert os.name == 'posix'
1✔
844
    if 'XDG_SESSION_TYPE' in os.environ:
1✔
845
        if os.environ['XDG_SESSION_TYPE'] == 'wayland':
1✔
846
            return True
1✔
847
        # If not wayland, then x11, mir, etc.
848
        return False
1✔
849
    if 'WAYLAND_DISPLAY' in os.environ:
1✔
850
        return True
1✔
851
    # Ubuntu 24.10 showed "ubuntu-xorg".
852
    # openSUSE Tumbleweed and Fedora 41 showed "gnome".
853
    # Fedora 41 also showed "plasma".
854
    if os.environ.get('DESKTOP_SESSION') in ('ubuntu-xorg', 'gnome', 'plasma'):
1✔
855
        return False
×
856
    # Wayland (Ubuntu 23.10) sets DISPLAY=:0 like x11, so do not check DISPLAY.
857
    try:
1✔
858
        (rc, stdout, _stderr) = General.run_external(['loginctl'])
1✔
859
    except FileNotFoundError:
1✔
860
        return False
1✔
861
    if rc != 0:
1✔
862
        logger.warning('logintctl returned rc %s', rc)
1✔
863
        return False
1✔
864
    try:
1✔
865
        session = stdout.split('\n')[1].strip().split(' ')[0]
1✔
866
    except (IndexError, ValueError):
1✔
867
        logger.warning('unexpected output from loginctl: %s', stdout)
1✔
868
        return False
1✔
869
    if not session.isdigit():
1✔
870
        logger.warning('unexpected session loginctl: %s', session)
1✔
871
        return False
1✔
872
    result = General.run_external(
1✔
873
        ['loginctl', 'show-session', session, '-p', 'Type'])
874
    return 'wayland' in result[1].lower()
1✔
875

876

877
def root_is_not_allowed_to_X_session():
1✔
878
    """Return True if root is not allowed to X session.
879

880
    This function is called only with root on Wayland.
881
    """
882
    assert os.name == 'posix'
1✔
883
    try:
1✔
884
        result = General.run_external(['xhost'], clean_env=False)
1✔
885
        xhost_returned_error = result[0] == 1
1✔
886
        return xhost_returned_error
1✔
887
    except (FileNotFoundError, OSError) as exc:
×
888
        logger.debug(
×
889
            'xhost check failed (%s); assuming root is not allowed to X session', exc)
UNCOV
890
        return True
×
891

892

893
def is_display_protocol_wayland_and_root_not_allowed():
1✔
894
    """Return True if the display protocol is Wayland and root is not allowed to X session"""
895
    try:
×
896
        is_wayland = bleachbit.Unix.is_unix_display_protocol_wayland()
×
897
    except Exception as e:
×
898
        logger.exception(e)
×
899
        return False
×
UNCOV
900
    return (
×
901
        is_wayland and
902
        os.environ.get('USER') == 'root' and
903
        bleachbit.Unix.root_is_not_allowed_to_X_session()
904
    )
905

906

907
locales = Locales()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc