• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

weldr / lorax / 13119269928

03 Feb 2025 05:29PM UTC coverage: 43.437%. Remained the same
13119269928

push

github

bcl
Automatic commit of package [lorax] release [42.5-1].

Created by command:

/usr/bin/tito tag

592 of 1565 branches covered (37.83%)

Branch coverage included in aggregate %.

1635 of 3562 relevant lines covered (45.9%)

0.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.34
/src/pylorax/treebuilder.py
1
# treebuilder.py - handle arch-specific tree building stuff using templates
2
#
3
# Copyright (C) 2011-2015 Red Hat, Inc.
4
#
5
# This program is free software; you can redistribute it and/or modify
6
# it under the terms of the GNU General Public License as published by
7
# the Free Software Foundation; either version 2 of the License, or
8
# (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
#
18
# Author(s):  Will Woods <wwoods@redhat.com>
19

20
import logging
1✔
21
logger = logging.getLogger("pylorax.treebuilder")
1✔
22

23
import os, re
1✔
24
from os.path import basename
1✔
25
from shutil import copytree, copy2
1✔
26
from subprocess import CalledProcessError
1✔
27
from pathlib import Path
1✔
28
import libdnf5 as dnf5
1✔
29
from libdnf5.common import QueryCmp_EQ as EQ
1✔
30

31
from pylorax.sysutils import joinpaths, remove
1✔
32
from pylorax.base import DataHolder
1✔
33
from pylorax.ltmpl import LoraxTemplateRunner
1✔
34
import pylorax.imgutils as imgutils
1✔
35
from pylorax.imgutils import DracutChroot
1✔
36
from pylorax.executils import runcmd, runcmd_output, execWithCapture
1✔
37

38
templatemap = {
1✔
39
    'x86_64':  'x86.tmpl',
40
    'ppc64le': 'ppc64le.tmpl',
41
    's390x':   's390.tmpl',
42
    'aarch64': 'aarch64.tmpl',
43
}
44

45
def generate_module_info(moddir, outfile=None):
1✔
46
    def module_desc(mod):
×
47
        output = runcmd_output(["modinfo", "-F", "description", mod])
×
48
        return output.strip()
×
49
    def read_module_set(name):
×
50
        return set(l.strip() for l in open(joinpaths(moddir,name)) if ".ko" in l)
×
51
    modsets = {'scsi':read_module_set("modules.block"),
×
52
               'eth':read_module_set("modules.networking")}
53

54
    modinfo = list()
×
55
    for root, _dirs, files in os.walk(moddir):
×
56
        for modtype, modset in modsets.items():
×
57
            for mod in modset.intersection(files):  # modules in this dir
×
58
                (name, _ext) = os.path.splitext(mod) # foo.ko -> (foo, .ko)
×
59
                desc = module_desc(joinpaths(root,mod)) or "%s driver" % name
×
60
                modinfo.append(dict(name=name, type=modtype, desc=desc))
×
61

62
    out = open(outfile or joinpaths(moddir,"module-info"), "w")
×
63
    out.write("Version 0\n")
×
64
    for mod in sorted(modinfo, key=lambda m: m.get('name')):
×
65
        out.write('{name}\n\t{type}\n\t"{desc:.65}"\n'.format(**mod))
×
66

67
class RuntimeBuilder(object):
1✔
68
    '''Builds the anaconda runtime image.
69

70
    NOTE: dbo is optional, but if it is not included root must be set.
71
    '''
72
    def __init__(self, product, arch, dbo=None, templatedir=None,
1✔
73
                 installpkgs=None, excludepkgs=None,
74
                 add_templates=None,
75
                 add_template_vars=None,
76
                 skip_branding=False,
77
                 root=None):
78
        self.dbo = dbo
1✔
79
        if dbo:
1✔
80
            root = dbo.get_config().installroot
1✔
81

82
        if not root:
1!
83
            raise RuntimeError("No root directory passed to RuntimeBuilder")
×
84

85
        self._runner = LoraxTemplateRunner(inroot=root, outroot=root,
1✔
86
                                           dbo=dbo, templatedir=templatedir,
87
                                           basearch=arch.basearch)
88
        self.add_templates = add_templates or []
1✔
89
        self.add_template_vars = add_template_vars or {}
1✔
90
        self._installpkgs = installpkgs or []
1✔
91
        self._excludepkgs = excludepkgs or []
1✔
92

93
        # use a copy of product so we can modify it locally
94
        product = product.copy()
1✔
95
        product.name = product.name.lower()
1✔
96
        self._branding = self.get_branding(skip_branding, product)
1✔
97
        self.vars = DataHolder(arch=arch, product=product, dbo=dbo, root=root,
1✔
98
                               basearch=arch.basearch, libdir=arch.libdir,
99
                               branding=self._branding)
100
        self._runner.defaults = self.vars
1✔
101

102
    def get_branding(self, skip, product):
1✔
103
        """Select the branding from the available 'system-release' packages
104
        The *best* way to control this is to have a single package in the repo provide 'system-release'
105
        When there are more than 1 package it will:
106
        - Make a list of the available packages
107
        - If variant is set look for a package ending with lower(variant) and use that
108
        - If there are one or more non-generic packages, use the first one after sorting
109

110
        Returns the package names of the system-release and release logos package
111
        """
112
        if skip:
1✔
113
            return DataHolder(release=None, logos=None)
1✔
114

115
        release = None
1✔
116
        query = dnf5.rpm.PackageQuery(self.dbo)
1✔
117
        query.filter_provides(["system-release"], EQ)
1✔
118
        pkgs = sorted([p for p in list(query)
1✔
119
                               if not p.get_name().startswith("generic")])
120
        if not pkgs:
1✔
121
            logger.error("No system-release packages found, could not get the release")
1✔
122
            return DataHolder(release=None, logos=None)
1✔
123

124
        logger.debug("system-release packages: %s", ",".join(p.get_name() for p in pkgs))
1✔
125
        if product.variant:
1✔
126
            variant = [p.get_name() for p in pkgs if p.get_name().endswith("-"+product.variant.lower())]
1✔
127
            if variant:
1✔
128
                release = variant[0]
1✔
129
        if not release:
1✔
130
            release = pkgs[0].get_name()
1✔
131

132
        # release
133
        logger.info('got release: %s', release)
1✔
134

135
        # logos uses the basename from release (fedora, redhat, centos, ...)
136
        logos, _suffix = release.split('-', 1)
1✔
137
        return DataHolder(release=release, logos=logos+"-logos")
1✔
138

139
    def install(self):
1✔
140
        '''Install packages and do initial setup with runtime-install.tmpl'''
141
        if self._branding.release:
×
142
            self._runner.installpkg(self._branding.release)
×
143
        if self._branding.logos:
×
144
            self._runner.installpkg(self._branding.logos)
×
145

146
        if len(self._installpkgs) > 0:
×
147
            self._runner.installpkg(*self._installpkgs)
×
148
        if len(self._excludepkgs) > 0:
×
149
            self._runner.removepkg(*self._excludepkgs)
×
150

151
        self._runner.run("runtime-install.tmpl")
×
152

153
        for tmpl in self.add_templates:
×
154
            self._runner.run(tmpl, **self.add_template_vars)
×
155

156
    def writepkglists(self, pkglistdir):
1✔
157
        '''debugging data: write out lists of package contents'''
158
        self._runner._writepkglists(pkglistdir)
×
159

160
    def writepkgsizes(self, pkgsizefile):
1✔
161
        '''debugging data: write a big list of pkg sizes'''
162
        self._runner._writepkgsizes(pkgsizefile)
×
163

164
    def postinstall(self):
1✔
165
        '''Do some post-install setup work with runtime-postinstall.tmpl'''
166
        # copy configdir into runtime root beforehand
167
        configdir = joinpaths(self._runner.templatedir,"config_files")
×
168
        configdir_path = "tmp/config_files"
×
169
        fullpath = joinpaths(self.vars.root, configdir_path)
×
170
        if os.path.exists(fullpath):
×
171
            remove(fullpath)
×
172
        copytree(configdir, fullpath)
×
173
        self._runner.run("runtime-postinstall.tmpl", configdir=configdir_path)
×
174

175
    def cleanup(self):
1✔
176
        '''Remove unneeded packages and files with runtime-cleanup.tmpl'''
177
        self._runner.run("runtime-cleanup.tmpl")
×
178

179
    def verify(self):
1✔
180
        '''Ensure that contents of the installroot can run'''
181
        status = True
×
182

183
        ELF_MAGIC = b'\x7fELF'
×
184

185
        # Iterate over all files in /usr/bin
186
        # NOTE: Fedora 42 has merged these into the same directory
187
        # For ELF files, gather them into a list and we'll check them all at
188
        # the end. For files with a #!, check them as we go
189
        elf_files = []
×
190
        for path in (str(x) for x in Path(self.vars.root + '/usr/bin').iterdir() \
×
191
                     if x.is_file()):
192
            with open(path, "rb") as f:
×
193
                magic = f.read(4)
×
194
                if magic == ELF_MAGIC:
×
195
                    # Save the path, minus the chroot prefix
196
                    elf_files.append(path[len(self.vars.root):])
×
197
                elif magic[:2] == b'#!':
×
198
                    # Reopen the file as text and read the first line.
199
                    # Open as latin-1 so that stray 8-bit characters don't make
200
                    # things blow up. We only really care about ASCII parts.
201
                    with open(path, "rt", encoding="latin-1") as f_text:
×
202
                        # Remove the #!, split on space, and take the first part
203
                        shabang = f_text.readline()[2:].split()[0]
×
204

205
                    # Does the path exist?
206
                    if not os.path.exists(self.vars.root + shabang):
×
207
                        logger.error('%s, needed by %s, does not exist', shabang, path)
×
208
                        status = False
×
209

210
        # Now, run ldd on all the ELF files
211
        # Just run ldd once on everything so it isn't logged a million times.
212
        # At least one thing in the list isn't going to be a dynamic executable,
213
        # so use execWithCapture to ignore the exit code.
214
        filename = ''
×
215
        for line in execWithCapture('ldd', elf_files, root=self.vars.root,
×
216
                log_output=False, filter_stderr=True).split('\n'):
217
            if line and not line[0].isspace():
×
218
                # New filename header, strip the : at the end and save
219
                filename = line[:-1]
×
220
            elif 'not found' in line:
×
221
                logger.error('%s, needed by %s, not found', line.split()[0], filename)
×
222
                status = False
×
223

224
        return status
×
225

226
    def generate_module_data(self):
1✔
227
        root = self.vars.root
×
228
        moddir = joinpaths(root, "lib/modules/")
×
229
        for kernel in findkernels(root=root):
×
230
            ksyms = joinpaths(root, "boot/System.map-%s" % kernel.version)
×
231
            logger.info("doing depmod and module-info for %s", kernel.version)
×
232
            runcmd(["depmod", "-a", "-F", ksyms, "-b", root, kernel.version])
×
233
            generate_module_info(moddir+kernel.version, outfile=moddir+"module-info")
×
234

235
    def create_squashfs_runtime(self, outfile="/var/tmp/squashfs.img", compression="xz", compressargs=None, size=2):
1✔
236
        """Create a plain squashfs runtime"""
237
        compressargs = compressargs or []
1✔
238
        os.makedirs(os.path.dirname(outfile))
1✔
239

240
        # squash the rootfs
241
        return imgutils.mksquashfs(self.vars.root, outfile, compression, compressargs)
1✔
242

243
    def create_ext4_runtime(self, outfile="/var/tmp/squashfs.img", compression="xz", compressargs=None, size=2):
1✔
244
        """Create a squashfs compressed ext4 runtime"""
245
        # make live rootfs image - must be named "LiveOS/rootfs.img" for dracut
246
        compressargs = compressargs or []
×
247
        workdir = joinpaths(os.path.dirname(outfile), "runtime-workdir")
×
248
        os.makedirs(joinpaths(workdir, "LiveOS"))
×
249

250
        # Catch problems with the rootfs being too small and clearly log them
251
        try:
×
252
            imgutils.mkrootfsimg(self.vars.root, joinpaths(workdir, "LiveOS/rootfs.img"),
×
253
                                 "Anaconda", size=size)
254
        except CalledProcessError as e:
×
255
            if e.stdout and "No space left on device" in e.stdout:
×
256
                logger.error("The rootfs ran out of space with size=%d", size)
×
257
            raise
×
258

259
        # squash the live rootfs and clean up workdir
260
        rc = imgutils.mksquashfs(workdir, outfile, compression, compressargs)
×
261
        remove(workdir)
×
262
        return rc
×
263

264
    def create_erofs_runtime(self, outfile="/var/tmp/erofs.img", compression="zstd", compressargs=None, size=2):
1✔
265
        """Create a plain erofs runtime"""
266
        compressargs = compressargs or []
1✔
267
        os.makedirs(os.path.dirname(outfile))
1✔
268

269
        # erofs the rootfs
270
        return imgutils.mkerofs(self.vars.root, outfile, compression, compressargs)
1✔
271

272
    def create_erofs_ext4_runtime(self, outfile="/var/tmp/erofs.img", compression="zstd", compressargs=None, size=2):
1✔
273
        """Create a erofs compressed ext4 runtime"""
274
        # make live rootfs image - must be named "LiveOS/rootfs.img" for dracut
275
        compressargs = compressargs or []
×
276
        workdir = joinpaths(os.path.dirname(outfile), "runtime-workdir")
×
277
        os.makedirs(joinpaths(workdir, "LiveOS"))
×
278

279
        # Catch problems with the rootfs being too small and clearly log them
280
        try:
×
281
            imgutils.mkrootfsimg(self.vars.root, joinpaths(workdir, "LiveOS/rootfs.img"),
×
282
                                 "Anaconda", size=size)
283
        except CalledProcessError as e:
×
284
            if e.stdout and "No space left on device" in e.stdout:
×
285
                logger.error("The rootfs ran out of space with size=%d", size)
×
286
            raise
×
287

288
        # compress the live rootfs and clean up workdir
289
        rc = imgutils.mkerofs(workdir, outfile, compression, compressargs)
×
290
        remove(workdir)
×
291
        return rc
×
292

293
    def finished(self):
1✔
294
        pass
×
295

296
class TreeBuilder(object):
1✔
297
    '''Builds the arch-specific boot images.
298
    inroot should be the installtree root (the newly-built runtime dir)'''
299
    def __init__(self, product, arch, inroot, outroot, runtime, isolabel, domacboot=True, doupgrade=True,
1✔
300
                 templatedir=None, add_templates=None, add_template_vars=None, workdir=None, extra_boot_args=""):
301

302
        # NOTE: if you pass an arg named "runtime" to a mako template it'll
303
        # clobber some mako internal variables - hence "runtime_img".
304
        self.vars = DataHolder(arch=arch, product=product, runtime_img=runtime,
1✔
305
                               runtime_base=basename(runtime),
306
                               inroot=inroot, outroot=outroot,
307
                               basearch=arch.basearch, libdir=arch.libdir,
308
                               isolabel=isolabel, udev=udev_escape, domacboot=domacboot, doupgrade=doupgrade,
309
                               workdir=workdir, lower=string_lower,
310
                               extra_boot_args=extra_boot_args)
311
        self._runner = LoraxTemplateRunner(inroot, outroot, templatedir=templatedir,
1✔
312
                                           basearch=arch.basearch)
313
        self._runner.defaults = self.vars
1✔
314
        self.add_templates = add_templates or []
1✔
315
        self.add_template_vars = add_template_vars or {}
1✔
316
        self.templatedir = templatedir
1✔
317
        self.treeinfo_data = None
1✔
318

319
    @property
1✔
320
    def kernels(self):
1✔
321
        return findkernels(root=self.vars.inroot)
×
322

323
    def rebuild_initrds(self, add_args=None, backup="", prefix=""):
1✔
324
        '''Rebuild all the initrds in the tree. If backup is specified, each
325
        initrd will be renamed with backup as a suffix before rebuilding.
326
        If backup is empty, the existing initrd files will be overwritten.
327
        If suffix is specified, the existing initrd is untouched and a new
328
        image is built with the filename "${prefix}-${kernel.version}.img"
329

330
        If the initrd doesn't exist its name will be created based on the
331
        name of the kernel.
332
        '''
333
        add_args = add_args or []
×
334
        args = ["--nomdadmconf", "--nolvmconf"] + add_args
×
335
        if not backup:
×
336
            args.append("--force")
×
337

338
        if not self.kernels:
×
339
            raise RuntimeError("No kernels found, cannot rebuild_initrds")
×
340

341
        with DracutChroot(self.vars.inroot) as dracut:
×
342
            for kernel in self.kernels:
×
343
                if prefix:
×
344
                    idir = os.path.dirname(kernel.path)
×
345
                    outfile = joinpaths(idir, prefix+'-'+kernel.version+'.img')
×
346
                elif hasattr(kernel, "initrd"):
×
347
                    # If there is an existing initrd, use that
348
                    outfile = kernel.initrd.path
×
349
                else:
350
                    # Construct an initrd from the kernel name
351
                    outfile = kernel.path.replace("vmlinuz-", "initrd-") + ".img"
×
352
                logger.info("rebuilding %s", outfile)
×
353

354
                if backup:
×
355
                    initrd = joinpaths(self.vars.inroot, outfile)
×
356
                    if os.path.exists(initrd):
×
357
                        os.rename(initrd, initrd + backup)
×
358
                dracut.Run(args + [outfile, kernel.version])
×
359

360
    def build(self):
1✔
361
        templatefile = templatemap[self.vars.arch.basearch]
×
362
        for tmpl in self.add_templates:
×
363
            self._runner.run(tmpl, **self.add_template_vars)
×
364
        self._runner.run(templatefile, kernels=self.kernels)
×
365
        self.treeinfo_data = self._runner.results.treeinfo
×
366
        self.implantisomd5()
×
367

368
    def implantisomd5(self):
1✔
369
        for _section, data in self.treeinfo_data.items():
×
370
            if 'boot.iso' in data:
×
371
                iso = joinpaths(self.vars.outroot, data['boot.iso'])
×
372
                runcmd(["implantisomd5", iso])
×
373

374
    @property
1✔
375
    def dracut_hooks_path(self):
1✔
376
        """ Return the path to the lorax dracut hooks scripts
377

378
            Use the configured share dir if it is setup,
379
            otherwise default to /usr/share/lorax/dracut_hooks
380
        """
381
        if self.templatedir:
×
382
            return joinpaths(self.templatedir, "dracut_hooks")
×
383
        else:
384
            return "/usr/share/lorax/dracut_hooks"
×
385

386
    def copy_dracut_hooks(self, hooks):
1✔
387
        """ Copy the hook scripts in hooks into the installroot's /tmp/
388
        and return a list of commands to pass to dracut when creating the
389
        initramfs
390

391
        hooks is a list of tuples with the name of the hook script and the
392
        target dracut hook directory
393
        (eg. [("99anaconda-copy-ks.sh", "/lib/dracut/hooks/pre-pivot")])
394
        """
395
        dracut_commands = []
×
396
        for hook_script, dracut_path in hooks:
×
397
            src = joinpaths(self.dracut_hooks_path, hook_script)
×
398
            if not os.path.exists(src):
×
399
                logger.error("Missing lorax dracut hook script %s", (src))
×
400
                continue
×
401
            dst = joinpaths(self.vars.inroot, "/tmp/", hook_script)
×
402
            copy2(src, dst)
×
403
            dracut_commands += ["--include", joinpaths("/tmp/", hook_script),
×
404
                                dracut_path]
405
        return dracut_commands
×
406

407
#### TreeBuilder helper functions
408

409
def findkernels(root="/", kdir="boot"):
1✔
410
    # To find possible flavors, awk '/BuildKernel/ { print $4 }' kernel.spec
411
    flavors = ('debug', 'PAE', 'PAEdebug', 'smp', 'xen', 'lpae')
1✔
412
    kre = re.compile(r"vmlinuz-(?P<version>.+?\.(?P<arch>[a-z0-9_]+)"
1✔
413
                     r"(.(?P<flavor>{0}))?)$".format("|".join(flavors)))
414
    kernels = []
1✔
415
    bootfiles = os.listdir(joinpaths(root, kdir))
1✔
416
    for f in bootfiles:
1✔
417
        match = kre.match(f)
1✔
418
        if match:
1✔
419
            kernel = DataHolder(path=joinpaths(kdir, f))
1✔
420
            kernel.update(match.groupdict()) # sets version, arch, flavor
1✔
421
            kernels.append(kernel)
1✔
422

423
    # look for associated initrd/initramfs/etc.
424
    for kernel in kernels:
1✔
425
        for f in bootfiles:
1✔
426
            if f.endswith('-'+kernel.version+'.img'):
1✔
427
                imgtype, _rest = f.split('-',1)
1✔
428
                # special backwards-compat case
429
                if imgtype == 'initramfs':
1!
430
                    imgtype = 'initrd'
1✔
431
                kernel[imgtype] = DataHolder(path=joinpaths(kdir, f))
1✔
432

433
    logger.debug("kernels=%s", kernels)
1✔
434
    return kernels
1✔
435

436
# udev whitelist: 'a-zA-Z0-9#+.:=@_-' (see is_whitelisted in libudev-util.c)
437
udev_blacklist=' !"$%&\'()*,/;<>?[\\]^`{|}~' # ASCII printable, minus whitelist
1✔
438
udev_blacklist += ''.join(chr(i) for i in range(32)) # ASCII non-printable
1✔
439
def udev_escape(label):
1✔
440
    out = ''
×
441
    for ch in label:
×
442
        out += ch if ch not in udev_blacklist else '\\x%02x' % ord(ch)
×
443
    return out
×
444

445
def string_lower(string):
1✔
446
    """ Return a lowercase string.
447

448
    :param string: String to lowercase
449

450
    This is used as a filter in the templates.
451
    """
452
    return string.lower()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc