• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

zestedesavoir / zds-site / 25093716915

06 Apr 2026 01:15PM UTC coverage: 89.451%. Remained the same
25093716915

push

github

web-flow
Met à jour la version de Node utilisée vers Node 24 (dernière LTS) (#6769)

3099 of 4138 branches covered (74.89%)

17094 of 19110 relevant lines covered (89.45%)

1.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.24
/zds/tutorialv2/views/statistics.py
1
import itertools
3✔
2
import logging
3✔
3
import urllib.parse
3✔
4
from datetime import date, datetime, timedelta
3✔
5
from typing import Any, List
3✔
6

7
import requests
3✔
8
from django.conf import settings
3✔
9
from django.contrib import messages
3✔
10
from django.core.exceptions import PermissionDenied
3✔
11
from django.urls import reverse
3✔
12
from django.utils.translation import gettext_lazy as _
3✔
13
from django.views.generic import FormView
3✔
14

15
from zds.tutorialv2.forms import ContentCompareStatsURLForm
3✔
16
from zds.tutorialv2.mixins import SingleOnlineContentDetailViewMixin
3✔
17
from zds.tutorialv2.models.versioned import VersionedContent
3✔
18
from zds.tutorialv2.utils import NamedUrl
3✔
19

20

21
class StatisticsException(Exception):
3✔
22
    """A class to distinguish exceptions raised ourselves by our code from
23
    other exceptions: ours have two arguments: the logger to use and the
24
    message."""
25

26
    def __init__(self, logger, msg):
3✔
27
        super().__init__(logger, msg)
1✔
28

29

30
class ContentStatisticsView(SingleOnlineContentDetailViewMixin, FormView):
3✔
31
    template_name = "tutorialv2/stats/index.html"
3✔
32
    form_class = ContentCompareStatsURLForm
3✔
33
    urls = []
3✔
34
    matomo_token_auth = settings.ZDS_APP["site"]["matomo_token_auth"]
3✔
35
    matomo_api_url = "{}/index.php?token_auth={}".format(settings.ZDS_APP["site"]["matomo_url"], matomo_token_auth)
3✔
36
    matomo_site_id = settings.ZDS_APP["site"]["matomo_site_id"]
3✔
37
    logger = logging.getLogger(__name__)
3✔
38

39
    def post(self, request, *args, **kwargs):
3✔
40
        self.public_content_object = self.get_public_object()
1✔
41
        self.object = self.get_object()
1✔
42
        self.versioned_object = self.get_versioned_object()
1✔
43
        return super().post(request, *args, **kwargs)
1✔
44

45
    def get_form_kwargs(self):
3✔
46
        kwargs = super().get_form_kwargs()
1✔
47
        kwargs["urls"] = [(named_url.url, named_url.name) for named_url in self.get_urls_to_render()]
1✔
48
        return kwargs
1✔
49

50
    def form_valid(self, form):
3✔
51
        self.urls = form.cleaned_data["urls"]
1✔
52
        return super().get(self.request)
1✔
53

54
    def get_urls_to_render(self):
3✔
55
        all_named_urls = self.get_content_urls()
1✔
56
        base_list = self.request.GET.getlist("urls", None) or self.urls
1✔
57
        if base_list:
1✔
58
            return [named_url for named_url in all_named_urls if named_url.url in base_list]
1✔
59
        else:
60
            return all_named_urls
1✔
61

62
    def get_content_urls(self):
3✔
63
        content = self.versioned_object
1✔
64
        urls = [NamedUrl(content.title, content.get_absolute_url_online(), 0)]
1✔
65
        if content.has_extracts():
1!
66
            return urls
×
67
        for child in content.children:
1✔
68
            urls.append(NamedUrl(child.title, child.get_absolute_url_online(), 1))
1✔
69
            if not child.has_extracts():
1!
70
                for subchild in child.children:
1✔
71
                    urls.append(NamedUrl(subchild.title, subchild.get_absolute_url_online(), 2))
1✔
72
        return urls
1✔
73

74
    def get_export_urls(self):
3✔
75
        content: VersionedContent = self.versioned_object
1✔
76
        content_type = content.type.lower()
1✔
77
        epub_url = reverse(f"{content_type}:download-epub", kwargs={"pk": content.pk, "slug": content.slug})
1✔
78
        pdf_url = reverse(f"{content_type}:download-pdf", kwargs={"pk": content.pk, "slug": content.slug})
1✔
79
        urls = [
1✔
80
            NamedUrl("epub", epub_url, 0),
81
            NamedUrl("pdf", pdf_url, 0),
82
        ]
83

84
        return urls
1✔
85

86
    def get_all_statistics(
3✔
87
        self, urls: list[NamedUrl], start: datetime | date, end: datetime | date, methods
88
    ) -> dict[str, list]:
89
        """Fetch statistics from Matomo in batches of at most 3 URLs per request.
90

91
        Matomo bulk API is queried with a cartesian product of `methods` and a
92
        chunk of `urls` (size <= 3). Results are aggregated to preserve the
93
        original `urls` order for each method.
94
        """
95
        date_ranges = f"{start:%Y-%m-%d},{end:%Y-%m-%d}"
1✔
96

97
        # Initialize the aggregated structure with one list per method
98
        data_structured = {method: [] for method in methods}
1✔
99

100
        # Process URLs in chunks of 3 to limit the bulk size per request
101
        # this is done to reduce matomo memory load on big tuto with many parts & chapters
102
        for i in range(0, len(urls), 3):
1✔
103
            url_chunk = urls[i : min(i + 3, len(urls))]
1✔
104

105
            # Build a fresh bulk request for this chunk
106
            data_request = {
1✔
107
                "module": "API",
108
                "method": "API.getBulkRequest",
109
                "format": "json",
110
                "filter_limit": -1,
111
            }
112

113
            self.build_matomo_payload(data_request, date_ranges, methods, url_chunk)
1✔
114

115
            # Execute the bulk request for this chunk
116
            response_matomo = requests.post(url=self.matomo_api_url, data=data_request)
1✔
117
            data = response_matomo.json()
1✔
118

119
            # Top-level error returned by Matomo
120
            if isinstance(data, dict) and data.get("result", "") == "error":
1✔
121
                raise StatisticsException(self.logger.error, data.get("message", "Unknown matomo error"))
1✔
122

123
            # Validate and aggregate each response item in the same order
124
            for page_index, matomo_stat_for_url_tuple in enumerate(itertools.product(methods, url_chunk)):
1✔
125
                if isinstance(data[page_index], dict) and data[page_index].get("result", "") == "error":
1!
126
                    raise StatisticsException(
×
127
                        self.logger.error,
128
                        data[page_index].get("message", "Unknown matomo error for " + matomo_stat_for_url_tuple[1].url),
129
                    )
130

131
                method = matomo_stat_for_url_tuple[0]
1✔
132
                data_structured[method].append(data[page_index])
1✔
133

134
        return data_structured
1✔
135

136
    def build_matomo_payload(self, data_request: dict[str, str | int], date_ranges, methods: list[str], url_chunk):
3✔
137
        for index, method_url in enumerate(itertools.product(methods, url_chunk)):
1✔
138
            method = method_url[0]
1✔
139
            url = method_url[1]
1✔
140
            absolute_url = f"{self.request.scheme}://{self.request.get_host()}{url.url}"
1✔
141
            param_url = f"pageUrl=={urllib.parse.quote_plus(absolute_url)}"
1✔
142

143
            request_params = {
1✔
144
                "method": method,
145
                "idSite": self.matomo_site_id,
146
                "date": date_ranges,
147
                "period": "day",
148
            }
149
            # Referrers requests use segment to define url
150
            if method.startswith("Referrers"):
1✔
151
                request_params["segment"] = ",".join([param_url])
1✔
152
            elif method == "Actions.getPageUrl":
1!
153
                request_params["pageUrl"] = absolute_url
1✔
154

155
            data_request.update({f"urls[{index}]": urllib.parse.urlencode(request_params)})
1✔
156

157
    @staticmethod
3✔
158
    def get_stat_metrics(data, metric_name):
3✔
159
        x = []
1✔
160
        y = []
1✔
161
        for key, val in data.items():
1✔
162
            x.append(key)
1✔
163
            if len(val) == 0:
1!
164
                y.append(0)
×
165
            else:
166
                y.append(val[0].get(metric_name, 0))
1✔
167

168
        return (x, y)
1✔
169

170
    @staticmethod
3✔
171
    def get_ref_metrics(data):
3✔
172
        refs = {}
1✔
173
        for key, val in data.items():
1✔
174
            for item in val:
1✔
175
                if item["label"] in refs:
1✔
176
                    refs[item["label"]] += item["nb_visits"]
1✔
177
                else:
178
                    refs[item["label"]] = item["nb_visits"]
1✔
179

180
        return refs
1✔
181

182
    def get_start_and_end_dates(self):
3✔
183
        end_date = self.request.GET.get("end_date", None)
1✔
184
        try:
1✔
185
            end_date = datetime.strptime(end_date, "%Y-%m-%d").date()
1✔
186
        except TypeError:
1✔
187
            end_date = date.today()
1✔
188
        except ValueError:
×
189
            end_date = date.today()
×
190
            messages.error(self.request, _("La date de fin fournie est invalide."))
×
191

192
        start_date = self.request.GET.get("start_date", None)
1✔
193
        try:
1✔
194
            start_date = datetime.strptime(start_date, "%Y-%m-%d").date()
1✔
195
        except TypeError:
1✔
196
            start_date = end_date - timedelta(days=7)
1✔
197
        except ValueError:
1✔
198
            start_date = end_date - timedelta(days=7)
1✔
199
            messages.error(self.request, _("La date de début fournie est invalide."))
1✔
200

201
        if start_date > end_date:
1✔
202
            end_date, start_date = start_date, end_date
1✔
203

204
        return start_date, end_date
1✔
205

206
    def get_display_mode(self, urls):
3✔
207
        # TODO make display_mode an enum ?
208
        # Good idea, but not straightforward for the template integration
209
        if len(urls) == len(self.get_content_urls()):
1✔
210
            return "global"
1✔
211
        if len(urls) == 1:
1✔
212
            return "details"
1✔
213
        return "comparison"
1✔
214

215
    @staticmethod
3✔
216
    def get_cumulative(stats: dict[str, list]) -> dict[str, int]:
3✔
217
        cumul = {"total": 0}
1✔
218
        for info_date, infos_stat in stats.items():
1✔
219
            cumul["total"] += len(infos_stat)
1✔
220
            for info_stat in infos_stat:
1✔
221
                for key, val in info_stat.items():
1✔
222
                    if type(val) == str or isinstance(val, dict):
1✔
223
                        continue
1✔
224
                    if key in cumul:
1✔
225
                        cumul[key] += int(val)
1✔
226
                    else:
227
                        cumul[key] = int(val)
1✔
228
        return cumul
1✔
229

230
    @staticmethod
3✔
231
    def merge_ref_to_data(metrics, refs):
3✔
232
        for key, item in refs.items():
1✔
233
            if key in metrics:
1✔
234
                metrics[key] += item
1✔
235
            else:
236
                metrics[key] = item
1✔
237
        return metrics
1✔
238

239
    @staticmethod
3✔
240
    def merge_report_to_global(reports, fields):
3✔
241
        metrics = {}
1✔
242
        for key, item in reports.items():
1✔
243
            for field, is_avg in fields:
1✔
244
                if field in metrics:
1✔
245
                    metrics[field] = (
1✔
246
                        metrics[field][0],
247
                        [i + j for (i, j) in zip(metrics[field][1], item.get(field)[1])],
248
                    )
249
                else:
250
                    metrics[field] = item.get(field)
1✔
251
        return metrics
1✔
252

253
    def get_context_data(self, **kwargs):
3✔
254
        context = super().get_context_data(**kwargs)
1✔
255
        if not (self.is_author or self.is_staff):
1✔
256
            raise PermissionDenied
1✔
257

258
        urls = self.get_urls_to_render()
1✔
259
        start_date, end_date = self.get_start_and_end_dates()
1✔
260
        display_mode = self.get_display_mode(urls)
1✔
261
        reports = {}
1✔
262
        cumulative_stats = {}
1✔
263
        referrers = {}
1✔
264
        type_referrers = {}
1✔
265
        keywords = {}
1✔
266
        report_fields = [("nb_uniq_visitors", False), ("nb_hits", False), ("avg_time_on_page", True)]
1✔
267
        export_urls = self.get_export_urls()
1✔
268

269
        try:
1✔
270
            all_statistics = self.get_all_statistics(
1✔
271
                urls,
272
                start_date,
273
                end_date,
274
                ["Referrers.getReferrerType", "Referrers.getWebsites", "Referrers.getKeywords", "Actions.getPageUrl"],
275
            )
276
            export_statistics = self.get_all_statistics(export_urls, start_date, end_date, ["Actions.getPageUrl"])
1✔
277
        except StatisticsException as e:
1✔
278
            all_statistics = {}
1✔
279
            export_statistics = {}
1✔
280
            logger_method, msg = e.args
1✔
281
            logger_method(f"Something failed with Matomo reporting system: {msg}")
1✔
282
            messages.error(self.request, _("Impossible de récupérer les statistiques du site ({}).").format(msg))
1✔
283
        except Exception as e:
×
284
            all_statistics = {}
×
285
            export_statistics = {}
×
286
            self.logger.error(f"Something failed with Matomo reporting system: {e}")
×
287
            messages.error(self.request, _("Impossible de récupérer les statistiques du site ({}).").format(e))
×
288

289
        result_report = {}
1✔
290
        if all_statistics:
1✔
291
            all_stats = all_statistics["Actions.getPageUrl"]
1✔
292
            all_ref_websites = all_statistics["Referrers.getWebsites"]
1✔
293
            all_ref_types = all_statistics["Referrers.getReferrerType"]
1✔
294
            all_ref_keyword = all_statistics["Referrers.getKeywords"]
1✔
295

296
            for index, url in enumerate(urls):
1✔
297
                grand_totals = ContentStatisticsView.get_cumulative(all_stats[index])
1✔
298
                reports[url] = {}
1✔
299
                cumulative_stats[url] = {}
1✔
300

301
                self.__format_result(all_stats, grand_totals, cumulative_stats, index, report_fields, reports, url)
1✔
302

303
                referrers = self.merge_ref_to_data(referrers, self.get_ref_metrics(all_ref_websites[index]))
1✔
304
                type_referrers = self.merge_ref_to_data(type_referrers, self.get_ref_metrics(all_ref_types[index]))
1✔
305
                keywords = self.merge_ref_to_data(keywords, self.get_ref_metrics(all_ref_keyword[index]))
1✔
306

307
            if display_mode.lower() == "global":
1✔
308
                """
309
                {
310
                  "global": {
311
                        "nb_uniq_visitors": (
312
                            ["2025-11-06", "2025-11-07", "2025-11-08"],
313
                            [2500, 2800, 2650] ),
314
                        "nb_hits": (
315
                            ["2025-11-06", "2025-11-07", "2025-11-08"],
316
                            [8500, 9200, 8900]  # Somme de tous les hits pour toutes les URLs
317
                        ),
318
                        "avg_time_on_page": (
319
                            ["2025-11-06", "2025-11-07", "2025-11-08"],
320
                            [480, 520, 505]  # Somme des temps moyens (pas vraiment une moyenne au final)
321
                        )
322
                  },
323
                  "epub": {
324
                    ...
325
                  },
326
                  "pdf": {
327
                    ...
328
                  }
329
                }
330
                """
331
                result_report = {NamedUrl(display_mode, "", 0): self.merge_report_to_global(reports, report_fields)}
1✔
332
                export_reports = {}
1✔
333
                export_cumulative_stats = {}
1✔
334
                self.__format_export_report(
1✔
335
                    export_cumulative_stats, export_reports, export_statistics, export_urls, report_fields
336
                )
337
                result_report.update(export_reports)
1✔
338
            else:
339
                result_report = reports
1✔
340

341
        context.update(
1✔
342
            {
343
                "display": display_mode,
344
                "urls": urls,
345
                "reports": result_report,
346
                "cumulative_stats": cumulative_stats,
347
                "referrers": referrers,
348
                "type_referrers": type_referrers,
349
                "keywords": keywords,
350
            }
351
        )
352
        return context
1✔
353

354
    def __format_export_report(
3✔
355
        self,
356
        export_cumulative_stats: dict[Any, Any],
357
        export_reports: dict[Any, Any],
358
        export_statistics: dict[str, list] | dict[Any, Any],
359
        export_urls: list[NamedUrl],
360
        report_fields: list[tuple[str, bool]],
361
    ):
362
        for export_index, export_url in enumerate(export_urls):
1✔
363
            export_grand_totals = ContentStatisticsView.get_cumulative(
1✔
364
                export_statistics["Actions.getPageUrl"][export_index]
365
            )
366
            export_reports[export_url] = {}
1✔
367
            export_cumulative_stats[export_url] = {}
1✔
368
            self.__format_result(
1✔
369
                export_statistics["Actions.getPageUrl"],
370
                export_grand_totals,
371
                export_cumulative_stats,
372
                export_index,
373
                report_fields,
374
                export_reports,
375
                export_url,
376
            )
377

378
    def __format_result(
3✔
379
        self,
380
        all_stats: list,
381
        grand_totals: dict[str, int],
382
        cumulative_stats: dict,
383
        index: int,
384
        report_field: list[tuple[str, bool]],
385
        reports: dict,
386
        url: NamedUrl,
387
    ):
388
        for field_name, is_avg in report_field:
1✔
389
            reports[url][field_name] = ContentStatisticsView.get_stat_metrics(all_stats[index], field_name)
1✔
390
            if is_avg:
1✔
391
                cumulative_stats[url][field_name] = 0
1✔
392
                if grand_totals.get("total") > 0:
1✔
393
                    cumulative_stats[url][field_name] = grand_totals.get(field_name, 0) / grand_totals.get("total")
1✔
394
            else:
395
                cumulative_stats[url][field_name] = grand_totals.get(field_name, 0)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc