• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

IMIO / imio.smartweb.core / 13650404957

04 Mar 2025 09:24AM UTC coverage: 89.114% (-1.6%) from 90.746%
13650404957

push

github

boulch
test: disable releaser

4404 of 4942 relevant lines covered (89.11%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.77
/src/imio/smartweb/core/utils.py
1
# -*- coding: utf-8 -*-
2

3
from Acquisition import aq_parent
1✔
4
from collective.taxonomy.interfaces import ITaxonomy
1✔
5
from imio.smartweb.common.utils import is_log_active
1✔
6
from imio.smartweb.core.config import WCA_URL
1✔
7
from imio.smartweb.core.contents import IFolder
1✔
8
from more_itertools import chunked
1✔
9
from plone import api
1✔
10
from plone.app.multilingual.interfaces import ILanguageRootFolder
1✔
11
from plone.dexterity.interfaces import IDexterityContent
1✔
12
from Products.CMFPlone.defaultpage import get_default_page
1✔
13
from Products.CMFPlone.interfaces.siteroot import IPloneSiteRoot
1✔
14
from Products.CMFPlone.utils import base_hasattr
1✔
15
from urllib.parse import urlparse, urlunparse
1✔
16
from zope.component import getSiteManager
1✔
17
from zope.component import queryMultiAdapter
1✔
18

19
import base64
1✔
20
import hashlib
1✔
21
import json
1✔
22
import logging
1✔
23
import os
1✔
24
import re
1✔
25
import requests
1✔
26

27
logger = logging.getLogger("imio.smartweb.core")
1✔
28

29

30
def get_category(context):
1✔
31
    if not base_hasattr(context, "category_name"):
1✔
32
        return
1✔
33
    field_name = "taxonomy_{}".format(context.category_name)
1✔
34
    taxonomy_name = "collective.taxonomy.{}".format(context.category_name)
1✔
35
    term = getattr(context, field_name)
1✔
36
    if not term:
1✔
37
        return
1✔
38
    current_lang = api.portal.get_current_language()[:2]
1✔
39
    sm = getSiteManager()
1✔
40
    utility = sm.queryUtility(ITaxonomy, name=taxonomy_name)
1✔
41
    value = utility.translate(
1✔
42
        term,
43
        context=context,
44
        target_language=current_lang,
45
    )
46
    return value
1✔
47

48

49
def get_categories():
1✔
50
    sm = getSiteManager()
1✔
51
    return sm.queryUtility(ITaxonomy, name="collective.taxonomy.page_category")
1✔
52

53

54
def concat_voca_term(term1, term2):
1✔
55
    return "{0}-{1}".format(term1, term2)
1✔
56

57

58
def concat_voca_title(title1, title2):
1✔
59
    return "{0} - {1}".format(title1, title2)
1✔
60

61

62
def get_json(url, auth=None, timeout=5):
1✔
63
    language = api.portal.get_current_language()
1✔
64
    headers = {"Accept": "application/json", "Cookie": f"I18N_LANGUAGE={language}"}
1✔
65
    if auth is not None:
1✔
66
        headers["Authorization"] = auth
1✔
67
    try:
1✔
68
        response = requests.get(url, headers=headers, timeout=timeout)
1✔
69
    except requests.exceptions.Timeout:
1✔
70
        logger.warning(f"Timeout raised for requests : {url}")
1✔
71
        return None
1✔
72
    except Exception:
1✔
73
        return None
1✔
74
    if response.status_code != 200:
1✔
75
        return None
1✔
76
    if response.text:
1✔
77
        return json.loads(response.text)
1✔
78

79

80
def get_wca_token(client_id, client_secret):
1✔
81
    username = os.environ.get("RESTAPI_USER_USERNAME")
×
82
    password = os.environ.get("RESTAPI_USER_PASSWORD")
×
83

84
    payload = {
×
85
        "grant_type": "password",
86
        "client_id": client_id,
87
        "client_secret": client_secret,
88
        "username": username,
89
        "password": password,
90
        "scope": ["openid"],
91
    }
92
    if not client_id or not client_secret:
×
93
        return (username, password)
×
94
    headers = {
×
95
        "Content-Type": "application/x-www-form-urlencoded",
96
    }
97
    response = requests.post(WCA_URL, headers=headers, data=payload)
×
98
    id_token = response.json().get("id_token")
×
99
    if is_log_active():
×
100
        logger.info(f"WCA token retrieved : {id_token}")
×
101
    return "Bearer {0}".format(id_token)
×
102

103

104
def hash_md5(text):
1✔
105
    return hashlib.md5(text.encode()).hexdigest()
1✔
106

107

108
def safe_html(html):
1✔
109
    if not html:
1✔
110
        return
1✔
111
    transforms = api.portal.get_tool("portal_transforms")
1✔
112
    data = transforms.convertTo(target_mimetype="text/x-html-safe", orig=html)
1✔
113
    output = data.getData()
1✔
114
    return output
1✔
115

116

117
def batch_results(iterable, batch_size):
1✔
118
    return list(chunked(iterable, batch_size, strict=False))
1✔
119

120

121
def reindexParent(obj, event=None):
1✔
122
    parent = aq_parent(obj)
1✔
123
    if parent is not None:
1✔
124
        # in some cases (ex: relation breaking), we do not get the object in
125
        # its acquisition chain
126
        parent.reindexObject()
1✔
127

128

129
def get_default_content_id(obj):
1✔
130
    if IPloneSiteRoot.providedBy(obj) or ILanguageRootFolder.providedBy(obj):
1✔
131
        # Plone / LRF default page
132
        item_id = get_default_page(obj)
1✔
133
        return item_id and item_id or ""
1✔
134
    elif IFolder.providedBy(obj):
1✔
135
        # Our folder default page
136
        item = obj.get_default_item()
1✔
137
        return item and item.getId or ""
1✔
138

139

140
def get_scale_url(context, request, fieldname, scale_name, orientation=""):
1✔
141
    if orientation:
1✔
142
        m = re.match(r"(portrait|paysage|carre)_(\w+)", scale_name)
1✔
143
        if m:
1✔
144
            # remove existing orientation (if any) from scale name
145
            scale_name = m.group(2)
1✔
146
    scale_name = "_".join(filter(None, [orientation, scale_name]))
1✔
147
    if IDexterityContent.providedBy(context):
1✔
148
        # get scale url on an object
149
        if not scale_name:
1✔
150
            # return the full image
151
            modified_hash = hash_md5(context.ModificationDate())
1✔
152
            return f"{context.absolute_url()}/@@images/{fieldname}/?cache_key={modified_hash}"
1✔
153
        images_view = queryMultiAdapter((context, request), name="images")
1✔
154
        if images_view is None:
1✔
155
            return ""
×
156
        scale = images_view.scale(fieldname, scale_name)
1✔
157
        if scale is None:
1✔
158
            return ""
1✔
159
        return scale.url
1✔
160
    else:
161
        # get scale url on a brain
162
        # In this case, we need a modification hash to handle croppings, because
163
        # catalog does not handle them correctly.
164
        # See https://github.com/collective/plone.app.imagecropping/issues/129
165
        brain = context
1✔
166
        if fieldname == "image" and not brain.has_leadimage:
1✔
167
            return ""
1✔
168
        modification_date = brain.ModificationDate
1✔
169
        if callable(modification_date):
1✔
170
            # brain in content listing for example
171
            modification_date = modification_date()
1✔
172
        modified_hash = hash_md5(modification_date)
1✔
173
        url = f"{brain.getURL()}/@@images/{fieldname}/{scale_name}?cache_key={modified_hash}"
1✔
174
        return url
1✔
175

176

177
def get_plausible_vars():
1✔
178
    env_plausible_url = os.getenv("SMARTWEB_PLAUSIBLE_URL", "")
1✔
179
    env_plausible_site = os.getenv("SMARTWEB_PLAUSIBLE_SITE", "")
1✔
180
    env_plausible_token = os.getenv("SMARTWEB_PLAUSIBLE_TOKEN", "")
1✔
181

182
    plausible_url = (
1✔
183
        env_plausible_url
184
        if (env_plausible_url and env_plausible_url != "")
185
        else api.portal.get_registry_record("smartweb.plausible_url")
186
    )
187
    plausible_site = (
1✔
188
        env_plausible_site
189
        if (env_plausible_site and env_plausible_site != "")
190
        else api.portal.get_registry_record("smartweb.plausible_site")
191
    )
192
    plausible_token = (
1✔
193
        env_plausible_token
194
        if (env_plausible_token and env_plausible_token != "")
195
        else api.portal.get_registry_record("smartweb.plausible_token")
196
    )
197
    if all([plausible_site, plausible_url, plausible_token]):
1✔
198
        plausible_vars = {
1✔
199
            "plausible_url": plausible_url,
200
            "plausible_site": plausible_site,
201
            "plausible_token": plausible_token,
202
        }
203
        return plausible_vars
1✔
204
    else:
205
        return None
1✔
206

207

208
def get_value_from_registry(key):
1✔
209
    val = api.portal.get_registry_record(key)
1✔
210
    return val
1✔
211

212

213
def get_iadeliberation_url_from_registry():
1✔
214
    iadeliberation_url = api.portal.get_registry_record("smartweb.iadeliberations_url")
1✔
215
    return iadeliberation_url
1✔
216

217

218
def get_iadeliberation_institution_from_registry():
1✔
219
    iadeliberation_institution = api.portal.get_registry_record(
1✔
220
        "smartweb.iadeliberations_institution"
221
    )
222
    return iadeliberation_institution
1✔
223

224

225
def get_iadeliberation_json(url):
1✔
226
    iadeliberation_user = api.portal.get_registry_record(
1✔
227
        "smartweb.iadeliberations_api_username"
228
    )
229
    iadeliberation_pwd = api.portal.get_registry_record(
1✔
230
        "smartweb.iadeliberation_api_password"
231
    )
232
    usrPass = f"{iadeliberation_user}:{iadeliberation_pwd}".encode("utf-8")
1✔
233
    b64Val = base64.b64encode(usrPass)
1✔
234
    json = get_json(url, auth=f"Basic {b64Val.decode('utf-8')}", timeout=20)
1✔
235

236
    return json
1✔
237

238

239
def get_iaideabox_json(url):
1✔
240
    user = api.portal.get_registry_record("smartweb.iaideabox_api_username")
×
241
    pwd = api.portal.get_registry_record("smartweb.iaideabox_api_password")
×
242
    usrPass = f"{user}:{pwd}".encode("utf-8")
×
243
    b64Val = base64.b64encode(usrPass)
×
244
    json = get_json(url, auth=f"Basic {b64Val.decode('utf-8')}", timeout=20)
×
245
    return json
×
246

247

248
def get_basic_auth_json(url, user, pwd):
1✔
249
    usrPass = f"{user}:{pwd}".encode("utf-8")
1✔
250
    b64Val = base64.b64encode(usrPass)
1✔
251
    json = get_json(url, auth=f"Basic {b64Val.decode('utf-8')}", timeout=20)
1✔
252
    return json
1✔
253

254

255
def get_ts_api_url(service="wcs"):
1✔
256
    url_ts = get_value_from_registry("smartweb.url_ts")
1✔
257
    parsed_url = urlparse(url_ts)
1✔
258
    if "wcs" in service:
1✔
259
        new_netloc = parsed_url.netloc.replace(
1✔
260
            ".guichet-citoyen.be", "-formulaires.guichet-citoyen.be"
261
        )
262
    api_url = urlunparse(parsed_url._replace(netloc=new_netloc, path="/api"))
1✔
263
    return api_url
1✔
264

265

266
def remove_cache_key(json_data: dict) -> dict:
1✔
267
    pattern = re.compile(r"&cache_key=[a-f0-9]{32}")
1✔
268
    if json_data is None:
1✔
269
        return json_data
1✔
270
    if "@id" in json_data and isinstance(json_data["@id"], str):
1✔
271
        json_data["@id"] = pattern.sub("", json_data["@id"])
1✔
272

273
    # Nested keys inside 'batching'
274
    if "batching" in json_data and isinstance(json_data["batching"], dict):
1✔
275
        for key in ["@id", "first", "last", "next"]:
1✔
276
            if key in json_data["batching"] and isinstance(
1✔
277
                json_data["batching"][key], str
278
            ):
279
                json_data["batching"][key] = pattern.sub("", json_data["batching"][key])
1✔
280

281
    return json_data
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc