• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 11059094899

26 Sep 2024 07:38PM UTC coverage: 89.18% (+0.002%) from 89.178%
11059094899

push

github

Situphen
Met à jour les versions Python supportées

- 3.11 est la version sur le serveur de production
- Django 4.2 est compatible avec les version 3.8-3.12

5844 of 7011 branches covered (83.35%)

16707 of 18734 relevant lines covered (89.18%)

1.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.38
/zds/tutorialv2/publication_utils.py
1
import contextlib
3✔
2
import copy
3✔
3
import logging
3✔
4
import os
3✔
5
import shutil
3✔
6
import subprocess
3✔
7
import zipfile
3✔
8
from datetime import datetime
3✔
9
from os import makedirs, path
3✔
10
from pathlib import Path
3✔
11

12
import requests
3✔
13
from django.core.exceptions import ObjectDoesNotExist
3✔
14
from django.template.defaultfilters import date
3✔
15
from django.template.loader import render_to_string
3✔
16
from django.utils import translation
3✔
17
from django.utils.translation import gettext_lazy as _
3✔
18
from django.conf import settings
3✔
19
from zds.tutorialv2 import signals
3✔
20
from zds.tutorialv2.epub_utils import build_ebook
3✔
21
from zds.tutorialv2.models.database import ContentReaction, PublishedContent, PublicationEvent
3✔
22
from zds.tutorialv2.publish_container import publish_use_manifest
3✔
23
from zds.tutorialv2.signals import content_unpublished
3✔
24
from zds.tutorialv2.utils import export_content
3✔
25
from zds.forum.utils import send_post, lock_topic
3✔
26
from zds.utils.templatetags.emarkdown import render_markdown
3✔
27
from zds.utils.templatetags.smileys_def import SMILEYS_BASE_PATH, LICENSES_BASE_PATH
3✔
28

29
logger = logging.getLogger(__name__)
3✔
30
licences = {
3✔
31
    "by-nc-nd": "by-nc-nd.svg",
32
    "by-nc-sa": "by-nc-sa.svg",
33
    "by-nc": "by-nc.svg",
34
    "by-nd": "by-nd.svg",
35
    "by-sa": "by-sa.svg",
36
    "by": "by.svg",
37
    "0": "0.svg",
38
    "copyright": "copyright.svg",
39
}
40

41

42
def notify_update(db_object, is_update, is_major):
3✔
43
    if not is_update or is_major:
2✔
44
        # Follow
45
        signals.content_published.send(sender=db_object.__class__, instance=db_object, by_email=False)
2✔
46

47

48
def publish_content(db_object, versioned, is_major_update=True):
3✔
49
    """
50
    Publish a given content.
51

52
    .. note::
53
        create a manifest.json without the introduction and conclusion if not needed. Also remove the 'text' field
54
        of extracts.
55

56
    :param db_object: Database representation of the content
57
    :type db_object: zds.tutorialv2.models.database.PublishableContent
58
    :param versioned: version of the content to publish
59
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
60
    :param is_major_update: if set to `True`, will update the publication date
61
    :type is_major_update: bool
62
    :raise FailureDuringPublication: if something goes wrong
63
    :return: the published representation
64
    :rtype: zds.tutorialv2.models.database.PublishedContent
65
    """
66

67
    from zds.tutorialv2.models.database import PublishedContent
3✔
68

69
    if is_major_update:
3✔
70
        versioned.pubdate = datetime.now()
3✔
71

72
    # First write the files to a temporary directory: if anything goes wrong,
73
    # the last published version is not impacted !
74
    tmp_path = path.join(settings.ZDS_APP["content"]["repo_public_path"], versioned.slug + "__building")
3✔
75
    if path.exists(tmp_path):
3✔
76
        shutil.rmtree(tmp_path)  # remove previous attempt, if any
3✔
77

78
    # render HTML:
79
    altered_version = copy.deepcopy(versioned)
3✔
80
    char_count = publish_use_manifest(db_object, tmp_path, altered_version)
3✔
81
    altered_version.dump_json(path.join(tmp_path, "manifest.json"))
3✔
82

83
    # make room for 'extra contents'
84
    build_extra_contents_path = path.join(tmp_path, settings.ZDS_APP["content"]["extra_contents_dirname"])
3✔
85
    makedirs(build_extra_contents_path)
3✔
86
    base_name = path.join(build_extra_contents_path, versioned.slug)
3✔
87

88
    # 1. markdown file (base for the others) :
89
    # If we come from a command line, we need to activate i18n, to have the date in the french language.
90
    cur_language = translation.get_language()
3✔
91
    altered_version.pubdate = datetime.now()
3✔
92

93
    md_file_path = base_name + ".md"
3✔
94
    with contextlib.suppress(OSError):
3✔
95
        Path(Path(md_file_path).parent, "images").mkdir()
3✔
96
    is_update = False
3✔
97

98
    if db_object.public_version:
3✔
99
        public_version = update_existing_publication(db_object, versioned)
3✔
100
        is_update = True
3✔
101
    else:
102
        public_version = PublishedContent()
3✔
103

104
    # make the new public version
105
    public_version.content_public_slug = versioned.slug
3✔
106
    public_version.content_type = versioned.type
3✔
107
    public_version.content_pk = db_object.pk
3✔
108
    public_version.content = db_object
3✔
109
    public_version.char_count = char_count
3✔
110
    public_version.save()
3✔
111
    with contextlib.suppress(FileExistsError):
3✔
112
        makedirs(public_version.get_extra_contents_directory())
3✔
113
    if is_major_update or not is_update:
3✔
114
        public_version.publication_date = datetime.now()
3✔
115
    elif is_update:
2!
116
        public_version.update_date = datetime.now()
2✔
117
    public_version.sha_public = versioned.current_version
3✔
118
    public_version.save(update_fields=["publication_date", "update_date", "sha_public"])
3✔
119

120
    public_version.authors.clear()
3✔
121
    for author in db_object.authors.all():
3✔
122
        public_version.authors.add(author)
3✔
123

124
    # this puts the manifest.json and base json file on the prod path.
125
    shutil.rmtree(public_version.get_prod_path(), ignore_errors=True)
3✔
126
    shutil.copytree(tmp_path, public_version.get_prod_path())
3✔
127
    db_object.sha_public = versioned.current_version
3✔
128
    public_version.save()
3✔
129
    if settings.ZDS_APP["content"]["extra_content_generation_policy"] == "SYNC":
3✔
130
        # ok, now we can really publish the thing!
131
        generate_external_content(base_name, build_extra_contents_path, md_file_path, versioned=versioned)
3✔
132
    elif settings.ZDS_APP["content"]["extra_content_generation_policy"] == "WATCHDOG":
3✔
133
        PublicatorRegistry.get("watchdog").publish(md_file_path, base_name, silently_pass=False)
2✔
134

135
    return public_version
3✔
136

137

138
def update_existing_publication(db_object, versioned):
3✔
139
    public_version = db_object.public_version
3✔
140
    # the content has been published in the past, so we will clean up old files!
141
    old_path = public_version.get_prod_path()
3✔
142

143
    # if the slug has changed, create a new object instead of reusing the old one
144
    # this allows us to handle permanent redirection so that SEO is not impacted.
145
    if versioned.slug != public_version.content_public_slug:
3✔
146
        public_version.must_redirect = True  # set redirection
2✔
147
        public_version.save(update_fields=["must_redirect"])
2✔
148
        publication_date = public_version.publication_date
2✔
149
        db_object.public_version = PublishedContent()
2✔
150
        public_version = db_object.public_version
2✔
151

152
        # keep the same publication date if the content is already published
153
        public_version.publication_date = publication_date
2✔
154

155
    # remove old files only if everything succeed so far: if something bad
156
    # happened, we don't want to have a published content without content!
157
    logging.getLogger(__name__).debug("erase " + old_path)
3✔
158
    shutil.rmtree(old_path)
3✔
159

160
    return public_version
3✔
161

162

163
def write_md_file(md_file_path, parsed_with_local_images, versioned):
3✔
164
    with open(md_file_path, "w", encoding="utf-8") as md_file:
3✔
165
        try:
3✔
166
            md_file.write(parsed_with_local_images)
3✔
167
        except UnicodeError:
×
168
            logger.error("Could not encode %s in UTF-8, publication aborted", versioned.title)
×
169
            raise FailureDuringPublication(
×
170
                _(
171
                    "Une erreur est survenue durant la génération du fichier markdown "
172
                    "à télécharger, vérifiez le code markdown"
173
                )
174
            )
175

176

177
def generate_external_content(
3✔
178
    base_name, extra_contents_path, md_file_path, overload_settings=False, excluded=None, **kwargs
179
):
180
    """
181
    generate all static file that allow offline access to content
182

183
    :param base_name: base nae of file (without extension)
184
    :param extra_contents_path: internal directory where all files will be pushed
185
    :param md_file_path: bundled markdown file path
186
    :param overload_settings: this option force the function to generate all registered formats even when settings \
187
    ask for PDF not to be published
188
    :param excluded: list of excluded format, None if no exclusion
189
    """
190
    excluded = excluded or ["watchdog"]
3✔
191
    if not settings.ZDS_APP["content"]["build_pdf_when_published"] and not overload_settings:
3✔
192
        excluded.append("pdf")
3✔
193
    for publicator_name, publicator in PublicatorRegistry.get_all_registered(excluded):
3✔
194
        try:
3✔
195
            publicator.publish(
3✔
196
                md_file_path,
197
                base_name,
198
                change_dir=extra_contents_path,
199
                cur_language=translation.get_language(),
200
                **kwargs,
201
            )
202
        except (FailureDuringPublication, OSError):
1✔
203
            logging.getLogger(__name__).exception(
1✔
204
                "Could not publish %s format from %s base.", publicator_name, md_file_path
205
            )
206

207

208
class PublicatorRegistry:
3✔
209
    """
210
    Register all publicator as a 'human-readable name/publicator' instance key/value list
211
    """
212

213
    registry = {}
3✔
214

215
    @classmethod
3✔
216
    def register(cls, publicator_name, *args):
3✔
217
        def decorated(func):
3✔
218
            cls.registry[publicator_name] = func(*args)
3✔
219
            return func
3✔
220

221
        return decorated
3✔
222

223
    @classmethod
3✔
224
    def get_all_registered(cls, exclude=None):
3✔
225
        """
226
        Args:
227
            exclude: A list of excluded publicator
228

229
        Returns:
230
        """
231
        if exclude is None:
3✔
232
            exclude = []
1✔
233
        order_key = {
3✔
234
            "zip": 1,
235
            "md": 2,
236
            "html": 3,
237
            "epub": 4,
238
            "pdf": 5,
239
        }
240
        for key, value in sorted(cls.registry.items(), key=lambda k: order_key.get(k[0], 42)):
3✔
241
            if key not in exclude:
3✔
242
                yield key, value
3✔
243

244
    @classmethod
3✔
245
    def unregister(cls, name):
3✔
246
        """
247
        Remove registered Publicator named 'name' if present
248

249
        :param name: publicator name.
250
        """
251
        if name in cls.registry:
×
252
            del cls.registry[name]
×
253

254
    @classmethod
3✔
255
    def get(cls, name):
3✔
256
        """
257
        Get publicator named 'name'.
258

259
        :param name:
260
        :return: the wanted publicator
261
        :rtype: Publicator
262
        :raise KeyError: if publicator is not registered
263
        """
264
        return cls.registry[name]
2✔
265

266

267
class Publicator:
3✔
268
    """
269
    Publicator base object, all methods must be overridden
270
    """
271

272
    def publish(self, md_file_path, base_name, **kwargs):
3✔
273
        """
274
        Function called to generate a content export
275

276
        :param md_file_path: base markdown file path
277
        :param base_name: file name without extension
278
        :param kwargs: other publicator dependent options
279
        """
280
        raise NotImplementedError()
×
281

282
    def get_published_content_entity(self, md_file_path) -> PublishedContent:
3✔
283
        """
284
        Retrieve the db entity from mdfile path
285

286
        :param md_file_path: mdfile path as string
287
        :type md_file_path: str
288
        :return: the db entity
289
        :rtype: zds.tutorialv2.models.models_database.PublishedContent
290
        """
291
        content_slug = PublishedContent.get_slug_from_file_path(md_file_path)
3✔
292
        published_content_entity = PublishedContent.objects.filter(content_public_slug=content_slug).first()
3✔
293
        return published_content_entity
3✔
294

295

296
@PublicatorRegistry.register("md")
3✔
297
class MarkdownPublicator(Publicator):
3✔
298
    def publish(self, md_file_path, base_name, *, cur_language=settings.LANGUAGE_CODE, **kwargs):
3✔
299
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
300
        versioned = kwargs.pop("versioned", None)
3✔
301
        if not versioned:
3✔
302
            # do not use load_public_version as it lacks of information to get the content
303
            # if you use it you will only get titles, without the text
304
            versioned = published_content_entity.content.load_version(sha=published_content_entity.sha_public)
1✔
305
        try:
3✔
306
            translation.activate(settings.LANGUAGE_CODE)
3✔
307
            parsed = render_to_string("tutorialv2/export/content.md", {"content": versioned})
3✔
308
        except requests.exceptions.HTTPError:
×
309
            raise FailureDuringPublication("Could not publish flat markdown")
×
310
        finally:
311
            translation.activate(cur_language)
3!
312

313
        write_md_file(md_file_path, parsed, versioned)
3✔
314
        if "__building" in md_file_path:
3✔
315
            shutil.copy2(md_file_path, md_file_path.replace("__building", ""))
3✔
316

317

318
def _read_flat_markdown(md_file_path):
3✔
319
    with open(md_file_path, encoding="utf-8") as md_file_handler:
×
320
        md_flat_content = md_file_handler.read()
×
321
    return md_flat_content
×
322

323

324
@PublicatorRegistry.register("zip")
3✔
325
class ZipPublicator(Publicator):
3✔
326
    def publish(self, md_file_path, base_name, **kwargs):
3✔
327
        try:
3✔
328
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
329
            if published_content_entity is None:
3!
330
                raise ValueError("published_content_entity is None")
×
331
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_zip"]:
3!
332
                logger.info("ZIP not allowed for opinions.")
×
333
                return
×
334
            make_zip_file(published_content_entity)
3✔
335
            # no need to move zip file because it is already dumped to the public directory
336
        except (OSError, ValueError) as e:
×
337
            raise FailureDuringPublication("Zip could not be created", e)
×
338

339

340
@PublicatorRegistry.register("pdf")
3✔
341
class ZMarkdownRebberLatexPublicator(Publicator):
3✔
342
    """
343
    Use zmarkdown and rebber stringifier to produce latex & pdf output.
344
    """
345

346
    def __init__(self, extension=".pdf", latex_classes=""):
3✔
347
        self.extension = extension
3✔
348
        self.doc_type = extension[1:]
3✔
349
        self.latex_classes = latex_classes
3✔
350

351
    def publish(self, md_file_path, base_name, **kwargs):
3✔
352
        published_content_entity = self.get_published_content_entity(md_file_path)
1✔
353
        if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_pdf"]:
1!
354
            logger.info("PDF not allowed for opinions")
×
355
            return
×
356
        gallery_pk = published_content_entity.content.gallery.pk
1✔
357
        depth_to_size_map = {
1✔
358
            1: "small",  # in fact this is an "empty" tutorial (i.e it is empty or has intro and/or conclusion)
359
            2: "small",
360
            3: "middle",
361
            4: "big",
362
        }
363
        public_versionned_source = published_content_entity.content.load_version(
1✔
364
            sha=published_content_entity.sha_public
365
        )
366
        base_directory = Path(base_name).parent
1✔
367
        image_dir = base_directory / "images"
1✔
368
        with contextlib.suppress(FileExistsError):
1✔
369
            image_dir.mkdir(parents=True)
1✔
370
        if (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).exists():
1!
371
            for image in (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).iterdir():
1!
372
                with contextlib.suppress(OSError):
×
373
                    shutil.copy2(str(image.absolute()), str(image_dir))
×
374
        content_type = depth_to_size_map[public_versionned_source.get_tree_level()]
1✔
375
        if self.latex_classes:
1!
376
            content_type += ", " + self.latex_classes
×
377
        title = published_content_entity.title()
1✔
378
        authors = [a.username for a in published_content_entity.authors.all()]
1✔
379

380
        licence = published_content_entity.content.licence.code
1✔
381
        licence_short = licence.replace("CC", "").strip().lower()
1✔
382
        licence_logo = licences.get(licence_short, False)
1✔
383
        if licence_logo:
1!
384
            licence_url = f"https://creativecommons.org/licenses/{licence_short}/4.0/legalcode"
×
385
            # we need a specific case for CC-0 as it is "public-domain"
386
            if licence_logo == licences["0"]:
×
387
                licence_url = "https://creativecommons.org/publicdomain/zero/1.0/legalcode.fr"
×
388
        else:
389
            licence = str(_("Tous droits réservés"))
1✔
390
            licence_logo = licences["copyright"]
1✔
391
            licence_url = ""
1✔
392

393
        replacement_image_url = str(settings.MEDIA_ROOT.parent)
1✔
394
        if not replacement_image_url.endswith("/"):
1!
395
            replacement_image_url += "/"
1✔
396
        replaced_media_url = settings.MEDIA_URL
1✔
397
        if replaced_media_url.startswith("/"):
1!
398
            replaced_media_url = replaced_media_url[1:]
1✔
399
        exported = export_content(public_versionned_source, with_text=True, ready_to_publish_only=True)
1✔
400
        # no title to avoid zmd to put it on the final latex
401
        del exported["title"]
1✔
402
        content, metadata, messages = render_markdown(
1✔
403
            exported,
404
            output_format="texfile",
405
            # latex template arguments
406
            content_type=content_type,
407
            title=title,
408
            authors=authors,
409
            license=licence,
410
            license_directory=str(LICENSES_BASE_PATH),
411
            license_logo=licence_logo,
412
            license_url=licence_url,
413
            smileys_directory=str(SMILEYS_BASE_PATH / "svg"),
414
            images_download_dir=str(base_directory / "images"),
415
            local_url_to_local_path=["/", replacement_image_url],
416
            heading_shift=-1,
417
            date=date(published_content_entity.last_publication_date, "l d F Y"),
418
        )
419
        if content == "" and messages:
1!
420
            raise FailureDuringPublication(f"Markdown was not parsed due to {messages}")
×
421
        zmd_class_dir_path = Path(settings.ZDS_APP["content"]["latex_template_repo"])
1✔
422
        content.replace(replacement_image_url + replaced_media_url, replacement_image_url)
1✔
423
        if zmd_class_dir_path.exists() and zmd_class_dir_path.is_dir():
1!
424
            with contextlib.suppress(FileExistsError):
×
425
                zmd_class_link = base_directory / "zmdocument.cls"
×
426
                zmd_class_link.symlink_to(zmd_class_dir_path / "zmdocument.cls")
×
427
                luatex_dir_link = base_directory / "utf8.lua"
×
428
                luatex_dir_link.symlink_to(zmd_class_dir_path / "utf8.lua", target_is_directory=True)
×
429
        true_latex_extension = ".".join(self.extension.split(".")[:-1]) + ".tex"
1✔
430
        latex_file_path = base_name + true_latex_extension
1✔
431
        pdf_file_path = base_name + self.extension
1✔
432
        default_logo_original_path = Path(__file__).parent / ".." / ".." / "assets" / "images" / "logo@2x.png"
1✔
433
        with contextlib.suppress(FileExistsError):
1✔
434
            shutil.copy(str(default_logo_original_path), str(base_directory / "default_logo.png"))
1✔
435
        with open(latex_file_path, mode="w", encoding="utf-8") as latex_file:
1✔
436
            latex_file.write(content)
1✔
437
        shutil.copy2(latex_file_path, published_content_entity.get_extra_contents_directory())
1✔
438

439
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
1✔
440
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
×
441
        self.make_glossary(base_name.split("/")[-1], latex_file_path)
×
442
        self.full_tex_compiler_call(latex_file_path)
×
443

444
        shutil.copy2(pdf_file_path, published_content_entity.get_extra_contents_directory())
×
445

446
    def full_tex_compiler_call(self, latex_file, draftmode: str = ""):
3✔
447
        success_flag = self.tex_compiler(latex_file, draftmode)
1✔
448
        if not success_flag:
1!
449
            handle_tex_compiler_error(latex_file, self.extension)
1✔
450

451
    def handle_makeglossaries_error(self, latex_file):
3✔
452
        with open(path.splitext(latex_file)[0] + ".log") as latex_log:
×
453
            errors = "\n".join(filter(line for line in latex_log if "fatal" in line.lower() or "error" in line.lower()))
×
454
        raise FailureDuringPublication(errors)
×
455

456
    def tex_compiler(self, texfile, draftmode: str = ""):
3✔
457
        command = f"lualatex -shell-escape -interaction=nonstopmode {draftmode} {texfile}"
1✔
458
        command_process = subprocess.Popen(
1✔
459
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
460
        )
461
        # let's put 10 min of timeout because we do not generate latex everyday
462
        command_process.communicate(timeout=600)
1✔
463

464
        pdf_file_path = path.splitext(texfile)[0] + self.extension
1✔
465
        return path.exists(pdf_file_path)
1✔
466

467
    def make_glossary(self, basename, texfile):
3✔
468
        command = f"makeglossaries {basename}"
×
469
        command_process = subprocess.Popen(
×
470
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
471
        )
472
        std_out, std_err = command_process.communicate()
×
473

474
        # TODO: check makeglossary exit codes to see if we can enhance error detection
475
        if "fatal" not in std_out.decode("utf-8").lower() and "fatal" not in std_err.decode("utf-8").lower():
×
476
            return True
×
477

478
        self.handle_makeglossaries_error(texfile)
×
479

480

481
def handle_tex_compiler_error(latex_file_path, ext):
3✔
482
    # TODO zmd: fix extension parsing
483
    log_file_path = latex_file_path[:-3] + "log"
1✔
484
    errors = [f"Error occured, log file {log_file_path} not found."]
1✔
485
    with contextlib.suppress(FileNotFoundError, UnicodeDecodeError):
1✔
486
        with Path(log_file_path).open(encoding="utf-8") as latex_log:
1!
487
            print_context = 25
×
488
            lines = []
×
489
            relevant_line = -print_context
×
490
            for idx, line in enumerate(latex_log):
×
491
                if "fatal" in line.lower() or "error" in line.lower():
×
492
                    relevant_line = idx
×
493
                    lines.append(line)
×
494
                elif idx - relevant_line < print_context:
×
495
                    lines.append(line)
×
496

497
            errors = "\n".join(lines)
×
498
    logger.debug("%s ext=%s", errors, ext)
1✔
499

500
    raise FailureDuringPublication(errors)
1✔
501

502

503
@PublicatorRegistry.register("epub")
3✔
504
class ZMarkdownEpubPublicator(Publicator):
3✔
505
    def publish(self, md_file_path, base_name, **kwargs):
3✔
506
        try:
3✔
507
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
508
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_epub"]:
3!
509
                logger.info("EPUB not allowed for opinions")
×
510
                return
×
511
            epub_file_path = Path(base_name + ".epub")
3✔
512
            logger.info("Start generating epub")
3✔
513
            build_ebook(published_content_entity, path.dirname(md_file_path), epub_file_path)
3✔
514
        except (OSError, requests.exceptions.HTTPError):
×
515
            raise FailureDuringPublication("Error while generating epub file.")
×
516
        else:
517
            logger.info(epub_file_path)
3✔
518
            epub_path = Path(published_content_entity.get_extra_contents_directory(), Path(epub_file_path.name))
3✔
519
            if epub_path.exists():
3!
520
                os.remove(str(epub_path))
×
521
            if not epub_path.parent.exists():
3!
522
                epub_path.parent.mkdir(parents=True)
×
523
            logger.info(
3✔
524
                "created %s. moving it to %s", epub_file_path, published_content_entity.get_extra_contents_directory()
525
            )
526
            shutil.move(str(epub_file_path), published_content_entity.get_extra_contents_directory())
3✔
527

528

529
@PublicatorRegistry.register("watchdog")
3✔
530
class WatchdogFilePublicator(Publicator):
3✔
531
    def publish(self, md_file_path, base_name, silently_pass=True, **kwargs):
3✔
532
        if silently_pass:
2!
533
            return
×
534
        published_content = self.get_published_content_entity(md_file_path)
2✔
535
        self.publish_from_published_content(published_content)
2✔
536

537
    def publish_from_published_content(self, published_content: PublishedContent):
3✔
538
        for requested_format in PublicatorRegistry.get_all_registered(["watchdog"]):
2✔
539
            # Remove previous PublicationEvent for this content, not handled by
540
            # the publication watchdog yet:
541
            PublicationEvent.objects.filter(
2✔
542
                state_of_processing="REQUESTED",
543
                published_object__content_pk=published_content.content_pk,
544
                format_requested=requested_format[0],
545
            ).delete()
546
            PublicationEvent.objects.create(
2✔
547
                state_of_processing="REQUESTED",
548
                published_object=published_content,
549
                format_requested=requested_format[0],
550
            )
551

552

553
class FailureDuringPublication(Exception):
3✔
554
    """Exception raised if something goes wrong during publication process"""
555

556
    def __init__(self, *args, **kwargs):
3✔
557
        super().__init__(*args, **kwargs)
1✔
558

559

560
def make_zip_file(published_content):
3✔
561
    """Create the zip archive extra content from the published content
562

563
    :param published_content: a PublishedContent object
564
    :return:
565
    """
566

567
    publishable = published_content.content
3✔
568
    # update SHA so that archive gets updated too
569
    publishable.sha_public = publishable.sha_draft
3✔
570
    file_path = path.join(
3✔
571
        published_content.get_extra_contents_directory(), published_content.content_public_slug + ".zip"
572
    )
573
    zip_file = zipfile.ZipFile(file_path, "w")
3✔
574
    versioned = publishable.load_version(None, True)
3✔
575
    from zds.tutorialv2.views.archives import DownloadContent
3✔
576

577
    DownloadContent.insert_into_zip(zip_file, versioned.repository.commit(versioned.current_version).tree)
3✔
578
    zip_file.close()
3✔
579
    return file_path
3✔
580

581

582
def unpublish_content(db_object, moderator=None):
3✔
583
    """
584
    Remove the given content from the public view.
585

586
    .. note::
587
        This will send content_unpublished event.
588

589
    :param db_object: Database representation of the content
590
    :type db_object: PublishableContent
591
    :param moderator: the staff user who triggered the unpublish action.
592
    :type moderator: django.contrib.auth.models.User
593
    :return: ``True`` if unpublished, ``False`` otherwise
594
    :rtype: bool
595
    """
596

597
    from zds.tutorialv2.models.database import PublishedContent
2✔
598

599
    with contextlib.suppress(ObjectDoesNotExist, OSError):
2✔
600
        public_version = PublishedContent.objects.get(pk=db_object.public_version.pk)
2✔
601

602
        results = [
2✔
603
            content_unpublished.send(
604
                sender=reaction.__class__, instance=db_object, target=ContentReaction, moderator=moderator, user=None
605
            )
606
            for reaction in [ContentReaction.objects.filter(related_content=db_object).all()]
607
        ]
608
        logging.debug("Nb_messages=%d, messages=%s", len(results), results)
2✔
609
        # remove public_version:
610
        public_version.delete()
2✔
611
        update_params = {"public_version": None}
2✔
612

613
        if db_object.is_opinion:
2✔
614
            update_params["sha_public"] = None
1✔
615
            update_params["sha_picked"] = None
1✔
616
            update_params["pubdate"] = None
1✔
617

618
        db_object.update(**update_params)
2✔
619
        content_unpublished.send(
2✔
620
            sender=db_object.__class__, instance=db_object, target=db_object.__class__, moderator=moderator
621
        )
622
        # clean files
623
        old_path = public_version.get_prod_path()
2✔
624
        public_version.content.update(public_version=None, sha_public=None)
2✔
625
        if path.exists(old_path):
2!
626
            shutil.rmtree(old_path)
2✔
627
        return True
2✔
628

629
    return False
×
630

631

632
def close_article_beta(db_object, versioned, user, request=None):
3✔
633
    """
634
    Close forum topic of an article if the artcle was in beta.
635
    :param db_object: the article
636
    :type db_object: zds.tutorialv2.models.database.PublishableContent
637
    :param versioned: the public version of article, used to pupulate closing post
638
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
639
    :param user: the current user
640
    :param request: the current request
641
    """
642
    if db_object.type == "ARTICLE":
1✔
643
        db_object.sha_beta = None
1✔
644
        topic = db_object.beta_topic
1✔
645
        if topic is not None and not topic.is_locked:
1✔
646
            msg_post = render_to_string("tutorialv2/messages/beta_desactivate.md", {"content": versioned})
1✔
647
            send_post(request, topic, user, msg_post)
1✔
648
            lock_topic(topic)
1✔
649

650

651
def save_validation_state(
3✔
652
    db_object,
653
    is_update,
654
    published: PublishedContent,
655
    validation,
656
    versioned,
657
    source="",
658
    is_major=False,
659
    user=None,
660
    request=None,
661
    comment="",
662
):
663
    """
664
    Save validation after publication, changes its status to ACCEPT
665
    :param db_object:  the content
666
    :type db_object: zds.tutorialv2.models.database.PublishableContent
667
    :param is_update: marks if the publication is an update or a new/major publication
668
    :param published: the PublishedContent instance
669
    :param validation: the related validation
670
    :param versioned:  the VersionedContent related to the public sha
671
    :param source: the optional cannonical link
672
    :param is_major: marks a major publication (first one, or new parts for example)
673
    :param user: validating user
674
    :param request: current request to get hats, and send error messages if needed
675
    """
676
    db_object.sha_public = validation.version
1✔
677
    db_object.source = source
1✔
678
    db_object.sha_validation = None
1✔
679
    db_object.public_version = published
1✔
680
    if is_major or not is_update or db_object.pubdate is None:
1✔
681
        db_object.pubdate = datetime.now()
1✔
682
        db_object.is_obsolete = False
1✔
683

684
    # close beta if is an article
685
    close_article_beta(db_object, versioned, user=user, request=request)
1✔
686
    db_object.save()
1✔
687
    # save validation object
688
    validation.comment_validator = comment
1✔
689
    validation.status = "ACCEPT"
1✔
690
    validation.date_validation = datetime.now()
1✔
691
    validation.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc