• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 9923491706

13 Jul 2024 11:10PM UTC coverage: 88.842% (+0.01%) from 88.831%
9923491706

push

github

Situphen
Supprime les fils d'Ariane pour Sentry dans publication_utils.py

- les fils d'Ariane n'aportent pas d'information supplémentaire,
  Sentry intercepte déjà les appels à Popen et les met dans le fil
  d'Ariane
- la fonction add_breadcumb() était mal appelée et causait des
  avertissement de Sentry : "Sentry has identified the following
  problems for you to monitor: Discarded invalid value (3), expected an
  object"

4740 of 5932 branches covered (79.91%)

16593 of 18677 relevant lines covered (88.84%)

1.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.7
/zds/tutorialv2/publication_utils.py
1
import contextlib
3✔
2
import copy
3✔
3
import logging
3✔
4
import os
3✔
5
import shutil
3✔
6
import subprocess
3✔
7
import zipfile
3✔
8
from datetime import datetime
3✔
9
from os import makedirs, path
3✔
10
from pathlib import Path
3✔
11

12
import requests
3✔
13
from django.core.exceptions import ObjectDoesNotExist
3✔
14
from django.template.defaultfilters import date
3✔
15
from django.template.loader import render_to_string
3✔
16
from django.utils import translation
3✔
17
from django.utils.translation import gettext_lazy as _
3✔
18
from django.conf import settings
3✔
19
from zds.tutorialv2 import signals
3✔
20
from zds.tutorialv2.epub_utils import build_ebook
3✔
21
from zds.tutorialv2.models.database import ContentReaction, PublishedContent, PublicationEvent
3✔
22
from zds.tutorialv2.publish_container import publish_use_manifest
3✔
23
from zds.tutorialv2.signals import content_unpublished
3✔
24
from zds.tutorialv2.utils import export_content
3✔
25
from zds.forum.utils import send_post, lock_topic
3✔
26
from zds.utils.templatetags.emarkdown import render_markdown
3✔
27
from zds.utils.templatetags.smileys_def import SMILEYS_BASE_PATH, LICENSES_BASE_PATH
3✔
28

29
logger = logging.getLogger(__name__)
3✔
30
licences = {
3✔
31
    "by-nc-nd": "by-nc-nd.svg",
32
    "by-nc-sa": "by-nc-sa.svg",
33
    "by-nc": "by-nc.svg",
34
    "by-nd": "by-nd.svg",
35
    "by-sa": "by-sa.svg",
36
    "by": "by.svg",
37
    "0": "0.svg",
38
    "copyright": "copyright.svg",
39
}
40

41

42
def notify_update(db_object, is_update, is_major):
3✔
43
    if not is_update or is_major:
2✔
44
        # Follow
45
        signals.content_published.send(sender=db_object.__class__, instance=db_object, by_email=False)
2✔
46

47

48
def publish_content(db_object, versioned, is_major_update=True):
3✔
49
    """
50
    Publish a given content.
51

52
    .. note::
53
        create a manifest.json without the introduction and conclusion if not needed. Also remove the 'text' field
54
        of extracts.
55

56
    :param db_object: Database representation of the content
57
    :type db_object: zds.tutorialv2.models.database.PublishableContent
58
    :param versioned: version of the content to publish
59
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
60
    :param is_major_update: if set to `True`, will update the publication date
61
    :type is_major_update: bool
62
    :raise FailureDuringPublication: if something goes wrong
63
    :return: the published representation
64
    :rtype: zds.tutorialv2.models.database.PublishedContent
65
    """
66

67
    from zds.tutorialv2.models.database import PublishedContent
3✔
68

69
    if is_major_update:
3✔
70
        versioned.pubdate = datetime.now()
3✔
71

72
    # First write the files to a temporary directory: if anything goes wrong,
73
    # the last published version is not impacted !
74
    tmp_path = path.join(settings.ZDS_APP["content"]["repo_public_path"], versioned.slug + "__building")
3✔
75
    if path.exists(tmp_path):
3✔
76
        shutil.rmtree(tmp_path)  # remove previous attempt, if any
3✔
77

78
    # render HTML:
79
    altered_version = copy.deepcopy(versioned)
3✔
80
    char_count = publish_use_manifest(db_object, tmp_path, altered_version)
3✔
81
    altered_version.dump_json(path.join(tmp_path, "manifest.json"))
3✔
82

83
    # make room for 'extra contents'
84
    build_extra_contents_path = path.join(tmp_path, settings.ZDS_APP["content"]["extra_contents_dirname"])
3✔
85
    makedirs(build_extra_contents_path)
3✔
86
    base_name = path.join(build_extra_contents_path, versioned.slug)
3✔
87

88
    # 1. markdown file (base for the others) :
89
    # If we come from a command line, we need to activate i18n, to have the date in the french language.
90
    cur_language = translation.get_language()
3✔
91
    altered_version.pubdate = datetime.now()
3✔
92

93
    md_file_path = base_name + ".md"
3✔
94
    with contextlib.suppress(OSError):
3✔
95
        Path(Path(md_file_path).parent, "images").mkdir()
3✔
96
    is_update = False
3✔
97

98
    if db_object.public_version:
3✔
99
        public_version = update_existing_publication(db_object, versioned)
3✔
100
        is_update = True
3✔
101
    else:
102
        public_version = PublishedContent()
3✔
103

104
    # make the new public version
105
    public_version.content_public_slug = versioned.slug
3✔
106
    public_version.content_type = versioned.type
3✔
107
    public_version.content_pk = db_object.pk
3✔
108
    public_version.content = db_object
3✔
109
    public_version.must_reindex = True
3✔
110
    public_version.char_count = char_count
3✔
111
    public_version.save()
3✔
112
    with contextlib.suppress(FileExistsError):
3✔
113
        makedirs(public_version.get_extra_contents_directory())
3✔
114
    if is_major_update or not is_update:
3✔
115
        public_version.publication_date = datetime.now()
3✔
116
    elif is_update:
2!
117
        public_version.update_date = datetime.now()
2✔
118
    public_version.sha_public = versioned.current_version
3✔
119
    public_version.save(update_fields=["publication_date", "update_date", "sha_public"])
3✔
120

121
    public_version.authors.clear()
3✔
122
    for author in db_object.authors.all():
3✔
123
        public_version.authors.add(author)
3✔
124

125
    # this puts the manifest.json and base json file on the prod path.
126
    shutil.rmtree(public_version.get_prod_path(), ignore_errors=True)
3✔
127
    shutil.copytree(tmp_path, public_version.get_prod_path())
3✔
128
    db_object.sha_public = versioned.current_version
3✔
129
    public_version.save()
3✔
130
    if settings.ZDS_APP["content"]["extra_content_generation_policy"] == "SYNC":
3✔
131
        # ok, now we can really publish the thing!
132
        generate_external_content(base_name, build_extra_contents_path, md_file_path, versioned=versioned)
3✔
133
    elif settings.ZDS_APP["content"]["extra_content_generation_policy"] == "WATCHDOG":
2!
134
        PublicatorRegistry.get("watchdog").publish(md_file_path, base_name, silently_pass=False)
2✔
135

136
    return public_version
3✔
137

138

139
def update_existing_publication(db_object, versioned):
3✔
140
    public_version = db_object.public_version
3✔
141
    # the content has been published in the past, so we will clean up old files!
142
    old_path = public_version.get_prod_path()
3✔
143

144
    # if the slug has changed, create a new object instead of reusing the old one
145
    # this allows us to handle permanent redirection so that SEO is not impacted.
146
    if versioned.slug != public_version.content_public_slug:
3✔
147
        public_version.must_redirect = True  # set redirection
2✔
148
        public_version.save(update_fields=["must_redirect"])
2✔
149
        publication_date = public_version.publication_date
2✔
150
        db_object.public_version = PublishedContent()
2✔
151
        public_version = db_object.public_version
2✔
152

153
        # keep the same publication date if the content is already published
154
        public_version.publication_date = publication_date
2✔
155

156
    # remove old files only if everything succeed so far: if something bad
157
    # happened, we don't want to have a published content without content!
158
    logging.getLogger(__name__).debug("erase " + old_path)
3✔
159
    shutil.rmtree(old_path)
3✔
160

161
    return public_version
3✔
162

163

164
def write_md_file(md_file_path, parsed_with_local_images, versioned):
3✔
165
    with open(md_file_path, "w", encoding="utf-8") as md_file:
3✔
166
        try:
3✔
167
            md_file.write(parsed_with_local_images)
3✔
168
        except UnicodeError:
×
169
            logger.error("Could not encode %s in UTF-8, publication aborted", versioned.title)
×
170
            raise FailureDuringPublication(
×
171
                _(
172
                    "Une erreur est survenue durant la génération du fichier markdown "
173
                    "à télécharger, vérifiez le code markdown"
174
                )
175
            )
176

177

178
def generate_external_content(
3✔
179
    base_name, extra_contents_path, md_file_path, overload_settings=False, excluded=None, **kwargs
180
):
181
    """
182
    generate all static file that allow offline access to content
183

184
    :param base_name: base nae of file (without extension)
185
    :param extra_contents_path: internal directory where all files will be pushed
186
    :param md_file_path: bundled markdown file path
187
    :param overload_settings: this option force the function to generate all registered formats even when settings \
188
    ask for PDF not to be published
189
    :param excluded: list of excluded format, None if no exclusion
190
    """
191
    excluded = excluded or ["watchdog"]
3✔
192
    if not settings.ZDS_APP["content"]["build_pdf_when_published"] and not overload_settings:
3✔
193
        excluded.append("pdf")
3✔
194
    for publicator_name, publicator in PublicatorRegistry.get_all_registered(excluded):
3✔
195
        try:
3✔
196
            publicator.publish(
3✔
197
                md_file_path,
198
                base_name,
199
                change_dir=extra_contents_path,
200
                cur_language=translation.get_language(),
201
                **kwargs,
202
            )
203
        except (FailureDuringPublication, OSError):
1✔
204
            logging.getLogger(__name__).exception(
1✔
205
                "Could not publish %s format from %s base.", publicator_name, md_file_path
206
            )
207

208

209
class PublicatorRegistry:
3✔
210
    """
211
    Register all publicator as a 'human-readable name/publicator' instance key/value list
212
    """
213

214
    registry = {}
3✔
215

216
    @classmethod
3✔
217
    def register(cls, publicator_name, *args):
3✔
218
        def decorated(func):
3✔
219
            cls.registry[publicator_name] = func(*args)
3✔
220
            return func
3✔
221

222
        return decorated
3✔
223

224
    @classmethod
3✔
225
    def get_all_registered(cls, exclude=None):
3✔
226
        """
227
        Args:
228
            exclude: A list of excluded publicator
229

230
        Returns:
231
        """
232
        if exclude is None:
3✔
233
            exclude = []
1✔
234
        order_key = {
3✔
235
            "zip": 1,
236
            "md": 2,
237
            "html": 3,
238
            "epub": 4,
239
            "pdf": 5,
240
        }
241
        for key, value in sorted(cls.registry.items(), key=lambda k: order_key.get(k[0], 42)):
3✔
242
            if key not in exclude:
3✔
243
                yield key, value
3✔
244

245
    @classmethod
3✔
246
    def unregister(cls, name):
3✔
247
        """
248
        Remove registered Publicator named 'name' if present
249

250
        :param name: publicator name.
251
        """
252
        if name in cls.registry:
×
253
            del cls.registry[name]
×
254

255
    @classmethod
3✔
256
    def get(cls, name):
3✔
257
        """
258
        Get publicator named 'name'.
259

260
        :param name:
261
        :return: the wanted publicator
262
        :rtype: Publicator
263
        :raise KeyError: if publicator is not registered
264
        """
265
        return cls.registry[name]
2✔
266

267

268
class Publicator:
3✔
269
    """
270
    Publicator base object, all methods must be overridden
271
    """
272

273
    def publish(self, md_file_path, base_name, **kwargs):
3✔
274
        """
275
        Function called to generate a content export
276

277
        :param md_file_path: base markdown file path
278
        :param base_name: file name without extension
279
        :param kwargs: other publicator dependent options
280
        """
281
        raise NotImplementedError()
×
282

283
    def get_published_content_entity(self, md_file_path) -> PublishedContent:
3✔
284
        """
285
        Retrieve the db entity from mdfile path
286

287
        :param md_file_path: mdfile path as string
288
        :type md_file_path: str
289
        :return: the db entity
290
        :rtype: zds.tutorialv2.models.models_database.PublishedContent
291
        """
292
        content_slug = PublishedContent.get_slug_from_file_path(md_file_path)
3✔
293
        published_content_entity = PublishedContent.objects.filter(content_public_slug=content_slug).first()
3✔
294
        return published_content_entity
3✔
295

296

297
@PublicatorRegistry.register("md")
3✔
298
class MarkdownPublicator(Publicator):
3✔
299
    def publish(self, md_file_path, base_name, *, cur_language=settings.LANGUAGE_CODE, **kwargs):
3✔
300
        published_content_entity = self.get_published_content_entity(md_file_path)
3✔
301
        versioned = kwargs.pop("versioned", None)
3✔
302
        if not versioned:
3✔
303
            # do not use load_public_version as it lacks of information to get the content
304
            # if you use it you will only get titles, without the text
305
            versioned = published_content_entity.content.load_version(sha=published_content_entity.sha_public)
1✔
306
        try:
3✔
307
            translation.activate(settings.LANGUAGE_CODE)
3✔
308
            parsed = render_to_string("tutorialv2/export/content.md", {"content": versioned})
3✔
309
        except requests.exceptions.HTTPError:
×
310
            raise FailureDuringPublication("Could not publish flat markdown")
×
311
        finally:
312
            translation.activate(cur_language)
3!
313

314
        write_md_file(md_file_path, parsed, versioned)
3✔
315
        if "__building" in md_file_path:
3✔
316
            shutil.copy2(md_file_path, md_file_path.replace("__building", ""))
3✔
317

318

319
def _read_flat_markdown(md_file_path):
3✔
320
    with open(md_file_path, encoding="utf-8") as md_file_handler:
×
321
        md_flat_content = md_file_handler.read()
×
322
    return md_flat_content
×
323

324

325
@PublicatorRegistry.register("zip")
3✔
326
class ZipPublicator(Publicator):
3✔
327
    def publish(self, md_file_path, base_name, **kwargs):
3✔
328
        try:
3✔
329
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
330
            if published_content_entity is None:
3!
331
                raise ValueError("published_content_entity is None")
×
332
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_zip"]:
3!
333
                logger.info("ZIP not allowed for opinions.")
×
334
                return
×
335
            make_zip_file(published_content_entity)
3✔
336
            # no need to move zip file because it is already dumped to the public directory
337
        except (OSError, ValueError) as e:
×
338
            raise FailureDuringPublication("Zip could not be created", e)
×
339

340

341
@PublicatorRegistry.register("pdf")
3✔
342
class ZMarkdownRebberLatexPublicator(Publicator):
3✔
343
    """
344
    Use zmarkdown and rebber stringifier to produce latex & pdf output.
345
    """
346

347
    def __init__(self, extension=".pdf", latex_classes=""):
3✔
348
        self.extension = extension
3✔
349
        self.doc_type = extension[1:]
3✔
350
        self.latex_classes = latex_classes
3✔
351

352
    def publish(self, md_file_path, base_name, **kwargs):
3✔
353
        published_content_entity = self.get_published_content_entity(md_file_path)
1✔
354
        if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_pdf"]:
1!
355
            logger.info("PDF not allowed for opinions")
×
356
            return
×
357
        gallery_pk = published_content_entity.content.gallery.pk
1✔
358
        depth_to_size_map = {
1✔
359
            1: "small",  # in fact this is an "empty" tutorial (i.e it is empty or has intro and/or conclusion)
360
            2: "small",
361
            3: "middle",
362
            4: "big",
363
        }
364
        public_versionned_source = published_content_entity.content.load_version(
1✔
365
            sha=published_content_entity.sha_public
366
        )
367
        base_directory = Path(base_name).parent
1✔
368
        image_dir = base_directory / "images"
1✔
369
        with contextlib.suppress(FileExistsError):
1✔
370
            image_dir.mkdir(parents=True)
1✔
371
        if (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).exists():
1!
372
            for image in (settings.MEDIA_ROOT / "galleries" / str(gallery_pk)).iterdir():
1!
373
                with contextlib.suppress(OSError):
×
374
                    shutil.copy2(str(image.absolute()), str(image_dir))
×
375
        content_type = depth_to_size_map[public_versionned_source.get_tree_level()]
1✔
376
        if self.latex_classes:
1!
377
            content_type += ", " + self.latex_classes
×
378
        title = published_content_entity.title()
1✔
379
        authors = [a.username for a in published_content_entity.authors.all()]
1✔
380

381
        licence = published_content_entity.content.licence.code
1✔
382
        licence_short = licence.replace("CC", "").strip().lower()
1✔
383
        licence_logo = licences.get(licence_short, False)
1✔
384
        if licence_logo:
1!
385
            licence_url = f"https://creativecommons.org/licenses/{licence_short}/4.0/legalcode"
×
386
            # we need a specific case for CC-0 as it is "public-domain"
387
            if licence_logo == licences["0"]:
×
388
                licence_url = "https://creativecommons.org/publicdomain/zero/1.0/legalcode.fr"
×
389
        else:
390
            licence = str(_("Tous droits réservés"))
1✔
391
            licence_logo = licences["copyright"]
1✔
392
            licence_url = ""
1✔
393

394
        replacement_image_url = str(settings.MEDIA_ROOT.parent)
1✔
395
        if not replacement_image_url.endswith("/"):
1!
396
            replacement_image_url += "/"
1✔
397
        replaced_media_url = settings.MEDIA_URL
1✔
398
        if replaced_media_url.startswith("/"):
1!
399
            replaced_media_url = replaced_media_url[1:]
1✔
400
        exported = export_content(public_versionned_source, with_text=True, ready_to_publish_only=True)
1✔
401
        # no title to avoid zmd to put it on the final latex
402
        del exported["title"]
1✔
403
        content, metadata, messages = render_markdown(
1✔
404
            exported,
405
            output_format="texfile",
406
            # latex template arguments
407
            content_type=content_type,
408
            title=title,
409
            authors=authors,
410
            license=licence,
411
            license_directory=str(LICENSES_BASE_PATH),
412
            license_logo=licence_logo,
413
            license_url=licence_url,
414
            smileys_directory=str(SMILEYS_BASE_PATH / "svg"),
415
            images_download_dir=str(base_directory / "images"),
416
            local_url_to_local_path=["/", replacement_image_url],
417
            heading_shift=-1,
418
            date=date(published_content_entity.last_publication_date, "l d F Y"),
419
        )
420
        if content == "" and messages:
1!
421
            raise FailureDuringPublication(f"Markdown was not parsed due to {messages}")
×
422
        zmd_class_dir_path = Path(settings.ZDS_APP["content"]["latex_template_repo"])
1✔
423
        content.replace(replacement_image_url + replaced_media_url, replacement_image_url)
1✔
424
        if zmd_class_dir_path.exists() and zmd_class_dir_path.is_dir():
1!
425
            with contextlib.suppress(FileExistsError):
×
426
                zmd_class_link = base_directory / "zmdocument.cls"
×
427
                zmd_class_link.symlink_to(zmd_class_dir_path / "zmdocument.cls")
×
428
                luatex_dir_link = base_directory / "utf8.lua"
×
429
                luatex_dir_link.symlink_to(zmd_class_dir_path / "utf8.lua", target_is_directory=True)
×
430
        true_latex_extension = ".".join(self.extension.split(".")[:-1]) + ".tex"
1✔
431
        latex_file_path = base_name + true_latex_extension
1✔
432
        pdf_file_path = base_name + self.extension
1✔
433
        default_logo_original_path = Path(__file__).parent / ".." / ".." / "assets" / "images" / "logo@2x.png"
1✔
434
        with contextlib.suppress(FileExistsError):
1✔
435
            shutil.copy(str(default_logo_original_path), str(base_directory / "default_logo.png"))
1✔
436
        with open(latex_file_path, mode="w", encoding="utf-8") as latex_file:
1✔
437
            latex_file.write(content)
1✔
438
        shutil.copy2(latex_file_path, published_content_entity.get_extra_contents_directory())
1✔
439

440
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
1✔
441
        self.full_tex_compiler_call(latex_file_path, draftmode="-draftmode")
×
442
        self.make_glossary(base_name.split("/")[-1], latex_file_path)
×
443
        self.full_tex_compiler_call(latex_file_path)
×
444

445
        shutil.copy2(pdf_file_path, published_content_entity.get_extra_contents_directory())
×
446

447
    def full_tex_compiler_call(self, latex_file, draftmode: str = ""):
3✔
448
        success_flag = self.tex_compiler(latex_file, draftmode)
1✔
449
        if not success_flag:
1!
450
            handle_tex_compiler_error(latex_file, self.extension)
1✔
451

452
    def handle_makeglossaries_error(self, latex_file):
3✔
453
        with open(path.splitext(latex_file)[0] + ".log") as latex_log:
×
454
            errors = "\n".join(filter(line for line in latex_log if "fatal" in line.lower() or "error" in line.lower()))
×
455
        raise FailureDuringPublication(errors)
×
456

457
    def tex_compiler(self, texfile, draftmode: str = ""):
3✔
458
        command = f"lualatex -shell-escape -interaction=nonstopmode {draftmode} {texfile}"
1✔
459
        command_process = subprocess.Popen(
1✔
460
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
461
        )
462
        # let's put 10 min of timeout because we do not generate latex everyday
463
        command_process.communicate(timeout=600)
1✔
464

465
        pdf_file_path = path.splitext(texfile)[0] + self.extension
1✔
466
        return path.exists(pdf_file_path)
1✔
467

468
    def make_glossary(self, basename, texfile):
3✔
469
        command = f"makeglossaries {basename}"
×
470
        command_process = subprocess.Popen(
×
471
            command, shell=True, cwd=path.dirname(texfile), stdout=subprocess.PIPE, stderr=subprocess.PIPE
472
        )
473
        std_out, std_err = command_process.communicate()
×
474

475
        # TODO: check makeglossary exit codes to see if we can enhance error detection
476
        if "fatal" not in std_out.decode("utf-8").lower() and "fatal" not in std_err.decode("utf-8").lower():
×
477
            return True
×
478

479
        self.handle_makeglossaries_error(texfile)
×
480

481

482
def handle_tex_compiler_error(latex_file_path, ext):
3✔
483
    # TODO zmd: fix extension parsing
484
    log_file_path = latex_file_path[:-3] + "log"
1✔
485
    errors = [f"Error occured, log file {log_file_path} not found."]
1✔
486
    with contextlib.suppress(FileNotFoundError, UnicodeDecodeError):
1✔
487
        with Path(log_file_path).open(encoding="utf-8") as latex_log:
1✔
488
            print_context = 25
×
489
            lines = []
×
490
            relevant_line = -print_context
×
491
            for idx, line in enumerate(latex_log):
×
492
                if "fatal" in line.lower() or "error" in line.lower():
×
493
                    relevant_line = idx
×
494
                    lines.append(line)
×
495
                elif idx - relevant_line < print_context:
×
496
                    lines.append(line)
×
497

498
            errors = "\n".join(lines)
1✔
499
    logger.debug("%s ext=%s", errors, ext)
1✔
500

501
    raise FailureDuringPublication(errors)
1✔
502

503

504
@PublicatorRegistry.register("epub")
3✔
505
class ZMarkdownEpubPublicator(Publicator):
3✔
506
    def publish(self, md_file_path, base_name, **kwargs):
3✔
507
        try:
3✔
508
            published_content_entity = self.get_published_content_entity(md_file_path)
3✔
509
            if published_content_entity.content.type == "OPINION" and not settings.ZDS_APP["opinions"]["allow_epub"]:
3!
510
                logger.info("EPUB not allowed for opinions")
×
511
                return
×
512
            epub_file_path = Path(base_name + ".epub")
3✔
513
            logger.info("Start generating epub")
3✔
514
            build_ebook(published_content_entity, path.dirname(md_file_path), epub_file_path)
3✔
515
        except (OSError, requests.exceptions.HTTPError):
×
516
            raise FailureDuringPublication("Error while generating epub file.")
×
517
        else:
518
            logger.info(epub_file_path)
3✔
519
            epub_path = Path(published_content_entity.get_extra_contents_directory(), Path(epub_file_path.name))
3✔
520
            if epub_path.exists():
3!
521
                os.remove(str(epub_path))
×
522
            if not epub_path.parent.exists():
3!
523
                epub_path.parent.mkdir(parents=True)
×
524
            logger.info(
3✔
525
                "created %s. moving it to %s", epub_file_path, published_content_entity.get_extra_contents_directory()
526
            )
527
            shutil.move(str(epub_file_path), published_content_entity.get_extra_contents_directory())
3✔
528

529

530
@PublicatorRegistry.register("watchdog")
3✔
531
class WatchdogFilePublicator(Publicator):
3✔
532
    def publish(self, md_file_path, base_name, silently_pass=True, **kwargs):
3✔
533
        if silently_pass:
2!
534
            return
×
535
        published_content = self.get_published_content_entity(md_file_path)
2✔
536
        self.publish_from_published_content(published_content)
2✔
537

538
    def publish_from_published_content(self, published_content: PublishedContent):
3✔
539
        for requested_format in PublicatorRegistry.get_all_registered(["watchdog"]):
2✔
540
            # Remove previous PublicationEvent for this content, not handled by
541
            # the publication watchdog yet:
542
            PublicationEvent.objects.filter(
2✔
543
                state_of_processing="REQUESTED",
544
                published_object__content_pk=published_content.content_pk,
545
                format_requested=requested_format[0],
546
            ).delete()
547
            PublicationEvent.objects.create(
2✔
548
                state_of_processing="REQUESTED",
549
                published_object=published_content,
550
                format_requested=requested_format[0],
551
            )
552

553

554
class FailureDuringPublication(Exception):
3✔
555
    """Exception raised if something goes wrong during publication process"""
556

557
    def __init__(self, *args, **kwargs):
3✔
558
        super().__init__(*args, **kwargs)
1✔
559

560

561
def make_zip_file(published_content):
3✔
562
    """Create the zip archive extra content from the published content
563

564
    :param published_content: a PublishedContent object
565
    :return:
566
    """
567

568
    publishable = published_content.content
3✔
569
    # update SHA so that archive gets updated too
570
    publishable.sha_public = publishable.sha_draft
3✔
571
    file_path = path.join(
3✔
572
        published_content.get_extra_contents_directory(), published_content.content_public_slug + ".zip"
573
    )
574
    zip_file = zipfile.ZipFile(file_path, "w")
3✔
575
    versioned = publishable.load_version(None, True)
3✔
576
    from zds.tutorialv2.views.archives import DownloadContent
3✔
577

578
    DownloadContent.insert_into_zip(zip_file, versioned.repository.commit(versioned.current_version).tree)
3✔
579
    zip_file.close()
3✔
580
    return file_path
3✔
581

582

583
def unpublish_content(db_object, moderator=None):
3✔
584
    """
585
    Remove the given content from the public view.
586

587
    .. note::
588
        This will send content_unpublished event.
589

590
    :param db_object: Database representation of the content
591
    :type db_object: PublishableContent
592
    :param moderator: the staff user who triggered the unpublish action.
593
    :type moderator: django.contrib.auth.models.User
594
    :return: ``True`` if unpublished, ``False`` otherwise
595
    :rtype: bool
596
    """
597

598
    from zds.tutorialv2.models.database import PublishedContent
2✔
599

600
    with contextlib.suppress(ObjectDoesNotExist, OSError):
2✔
601
        public_version = PublishedContent.objects.get(pk=db_object.public_version.pk)
2✔
602

603
        results = [
2✔
604
            content_unpublished.send(
605
                sender=reaction.__class__, instance=db_object, target=ContentReaction, moderator=moderator, user=None
606
            )
607
            for reaction in [ContentReaction.objects.filter(related_content=db_object).all()]
608
        ]
609
        logging.debug("Nb_messages=%d, messages=%s", len(results), results)
2✔
610
        # remove public_version:
611
        public_version.delete()
2✔
612
        update_params = {"public_version": None}
2✔
613

614
        if db_object.is_opinion:
2✔
615
            update_params["sha_public"] = None
1✔
616
            update_params["sha_picked"] = None
1✔
617
            update_params["pubdate"] = None
1✔
618

619
        db_object.update(**update_params)
2✔
620
        content_unpublished.send(
2✔
621
            sender=db_object.__class__, instance=db_object, target=db_object.__class__, moderator=moderator
622
        )
623
        # clean files
624
        old_path = public_version.get_prod_path()
2✔
625
        public_version.content.update(public_version=None, sha_public=None)
2✔
626
        if path.exists(old_path):
2!
627
            shutil.rmtree(old_path)
2✔
628
        return True
2✔
629

630
    return False
×
631

632

633
def close_article_beta(db_object, versioned, user, request=None):
3✔
634
    """
635
    Close forum topic of an article if the artcle was in beta.
636
    :param db_object: the article
637
    :type db_object: zds.tutorialv2.models.database.PublishableContent
638
    :param versioned: the public version of article, used to pupulate closing post
639
    :type versioned: zds.tutorialv2.models.versioned.VersionedContent
640
    :param user: the current user
641
    :param request: the current request
642
    """
643
    if db_object.type == "ARTICLE":
1✔
644
        db_object.sha_beta = None
1✔
645
        topic = db_object.beta_topic
1✔
646
        if topic is not None and not topic.is_locked:
1✔
647
            msg_post = render_to_string("tutorialv2/messages/beta_desactivate.md", {"content": versioned})
1✔
648
            send_post(request, topic, user, msg_post)
1✔
649
            lock_topic(topic)
1✔
650

651

652
def save_validation_state(
3✔
653
    db_object,
654
    is_update,
655
    published: PublishedContent,
656
    validation,
657
    versioned,
658
    source="",
659
    is_major=False,
660
    user=None,
661
    request=None,
662
    comment="",
663
):
664
    """
665
    Save validation after publication, changes its status to ACCEPT
666
    :param db_object:  the content
667
    :type db_object: zds.tutorialv2.models.database.PublishableContent
668
    :param is_update: marks if the publication is an update or a new/major publication
669
    :param published: the PublishedContent instance
670
    :param validation: the related validation
671
    :param versioned:  the VersionedContent related to the public sha
672
    :param source: the optional cannonical link
673
    :param is_major: marks a major publication (first one, or new parts for example)
674
    :param user: validating user
675
    :param request: current request to get hats, and send error messages if needed
676
    """
677
    db_object.sha_public = validation.version
1✔
678
    db_object.source = source
1✔
679
    db_object.sha_validation = None
1✔
680
    db_object.public_version = published
1✔
681
    if is_major or not is_update or db_object.pubdate is None:
1✔
682
        db_object.pubdate = datetime.now()
1✔
683
        db_object.is_obsolete = False
1✔
684

685
    # close beta if is an article
686
    close_article_beta(db_object, versioned, user=user, request=request)
1✔
687
    db_object.save()
1✔
688
    # save validation object
689
    validation.comment_validator = comment
1✔
690
    validation.status = "ACCEPT"
1✔
691
    validation.date_validation = datetime.now()
1✔
692
    validation.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc