• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rdmorganiser / rdmo / 22848572831

09 Mar 2026 10:13AM UTC coverage: 94.901% (+0.09%) from 94.814%
22848572831

Pull #1549

github

web-flow
Merge ecd027e09 into cda25edd6
Pull Request #1549: RDMO 2.4.1 🐞

2134 of 2234 branches covered (95.52%)

22818 of 24044 relevant lines covered (94.9%)

3.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.31
rdmo/projects/utils.py
1
import logging
4✔
2
import mimetypes
4✔
3
from collections import defaultdict
4✔
4
from pathlib import Path
4✔
5

6
from django.conf import settings
4✔
7
from django.contrib.sites.models import Site
4✔
8
from django.core.exceptions import ObjectDoesNotExist
4✔
9
from django.template.loader import render_to_string
4✔
10
from django.urls import reverse
4✔
11
from django.utils.timezone import now
4✔
12

13
from rdmo.core.mail import send_mail
4✔
14
from rdmo.core.plugins import get_plugins
4✔
15
from rdmo.core.utils import remove_double_newlines
4✔
16

17
logger = logging.getLogger(__name__)
4✔
18

19

20
def get_value_path(project, snapshot=None):
4✔
21
    if snapshot is None:
4✔
22
        return Path('projects') / str(project.id) / 'values'
4✔
23
    else:
24
        return Path('projects') / str(project.id) / 'snapshots' / str(snapshot.id) / 'values'
4✔
25

26

27
def is_last_owner(project, user):
4✔
28
    # check if user is owner
29
    if user in project.owners:
4✔
30
        # check if the user is the last owner
31
        return project.owners.count() <= 1
4✔
32
    else:
33
        return False
4✔
34

35

36
def check_conditions(conditions, values, set_prefix=None, set_index=None):
4✔
37
    if conditions:
4✔
38
        for condition in conditions:
4✔
39
            if condition.resolve(values, set_prefix, set_index):
4✔
40
                return True
4✔
41
        return False
4✔
42
    else:
43
        return True
×
44

45

46
def check_options(project, value):
4✔
47
    # loop over all values of a question and check if value.option matches the optionsets of the question
48
    for question in filter(lambda q: q.attribute == value.attribute, project.catalog.questions):
4✔
49
        question_options = [
4✔
50
            option.id
51
            for optionset in question.optionsets.all()
52
            for option in optionset.options.all()
53
        ]
54

55
        # fail if question requires an option but value has none
56
        if question_options and value.option is None:
4✔
57
            return False
4✔
58

59
        # fail if the value's option is not allowed for this question
60
        if value.option is not None and value.option.id not in question_options:
4✔
61
            return False
4✔
62

63
    return True
4✔
64

65

66
def copy_project(instance, site, owners):
4✔
67
    from .models import Membership, Project, Value  # to prevent circular inclusion
4✔
68

69
    timestamp = now()
4✔
70

71
    tasks = instance.tasks.all()
4✔
72
    views = instance.views.all()
4✔
73

74
    values = instance.values.filter(snapshot=None)
4✔
75
    snapshots = {
4✔
76
        snapshot: instance.values.filter(snapshot=snapshot)
77
        for snapshot in instance.snapshots.all()
78
    }
79

80
    # a completely new project instance needs to be created in order for mptt to work
81
    project = Project.objects.create(
4✔
82
        parent=instance.parent,
83
        site=site,
84
        title=instance.title,
85
        description=instance.description,
86
        catalog=instance.catalog,
87
        created=timestamp
88
    )
89

90
    # save project tasks
91
    for task in tasks:
4✔
92
        project.tasks.add(task)
4✔
93

94
    # save project views
95
    for view in views:
4✔
96
        project.views.add(view)
4✔
97

98
    # save current project values
99
    project_values = []
4✔
100
    for value in values:
4✔
101
        value.id = None
4✔
102
        value.project = project
4✔
103
        value.updated = timestamp
4✔
104

105
        if value.file:
4✔
106
            # file values cannot be bulk created since we need their id and only postgres provides that (reliably)
107
            # https://docs.djangoproject.com/en/4.2/ref/models/querysets/#bulk-create
108
            value.save()
4✔
109
            value.copy_file(value.file_name, value.file)
4✔
110
        else:
111
            project_values.append(value)
4✔
112

113
    # insert the new values using bulk_create
114
    Value.objects.bulk_create(project_values)
4✔
115

116
    # save project snapshots
117
    for snapshot, snapshot_values in snapshots.items():
4✔
118
        snapshot.id = None
4✔
119
        snapshot.project = project
4✔
120
        snapshot.save(copy_values=False)
4✔
121

122
        project_snapshot_values = []
4✔
123
        for value in snapshot_values:
4✔
124
            value.id = None
4✔
125
            value.project = project
4✔
126
            value.snapshot = snapshot
4✔
127
            value.updated = timestamp
4✔
128

129
            if value.file:
4✔
130
                value.save()
4✔
131
                value.copy_file(value.file_name, value.file)
4✔
132
            else:
133
                project_snapshot_values.append(value)
4✔
134

135
        # insert the new snapshot values using bulk_create
136
        Value.objects.bulk_create(project_snapshot_values)
4✔
137

138
    for owner in owners:
4✔
139
        membership = Membership(project=project, user=owner, role='owner')
4✔
140
        membership.save()
4✔
141

142
    return project
4✔
143

144

145
def save_import_values(project, values, checked):
4✔
146
    for value in values:
4✔
147
        if value.attribute:
4✔
148
            value_key = f'{value.attribute.uri}[{value.set_prefix}][{value.set_index}][{value.collection_index}]'
4✔
149

150
            if value_key in checked:
4✔
151
                current_value = value.current
4✔
152
                if current_value is None:
4✔
153
                    # assert that this is a new value
154
                    assert value.pk is None
4✔
155

156
                    value.project = project
4✔
157
                    value.save()
4✔
158

159
                    if value.file:
4✔
160
                        value.copy_file(value.file_name, value.file)
4✔
161
                    else:
162
                        try:
4✔
163
                            name = value.file_import.get('name')
4✔
164
                            file = value.file_import.get('file')
4✔
165
                            value.file.save(name, file)
4✔
166
                        except AttributeError:
4✔
167
                            pass
4✔
168

169
                else:
170
                    # make sure we have the correct value
171
                    assert current_value.snapshot is None
4✔
172
                    assert current_value.attribute == value.attribute
4✔
173
                    assert current_value.set_prefix == value.set_prefix
4✔
174
                    assert current_value.set_index == value.set_index
4✔
175
                    assert current_value.collection_index == value.collection_index
4✔
176

177
                    # assert that this is an new value
178
                    assert current_value.pk is not None
4✔
179

180
                    current_value.text = value.text
4✔
181
                    current_value.option = value.option
4✔
182
                    current_value.value_type = value.value_type
4✔
183
                    current_value.unit = value.unit
4✔
184
                    current_value.save()
4✔
185

186
                    if value.file:
4✔
187
                        current_value.file.delete()
4✔
188
                        current_value.copy_file(value.file_name, value.file)
4✔
189
                    else:
190
                        try:
4✔
191
                            name = value.file_import.get('name')
4✔
192
                            file = value.file_import.get('file')
4✔
193
                            current_value.file.delete()
4✔
194
                            current_value.file.save(name, file)
4✔
195
                        except AttributeError:
4✔
196
                            pass
4✔
197

198

199
def save_import_snapshot_values(project, snapshots, checked):
4✔
200
    for snapshot in snapshots:
4✔
201
        # assert that this is a new snapshot
202
        assert snapshot.pk is None
4✔
203

204
        snapshot.project = project
4✔
205
        snapshot.save(copy_values=False)
4✔
206

207
        for value in snapshot.snapshot_values:
4✔
208
            if value.attribute:
4✔
209
                value_key = f"{value.attribute.uri}[{snapshot.snapshot_index}][{value.set_prefix}][{value.set_index}][{value.collection_index}]" # noqa: E501
4✔
210

211
                if value_key in checked:
4✔
212
                    # assert that this is a new value
213
                    assert value.pk is None
4✔
214

215
                    value.project = project
4✔
216
                    value.snapshot = snapshot
4✔
217
                    value.save()
4✔
218

219
                    if value.file:
4✔
220
                        value.copy_file(value.file_name, value.file)
×
221
                    else:
222
                        try:
4✔
223
                            name = value.file_import.get('name')
4✔
224
                            file = value.file_import.get('file')
4✔
225
                            value.file.save(name, file)
4✔
226
                        except AttributeError:
4✔
227
                            pass
4✔
228

229

230
def save_import_tasks(project, tasks):
4✔
231
    for task in tasks:
4✔
232
        project.tasks.add(task)
4✔
233

234

235
def save_import_views(project, views):
4✔
236
    for view in views:
4✔
237
        project.views.add(view)
4✔
238

239

240
def get_invite_email_project_path(invite) -> str:
4✔
241
    project_invite_path = reverse('project_join', args=[invite.token])
4✔
242
    # check if the invited user exists and the multisite environment is enabled
243
    if invite.user is not None and settings.MULTISITE:
4✔
244
        # do nothing if user is a member of the current site
245
        current_site = Site.objects.get_current()
4✔
246
        if not invite.user.role.member.filter(id=current_site.id).exists():
4✔
247
            # else take first site
248
            invited_user_member_domain = invite.user.role.member.first().domain
4✔
249
            project_invite_path = 'http://' + invited_user_member_domain + project_invite_path
4✔
250
    return project_invite_path
4✔
251

252

253
def send_invite_email(request, invite):
4✔
254
    project_invite_path = get_invite_email_project_path(invite)
4✔
255
    context = {
4✔
256
        'invite_url': request.build_absolute_uri(project_invite_path),
257
        'invite_user': invite.user,
258
        'invite_email': invite.email,
259
        'project': invite.project,
260
        'user': request.user,
261
        'site': Site.objects.get_current()
262
    }
263

264
    subject = render_to_string('projects/email/project_invite_subject.txt', context)
4✔
265
    message = render_to_string('projects/email/project_invite_message.txt', context)
4✔
266

267
    # send the email
268
    send_mail(subject, message, to=[invite.email])
4✔
269

270

271
def set_context_querystring_with_filter_and_page(context: dict) -> dict:
4✔
272
    '''prepares the filter part of the querystring for the next and previous hyperlinks in the pagination'''
273
    if context["filter"].data:
4✔
274
        querystring = context["filter"].data.copy()
4✔
275
        if context["filter"].data.get('page'):
4✔
276
            del querystring['page']
4✔
277
        context['querystring'] = querystring.urlencode()
4✔
278
    return context
4✔
279

280

281
def get_upload_accept():
4✔
282
    accept = defaultdict(set)
4✔
283
    for import_plugin in get_plugins('PROJECT_IMPORTS').values():
4✔
284
        if isinstance(import_plugin.accept, dict):
4✔
285
            for mime_type, suffixes in import_plugin.accept.items():
4✔
286
                accept[mime_type].update(suffixes)
4✔
287

288
        elif isinstance(import_plugin.accept, str):
4✔
289
            # legacy fallback for pre 2.3.0 RDMO, e.g. `accept = '.xml'`
290
            suffix = import_plugin.accept
×
291
            mime_type, _encoding = mimetypes.guess_type(f'example{suffix}')
×
292
            if mime_type:
×
293
                accept[mime_type].update([suffix])
×
294

295
        elif import_plugin.upload is True:
4✔
296
            # if one of the plugins does not have the accept field, but is marked as upload plugin
297
            # all file types are allowed
298
            return {}
×
299

300
    return accept
4✔
301

302

303
def compute_set_prefix_from_set_value(set_value, value):
4✔
304
    set_prefix_length = len(set_value.set_prefix.split('|')) if set_value.set_prefix else 0
4✔
305
    return '|'.join([
4✔
306
        str(set_value.set_index) if (index == set_prefix_length) else value
307
        for index, value in enumerate(value.set_prefix.split('|'))
308
    ])
309

310

311
def get_contact_message(request, project):
4✔
312
    project_url = project.get_absolute_url()
4✔
313
    project_interview_url = reverse('project_interview', args=[project.id])
4✔
314

315
    context = {
4✔
316
        'user': request.user,
317
        'site': Site.objects.get_current(),
318
        'project': project,
319
        'project_url': request.build_absolute_uri(project_url)
320
    }
321

322
    page_id = request.GET.get('page')
4✔
323
    if page_id:
4✔
324
        page = project.catalog.get_page(page_id)
4✔
325
        if page:
4✔
326
            context.update({
4✔
327
                'page': page,
328
                'page_url': request.build_absolute_uri(f'{project_interview_url}{page_id}/')
329
            })
330

331
    questionset_id = request.GET.get('questionset')
4✔
332
    if questionset_id:
4✔
333
        context['questionset'] = project.catalog.get_questionset(questionset_id)
×
334

335
    question_id = request.GET.get('question')
4✔
336
    if question_id:
4✔
337
        context['question'] = project.catalog.get_question(question_id)
4✔
338

339
    value_ids = request.GET.getlist('values')
4✔
340
    if value_ids:
4✔
341
        values = project.values.filter(snapshot=None).filter(id__in=value_ids)
4✔
342
        context['values'] = values
4✔
343

344
        if page_id and page and page.is_collection and page.attribute is not None:
4✔
345
            value = values.filter(set_prefix='').first()
4✔
346
            if value:
4✔
347
                try:
4✔
348
                    context['set_value'] = project.values.filter(snapshot=None).get(
4✔
349
                        attribute=page.attribute,
350
                        set_prefix='',
351
                        set_index=value.set_index,
352
                        collection_index=0
353
                    )
354
                except ObjectDoesNotExist:
×
355
                    pass
×
356

357
    subject = render_to_string('projects/email/project_contact_subject.txt', context)
4✔
358
    message = render_to_string('projects/email/project_contact_message.txt', context)
4✔
359

360
    return {
4✔
361
        'subject': subject,
362
        'message': remove_double_newlines(message)
363
    }
364

365

366
def send_contact_message(request, subject, message):
4✔
367
    send_mail(subject, message,
4✔
368
              to=settings.PROJECT_CONTACT_RECIPIENTS,
369
              cc=[request.user.email], reply_to=[request.user.email])
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc