• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

sequana / sequana_pipetools / 4075608515

pending completion
4075608515

Pull #51

github

GitHub
Merge 173da2f1c into fd3b61e95
Pull Request #51: Fix CI (remove conda)

4 of 4 new or added lines in 2 files covered. (100.0%)

1380 of 1520 relevant lines covered (90.79%)

2.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.51
/sequana_pipetools/sequana_manager.py
1
#
2
#  This file is part of Sequana software
3
#
4
#  Copyright (c) 2016-2021 - Sequana Dev Team (https://sequana.readthedocs.io)
5
#
6
#  Distributed under the terms of the 3-clause BSD license.
7
#  The full license is in the LICENSE file, distributed with this software.
8
#
9
#  Website:       https://github.com/sequana/sequana
10
#  Documentation: http://sequana.readthedocs.io
11
#  Contributors:  https://github.com/sequana/sequana/graphs/contributors
12
##############################################################################
13
import glob
3✔
14
import os
3✔
15
import shutil
3✔
16
import subprocess
3✔
17
import pkg_resources
3✔
18
import hashlib
3✔
19
import sys
3✔
20
import aiohttp
3✔
21
import asyncio
3✔
22
from shutil import which
3✔
23
from urllib.request import urlretrieve
3✔
24
from pathlib import Path
3✔
25

26
from tqdm.asyncio import tqdm
3✔
27

28
import colorlog
3✔
29
from deprecated import deprecated
3✔
30
from easydev import CustomConfig
3✔
31

32
from sequana_pipetools.snaketools.profile import create_profile
3✔
33

34
from .misc import Colors, PipetoolsException, print_version
3✔
35
from .snaketools import Module, SequanaConfig
3✔
36

37
logger = colorlog.getLogger(__name__)
3✔
38

39

40
class SequanaManager:
3✔
41
    def __init__(self, options, name="undefined"):
3✔
42
        """
43
        :param options: an instance of :class:`Options`
44
        :param name: name of the pipeline. Must be a Sequana pipeline already installed.
45

46
        options must be an object Options with at least the following attributes:
47

48
        ::
49

50
            class Options:
51
                level = 'INFO'
52
                version = False
53
                workdir = "fastqc"
54
                job=1
55
                force = True
56
                use_apptainer = False
57
                apptainer_prefix = ""
58
                def __init__(self):
59
                    pass
60
            from sequana_pipetools import SequanaManager
61
            o = Options()
62
            pm = SequanaManager(o, "fastqc")
63

64

65
        The working_directory is used to copy the pipeline in it.
66

67
        """
68
        # the logger must be defined here because from a pipeline, it may not
69
        # have been defined yet.
70
        try:
3✔
71
            logger.setLevel(options.level)
3✔
72
        except AttributeError:
3✔
73
            logger.warning("Your pipeline does not have a level option.")
3✔
74
            options.level = "INFO"
3✔
75

76
        self.options = options
3✔
77

78
        if self.options.version:
3✔
79
            print_version(name)
3✔
80
            sys.exit(0)
3✔
81

82
        self.name = name
3✔
83

84
        # handy printer
85
        self.colors = Colors()
3✔
86

87
        # load the pipeline (to check it is possible and if it is a pipeline)
88
        try:
3✔
89
            self.module = Module(f"pipeline:{self.name}")
3✔
90
        except ValueError:
3✔
91
            logger.error(f"{self.name} does not seem to be installed or is not a valid pipeline")
3✔
92
            sys.exit(1)
3✔
93
        self.module.check()
3✔
94
        # self.module.is_executable()
95

96
        # If this is a pipeline, let us load its config file
97
        # Do we start from an existing project with a valid config file
98
        config_name = os.path.basename(self.module.config)
3✔
99
        self.config = None
3✔
100
        if "from_project" in dir(options) and options.from_project:
3✔
101
            possible_filenames = (
3✔
102
                # from project tries to find a valid config.yaml
103
                # options.from_project,  # exact config file path
104
                f"{options.from_project}/{config_name}",  # config file in project path
105
                f"{options.from_project}/.sequana/{config_name}",  # config file in .sequana dir
106
            )
107
            for filename in possible_filenames:
3✔
108
                try:
3✔
109
                    self.config = SequanaConfig(filename)
3✔
110
                    logger.info(f"Reading existing config file {filename}")
3✔
111
                    break
3✔
112
                except FileNotFoundError:  # pragma: no cover
113
                    pass
114

115
            if not self.config:  # pragma: no cover
116
                raise FileNotFoundError(
117
                    "Could not find config.yaml in the project specified {}".format(options.from_project)
118
                )
119
        else:
120
            self.config = SequanaConfig(self.module.config)
3✔
121

122
        # the working directory
123
        self.workdir = Path(options.workdir)
3✔
124

125
        # define the data path of the pipeline
126
        self.datapath = self._get_package_location()
3✔
127

128
        # Set wrappers as attribute so that it may be changed by the
129
        # user/developer
130
        self.sequana_wrappers = os.environ.get(
3✔
131
            "SEQUANA_WRAPPERS", "https://raw.githubusercontent.com/sequana/sequana-wrappers/"
132
        )
133

134
        if self.options.apptainer_prefix:  # pragma: no cover
135
            self.apptainer_prefix = self.options.apptainer_prefix
136
            self.local_apptainers = False
137
        else:  # pragma: no cover
138
            self.apptainer_prefix = os.environ.get(
139
                "SEQUANA_SINGULARITY_PREFIX", f"{self.workdir}/.sequana/apptainers"
140
            )
141
            self.local_apptainers = True
142

143
    def exists(self, filename, exit_on_error=True, warning_only=False):  # pragma: no cover
144
        """This is a convenient function to check if a directory/file exists
145

146
        Used in the main.py of all pipelines when setting the working directory
147
        """
148
        if not os.path.exists(filename):
149
            if warning_only:
150
                logger.warning(f"{filename} file does not exists")
151
            else:
152
                logger.error(f"{filename} file does not exists")
153
                if exit_on_error:
154
                    sys.exit(1)
155
            return False
156
        return True
157

158
    def copy_requirements(self):
3✔
159
        # Copy is done by the sequana manager once at the creation of the
160
        # working directory. Should not be done after otherwise, if snakemake reads
161
        # the snakefile several times, copy_requirements may be called several times
162
        if "requirements" not in self.config.config:
3✔
163
            return
3✔
164

165
        for requirement in self.config.config.requirements:
3✔
166
            logger.info(f"Copying {requirement} file into {self.workdir}")
3✔
167
            if os.path.exists(requirement):
3✔
168
                try:
3✔
169
                    shutil.copy(requirement, self.workdir)
3✔
170
                except shutil.SameFileError:  # pragma: no cover
171
                    pass  # the target and input may be the same
172
            elif requirement.startswith("http"):
3✔
173
                logger.info(f"This file {requirement} will be needed. Downloading")
3✔
174
                output = requirement.split("/")[-1]
3✔
175
                urlretrieve(requirement, filename=self.workdir / output)
3✔
176

177
    def _get_package_location(self):
3✔
178
        import site
3✔
179

180
        for site_package in site.getsitepackages():
3✔
181
            pipeline_path = Path(site_package) / "sequana_pipelines" / self.name
3✔
182
            if pipeline_path.exists():
3✔
183
                return pipeline_path / "data"
3✔
184

185
            # if it does not exist, this may be a "develop" mode.
186
            pipeline_path = Path(site_package) / f"sequana-{self.name}.egg-link"
×
187
            if pipeline_path.exists():
×
188
                return pipeline_path / "data"
×
189

190
        logger.error(f"package provided ({self.name}) not installed.")
×
191
        raise PipetoolsException
×
192

193

194
    def _get_package_version(self):
3✔
195
        try:
3✔
196
            ver = pkg_resources.require("sequana_{}".format(self.name))[0].version
3✔
197
        except pkg_resources.DistributionNotFound:
×
198
            # check if the package exists
199
            ver = pkg_resources.require(self.name)[0].version
×
200
        return ver
3✔
201

202
    def _get_sequana_version(self):
3✔
203
        try:
3✔
204
            ver = pkg_resources.require("sequana".format(self.name))[0].version
3✔
205
            return ver
3✔
206
        except pkg_resources.DistributionNotFound:  # pragma: no cover
207
            return "not installed"
208

209
    def _guess_scheduler(self):
3✔
210

211
        if which("sbatch") and which("srun"):  # pragma: no cover
212
            return "slurm"
213
        else:
214
            return "local"
3✔
215

216
    def setup(self):
3✔
217
        """Initialise the pipeline.
218

219
        - Create a directory (usually named after the pipeline name)
220
        - Copy the pipeline and associated files (e.g. config file)
221
        - Create a script in the directory ready to use
222

223
        If there is a "requirements" section in your config file, it looks
224
        like::
225

226
            requirements:
227
                - path to file1
228
                - path to file2
229

230
        It means that those files will be required by the pipeline to run
231
        correctly. If the file exists, use it , otherwise look into
232
        the pipeline itself.
233

234
        """
235
        # First we create the beginning of the command with the optional
236
        # parameters for a run on a SLURM scheduler
237
        logger.info("Welcome to Sequana pipelines suite (https://sequana.readthedocs.io)")
3✔
238
        logger.info(" - Found an issue, have a question: https://tinyurl.com/2bh6frp2 ")
3✔
239
        logger.info(f" - more about this pipeline on https://github.com/sequana/{self.name} ")
3✔
240

241
        snakefilename = os.path.basename(self.module.snakefile)
3✔
242
        self.command = f"#!/bin/bash\nsnakemake -s {snakefilename} "
3✔
243

244
        if self.sequana_wrappers:
3✔
245
            self.command += f" --wrapper-prefix {self.sequana_wrappers} "
3✔
246
            logger.info(f"Using sequana-wrappers from {self.sequana_wrappers}")
3✔
247

248
        if self.options.use_apptainer:  # pragma: no cover
249

250
            # to which, we always add the binding to home directory
251
            home = str(Path.home())
252
            if os.path.exists("/pasteur"):
253
                apptainer_args = (
254
                    f"--singularity-args=' -B {home}:{home} -B /pasteur:/pasteur {self.options.apptainer_args}'"
255
                )
256
                self.command += f" --use-singularity {apptainer_args}"
257
            else:
258
                apptainer_args = f"--singularity-args=' -B {home}:{home}{self.options.apptainer_args}'"
259
                self.command += f" --use-singularity {apptainer_args}"
260

261
            # finally, the prefix where images are stored
262
            if self.local_apptainers:
263
                self.command += f" --singularity-prefix .sequana/apptainers"
264
            else:
265
                self.command += f" --singularity-prefix {self.apptainer_prefix} "
266

267
        # FIXME a job is not a core. Ideally, we should add a core option
268
        if self._guess_scheduler() == "local":
3✔
269
            self.command += " -p --cores {} ".format(self.options.jobs)
3✔
270
        else:
271
            self.command += " -p --jobs {}".format(self.options.jobs)
3✔
272

273
        if self.options.run_mode is None:
3✔
274
            self.options.run_mode = self._guess_scheduler()
×
275
            logger.debug("Guessed scheduler is {}".format(self.options.run_mode))
×
276

277
        if self.options.run_mode == "slurm":
3✔
278
            if self.options.slurm_queue == "common":
3✔
279
                slurm_queue = ""
×
280
            else:
281
                slurm_queue = " --qos {} -p {}".format(self.options.slurm_queue, self.options.slurm_queue)
3✔
282

283
            if self.module.cluster_config:
3✔
284
                self.command += ' --cluster "sbatch --mem={cluster.ram} --cpus-per-task={threads}"'
×
285
                self.command += " --cluster-config cluster_config.json "
×
286
            else:
287
                self.command += ' --cluster "sbatch --mem {} -c {} {}"'.format(
3✔
288
                    self.options.slurm_memory, self.options.slurm_cores_per_job, slurm_queue
289
                )
290

291
        # This should be in the setup, not in the teardown since we may want to
292
        # copy files when creating the pipeline. This is the case e.g. in the
293
        # rnaseq pipeline. It is a bit annoying since if there is failure
294
        # between setup and teardown, the directories are created but no way to
295
        # fix that.
296
        self._create_directories()
3✔
297

298
    def _create_directories(self):
3✔
299
        # Now we create the directory to store the config/pipeline
300
        if self.workdir.exists():
3✔
301
            if self.options.force:
3✔
302
                logger.warning(f"Path {self.workdir} exists already but you set --force to overwrite it")
3✔
303
            else:  # pragma: no cover
304
                logger.error(f"Output path {self.workdir} exists already. Use --force to overwrite")
305
                sys.exit()
306
        else:
307
            self.workdir.mkdir()
×
308

309
        # Now we create the directory to store some info in
310
        # working_directory/.sequana for book-keeping and reproducibility
311
        hidden_dir = self.workdir / ".sequana"
3✔
312
        if not hidden_dir.exists():
3✔
313
            hidden_dir.mkdir()
3✔
314

315
    def check_input_files(self, stop_on_error=True):
3✔
316
        # Sanity checks
317
        cfg = self.config.config
3✔
318

319
        filenames = glob.glob(cfg.input_directory + os.sep + cfg.input_pattern)
3✔
320
        logger.info(f"Found {len(filenames)} files matching your input  pattern ({cfg.input_pattern})")
3✔
321

322
        if len(filenames) == 0:
3✔
323
            logger.critical(f"Found no files with your matching pattern ({cfg.input_pattern}) in {cfg.input_directory}")
3✔
324
            if "*" not in cfg.input_pattern and "?" not in cfg.input_pattern:
3✔
325
                logger.critical("No wildcard used in your input pattern, please use a * or ? character")
×
326
            if stop_on_error:
3✔
327
                sys.exit(1)
3✔
328

329
    def teardown(self, check_schema=True, check_input_files=True):
3✔
330
        """Save all files required to run the pipeline and perform sanity checks
331

332

333
        We copy the following files into the working directory:
334

335
        * the config file (config.yaml)
336
        * a NAME.sh that contains the snakemake command
337
        * the Snakefile (NAME.rules)
338

339
        For book-keeping and some parts of the pipelines, we copied the config
340
        file and its snakefile into the .sequana directory. We also copy
341
        the logo.png file if present into this .sequana directory
342

343
        and if present:
344

345
        * the cluster_config configuration files for snakemake
346
        * multiqc_config file for mutliqc reports
347
        * the schema.yaml file used to check the content of the
348
          config.yaml file
349

350
        if the config.yaml contains a requirements section, the files requested
351
        are copied in the working directory
352

353
        """
354
        if check_input_files:
3✔
355
            self.check_input_files()
3✔
356

357
        # the config file
358
        self.config._update_yaml()
3✔
359
        config_name = os.path.basename(self.module.config)
3✔
360
        self.config.save(self.workdir / f".sequana/{config_name}")
3✔
361
        try:
3✔
362
            os.symlink(f".sequana/{config_name}", f"{self.workdir}/{config_name}")
3✔
363
        except FileExistsError:  # pragma: no cover
364
            pass
365

366
        # the final command
367
        command_file = self.workdir / f"{self.name}.sh"
3✔
368
        snakefilename = os.path.basename(self.module.snakefile)
3✔
369
        if self.options.run_mode == self.options.profile:
3✔
370
            # use profile command
371
            options = {
3✔
372
                "wrappers": self.sequana_wrappers,
373
                "jobs": self.options.jobs,
374
                "forceall": False,
375
                "use_apptainer": self.options.use_apptainer,
376
            }
377

378
            if self.options.use_apptainer:  # pragma: no cover
379
                if self.local_apptainers:
380
                    options["apptainer_prefix"] = ".sequana/apptainers"
381
                else:
382
                    options["apptainer_prefix"] = self.apptainer_prefix
383
                home = str(Path.home())
384
                options["apptainer_args"] = self.options.apptainer_args
385

386
                # specific to Institut Pasteur cluster.
387
                if os.path.exists("/pasteur"):
388
                    options["apptainer_args"] += f" -B {home}:{home} -B /pasteur:/pasteur"
389
                else:
390
                    options["apptainer_args"] += f" --apptainer-args ' -B {home}:{home} "
391
            else:
392
                options["apptainer_prefix"] = ""
3✔
393
                options["apptainer_args"] = ""
3✔
394

395
            if self.options.profile == "slurm":
3✔
396
                # add slurm options
397
                options.update(
3✔
398
                    {
399
                        "partition": "common",
400
                        "qos": "normal",
401
                        "memory": self.options.slurm_memory,
402
                    }
403
                )
404
                if self.options.slurm_queue != "common":
3✔
405
                    options.update({"partition": self.options.slurm_queue, "qos": self.options.slurm_queue})
3✔
406

407
            profile_dir = create_profile(self.workdir, self.options.profile, **options)
3✔
408
            command = f"#!/bin/bash\nsnakemake -s {snakefilename} --profile {profile_dir}"
3✔
409
            command_file.write_text(command)
3✔
410
        else:
411
            command_file.write_text(self.command)
3✔
412

413
        # the snakefile
414
        shutil.copy(self.module.snakefile, self.workdir / ".sequana")
3✔
415
        try:
3✔
416
            os.symlink(f".sequana/{snakefilename}", self.workdir / f"{snakefilename}")
3✔
417
        except FileExistsError:  # pragma: no cover
418
            pass
419

420
        # the logo if any
421
        if self.module.logo:
3✔
422
            shutil.copy(self.module.logo, self.workdir / ".sequana")
3✔
423

424
        # the cluster config if any
425
        if self.module.cluster_config:
3✔
426
            shutil.copy(self.module.cluster_config, self.workdir)
×
427

428
        # the multiqc if any
429
        if self.module.multiqc_config:
3✔
430
            shutil.copy(self.module.multiqc_config, self.workdir)
3✔
431

432
        # the rules if any
433
        if self.module.rules:
3✔
434
            try:
×
435
                shutil.copytree(self.module.rules, self.workdir / "rules")
×
436
            except FileExistsError:
×
437
                if self.options.force:
×
438
                    shutil.rmtree(self.workdir / "rules")
×
439
                    shutil.copytree(self.module.rules, self.workdir / "rules")
×
440
                pass
×
441

442
        # the schema if any
443
        if self.module.schema_config:
3✔
444
            schema_name = os.path.basename(self.module.schema_config)
3✔
445
            shutil.copy(self.module.schema_config, self.workdir)
3✔
446

447
            # This is the place where we can check the entire validity of the
448
            # inputs based on the schema
449
            if check_schema:
3✔
450
                cfg = SequanaConfig(f"{self.workdir}/{config_name}")
3✔
451
                cfg.check_config_with_schema(f"{self.workdir}/{schema_name}")
3✔
452

453
        # if --use-apptainer is set, we need to download images for the users
454
        # Sequana pipelines will store images in Zenodo website (via damona).
455
        # introspecting sections written as:
456
        # container:
457
        #     "https://...image.img"
458
        if self.options.use_apptainer:  # pragma: no cover
459
            self._download_zenodo_images()
460

461
        # finally, we copy the files be found in the requirements section of the
462
        # config file.
463
        self.copy_requirements()
3✔
464

465
        # some information
466
        msg = "Check the script in {}/{}.sh as well as "
3✔
467
        msg += f"the configuration file in {{}}/{config_name}.\n"
3✔
468
        print(self.colors.purple(msg.format(self.workdir, self.name, self.workdir)))
3✔
469

470
        msg = "Once ready, execute the script {}.sh using \n\n\t".format(self.name)
3✔
471
        if self.options.run_mode == "slurm":
3✔
472
            msg += "cd {}; sbatch {}.sh\n\n".format(self.workdir, self.name)
3✔
473
        else:
474
            msg += "cd {}; sh {}.sh\n\n".format(self.workdir, self.name)
3✔
475
        print(self.colors.purple(msg))
3✔
476

477
        # Save an info.txt with the command used
478
        with open(self.workdir / ".sequana" / "info.txt", "w") as fout:
3✔
479
            from . import version
3✔
480

481
            fout.write(f"# sequana_pipetools version: {version}\n")
3✔
482
            fout.write(f"# sequana_{self.name} version: {self._get_package_version()}\n")
3✔
483
            fout.write(f"# sequana version: {self._get_sequana_version()}\n")
3✔
484
            cmd1 = os.path.basename(sys.argv[0])
3✔
485
            fout.write(" ".join([cmd1] + sys.argv[1:]))
3✔
486

487
        # Save unlock.sh
488
        with open(self.workdir / "unlock.sh", "w") as fout:
3✔
489
            fout.write(f"#!/bin/sh\nsnakemake -s {snakefilename} --unlock -j 1")
3✔
490

491
        # save environement
492
        try:
3✔
493
            cmd = "conda list"
3✔
494
            with open("{}/.sequana/env.yml".format(self.workdir), "w") as fout:
3✔
495
                subprocess.call(cmd.split(), stdout=fout)
3✔
496
            logger.debug("Saved your conda environment into env.yml")
3✔
497
        except Exception:
×
498
            cmd = "pip freeze"
×
499
            with open("{}/.sequana/pip.yml".format(self.workdir), "w") as fout:
×
500
                subprocess.call(cmd.split(), stdout=fout)
×
501
            logger.debug("Saved your pip environement into pip.txt (conda not found)")
×
502

503
        # General information
504

505
        configuration = CustomConfig("sequana", verbose=False)
3✔
506
        sequana_config_path = configuration.user_config_dir
3✔
507
        completion = sequana_config_path + "/pipelines/{}.sh".format(self.name)
3✔
508
        if os.path.exists(completion):
3✔
509
            with open(completion, "r") as fin:
×
510
                line = fin.readline()
×
511
                if line.startswith("#version:"):
×
512
                    version = line.split("#version:")[1].strip()
×
513
                    version = version.replace(">=", "").replace(">", "")
×
514
                    from packaging.version import Version
×
515

516
                    if Version(version) < Version(self._get_package_version()):  # pragma: no cover
517
                        msg = (
518
                            "The version {} of your completion file for the {} pipeline seems older than the installed"
519
                            " pipeline itself ({}). Please, consider updating the completion file {}"
520
                            " using the following command: \n\t sequana_completion --name {}\n"
521
                            "available in the sequana_pipetools package (pip install sequana_completion)"
522
                        )
523
                        msg = msg.format(version, self.name, self._get_package_version(), completion, self.name)
524
                        logger.info(msg)
525

526
        else:
527
            logger.info("A completion if possible with sequana_completion --name {}".format(self.name))
3✔
528

529
    @deprecated(version="1.0", reason="will be removed soon. Not used.")
3✔
530
    def update_config(self, config, options, section_name):  # pragma: no cover
531
        for option_name in config[section_name]:
532
            try:
533
                config[section_name][option_name] = getattr(options, section_name + "_" + option_name)
534
            except AttributeError:
535
                logger.debug("update_config. Could not find {}".format(option_name))
536

537
    def _get_section_content(self, filename, section_name):
3✔
538
        """searching for a given section (e.g. container)
539

540
        and extrct its content. This is for simple cases where
541
        content is made of one line. Two cases are supported
542

543
        case 1 (two lines)::
544

545
            container:
546
                "https:...."
547

548
        case 2 (one line)
549

550
            container: "https...."
551

552
        comments starting with # are allowed.
553
        """
554
        assert section_name.endswith(":")
3✔
555

556
        contents = []
3✔
557
        with open(filename, "r") as fin:
3✔
558
            previous = ""
3✔
559
            for line in fin.readlines():
3✔
560
                if line.strip().startswith("#") or not line.strip():
3✔
561
                    pass
3✔
562
                # case 1
563
                elif section_name in line:
3✔
564
                    content = line.replace(section_name, "").strip()
3✔
565
                    content = content.strip('"').strip("'")
3✔
566
                    if content:  # case 2
3✔
567
                        contents.append(content)
×
568
                elif previous == section_name:
3✔
569
                    # case 1
570
                    content = line.replace(section_name, "").strip()
3✔
571
                    content = content.strip('"').strip("'")
3✔
572
                    contents.append(content)
3✔
573

574
                # this is for case 1
575
                previous = line.strip()
3✔
576
        return contents
3✔
577

578
    def _download_zenodo_images(self):  # pragma: no cover
579
        """
580
        Looking for container: section, this downloads all container that are
581
        online (starting with https). Recursive function that also looks into the
582
        ./rules directories based on the include: section found in the main
583
        Snakefile.
584

585
        """
586
        logger.info(f"You set --use-apptainer. Downloading containers into {self.apptainer_prefix}")
587
        # first get the urls in the main snakefile
588
        urls = self._get_section_content(self.module.snakefile, "container:")
589
        urls = [x for x in urls if x.startswith("http")]
590

591
        # second get the urls from sub-rules if any
592
        # do we have sub modules / includes ?
593
        included_files = self._get_section_content(self.module.snakefile, "include:")
594

595
        # included_files may include former modules from sequana. Need to keep only
596
        # actual files ending in .rules and .smk
597
        included_files = [x for x in included_files if x.endswith((".smk", ".rules"))]
598

599
        # for back compatibility, we scan the pipeline looking for container that start with http
600
        for included_file in included_files:
601
            suburls = self._get_section_content(Path(self.module.snakefile).parent / included_file, "container:")
602
            suburls = [x for x in suburls if x.startswith("http")]
603
            urls.extend(suburls)
604

605
        # but more generally, we wish to retrieve the containers URLs from the config file
606
        apps = self.config.config.get("apptainers", {})
607
        urls.extend((x for x in apps.values() if x.strip()))
608

609
        # make sure there are unique URLs
610
        urls = set(urls)
611

612
        # guarantess that output filename to be saved have the same
613
        # unique ID as those expected by snakemake
614
        def _hash(url):
615
            md5hash = hashlib.md5()
616
            md5hash.update(url.encode())
617
            return md5hash.hexdigest()
618

619
        os.makedirs(self.apptainer_prefix, exist_ok=True)
620

621
        count = 0
622
        files_to_download = []
623

624
        # define the URLs and the output filename. Also, remove urls that
625
        # have already been downloaded.
626
        for url in urls:
627
            name = _hash(url)
628
            outfile = f"{self.apptainer_prefix}/{name}.simg"
629
            if os.path.exists(outfile):
630
                logger.info(f"Found corresponding image of {url} in {outfile}")
631
            else:
632
                files_to_download.append((url, outfile, count))
633
                count += 1
634
                logger.info(f"Preparing {url} for download")
635

636
        try:  # try an asynchrone downloads
637
            multiple_downloads(files_to_download)
638
        except (KeyboardInterrupt, asyncio.TimeoutError):
639
            logger.info("The download was interruped or network was too slow. Removing partially downloaded files")
640
            for values in files_to_download:
641
                filename = values[1]
642
                Path(filename).unlink()
643
            logger.critical(
644
                "Keep going but your pipeline will probably not be fully executable since images could not be downloaded")
645

646

647
def multiple_downloads(files_to_download, timeout=3600):
3✔
648

649
    async def download(session, url, name, position):
3✔
650
        async with session.get(url, timeout=timeout) as resp:
3✔
651
            with tqdm.wrapattr(
3✔
652
                open(name, "wb"),
653
                "write",
654
                miniters=1,
655
                desc=url.split("/")[-1],
656
                total=int(resp.headers.get("content-length", 0)),
657
                position=position,
658
            ) as fout:
659
                async for chunk in resp.content.iter_chunked(4096):
3✔
660
                    fout.write(chunk)
3✔
661

662
    async def download_all(files_to_download):
3✔
663
        """data_to_download is a list of tuples
664
        each tuple contain the url to download, its output name, and a unique
665
        position for the progress bar."""
666
        async with aiohttp.ClientSession() as session:
3✔
667
            await asyncio.gather(*(download(session, *data) for data in files_to_download))
3✔
668

669
    asyncio.run(download_all(files_to_download))
3✔
670

671

672
def get_pipeline_location(pipeline_name):
3✔
673
    class Opt:
3✔
674
        pass
3✔
675

676
    options = Opt()
3✔
677
    options.workdir = "."
3✔
678
    options.version = False
3✔
679
    options.apptainer_prefix = ""
3✔
680
    p = SequanaManager(options, pipeline_name)
3✔
681
    return p._get_package_location()
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc