• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rdmorganiser / rdmo / 14404330126

11 Apr 2025 01:31PM UTC coverage: 90.789% (+0.3%) from 90.478%
14404330126

push

github

web-flow
Merge pull request #1195 from rdmorganiser/2.3.0

RDMO 2.3.0 ⭐

989 of 1076 branches covered (91.91%)

9176 of 10107 relevant lines covered (90.79%)

3.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.33
rdmo/projects/utils.py
1
import logging
4✔
2
import mimetypes
4✔
3
from collections import defaultdict
4✔
4
from pathlib import Path
4✔
5

6
from django.conf import settings
4✔
7
from django.contrib.sites.models import Site
4✔
8
from django.core.exceptions import ObjectDoesNotExist
4✔
9
from django.template.loader import render_to_string
4✔
10
from django.urls import reverse
4✔
11
from django.utils.timezone import now
4✔
12

13
from rdmo.core.mail import send_mail
4✔
14
from rdmo.core.plugins import get_plugins
4✔
15
from rdmo.core.utils import remove_double_newlines
4✔
16

17
logger = logging.getLogger(__name__)
4✔
18

19

20
def get_value_path(project, snapshot=None):
4✔
21
    if snapshot is None:
4✔
22
        return Path('projects') / str(project.id) / 'values'
4✔
23
    else:
24
        return Path('projects') / str(project.id) / 'snapshots' / str(snapshot.id) / 'values'
4✔
25

26

27
def is_last_owner(project, user):
4✔
28
    # check if user is owner
29
    if user in project.owners:
4✔
30
        # check if the user is the last owner
31
        return project.owners.count() <= 1
4✔
32
    else:
33
        return False
4✔
34

35

36
def check_conditions(conditions, values, set_prefix=None, set_index=None):
4✔
37
    if conditions:
4✔
38
        for condition in conditions:
4✔
39
            if condition.resolve(values, set_prefix, set_index):
4✔
40
                return True
4✔
41
        return False
4✔
42
    else:
43
        return True
×
44

45

46
def check_options(project, value):
4✔
47
    # loop over all values of a question and check if value.option matches the optionsets of the question
48
    for question in filter(lambda q: q.attribute == value.attribute, project.catalog.questions):
4✔
49
        question_options = [
4✔
50
            option.id
51
            for optionset in question.optionsets.all()
52
            for option in optionset.options.all()
53
        ]
54

55
        # fail if question requires an option but value has none
56
        if question_options and value.option is None:
4✔
57
            return False
4✔
58

59
        # fail if the value's option is not allowed for this question
60
        if value.option is not None and value.option.id not in question_options:
4✔
61
            return False
4✔
62

63
    return True
4✔
64

65

66
def copy_project(instance, site, owners):
4✔
67
    from .models import Membership, Project, Value  # to prevent circular inclusion
4✔
68

69
    timestamp = now()
4✔
70

71
    tasks = instance.tasks.all()
4✔
72
    views = instance.views.all()
4✔
73

74
    values = instance.values.filter(snapshot=None)
4✔
75
    snapshots = {
4✔
76
        snapshot: instance.values.filter(snapshot=snapshot)
77
        for snapshot in instance.snapshots.all()
78
    }
79

80
    # a completely new project instance needs to be created in order for mptt to work
81
    project = Project.objects.create(
4✔
82
        parent=instance.parent,
83
        site=site,
84
        title=instance.title,
85
        description=instance.description,
86
        catalog=instance.catalog,
87
        created=timestamp
88
    )
89

90
    # save project tasks
91
    for task in tasks:
4✔
92
        project.tasks.add(task)
4✔
93

94
    # save project views
95
    for view in views:
4✔
96
        project.views.add(view)
4✔
97

98
    # save current project values
99
    project_values = []
4✔
100
    for value in values:
4✔
101
        value.id = None
4✔
102
        value.project = project
4✔
103
        value.created = timestamp
4✔
104

105
        if value.file:
4✔
106
            # file values cannot be bulk created since we need their id and only postgres provides that (reliably)
107
            # https://docs.djangoproject.com/en/4.2/ref/models/querysets/#bulk-create
108
            value.save()
4✔
109
            value.copy_file(value.file_name, value.file)
4✔
110
        else:
111
            project_values.append(value)
4✔
112

113
    # insert the new values using bulk_create
114
    Value.objects.bulk_create(project_values)
4✔
115

116
    # save project snapshots
117
    for snapshot, snapshot_values in snapshots.items():
4✔
118
        snapshot.id = None
4✔
119
        snapshot.project = project
4✔
120
        snapshot.created = timestamp
4✔
121
        snapshot.save(copy_values=False)
4✔
122

123
        project_snapshot_values = []
4✔
124
        for value in snapshot_values:
4✔
125
            value.id = None
4✔
126
            value.project = project
4✔
127
            value.snapshot = snapshot
4✔
128
            value.created = timestamp
4✔
129

130
            if value.file:
4✔
131
                value.save()
4✔
132
                value.copy_file(value.file_name, value.file)
4✔
133
            else:
134
                project_snapshot_values.append(value)
4✔
135

136
        # insert the new snapshot values using bulk_create
137
        Value.objects.bulk_create(project_snapshot_values)
4✔
138

139
    for owner in owners:
4✔
140
        membership = Membership(project=project, user=owner, role='owner')
4✔
141
        membership.save()
4✔
142

143
    return project
4✔
144

145

146
def save_import_values(project, values, checked):
4✔
147
    for value in values:
4✔
148
        if value.attribute:
4✔
149
            value_key = f'{value.attribute.uri}[{value.set_prefix}][{value.set_index}][{value.collection_index}]'
4✔
150

151
            if value_key in checked:
4✔
152
                current_value = value.current
4✔
153
                if current_value is None:
4✔
154
                    # assert that this is a new value
155
                    assert value.pk is None
4✔
156

157
                    value.project = project
4✔
158
                    value.save()
4✔
159

160
                    if value.file:
4✔
161
                        value.copy_file(value.file_name, value.file)
4✔
162
                    else:
163
                        try:
4✔
164
                            name = value.file_import.get('name')
4✔
165
                            file = value.file_import.get('file')
4✔
166
                            value.file.save(name, file)
4✔
167
                        except AttributeError:
4✔
168
                            pass
4✔
169

170
                else:
171
                    # make sure we have the correct value
172
                    assert current_value.snapshot is None
4✔
173
                    assert current_value.attribute == value.attribute
4✔
174
                    assert current_value.set_prefix == value.set_prefix
4✔
175
                    assert current_value.set_index == value.set_index
4✔
176
                    assert current_value.collection_index == value.collection_index
4✔
177

178
                    # assert that this is an new value
179
                    assert current_value.pk is not None
4✔
180

181
                    current_value.text = value.text
4✔
182
                    current_value.option = value.option
4✔
183
                    current_value.value_type = value.value_type
4✔
184
                    current_value.unit = value.unit
4✔
185
                    current_value.save()
4✔
186

187
                    if value.file:
4✔
188
                        current_value.file.delete()
4✔
189
                        current_value.copy_file(value.file_name, value.file)
4✔
190
                    else:
191
                        try:
4✔
192
                            name = value.file_import.get('name')
4✔
193
                            file = value.file_import.get('file')
4✔
194
                            current_value.file.delete()
4✔
195
                            current_value.file.save(name, file)
4✔
196
                        except AttributeError:
4✔
197
                            pass
4✔
198

199

200
def save_import_snapshot_values(project, snapshots, checked):
4✔
201
    for snapshot in snapshots:
4✔
202
        # assert that this is a new snapshot
203
        assert snapshot.pk is None
4✔
204

205
        snapshot.project = project
4✔
206
        snapshot.save(copy_values=False)
4✔
207

208
        for value in snapshot.snapshot_values:
4✔
209
            if value.attribute:
4✔
210
                value_key = f"{value.attribute.uri}[{snapshot.snapshot_index}][{value.set_prefix}][{value.set_index}][{value.collection_index}]" # noqa: E501
4✔
211

212
                if value_key in checked:
4✔
213
                    # assert that this is a new value
214
                    assert value.pk is None
4✔
215

216
                    value.project = project
4✔
217
                    value.snapshot = snapshot
4✔
218
                    value.save()
4✔
219

220
                    if value.file:
4✔
221
                        value.copy_file(value.file_name, value.file)
×
222
                    else:
223
                        try:
4✔
224
                            name = value.file_import.get('name')
4✔
225
                            file = value.file_import.get('file')
4✔
226
                            value.file.save(name, file)
4✔
227
                        except AttributeError:
4✔
228
                            pass
4✔
229

230

231
def save_import_tasks(project, tasks):
4✔
232
    for task in tasks:
4✔
233
        project.tasks.add(task)
4✔
234

235

236
def save_import_views(project, views):
4✔
237
    for view in views:
4✔
238
        project.views.add(view)
4✔
239

240

241
def get_invite_email_project_path(invite) -> str:
4✔
242
    project_invite_path = reverse('project_join', args=[invite.token])
4✔
243
    # check if the invited user exists and the multisite environment is enabled
244
    if invite.user is not None and settings.MULTISITE:
4✔
245
        # do nothing if user is a member of the current site
246
        current_site = Site.objects.get_current()
4✔
247
        if not invite.user.role.member.filter(id=current_site.id).exists():
4✔
248
            # else take first site
249
            invited_user_member_domain = invite.user.role.member.first().domain
4✔
250
            project_invite_path = 'http://' + invited_user_member_domain + project_invite_path
4✔
251
    return project_invite_path
4✔
252

253

254
def send_invite_email(request, invite):
4✔
255
    project_invite_path = get_invite_email_project_path(invite)
4✔
256
    context = {
4✔
257
        'invite_url': request.build_absolute_uri(project_invite_path),
258
        'invite_user': invite.user,
259
        'invite_email': invite.email,
260
        'project': invite.project,
261
        'user': request.user,
262
        'site': Site.objects.get_current()
263
    }
264

265
    subject = render_to_string('projects/email/project_invite_subject.txt', context)
4✔
266
    message = render_to_string('projects/email/project_invite_message.txt', context)
4✔
267

268
    # send the email
269
    send_mail(subject, message, to=[invite.email])
4✔
270

271

272
def set_context_querystring_with_filter_and_page(context: dict) -> dict:
4✔
273
    '''prepares the filter part of the querystring for the next and previous hyperlinks in the pagination'''
274
    if context["filter"].data:
4✔
275
        querystring = context["filter"].data.copy()
4✔
276
        if context["filter"].data.get('page'):
4✔
277
            del querystring['page']
4✔
278
        context['querystring'] = querystring.urlencode()
4✔
279
    return context
4✔
280

281

282
def get_upload_accept():
4✔
283
    accept = defaultdict(set)
4✔
284
    for import_plugin in get_plugins('PROJECT_IMPORTS').values():
4✔
285
        if isinstance(import_plugin.accept, dict):
4✔
286
            for mime_type, suffixes in import_plugin.accept.items():
4✔
287
                accept[mime_type].update(suffixes)
4✔
288

289
        elif isinstance(import_plugin.accept, str):
4✔
290
            # legacy fallback for pre 2.3.0 RDMO, e.g. `accept = '.xml'`
291
            suffix = import_plugin.accept
×
292
            mime_type, encoding = mimetypes.guess_type(f'example{suffix}')
×
293
            if mime_type:
×
294
                accept[mime_type].update([suffix])
×
295

296
        elif import_plugin.upload is True:
4✔
297
            # if one of the plugins does not have the accept field, but is marked as upload plugin
298
            # all file types are allowed
299
            return {}
×
300

301
    return accept
4✔
302

303

304
def compute_set_prefix_from_set_value(set_value, value):
4✔
305
    set_prefix_length = len(set_value.set_prefix.split('|')) if set_value.set_prefix else 0
4✔
306
    return '|'.join([
4✔
307
        str(set_value.set_index) if (index == set_prefix_length) else value
308
        for index, value in enumerate(value.set_prefix.split('|'))
309
    ])
310

311

312
def get_contact_message(request, project):
4✔
313
    project_url = project.get_absolute_url()
4✔
314
    project_interview_url = reverse('project_interview', args=[project.id])
4✔
315

316
    context = {
4✔
317
        'user': request.user,
318
        'site': Site.objects.get_current(),
319
        'project': project,
320
        'project_url': request.build_absolute_uri(project_url)
321
    }
322

323
    page_id = request.GET.get('page')
4✔
324
    if page_id:
4✔
325
        page = project.catalog.get_page(page_id)
4✔
326
        if page:
4✔
327
            context.update({
4✔
328
                'page': page,
329
                'page_url': request.build_absolute_uri(f'{project_interview_url}{page_id}/')
330
            })
331

332
    questionset_id = request.GET.get('questionset')
4✔
333
    if questionset_id:
4✔
334
        context['questionset'] = project.catalog.get_questionset(questionset_id)
×
335

336
    question_id = request.GET.get('question')
4✔
337
    if question_id:
4✔
338
        context['question'] = project.catalog.get_question(question_id)
4✔
339

340
    value_ids = request.GET.getlist('values')
4✔
341
    if value_ids:
4✔
342
        values = project.values.filter(snapshot=None).filter(id__in=value_ids)
4✔
343
        context['values'] = values
4✔
344

345
        if page_id and page and page.is_collection and page.attribute is not None:
4✔
346
            value = values.filter(set_prefix='').first()
4✔
347
            if value:
4✔
348
                try:
4✔
349
                    context['set_value'] = project.values.filter(snapshot=None).get(
4✔
350
                        attribute=page.attribute,
351
                        set_prefix='',
352
                        set_index=value.set_index,
353
                        collection_index=0
354
                    )
355
                except ObjectDoesNotExist:
×
356
                    pass
×
357

358
    subject = render_to_string('projects/email/project_contact_subject.txt', context)
4✔
359
    message = render_to_string('projects/email/project_contact_message.txt', context)
4✔
360

361
    return {
4✔
362
        'subject': subject,
363
        'message': remove_double_newlines(message)
364
    }
365

366

367
def send_contact_message(request, subject, message):
4✔
368
    send_mail(subject, message,
4✔
369
              to=settings.PROJECT_CONTACT_RECIPIENTS,
370
              cc=[request.user.email], reply_to=[request.user.email])
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc