• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 16479862102

23 Jul 2025 07:20PM UTC coverage: 89.237% (+0.001%) from 89.236%
16479862102

Pull #6682

github

web-flow
Merge 8239ac7b1 into 7a9a116b0
Pull Request #6682: Mise à jour de ZMarkdown vers la version 12.1.0

3092 of 4136 branches covered (74.76%)

1 of 1 new or added line in 1 file covered. (100.0%)

29 existing lines in 2 files now uncovered.

16822 of 18851 relevant lines covered (89.24%)

1.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.84
/zds/tutorialv2/publication_utils.py
1
import contextlib
3✔
2
import copy
3✔
3
import logging
3✔
4
import os
3✔
5
import shutil
3✔
6
import subprocess
3✔
7
import zipfile
3✔
8
from datetime import datetime
3✔
9
from os import makedirs, path
3✔
10
from pathlib import Path
3✔
11

12
import requests
3✔
13
from django.conf import settings
3✔
14
from django.core.exceptions import ObjectDoesNotExist
3✔
15
from django.template.defaultfilters import date
3✔
16
from django.template.loader import render_to_string
3✔
17
from django.utils import translation
3✔
18
from django.utils.translation import gettext_lazy as _
3✔
19

20
from zds.forum.utils import lock_topic, send_post
3✔
21
from zds.tutorialv2 import signals
3✔
22
from zds.tutorialv2.epub_utils import build_ebook
3✔
23
from zds.tutorialv2.models.database import ContentReaction, PublicationEvent, PublishedContent
3✔
24
from zds.tutorialv2.publish_container import publish_use_manifest
3✔
25
from zds.tutorialv2.signals import content_unpublished
3✔
26
from zds.tutorialv2.utils import export_content
3✔
27
from zds.utils.templatetags.emarkdown import render_markdown
3✔
28
from zds.utils.templatetags.smileys_def import LICENSES_BASE_PATH, SMILEYS_BASE_PATH
3✔
29

30
logger = logging.getLogger(__name__)
3✔
31
licences = {
3✔
32
    "by-nc-nd": "by-nc-nd.svg",
33
    "by-nc-sa": "by-nc-sa.svg",
34
    "by-nc": "by-nc.svg",
35
    "by-nd": "by-nd.svg",
36
    "by-sa": "by-sa.svg",
37
    "by": "by.svg",
38
    "0": "0.svg",
39
    "copyright": "copyright.svg",
40
}
41

42

43
def notify_update(db_object, is_update, is_major):
3✔
44
    if not is_update or is_major:
2✔
45
        # Follow
46
        signals.content_published.send(sender=db_object.__class__, instance=db_object, by_email=False)
2✔
47

48

49
def publish_content(db_object, versioned, is_major_update=True):
3✔
50
    """
51
    Publish a given content.
52

53
    .. note::
54
        create a manifest.json without the introduction and conclusion if not needed. Also remove the 'text' field
55
        of extracts.
56

57
    :param db_object: Database representation of the content
58
    :type db_object: zds.tutorialv2.models.database.PublishableContent
59
    :param versioned: version of the content to publish
60
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
61
    :param is_major_update: if set to `True`, will update the publication date
62
    :type is_major_update: bool
63
    :raise FailureDuringPublication: if something goes wrong
64
    :return: the published representation
65
    :rtype: zds.tutorialv2.models.database.PublishedContent
66
    """
67

68
    from zds.tutorialv2.models.database import PublishedContent
3✔
69

70
    if is_major_update:
3✔
71
        versioned.pubdate = datetime.now()
3✔
72

73
    # First write the files to a temporary directory: if anything goes wrong,
74
    # the last published version is not impacted !
75
    tmp_path = path.join(settings.ZDS_APP["content"]["repo_public_path"], versioned.slug + "__building")
3✔
76
    if path.exists(tmp_path):
3✔
77
        shutil.rmtree(tmp_path)  # remove previous attempt, if any
3✔
78

79
    # render HTML:
80
    altered_version = copy.deepcopy(versioned)
3✔
81
    char_count = publish_use_manifest(db_object, tmp_path, altered_version)
3✔
82
    altered_version.dump_json(path.join(tmp_path, "manifest.json"))
3✔
83

84
    # make room for 'extra contents'
85
    build_extra_contents_path = path.join(tmp_path, settings.ZDS_APP["content"]["extra_contents_dirname"])
3✔
86
    makedirs(build_extra_contents_path)
3✔
87
    base_name = path.join(build_extra_contents_path, versioned.slug)
3✔
88

89
    # 1. markdown file (base for the others) :
90
    # If we come from a command line, we need to activate i18n, to have the date in the french language.
91
    cur_language = translation.get_language()
3✔
92
    altered_version.pubdate = datetime.now()
3✔
93

94
    md_file_path = base_name + ".md"
3✔
95
    with contextlib.suppress(OSError):
3✔
96
        Path(Path(md_file_path).parent, "images").mkdir()
3✔
97
    is_update = False
3✔
98

99
    if db_object.public_version:
3✔
100
        public_version = update_existing_publication(db_object, versioned)
3✔
101
        is_update = True
3✔
102
    else:
103
        public_version = PublishedContent()
3✔
104

105
    # make the new public version
106
    public_version.content_public_slug = versioned.slug
3✔
107
    public_version.content_type = versioned.type
3✔
108
    public_version.content_pk = db_object.pk
3✔
109
    public_version.content = db_object
3✔
110
    public_version.char_count = char_count
3✔
111
    public_version.save()
3✔
112
    with contextlib.suppress(FileExistsError):
3✔
113
        makedirs(public_version.get_extra_contents_directory())
3✔
114
    if is_major_update or not is_update:
3✔
115
        public_version.publication_date = datetime.now()
3✔
116
    elif is_update:
2!
117
        public_version.update_date = datetime.now()
2✔
118
    public_version.sha_public = versioned.current_version
3✔
119
    public_version.save(update_fields=["publication_date", "update_date", "sha_public"])
3✔
120

121
    public_version.authors.clear()
3✔
122
    for author in db_object.authors.all():
3✔
123
        public_version.authors.add(author)
3✔
124

125
    # this puts the manifest.json and base json file on the prod path.
126
    shutil.rmtree(public_version.get_prod_path(), ignore_errors=True)
3✔
127
    shutil.copytree(tmp_path, public_version.get_prod_path())
3✔
128
    db_object.sha_public = versioned.current_version
3✔
129
    public_version.save()
3✔
130
    if settings.ZDS_APP["content"]["extra_content_generation_policy"] == "SYNC":
3✔
131
        # ok, now we can really publish the thing!
132
        generate_external_content(base_name, build_extra_contents_path, md_file_path, versioned=versioned)
3✔
133
    elif settings.ZDS_APP["content"]["extra_content_generation_policy"] == "WATCHDOG":
3✔
134
        PublicatorRegistry.get("watchdog").publish(md_file_path, base_name, silently_pass=False)
2✔
135

136
    return public_version
3✔
137

138

139
def update_existing_publication(db_object, versioned):
3✔
140
    public_version = db_object.public_version
3✔
141
    # the content has been published in the past, so we will clean up old files!
142
    old_path = public_version.get_prod_path()
3✔
143

144
    # if the slug has changed, create a new object instead of reusing the old one
145
    # this allows us to handle permanent redirection so that SEO is not impacted.
146
    if versioned.slug != public_version.content_public_slug:
3✔
147
        public_version.must_redirect = True  # set redirection
2✔
148
        public_version.save(update_fields=["must_redirect"])
2✔
149
        publication_date = public_version.publication_date
2✔
150
        db_object.public_version = PublishedContent()
2✔
151
        public_version = db_object.public_version
2✔
152

153
        # keep the same publication date if the content is already published
154
        public_version.publication_date = publication_date
2✔
155

156
    # remove old files only if everything succeed so far: if something bad
157
    # happened, we don't want to have a published content without content!
158
    logging.getLogger(__name__).debug("erase " + old_path)
3✔
159
    shutil.rmtree(old_path)
3✔
160

161
    return public_version
3✔
162

163

164
def write_md_file(md_file_path, parsed_with_local_images, versioned):
3✔
165
    with open(md_file_path, "w", encoding="utf-8") as md_file:
3✔
166
        try:
3✔
167
            md_file.write(parsed_with_local_images)
3✔
168
        except UnicodeError:
×
169
            logger.error("Could not encode %s in UTF-8, publication aborted", versioned.title)
×
170
            raise FailureDuringPublication(
×
171
                _(
172
                    "Une erreur est survenue durant la génération du fichier markdown "
173
                    "à télécharger, vérifiez le code markdown"
174
                )
175
            )
176

177

178
def generate_external_content(base_name, extra_contents_path, md_file_path, excluded=None, **kwargs):
3✔
179
    """
180
    generate all static file that allow offline access to content
181

182
    :param base_name: base nae of file (without extension)
183
    :param extra_contents_path: internal directory where all files will be pushed
184
    :param md_file_path: bundled markdown file path
185
    :param excluded: list of excluded format, None if no exclusion
186
    """
187
    excluded = excluded or ["watchdog"]
3✔
188
    for publicator_name, publicator in PublicatorRegistry.get_all_registered(excluded):
3✔
189
        try:
3✔
190
            publicator.publish(
3✔
191
                md_file_path,
192
                base_name,
193
                change_dir=extra_contents_path,
194
                cur_language=translation.get_language(),
195
                **kwargs,
196
            )
197
        except (FailureDuringPublication, OSError):
3✔
198
            logging.getLogger(__name__).exception(
3✔
199
                "Could not publish %s format from %s base.", publicator_name, md_file_path
200
            )
201

202

203
class PublicatorRegistry:
3✔
204
    """
205
    Register all publicator as a 'human-readable name/publicator' instance key/value list
206
    """
207

208
    registry = {}
3✔
209

210
    @classmethod
3✔
211
    def register(cls, publicator_name, *args):
3✔
212
        def decorated(func):
3✔
213
            cls.registry[publicator_name] = func(*args)
3✔
214
            return func
3✔
215

216
        return decorated
3✔
217

218
    @classmethod
3✔
219
    def get_all_registered(cls, exclude=None):
3✔
220
        """
221
        Args:
222
            exclude: A list of excluded publicator
223

224
        Returns:
225
        """
226
        if exclude is None:
3✔
227
            exclude = []
1✔
228
        order_key = {
3✔
229
            "zip": 1,
230
            "md": 2,
231
            "html": 3,
232
            "epub": 4,
233
            "pdf": 5,
234
        }
235
        for key, value in sorted(cls.registry.items(), key=lambda k: order_key.get(k[0], 42)):
3✔
236
            if key not in exclude:
3✔
237
                yield key, value
3✔
238

239
    @classmethod
3✔
240
    def unregister(cls, name):
3✔
241
        """
242
        Remove registered Publicator named 'name' if present
243

244
        :param name: publicator name.
245
        """
246
        if name in cls.registry:
×
247
            del cls.registry[name]
×
248

249
    @classmethod
3✔
250
    def get(cls, name):
3✔
251
        """
252
        Get publicator named 'name'.
253

254
        :param name:
255
        :return: the wanted publicator
256
        :rtype: Publicator
257
        :raise KeyError: if publicator is not registered
258
        """
259
        return cls.registry[name]
2✔
260

261

262
class Publicator:
3✔
263
    """
264
    Publicator base object, all methods must be overridden
265
    """
266

267
    def publish(self, md_file_path, base_name, **kwargs):
3✔
268
        """
269
        Function called to generate a content export
270

271
        :param md_file_path: base markdown file path
272
        :param base_name: file name without extension
273
        :param kwargs: other publicator dependent options
274
        """
275
        raise NotImplementedError()
×
276

277
    def get_published_content_entity(self, md_file_path) -> PublishedContent:
3✔
278
        """
279
        Retrieve the db entity from mdfile path
280

281
        :param md_file_path: mdfile path as string
282
        :type md_file_path: str
283
        :return: the db entity
284
        :rtype: zds.tutorialv2.models.models_database.PublishedContent
285
        """
286
        content_slug = PublishedContent.get_slug_from_file_path(md_file_path)
3✔
287
        published_content_entity = PublishedContent.objects.filter(content_public_slug=content_slug).first()
3✔
288
        return published_content_entity
3✔
289

290

291
@PublicatorRegistry.register("md")
3✔
292
class MarkdownPublicator(Publicator):
3✔
293
    def publish(self, md_file_path, base_name, *, cur_language=settings.LANGUAGE_CODE, **kwargs):
3✔
294
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
295
        versioned = kwargs.pop("versioned", None)
3✔
296
        if not versioned:
3✔
297
            # do not use load_public_version as it lacks of information to get the content
298
            # if you use it you will only get titles, without the text
299
            versioned = published_content_entity.content.load_version(sha=published_content_entity.sha_public)
1✔
300
        try:
3✔
301
            translation.activate(settings.LANGUAGE_CODE)
3✔
302
            parsed = render_to_string("tutorialv2/export/content.md", {"content": versioned})
3✔
303
        except requests.exceptions.HTTPError:
×
304
            raise FailureDuringPublication("Could not publish flat markdown")
×
305
        finally:
306
            translation.activate(cur_language)
3✔
307

308
        write_md_file(md_file_path, parsed, versioned)
3✔
309
        if "__building" in md_file_path:
3✔
310
            shutil.copy2(md_file_path, md_file_path.replace("__building", ""))
3✔
311

312

313
def _read_flat_markdown(md_file_path):
3✔
314
    with open(md_file_path, encoding="utf-8") as md_file_handler:
×
315
        md_flat_content = md_file_handler.read()
×
316
    return md_flat_content
×
317

318

319
@PublicatorRegistry.register("zip")
3✔
320
class ZipPublicator(Publicator):
3✔
321
    def publish(self, md_file_path, base_name, **kwargs):
3✔
322
        try:
3✔
323
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
324
            if published_content_entity is None:
3!
325
                raise ValueError("published_content_entity is None")
×
326
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_zip"]:
3!
327
                logger.info("ZIP not allowed for opinions.")
×
328
                return
×
329
            make_zip_file(published_content_entity)
3✔
330
            # no need to move zip file because it is already dumped to the public directory
331
        except (OSError, ValueError) as e:
×
332
            raise FailureDuringPublication("Zip could not be created", e)
×
333

334

335
@PublicatorRegistry.register("pdf")
3✔
336
class ZMarkdownRebberLatexPublicator(Publicator):
3✔
337
    """
338
    Use zmarkdown and rebber stringifier to produce latex & pdf output.
339
    """
340

341
    def __init__(self, extension=".pdf", latex_classes=""):
3✔
342
        self.extension = extension
3✔
343
        self.doc_type = extension[1:]
3✔
344
        self.latex_classes = latex_classes
3✔
345

346
    def publish(self, md_file_path, base_name, **kwargs):
3✔
347
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
348
        if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_pdf"]:
3!
349
            logger.info("PDF not allowed for opinions")
×
350
            return
×
351
        gallery_pk = published_content_entity.content.gallery.pk
3✔
352
        depth_to_size_map = {
3✔
353
            1: "small",  # in fact this is an "empty" tutorial (i.e it is empty or has intro and/or conclusion)
354
            2: "small",
355
            3: "middle",
356
            4: "big",
357
        }
358
        public_versionned_source = published_content_entity.content.load_version(
3✔
359
            sha=published_content_entity.sha_public
360
        )
361
        base_directory = Path(base_name).parent
3✔
362
        image_dir = base_directory / "images"
3✔
363
        with contextlib.suppress(FileExistsError):
3✔
364
            image_dir.mkdir(parents=True)
3✔
365
        if (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).exists():
3✔
366
            for image in (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).iterdir():
3!
367
                with contextlib.suppress(OSError):
×
368
                    shutil.copy2(str(image.absolute()), str(image_dir))
×
369
        content_type = depth_to_size_map[public_versionned_source.get_tree_level()]
3✔
370
        if self.latex_classes:
3!
371
            content_type += ", " + self.latex_classes
×
372
        title = published_content_entity.title()
3✔
373
        authors = [a.username for a in published_content_entity.authors.all()]
3✔
374

375
        licence = published_content_entity.content.licence.code
3✔
376
        licence_short = licence.replace("CC", "").strip().lower()
3✔
377
        licence_logo = licences.get(licence_short, False)
3✔
378
        if licence_logo:
3✔
379
            licence_url = f"https://creativecommons.org/licenses/{licence_short}/4.0/legalcode"
2✔
380
            # we need a specific case for CC-0 as it is "public-domain"
381
            if licence_logo == licences["0"]:
2!
382
                licence_url = "https://creativecommons.org/publicdomain/zero/1.0/legalcode.fr"
×
383
        else:
384
            licence = str(_("Tous droits réservés"))
3✔
385
            licence_logo = licences["copyright"]
3✔
386
            licence_url = ""
3✔
387

388
        replacement_image_url = str(settings.MEDIA_ROOT.parent)
3✔
389
        if not replacement_image_url.endswith("/"):
3!
390
            replacement_image_url += "/"
3✔
391
        replaced_media_url = settings.MEDIA_URL
3✔
392
        if replaced_media_url.startswith("/"):
3!
393
            replaced_media_url = replaced_media_url[1:]
3✔
394
        exported = export_content(public_versionned_source, with_text=True, ready_to_publish_only=True)
3✔
395
        # no title to avoid zmd to put it on the final latex
396
        del exported["title"]
3✔
397
        content, metadata, messages = render_markdown(
3✔
398
            exported,
399
            output_format="texfile",
400
            # latex template arguments
401
            content_type=content_type,
402
            title=title,
403
            authors=authors,
404
            license=licence,
405
            license_directory=str(LICENSES_BASE_PATH),
406
            license_logo=licence_logo,
407
            license_url=licence_url,
408
            smileys_directory=str(SMILEYS_BASE_PATH / "svg"),
409
            images_download_dir=str(base_directory / "images"),
410
            local_url_to_local_path=["/", replacement_image_url],
411
            heading_shift=-1,
412
            date=date(published_content_entity.last_publication_date, "l d F Y"),
413
        )
414
        if content == "" and messages:
3!
UNCOV
415
            raise FailureDuringPublication(f"Markdown was not parsed due to {messages}")
×
416
        zmd_class_dir_path = Path(settings.ZDS_APP["content"]["latex_template_repo"])
3✔
417
        content.replace(replacement_image_url + replaced_media_url, replacement_image_url)
3✔
418
        if zmd_class_dir_path.exists() and zmd_class_dir_path.is_dir():
3!
419
            with contextlib.suppress(FileExistsError):
×
420
                zmd_class_link = base_directory / "zmdocument.cls"
×
421
                zmd_class_link.symlink_to(zmd_class_dir_path / "zmdocument.cls")
×
422
                luatex_dir_link = base_directory / "utf8.lua"
×
UNCOV
423
                luatex_dir_link.symlink_to(zmd_class_dir_path / "utf8.lua", target_is_directory=True)
×
424
        true_latex_extension = ".".join(self.extension.split(".")[:-1]) + ".tex"
3✔
425
        latex_file_path = base_name + true_latex_extension
3✔
426
        pdf_file_path = base_name + self.extension
3✔
427
        default_logo_original_path = Path(__file__).parent / ".." / ".." / "assets" / "images" / "logo@2x.png"
3✔
428
        with contextlib.suppress(FileExistsError):
3✔
429
            shutil.copy(str(default_logo_original_path), str(base_directory / "default_logo.png"))
3✔
430
        with open(latex_file_path, mode="w", encoding="utf-8") as latex_file:
3✔
431
            latex_file.write(content)
3✔
432
        shutil.copy2(latex_file_path, published_content_entity.get_extra_contents_directory())
3✔
433

434
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
3✔
435
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
×
436
        self.make_glossary(base_name.split("/")[-1], latex_file_path)
×
UNCOV
437
        self.full_tex_compiler_call(latex_file_path)
×
438

UNCOV
439
        shutil.copy2(pdf_file_path, published_content_entity.get_extra_contents_directory())
×
440

441
    def full_tex_compiler_call(self, latex_file, draftmode: str = ""):
3✔
442
        success_flag = self.tex_compiler(latex_file, draftmode)
3✔
443
        if not success_flag:
3!
444
            handle_tex_compiler_error(latex_file, self.extension)
3✔
445

446
    def handle_makeglossaries_error(self, latex_file):
3✔
447
        with open(path.splitext(latex_file)[0] + ".log") as latex_log:
×
448
            errors = "\n".join(filter(line for line in latex_log if "fatal" in line.lower() or "error" in line.lower()))
×
UNCOV
449
        raise FailureDuringPublication(errors)
×
450

451
    def tex_compiler(self, texfile, draftmode: str = ""):
3✔
452
        command = f"lualatex -shell-escape -interaction=nonstopmode {draftmode} {texfile}"
3✔
453
        command_process = subprocess.Popen(
3✔
454
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
455
        )
456
        # let's put 10 min of timeout because we do not generate latex everyday
457
        command_process.communicate(timeout=600)
3✔
458

459
        pdf_file_path = path.splitext(texfile)[0] + self.extension
3✔
460
        return path.exists(pdf_file_path)
3✔
461

462
    def make_glossary(self, basename, texfile):
3✔
463
        command = f"makeglossaries {basename}"
×
UNCOV
464
        command_process = subprocess.Popen(
×
465
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
466
        )
UNCOV
467
        std_out, std_err = command_process.communicate()
×
468

469
        # TODO: check makeglossary exit codes to see if we can enhance error detection
470
        if "fatal" not in std_out.decode("utf-8").lower() and "fatal" not in std_err.decode("utf-8").lower():
×
UNCOV
471
            return True
×
472

UNCOV
473
        self.handle_makeglossaries_error(texfile)
×
474

475

476
def handle_tex_compiler_error(latex_file_path, ext):
3✔
477
    # TODO zmd: fix extension parsing
478
    log_file_path = latex_file_path[:-3] + "log"
3✔
479
    errors = [f"Error occured, log file {log_file_path} not found."]
3✔
480
    with contextlib.suppress(FileNotFoundError, UnicodeDecodeError):
3✔
481
        with Path(log_file_path).open(encoding="utf-8") as latex_log:
3✔
482
            print_context = 25
×
483
            lines = []
×
484
            relevant_line = -print_context
×
485
            for idx, line in enumerate(latex_log):
×
486
                if "fatal" in line.lower() or "error" in line.lower():
×
487
                    relevant_line = idx
×
488
                    lines.append(line)
×
489
                elif idx - relevant_line < print_context:
×
UNCOV
490
                    lines.append(line)
×
491

UNCOV
492
            errors = "\n".join(lines)
×
493
    logger.debug("%s ext=%s", errors, ext)
3✔
494

495
    raise FailureDuringPublication(errors)
3✔
496

497

498
@PublicatorRegistry.register("epub")
3✔
499
class ZMarkdownEpubPublicator(Publicator):
3✔
500
    def publish(self, md_file_path, base_name, **kwargs):
3✔
501
        try:
3✔
502
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
503
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_epub"]:
3!
504
                logger.info("EPUB not allowed for opinions")
×
UNCOV
505
                return
×
506
            epub_file_path = Path(base_name + ".epub")
3✔
507
            logger.info("Start generating epub")
3✔
508
            build_ebook(published_content_entity, path.dirname(md_file_path), epub_file_path)
3✔
509
        except (OSError, requests.exceptions.HTTPError):
×
UNCOV
510
            raise FailureDuringPublication("Error while generating epub file.")
×
511
        else:
512
            logger.info(epub_file_path)
3✔
513
            epub_path = Path(published_content_entity.get_extra_contents_directory(), Path(epub_file_path.name))
3✔
514
            if epub_path.exists():
3!
UNCOV
515
                os.remove(str(epub_path))
×
516
            if not epub_path.parent.exists():
3!
UNCOV
517
                epub_path.parent.mkdir(parents=True)
×
518
            logger.info(
3✔
519
                "created %s. moving it to %s", epub_file_path, published_content_entity.get_extra_contents_directory()
520
            )
521
            shutil.move(str(epub_file_path), published_content_entity.get_extra_contents_directory())
3✔
522

523

524
@PublicatorRegistry.register("watchdog")
3✔
525
class WatchdogFilePublicator(Publicator):
3✔
526
    def publish(self, md_file_path, base_name, silently_pass=True, **kwargs):
3✔
527
        if silently_pass:
2!
UNCOV
528
            return
×
529
        published_content = self.get_published_content_entity(md_file_path)
2✔
530
        self.publish_from_published_content(published_content)
2✔
531

532
    def publish_from_published_content(self, published_content: PublishedContent):
3✔
533
        for requested_format in PublicatorRegistry.get_all_registered(["watchdog"]):
2✔
534
            # Remove previous PublicationEvent for this content, not handled by
535
            # the publication watchdog yet:
536
            PublicationEvent.objects.filter(
2✔
537
                state_of_processing="REQUESTED",
538
                published_object__content_pk=published_content.content_pk,
539
                format_requested=requested_format[0],
540
            ).delete()
541
            PublicationEvent.objects.create(
2✔
542
                state_of_processing="REQUESTED",
543
                published_object=published_content,
544
                format_requested=requested_format[0],
545
            )
546

547

548
class FailureDuringPublication(Exception):
3✔
549
    """Exception raised if something goes wrong during publication process"""
550

551
    def __init__(self, *args, **kwargs):
3✔
552
        super().__init__(*args, **kwargs)
3✔
553

554

555
def make_zip_file(published_content):
3✔
556
    """Create the zip archive extra content from the published content
557

558
    :param published_content: a PublishedContent object
559
    :return:
560
    """
561

562
    publishable = published_content.content
3✔
563
    # update SHA so that archive gets updated too
564
    publishable.sha_public = publishable.sha_draft
3✔
565
    file_path = path.join(
3✔
566
        published_content.get_extra_contents_directory(), published_content.content_public_slug + ".zip"
567
    )
568
    zip_file = zipfile.ZipFile(file_path, "w")
3✔
569
    versioned = publishable.load_version(None, True)
3✔
570
    from zds.tutorialv2.views.archives import DownloadContent
3✔
571

572
    DownloadContent.insert_into_zip(zip_file, versioned.repository.commit(versioned.current_version).tree)
3✔
573
    zip_file.close()
3✔
574
    return file_path
3✔
575

576

577
def unpublish_content(db_object, moderator=None):
3✔
578
    """
579
    Remove the given content from the public view.
580

581
    .. note::
582
        This will send content_unpublished event.
583

584
    :param db_object: Database representation of the content
585
    :type db_object: PublishableContent
586
    :param moderator: the staff user who triggered the unpublish action.
587
    :type moderator: django.contrib.auth.models.User
588
    :return: ``True`` if unpublished, ``False`` otherwise
589
    :rtype: bool
590
    """
591

592
    from zds.tutorialv2.models.database import PublishedContent
2✔
593

594
    with contextlib.suppress(ObjectDoesNotExist, OSError):
2✔
595
        public_version = PublishedContent.objects.get(pk=db_object.public_version.pk)
2✔
596

597
        results = [
2✔
598
            content_unpublished.send(
599
                sender=reaction.__class__, instance=db_object, target=ContentReaction, moderator=moderator, user=None
600
            )
601
            for reaction in [ContentReaction.objects.filter(related_content=db_object).all()]
602
        ]
603
        logging.debug("Nb_messages=%d, messages=%s", len(results), results)
2✔
604
        # remove public_version:
605
        public_version.delete()
2✔
606
        update_params = {"public_version": None}
2✔
607

608
        if db_object.is_opinion:
2✔
609
            update_params["sha_public"] = None
1✔
610
            update_params["sha_picked"] = None
1✔
611
            update_params["pubdate"] = None
1✔
612

613
        db_object.update(**update_params)
2✔
614
        content_unpublished.send(
2✔
615
            sender=db_object.__class__, instance=db_object, target=db_object.__class__, moderator=moderator
616
        )
617
        # clean files
618
        old_path = public_version.get_prod_path()
2✔
619
        public_version.content.update(public_version=None, sha_public=None)
2✔
620
        if path.exists(old_path):
2!
621
            shutil.rmtree(old_path)
2✔
622
        return True
2✔
623

UNCOV
624
    return False
×
625

626

627
def close_article_beta(db_object, versioned, user, request=None):
3✔
628
    """
629
    Close forum topic of an article if the artcle was in beta.
630
    :param db_object: the article
631
    :type db_object: zds.tutorialv2.models.database.PublishableContent
632
    :param versioned: the public version of article, used to pupulate closing post
633
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
634
    :param user: the current user
635
    :param request: the current request
636
    """
637
    if db_object.type == "ARTICLE":
1✔
638
        db_object.sha_beta = None
1✔
639
        topic = db_object.beta_topic
1✔
640
        if topic is not None and not topic.is_locked:
1✔
641
            msg_post = render_to_string("tutorialv2/messages/beta_desactivate.md", {"content": versioned})
1✔
642
            send_post(request, topic, user, msg_post)
1✔
643
            lock_topic(topic)
1✔
644

645

646
def save_validation_state(
3✔
647
    db_object,
648
    is_update,
649
    published: PublishedContent,
650
    validation,
651
    versioned,
652
    source="",
653
    is_major=False,
654
    user=None,
655
    request=None,
656
    comment="",
657
):
658
    """
659
    Save validation after publication, changes its status to ACCEPT
660
    :param db_object:  the content
661
    :type db_object: zds.tutorialv2.models.database.PublishableContent
662
    :param is_update: marks if the publication is an update or a new/major publication
663
    :param published: the PublishedContent instance
664
    :param validation: the related validation
665
    :param versioned:  the VersionedContent related to the public sha
666
    :param source: the optional cannonical link
667
    :param is_major: marks a major publication (first one, or new parts for example)
668
    :param user: validating user
669
    :param request: current request to get hats, and send error messages if needed
670
    """
671
    db_object.sha_public = validation.version
1✔
672
    db_object.source = source
1✔
673
    db_object.sha_validation = None
1✔
674
    db_object.public_version = published
1✔
675
    if is_major or not is_update or db_object.pubdate is None:
1✔
676
        db_object.pubdate = datetime.now()
1✔
677
        db_object.is_obsolete = False
1✔
678

679
    # close beta if is an article
680
    close_article_beta(db_object, versioned, user=user, request=request)
1✔
681
    db_object.save()
1✔
682
    # save validation object
683
    validation.comment_validator = comment
1✔
684
    validation.status = "ACCEPT"
1✔
685
    validation.date_validation = datetime.now()
1✔
686
    validation.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc