• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 18150794202

28 Sep 2025 08:01PM UTC coverage: 89.297%. Remained the same
18150794202

push

github

Situphen
Corrige heading_shift pour l'export en LaTeX

3072 of 4110 branches covered (74.74%)

1 of 1 new or added line in 1 file covered. (100.0%)

27 existing lines in 2 files now uncovered.

16820 of 18836 relevant lines covered (89.3%)

1.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.49
/zds/tutorialv2/publication_utils.py
1
import contextlib
3✔
2
import copy
3✔
3
import logging
3✔
4
import os
3✔
5
import shutil
3✔
6
import subprocess
3✔
7
import zipfile
3✔
8
from datetime import datetime
3✔
9
from os import makedirs, path
3✔
10
from pathlib import Path
3✔
11

12
import requests
3✔
13
from django.conf import settings
3✔
14
from django.core.exceptions import ObjectDoesNotExist
3✔
15
from django.template.defaultfilters import date
3✔
16
from django.template.loader import render_to_string
3✔
17
from django.utils import translation
3✔
18
from django.utils.translation import gettext_lazy as _
3✔
19

20
from zds.forum.utils import lock_topic, send_post
3✔
21
from zds.tutorialv2 import signals
3✔
22
from zds.tutorialv2.epub_utils import build_ebook
3✔
23
from zds.tutorialv2.models.database import ContentReaction, PublicationEvent, PublishedContent
3✔
24
from zds.tutorialv2.publish_container import publish_use_manifest
3✔
25
from zds.tutorialv2.signals import content_unpublished
3✔
26
from zds.tutorialv2.utils import export_content
3✔
27
from zds.utils.templatetags.emarkdown import render_markdown
3✔
28
from zds.utils.templatetags.smileys_def import LICENSES_BASE_PATH, SMILEYS_BASE_PATH
3✔
29

30
logger = logging.getLogger(__name__)
3✔
31
licences = {
3✔
32
    "by-nc-nd": "by-nc-nd.svg",
33
    "by-nc-sa": "by-nc-sa.svg",
34
    "by-nc": "by-nc.svg",
35
    "by-nd": "by-nd.svg",
36
    "by-sa": "by-sa.svg",
37
    "by": "by.svg",
38
    "0": "0.svg",
39
    "copyright": "copyright.svg",
40
}
41

42

43
def notify_update(db_object, is_update, is_major):
3✔
44
    if not is_update or is_major:
2✔
45
        # Follow
46
        signals.content_published.send(sender=db_object.__class__, instance=db_object, by_email=False)
2✔
47

48

49
def publish_content(db_object, versioned, is_major_update=True):
3✔
50
    """
51
    Publish a given content.
52

53
    .. note::
54
        create a manifest.json without the introduction and conclusion if not needed. Also remove the 'text' field
55
        of extracts.
56

57
    :param db_object: Database representation of the content
58
    :type db_object: zds.tutorialv2.models.database.PublishableContent
59
    :param versioned: version of the content to publish
60
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
61
    :param is_major_update: if set to `True`, will update the publication date
62
    :type is_major_update: bool
63
    :raise FailureDuringPublication: if something goes wrong
64
    :return: the published representation
65
    :rtype: zds.tutorialv2.models.database.PublishedContent
66
    """
67

68
    from zds.tutorialv2.models.database import PublishedContent
3✔
69

70
    if is_major_update:
3✔
71
        versioned.pubdate = datetime.now()
3✔
72

73
    # First write the files to a temporary directory: if anything goes wrong,
74
    # the last published version is not impacted !
75
    tmp_path = path.join(settings.ZDS_APP["content"]["repo_public_path"], versioned.slug + "__building")
3✔
76
    if path.exists(tmp_path):
3✔
77
        shutil.rmtree(tmp_path)  # remove previous attempt, if any
3✔
78

79
    # render HTML:
80
    altered_version = copy.deepcopy(versioned)
3✔
81
    char_count = publish_use_manifest(db_object, tmp_path, altered_version)
3✔
82
    altered_version.dump_json(path.join(tmp_path, "manifest.json"))
3✔
83

84
    # make room for 'extra contents'
85
    build_extra_contents_path = path.join(tmp_path, settings.ZDS_APP["content"]["extra_contents_dirname"])
3✔
86
    makedirs(build_extra_contents_path)
3✔
87
    base_name = path.join(build_extra_contents_path, versioned.slug)
3✔
88

89
    # 1. markdown file (base for the others) :
90
    # If we come from a command line, we need to activate i18n, to have the date in the french language.
91
    cur_language = translation.get_language()
3✔
92
    altered_version.pubdate = datetime.now()
3✔
93

94
    md_file_path = base_name + ".md"
3✔
95
    Path(Path(md_file_path).parent, "images").mkdir(exist_ok=True)
3✔
96
    is_update = False
3✔
97

98
    if db_object.public_version:
3✔
99
        public_version = update_existing_publication(db_object, versioned)
3✔
100
        is_update = True
3✔
101
    else:
102
        public_version = PublishedContent()
3✔
103

104
    # make the new public version
105
    public_version.content_public_slug = versioned.slug
3✔
106
    public_version.content_type = versioned.type
3✔
107
    public_version.content_pk = db_object.pk
3✔
108
    public_version.content = db_object
3✔
109
    public_version.char_count = char_count
3✔
110
    public_version.save()
3✔
111
    makedirs(public_version.get_extra_contents_directory(), exist_ok=True)
3✔
112
    if is_major_update or not is_update:
3✔
113
        public_version.publication_date = datetime.now()
3✔
114
    elif is_update:
2!
115
        public_version.update_date = datetime.now()
2✔
116
    public_version.sha_public = versioned.current_version
3✔
117
    public_version.save(update_fields=["publication_date", "update_date", "sha_public"])
3✔
118

119
    public_version.authors.clear()
3✔
120
    for author in db_object.authors.all():
3✔
121
        public_version.authors.add(author)
3✔
122

123
    # this puts the manifest.json and base json file on the prod path.
124
    shutil.rmtree(public_version.get_prod_path(), ignore_errors=True)
3✔
125
    shutil.copytree(tmp_path, public_version.get_prod_path())
3✔
126
    db_object.sha_public = versioned.current_version
3✔
127
    public_version.save()
3✔
128
    if settings.ZDS_APP["content"]["extra_content_generation_policy"] == "SYNC":
3✔
129
        # ok, now we can really publish the thing!
130
        generate_external_content(base_name, build_extra_contents_path, md_file_path, versioned=versioned)
3✔
131
    elif settings.ZDS_APP["content"]["extra_content_generation_policy"] == "WATCHDOG":
3✔
132
        PublicatorRegistry.get("watchdog").publish(md_file_path, base_name, silently_pass=False)
2✔
133

134
    return public_version
3✔
135

136

137
def update_existing_publication(db_object, versioned):
3✔
138
    public_version = db_object.public_version
3✔
139
    # the content has been published in the past, so we will clean up old files!
140
    old_path = public_version.get_prod_path()
3✔
141

142
    # if the slug has changed, create a new object instead of reusing the old one
143
    # this allows us to handle permanent redirection so that SEO is not impacted.
144
    if versioned.slug != public_version.content_public_slug:
3✔
145
        public_version.must_redirect = True  # set redirection
2✔
146
        public_version.save(update_fields=["must_redirect"])
2✔
147
        publication_date = public_version.publication_date
2✔
148
        db_object.public_version = PublishedContent()
2✔
149
        public_version = db_object.public_version
2✔
150

151
        # keep the same publication date if the content is already published
152
        public_version.publication_date = publication_date
2✔
153

154
    # remove old files only if everything succeed so far: if something bad
155
    # happened, we don't want to have a published content without content!
156
    logging.getLogger(__name__).debug("erase " + old_path)
3✔
157
    shutil.rmtree(old_path)
3✔
158

159
    return public_version
3✔
160

161

162
def write_md_file(md_file_path, parsed_with_local_images, versioned):
3✔
163
    with open(md_file_path, "w", encoding="utf-8") as md_file:
3✔
164
        try:
3✔
165
            md_file.write(parsed_with_local_images)
3✔
166
        except UnicodeError:
×
167
            logger.error("Could not encode %s in UTF-8, publication aborted", versioned.title)
×
168
            raise FailureDuringPublication(
×
169
                _(
170
                    "Une erreur est survenue durant la génération du fichier markdown "
171
                    "à télécharger, vérifiez le code markdown"
172
                )
173
            )
174

175

176
def generate_external_content(base_name, extra_contents_path, md_file_path, excluded=None, **kwargs):
3✔
177
    """
178
    generate all static file that allow offline access to content
179

180
    :param base_name: base nae of file (without extension)
181
    :param extra_contents_path: internal directory where all files will be pushed
182
    :param md_file_path: bundled markdown file path
183
    :param excluded: list of excluded format, None if no exclusion
184
    """
185
    excluded = excluded or ["watchdog"]
3✔
186
    for publicator_name, publicator in PublicatorRegistry.get_all_registered(excluded):
3✔
187
        try:
3✔
188
            publicator.publish(
3✔
189
                md_file_path,
190
                base_name,
191
                change_dir=extra_contents_path,
192
                cur_language=translation.get_language(),
193
                **kwargs,
194
            )
195
        except (FailureDuringPublication, OSError):
3✔
196
            logging.getLogger(__name__).exception(
3✔
197
                "Could not publish %s format from %s base.", publicator_name, md_file_path
198
            )
199

200

201
class PublicatorRegistry:
3✔
202
    """
203
    Register all publicator as a 'human-readable name/publicator' instance key/value list
204
    """
205

206
    registry = {}
3✔
207

208
    @classmethod
3✔
209
    def register(cls, publicator_name, *args):
3✔
210
        def decorated(func):
3✔
211
            cls.registry[publicator_name] = func(*args)
3✔
212
            return func
3✔
213

214
        return decorated
3✔
215

216
    @classmethod
3✔
217
    def get_all_registered(cls, exclude=None):
3✔
218
        """
219
        Args:
220
            exclude: A list of excluded publicator
221

222
        Returns:
223
        """
224
        if exclude is None:
3✔
225
            exclude = []
1✔
226
        order_key = {
3✔
227
            "zip": 1,
228
            "md": 2,
229
            "html": 3,
230
            "epub": 4,
231
            "pdf": 5,
232
        }
233
        for key, value in sorted(cls.registry.items(), key=lambda k: order_key.get(k[0], 42)):
3✔
234
            if key not in exclude:
3✔
235
                yield key, value
3✔
236

237
    @classmethod
3✔
238
    def unregister(cls, name):
3✔
239
        """
240
        Remove registered Publicator named 'name' if present
241

242
        :param name: publicator name.
243
        """
244
        if name in cls.registry:
×
245
            del cls.registry[name]
×
246

247
    @classmethod
3✔
248
    def get(cls, name):
3✔
249
        """
250
        Get publicator named 'name'.
251

252
        :param name:
253
        :return: the wanted publicator
254
        :rtype: Publicator
255
        :raise KeyError: if publicator is not registered
256
        """
257
        return cls.registry[name]
2✔
258

259

260
class Publicator:
3✔
261
    """
262
    Publicator base object, all methods must be overridden
263
    """
264

265
    def publish(self, md_file_path, base_name, **kwargs):
3✔
266
        """
267
        Function called to generate a content export
268

269
        :param md_file_path: base markdown file path
270
        :param base_name: file name without extension
271
        :param kwargs: other publicator dependent options
272
        """
273
        raise NotImplementedError()
×
274

275
    def get_published_content_entity(self, md_file_path) -> PublishedContent:
3✔
276
        """
277
        Retrieve the db entity from mdfile path
278

279
        :param md_file_path: mdfile path as string
280
        :type md_file_path: str
281
        :return: the db entity
282
        :rtype: zds.tutorialv2.models.models_database.PublishedContent
283
        """
284
        content_slug = PublishedContent.get_slug_from_file_path(md_file_path)
3✔
285
        published_content_entity = PublishedContent.objects.filter(content_public_slug=content_slug).first()
3✔
286
        return published_content_entity
3✔
287

288

289
@PublicatorRegistry.register("md")
3✔
290
class MarkdownPublicator(Publicator):
3✔
291
    def publish(self, md_file_path, base_name, *, cur_language=settings.LANGUAGE_CODE, **kwargs):
3✔
292
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
293
        versioned = kwargs.pop("versioned", None)
3✔
294
        if not versioned:
3✔
295
            # do not use load_public_version as it lacks of information to get the content
296
            # if you use it you will only get titles, without the text
297
            versioned = published_content_entity.content.load_version(sha=published_content_entity.sha_public)
1✔
298
        try:
3✔
299
            translation.activate(settings.LANGUAGE_CODE)
3✔
300
            parsed = render_to_string("tutorialv2/export/content.md", {"content": versioned})
3✔
301
        except requests.exceptions.HTTPError:
×
302
            raise FailureDuringPublication("Could not publish flat markdown")
×
303
        finally:
304
            translation.activate(cur_language)
3✔
305

306
        write_md_file(md_file_path, parsed, versioned)
3✔
307
        if "__building" in md_file_path:
3✔
308
            shutil.copy2(md_file_path, md_file_path.replace("__building", ""))
3✔
309

310

311
def _read_flat_markdown(md_file_path):
3✔
312
    with open(md_file_path, encoding="utf-8") as md_file_handler:
×
313
        md_flat_content = md_file_handler.read()
×
314
    return md_flat_content
×
315

316

317
@PublicatorRegistry.register("zip")
3✔
318
class ZipPublicator(Publicator):
3✔
319
    def publish(self, md_file_path, base_name, **kwargs):
3✔
320
        try:
3✔
321
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
322
            if published_content_entity is None:
3!
323
                raise ValueError("published_content_entity is None")
×
324
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_zip"]:
3!
325
                logger.info("ZIP not allowed for opinions.")
×
326
                return
×
327
            make_zip_file(published_content_entity)
3✔
328
            # no need to move zip file because it is already dumped to the public directory
329
        except (OSError, ValueError) as e:
×
330
            raise FailureDuringPublication("Zip could not be created", e)
×
331

332

333
@PublicatorRegistry.register("pdf")
3✔
334
class ZMarkdownRebberLatexPublicator(Publicator):
3✔
335
    """
336
    Use zmarkdown and rebber stringifier to produce latex & pdf output.
337
    """
338

339
    def __init__(self, extension=".pdf", latex_classes=""):
3✔
340
        self.extension = extension
3✔
341
        self.doc_type = extension[1:]
3✔
342
        self.latex_classes = latex_classes
3✔
343

344
    def publish(self, md_file_path, base_name, **kwargs):
3✔
345
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
346
        if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_pdf"]:
3!
347
            logger.info("PDF not allowed for opinions")
×
348
            return
×
349
        gallery_pk = published_content_entity.content.gallery.pk
3✔
350
        depth_to_size_map = {
3✔
351
            1: "small",  # in fact this is an "empty" tutorial (i.e it is empty or has intro and/or conclusion)
352
            2: "small",
353
            3: "middle",
354
            4: "big",
355
        }
356
        public_versionned_source = published_content_entity.content.load_version(
3✔
357
            sha=published_content_entity.sha_public
358
        )
359
        base_directory = Path(base_name).parent
3✔
360
        image_dir = base_directory / "images"
3✔
361
        image_dir.mkdir(parents=True, exist_ok=True)
3✔
362
        if (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).exists():
3✔
363
            for image in (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).iterdir():
3!
364
                with contextlib.suppress(OSError):
×
365
                    shutil.copy2(str(image.absolute()), str(image_dir))
×
366
        content_type = depth_to_size_map[public_versionned_source.get_tree_level()]
3✔
367
        if self.latex_classes:
3!
368
            content_type += ", " + self.latex_classes
×
369
        title = published_content_entity.title()
3✔
370
        authors = [a.username for a in published_content_entity.authors.all()]
3✔
371

372
        licence = published_content_entity.content.licence.code
3✔
373
        licence_short = licence.replace("CC", "").strip().lower()
3✔
374
        licence_logo = licences.get(licence_short, False)
3✔
375
        if licence_logo:
3✔
376
            licence_url = f"https://creativecommons.org/licenses/{licence_short}/4.0/legalcode"
2✔
377
            # we need a specific case for CC-0 as it is "public-domain"
378
            if licence_logo == licences["0"]:
2!
379
                licence_url = "https://creativecommons.org/publicdomain/zero/1.0/legalcode.fr"
×
380
        else:
381
            licence = str(_("Tous droits réservés"))
3✔
382
            licence_logo = licences["copyright"]
3✔
383
            licence_url = ""
3✔
384

385
        replacement_image_url = str(settings.MEDIA_ROOT.parent)
3✔
386
        if not replacement_image_url.endswith("/"):
3!
387
            replacement_image_url += "/"
3✔
388
        replaced_media_url = settings.MEDIA_URL
3✔
389
        if replaced_media_url.startswith("/"):
3!
390
            replaced_media_url = replaced_media_url[1:]
3✔
391
        exported = export_content(public_versionned_source, with_text=True, ready_to_publish_only=True)
3✔
392
        # no title to avoid zmd to put it on the final latex
393
        del exported["title"]
3✔
394
        content, metadata, messages = render_markdown(
3✔
395
            exported,
396
            output_format="texfile",
397
            # latex template arguments
398
            content_type=content_type,
399
            title=title,
400
            authors=authors,
401
            license=licence,
402
            license_directory=str(LICENSES_BASE_PATH),
403
            license_logo=licence_logo,
404
            license_url=licence_url,
405
            smileys_directory=str(SMILEYS_BASE_PATH / "svg"),
406
            images_download_dir=str(base_directory / "images"),
407
            local_url_to_local_path=["/", replacement_image_url],
408
            date=date(published_content_entity.last_publication_date, "l d F Y"),
409
        )
410
        if content == "" and messages:
3!
UNCOV
411
            raise FailureDuringPublication(f"Markdown was not parsed due to {messages}")
×
412
        zmd_class_dir_path = Path(settings.ZDS_APP["content"]["latex_template_repo"])
3✔
413
        content.replace(replacement_image_url + replaced_media_url, replacement_image_url)
3✔
414
        if zmd_class_dir_path.exists() and zmd_class_dir_path.is_dir():
3!
415
            with contextlib.suppress(FileExistsError):
×
416
                zmd_class_link = base_directory / "zmdocument.cls"
×
417
                zmd_class_link.symlink_to(zmd_class_dir_path / "zmdocument.cls")
×
418
                luatex_dir_link = base_directory / "utf8.lua"
×
UNCOV
419
                luatex_dir_link.symlink_to(zmd_class_dir_path / "utf8.lua", target_is_directory=True)
×
420
        true_latex_extension = ".".join(self.extension.split(".")[:-1]) + ".tex"
3✔
421
        latex_file_path = base_name + true_latex_extension
3✔
422
        pdf_file_path = base_name + self.extension
3✔
423
        default_logo_original_path = Path(__file__).parent / ".." / ".." / "assets" / "images" / "logo@2x.png"
3✔
424
        with contextlib.suppress(FileExistsError):
3✔
425
            shutil.copy(str(default_logo_original_path), str(base_directory / "default_logo.png"))
3✔
426
        with open(latex_file_path, mode="w", encoding="utf-8") as latex_file:
3✔
427
            latex_file.write(content)
3✔
428
        shutil.copy2(latex_file_path, published_content_entity.get_extra_contents_directory())
3✔
429

430
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
3✔
431
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
×
432
        self.make_glossary(base_name.split("/")[-1], latex_file_path)
×
UNCOV
433
        self.full_tex_compiler_call(latex_file_path)
×
434

UNCOV
435
        shutil.copy2(pdf_file_path, published_content_entity.get_extra_contents_directory())
×
436

437
    def full_tex_compiler_call(self, latex_file, draftmode: str = ""):
3✔
438
        success_flag = self.tex_compiler(latex_file, draftmode)
3✔
439
        if not success_flag:
3!
440
            handle_tex_compiler_error(latex_file, self.extension)
3✔
441

442
    def handle_makeglossaries_error(self, latex_file):
3✔
443
        with open(path.splitext(latex_file)[0] + ".log") as latex_log:
×
444
            errors = "\n".join(filter(line for line in latex_log if "fatal" in line.lower() or "error" in line.lower()))
×
UNCOV
445
        raise FailureDuringPublication(errors)
×
446

447
    def tex_compiler(self, texfile, draftmode: str = ""):
3✔
448
        command = f"lualatex -shell-escape -interaction=nonstopmode {draftmode} {texfile}"
3✔
449
        command_process = subprocess.Popen(
3✔
450
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
451
        )
452
        # let's put 10 min of timeout because we do not generate latex everyday
453
        command_process.communicate(timeout=600)
3✔
454

455
        pdf_file_path = path.splitext(texfile)[0] + self.extension
3✔
456
        return path.exists(pdf_file_path)
3✔
457

458
    def make_glossary(self, basename, texfile):
3✔
459
        command = f"makeglossaries {basename}"
×
UNCOV
460
        command_process = subprocess.Popen(
×
461
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
462
        )
UNCOV
463
        std_out, std_err = command_process.communicate()
×
464

465
        # TODO: check makeglossary exit codes to see if we can enhance error detection
466
        if "fatal" not in std_out.decode("utf-8").lower() and "fatal" not in std_err.decode("utf-8").lower():
×
UNCOV
467
            return True
×
468

UNCOV
469
        self.handle_makeglossaries_error(texfile)
×
470

471

472
def handle_tex_compiler_error(latex_file_path, ext):
3✔
473
    # TODO zmd: fix extension parsing
474
    log_file_path = latex_file_path[:-3] + "log"
3✔
475
    errors = [f"Error occured, log file {log_file_path} not found."]
3✔
476
    with contextlib.suppress(FileNotFoundError, UnicodeDecodeError):
3✔
477
        with Path(log_file_path).open(encoding="utf-8") as latex_log:
3✔
478
            print_context = 25
×
479
            lines = []
×
480
            relevant_line = -print_context
×
481
            for idx, line in enumerate(latex_log):
×
482
                if "fatal" in line.lower() or "error" in line.lower():
×
483
                    relevant_line = idx
×
484
                    lines.append(line)
×
485
                elif idx - relevant_line < print_context:
×
UNCOV
486
                    lines.append(line)
×
487

UNCOV
488
            errors = "\n".join(lines)
×
489
    logger.debug("%s ext=%s", errors, ext)
3✔
490

491
    raise FailureDuringPublication(errors)
3✔
492

493

494
@PublicatorRegistry.register("epub")
3✔
495
class ZMarkdownEpubPublicator(Publicator):
3✔
496
    def publish(self, md_file_path, base_name, **kwargs):
3✔
497
        try:
3✔
498
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
499
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_epub"]:
3!
500
                logger.info("EPUB not allowed for opinions")
×
UNCOV
501
                return
×
502
            epub_file_path = Path(base_name + ".epub")
3✔
503
            logger.info("Start generating epub")
3✔
504
            build_ebook(published_content_entity, path.dirname(md_file_path), epub_file_path)
3✔
505
        except (OSError, requests.exceptions.HTTPError):
1✔
506
            raise FailureDuringPublication("Error while generating epub file.")
1✔
507
        else:
508
            logger.info(epub_file_path)
3✔
509
            epub_path = Path(published_content_entity.get_extra_contents_directory(), Path(epub_file_path.name))
3✔
510
            if epub_path.exists():
3!
UNCOV
511
                os.remove(str(epub_path))
×
512
            epub_path.parent.mkdir(parents=True, exist_ok=True)
3✔
513
            logger.info(
3✔
514
                "created %s. moving it to %s", epub_file_path, published_content_entity.get_extra_contents_directory()
515
            )
516
            shutil.move(str(epub_file_path), published_content_entity.get_extra_contents_directory())
3✔
517

518

519
@PublicatorRegistry.register("watchdog")
3✔
520
class WatchdogFilePublicator(Publicator):
3✔
521
    def publish(self, md_file_path, base_name, silently_pass=True, **kwargs):
3✔
522
        if silently_pass:
2!
UNCOV
523
            return
×
524
        published_content = self.get_published_content_entity(md_file_path)
2✔
525
        self.publish_from_published_content(published_content)
2✔
526

527
    def publish_from_published_content(self, published_content: PublishedContent):
3✔
528
        for requested_format in PublicatorRegistry.get_all_registered(["watchdog"]):
2✔
529
            # Remove previous PublicationEvent for this content, not handled by
530
            # the publication watchdog yet:
531
            PublicationEvent.objects.filter(
2✔
532
                state_of_processing="REQUESTED",
533
                published_object__content_pk=published_content.content_pk,
534
                format_requested=requested_format[0],
535
            ).delete()
536
            PublicationEvent.objects.create(
2✔
537
                state_of_processing="REQUESTED",
538
                published_object=published_content,
539
                format_requested=requested_format[0],
540
            )
541

542

543
class FailureDuringPublication(Exception):
3✔
544
    """Exception raised if something goes wrong during publication process"""
545

546
    def __init__(self, *args, **kwargs):
3✔
547
        super().__init__(*args, **kwargs)
3✔
548

549

550
def make_zip_file(published_content):
3✔
551
    """Create the zip archive extra content from the published content
552

553
    :param published_content: a PublishedContent object
554
    :return:
555
    """
556

557
    publishable = published_content.content
3✔
558
    # update SHA so that archive gets updated too
559
    publishable.sha_public = publishable.sha_draft
3✔
560
    file_path = path.join(
3✔
561
        published_content.get_extra_contents_directory(), published_content.content_public_slug + ".zip"
562
    )
563
    zip_file = zipfile.ZipFile(file_path, "w")
3✔
564
    versioned = publishable.load_version(None, True)
3✔
565
    from zds.tutorialv2.views.archives import DownloadContent
3✔
566

567
    DownloadContent.insert_into_zip(zip_file, versioned.repository.commit(versioned.current_version).tree)
3✔
568
    zip_file.close()
3✔
569
    return file_path
3✔
570

571

572
def unpublish_content(db_object, moderator=None):
3✔
573
    """
574
    Remove the given content from the public view.
575

576
    .. note::
577
        This will send content_unpublished event.
578

579
    :param db_object: Database representation of the content
580
    :type db_object: PublishableContent
581
    :param moderator: the staff user who triggered the unpublish action.
582
    :type moderator: django.contrib.auth.models.User
583
    :return: ``True`` if unpublished, ``False`` otherwise
584
    :rtype: bool
585
    """
586

587
    from zds.tutorialv2.models.database import PublishedContent
2✔
588

589
    with contextlib.suppress(ObjectDoesNotExist, OSError):
2✔
590
        public_version = PublishedContent.objects.get(pk=db_object.public_version.pk)
2✔
591

592
        results = [
2✔
593
            content_unpublished.send(
594
                sender=reaction.__class__, instance=db_object, target=ContentReaction, moderator=moderator, user=None
595
            )
596
            for reaction in [ContentReaction.objects.filter(related_content=db_object).all()]
597
        ]
598
        logging.debug("Nb_messages=%d, messages=%s", len(results), results)
2✔
599
        # remove public_version:
600
        public_version.delete()
2✔
601
        update_params = {"public_version": None}
2✔
602

603
        if db_object.is_opinion:
2✔
604
            update_params["sha_public"] = None
1✔
605
            update_params["sha_picked"] = None
1✔
606
            update_params["pubdate"] = None
1✔
607

608
        db_object.update(**update_params)
2✔
609
        content_unpublished.send(
2✔
610
            sender=db_object.__class__, instance=db_object, target=db_object.__class__, moderator=moderator
611
        )
612
        # clean files
613
        old_path = public_version.get_prod_path()
2✔
614
        public_version.content.update(public_version=None, sha_public=None)
2✔
615
        if path.exists(old_path):
2!
616
            shutil.rmtree(old_path)
2✔
617
        return True
2✔
618

UNCOV
619
    return False
×
620

621

622
def close_article_beta(db_object, versioned, user, request=None):
3✔
623
    """
624
    Close forum topic of an article if the artcle was in beta.
625
    :param db_object: the article
626
    :type db_object: zds.tutorialv2.models.database.PublishableContent
627
    :param versioned: the public version of article, used to pupulate closing post
628
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
629
    :param user: the current user
630
    :param request: the current request
631
    """
632
    if db_object.type == "ARTICLE":
1✔
633
        db_object.sha_beta = None
1✔
634
        topic = db_object.beta_topic
1✔
635
        if topic is not None and not topic.is_locked:
1✔
636
            msg_post = render_to_string("tutorialv2/messages/beta_desactivate.md", {"content": versioned})
1✔
637
            send_post(request, topic, user, msg_post)
1✔
638
            lock_topic(topic)
1✔
639

640

641
def save_validation_state(
3✔
642
    db_object,
643
    is_update,
644
    published: PublishedContent,
645
    validation,
646
    versioned,
647
    source="",
648
    is_major=False,
649
    user=None,
650
    request=None,
651
    comment="",
652
):
653
    """
654
    Save validation after publication, changes its status to ACCEPT
655
    :param db_object:  the content
656
    :type db_object: zds.tutorialv2.models.database.PublishableContent
657
    :param is_update: marks if the publication is an update or a new/major publication
658
    :param published: the PublishedContent instance
659
    :param validation: the related validation
660
    :param versioned:  the VersionedContent related to the public sha
661
    :param source: the optional cannonical link
662
    :param is_major: marks a major publication (first one, or new parts for example)
663
    :param user: validating user
664
    :param request: current request to get hats, and send error messages if needed
665
    """
666
    db_object.sha_public = validation.version
1✔
667
    db_object.source = source
1✔
668
    db_object.sha_validation = None
1✔
669
    db_object.public_version = published
1✔
670
    if is_major or not is_update or db_object.pubdate is None:
1✔
671
        db_object.pubdate = datetime.now()
1✔
672
        db_object.is_obsolete = False
1✔
673

674
    # close beta if is an article
675
    close_article_beta(db_object, versioned, user=user, request=request)
1✔
676
    db_object.save()
1✔
677
    # save validation object
678
    validation.comment_validator = comment
1✔
679
    validation.status = "ACCEPT"
1✔
680
    validation.date_validation = datetime.now()
1✔
681
    validation.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc