• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

rdmorganiser / rdmo / 20428555173

22 Dec 2025 10:02AM UTC coverage: 94.693% (-0.1%) from 94.814%
20428555173

Pull #1436

github

web-flow
Merge 0ffb48a5d into 57a75b09e
Pull Request #1436: Draft: add `rdmo.config` app for `Plugin` model (plugin managament)

2191 of 2304 branches covered (95.1%)

23411 of 24723 relevant lines covered (94.69%)

3.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.11
rdmo/projects/utils.py
1
import logging
4✔
2
import mimetypes
4✔
3
from collections import defaultdict
4✔
4
from pathlib import Path
4✔
5

6
from django.conf import settings
4✔
7
from django.contrib.sites.models import Site
4✔
8
from django.core.exceptions import ObjectDoesNotExist
4✔
9
from django.db.models import Q
4✔
10
from django.template.loader import render_to_string
4✔
11
from django.urls import reverse
4✔
12
from django.utils.timezone import now
4✔
13

14
from rdmo.config.models import Plugin
4✔
15
from rdmo.core.mail import send_mail
4✔
16
from rdmo.core.utils import remove_double_newlines
4✔
17
from rdmo.tasks.managers import TaskQuerySet
4✔
18
from rdmo.views.managers import ViewQuerySet
4✔
19

20
logger = logging.getLogger(__name__)
4✔
21

22

23
def get_value_path(project, snapshot=None):
4✔
24
    if snapshot is None:
4✔
25
        return Path('projects') / str(project.id) / 'values'
4✔
26
    else:
27
        return Path('projects') / str(project.id) / 'snapshots' / str(snapshot.id) / 'values'
4✔
28

29

30
def is_last_owner(project, user):
4✔
31
    # check if user is owner
32
    if user in project.owners:
4✔
33
        # check if the user is the last owner
34
        return project.owners.count() <= 1
4✔
35
    else:
36
        return False
4✔
37

38

39
def check_conditions(conditions, values, set_prefix=None, set_index=None):
4✔
40
    if conditions:
4✔
41
        for condition in conditions:
4✔
42
            if condition.resolve(values, set_prefix, set_index):
4✔
43
                return True
4✔
44
        return False
4✔
45
    else:
46
        return True
×
47

48

49
def check_options(project, value):
4✔
50
    # loop over all values of a question and check if value.option matches the optionsets of the question
51
    for question in filter(lambda q: q.attribute == value.attribute, project.catalog.questions):
4✔
52
        question_options = [
4✔
53
            option.id
54
            for optionset in question.optionsets.all()
55
            for option in optionset.options.all()
56
        ]
57

58
        # fail if question requires an option but value has none
59
        if question_options and value.option is None:
4✔
60
            return False
4✔
61

62
        # fail if the value's option is not allowed for this question
63
        if value.option is not None and value.option.id not in question_options:
4✔
64
            return False
4✔
65

66
    return True
4✔
67

68

69
def copy_project(instance, site, owners):
4✔
70
    from .models import Membership, Project, Value  # to prevent circular inclusion
4✔
71

72
    timestamp = now()
4✔
73

74
    tasks = instance.tasks.all()
4✔
75
    views = instance.views.all()
4✔
76

77
    values = instance.values.filter(snapshot=None)
4✔
78
    snapshots = {
4✔
79
        snapshot: instance.values.filter(snapshot=snapshot)
80
        for snapshot in instance.snapshots.all()
81
    }
82

83
    # a completely new project instance needs to be created in order for mptt to work
84
    project = Project.objects.create(
4✔
85
        parent=instance.parent,
86
        site=site,
87
        title=instance.title,
88
        description=instance.description,
89
        catalog=instance.catalog,
90
        created=timestamp
91
    )
92

93
    # save project tasks
94
    for task in tasks:
4✔
95
        project.tasks.add(task)
4✔
96

97
    # save project views
98
    for view in views:
4✔
99
        project.views.add(view)
4✔
100

101
    # save current project values
102
    project_values = []
4✔
103
    for value in values:
4✔
104
        value.id = None
4✔
105
        value.project = project
4✔
106
        value.created = timestamp
4✔
107

108
        if value.file:
4✔
109
            # file values cannot be bulk created since we need their id and only postgres provides that (reliably)
110
            # https://docs.djangoproject.com/en/4.2/ref/models/querysets/#bulk-create
111
            value.save()
4✔
112
            value.copy_file(value.file_name, value.file)
4✔
113
        else:
114
            project_values.append(value)
4✔
115

116
    # insert the new values using bulk_create
117
    Value.objects.bulk_create(project_values)
4✔
118

119
    # save project snapshots
120
    for snapshot, snapshot_values in snapshots.items():
4✔
121
        snapshot.id = None
4✔
122
        snapshot.project = project
4✔
123
        snapshot.created = timestamp
4✔
124
        snapshot.save(copy_values=False)
4✔
125

126
        project_snapshot_values = []
4✔
127
        for value in snapshot_values:
4✔
128
            value.id = None
4✔
129
            value.project = project
4✔
130
            value.snapshot = snapshot
4✔
131
            value.created = timestamp
4✔
132

133
            if value.file:
4✔
134
                value.save()
4✔
135
                value.copy_file(value.file_name, value.file)
4✔
136
            else:
137
                project_snapshot_values.append(value)
4✔
138

139
        # insert the new snapshot values using bulk_create
140
        Value.objects.bulk_create(project_snapshot_values)
4✔
141

142
    for owner in owners:
4✔
143
        membership = Membership(project=project, user=owner, role='owner')
4✔
144
        membership.save()
4✔
145

146
    return project
4✔
147

148

149
def save_import_values(project, values, checked):
4✔
150
    for value in values:
4✔
151
        if value.attribute:
4✔
152
            value_key = f'{value.attribute.uri}[{value.set_prefix}][{value.set_index}][{value.collection_index}]'
4✔
153

154
            if value_key in checked:
4✔
155
                current_value = value.current
4✔
156
                if current_value is None:
4✔
157
                    # assert that this is a new value
158
                    assert value.pk is None
4✔
159

160
                    value.project = project
4✔
161
                    value.save()
4✔
162

163
                    if value.file:
4✔
164
                        value.copy_file(value.file_name, value.file)
4✔
165
                    else:
166
                        try:
4✔
167
                            name = value.file_import.get('name')
4✔
168
                            file = value.file_import.get('file')
4✔
169
                            value.file.save(name, file)
4✔
170
                        except AttributeError:
4✔
171
                            pass
4✔
172

173
                else:
174
                    # make sure we have the correct value
175
                    assert current_value.snapshot is None
4✔
176
                    assert current_value.attribute == value.attribute
4✔
177
                    assert current_value.set_prefix == value.set_prefix
4✔
178
                    assert current_value.set_index == value.set_index
4✔
179
                    assert current_value.collection_index == value.collection_index
4✔
180

181
                    # assert that this is an new value
182
                    assert current_value.pk is not None
4✔
183

184
                    current_value.text = value.text
4✔
185
                    current_value.option = value.option
4✔
186
                    current_value.value_type = value.value_type
4✔
187
                    current_value.unit = value.unit
4✔
188
                    current_value.save()
4✔
189

190
                    if value.file:
4✔
191
                        current_value.file.delete()
4✔
192
                        current_value.copy_file(value.file_name, value.file)
4✔
193
                    else:
194
                        try:
4✔
195
                            name = value.file_import.get('name')
4✔
196
                            file = value.file_import.get('file')
4✔
197
                            current_value.file.delete()
4✔
198
                            current_value.file.save(name, file)
4✔
199
                        except AttributeError:
4✔
200
                            pass
4✔
201

202

203
def save_import_snapshot_values(project, snapshots, checked):
4✔
204
    for snapshot in snapshots:
4✔
205
        # assert that this is a new snapshot
206
        assert snapshot.pk is None
4✔
207

208
        snapshot.project = project
4✔
209
        snapshot.save(copy_values=False)
4✔
210

211
        for value in snapshot.snapshot_values:
4✔
212
            if value.attribute:
4✔
213
                value_key = f"{value.attribute.uri}[{snapshot.snapshot_index}][{value.set_prefix}][{value.set_index}][{value.collection_index}]" # noqa: E501
4✔
214

215
                if value_key in checked:
4✔
216
                    # assert that this is a new value
217
                    assert value.pk is None
4✔
218

219
                    value.project = project
4✔
220
                    value.snapshot = snapshot
4✔
221
                    value.save()
4✔
222

223
                    if value.file:
4✔
224
                        value.copy_file(value.file_name, value.file)
×
225
                    else:
226
                        try:
4✔
227
                            name = value.file_import.get('name')
4✔
228
                            file = value.file_import.get('file')
4✔
229
                            value.file.save(name, file)
4✔
230
                        except AttributeError:
4✔
231
                            pass
4✔
232

233

234
def save_import_tasks(project, tasks):
4✔
235
    for task in tasks:
4✔
236
        project.tasks.add(task)
4✔
237

238

239
def save_import_views(project, views):
4✔
240
    for view in views:
4✔
241
        project.views.add(view)
4✔
242

243

244
def get_invite_email_project_path(invite) -> str:
4✔
245
    project_invite_path = reverse('project_join', args=[invite.token])
4✔
246
    # check if the invited user exists and the multisite environment is enabled
247
    if invite.user is not None and settings.MULTISITE:
4✔
248
        # do nothing if user is a member of the current site
249
        current_site = Site.objects.get_current()
4✔
250
        if not invite.user.role.member.filter(id=current_site.id).exists():
4✔
251
            # else take first site
252
            invited_user_member_domain = invite.user.role.member.first().domain
4✔
253
            project_invite_path = 'http://' + invited_user_member_domain + project_invite_path
4✔
254
    return project_invite_path
4✔
255

256

257
def send_invite_email(request, invite):
4✔
258
    project_invite_path = get_invite_email_project_path(invite)
4✔
259
    context = {
4✔
260
        'invite_url': request.build_absolute_uri(project_invite_path),
261
        'invite_user': invite.user,
262
        'invite_email': invite.email,
263
        'project': invite.project,
264
        'user': request.user,
265
        'site': Site.objects.get_current()
266
    }
267

268
    subject = render_to_string('projects/email/project_invite_subject.txt', context)
4✔
269
    message = render_to_string('projects/email/project_invite_message.txt', context)
4✔
270

271
    # send the email
272
    send_mail(subject, message, to=[invite.email])
4✔
273

274

275
def set_context_querystring_with_filter_and_page(context: dict) -> dict:
4✔
276
    '''prepares the filter part of the querystring for the next and previous hyperlinks in the pagination'''
277
    if context["filter"].data:
4✔
278
        querystring = context["filter"].data.copy()
4✔
279
        if context["filter"].data.get('page'):
4✔
280
            del querystring['page']
4✔
281
        context['querystring'] = querystring.urlencode()
4✔
282
    return context
4✔
283

284

285
def get_upload_accept(project=None):
4✔
286

287
    accept = defaultdict(set)
4✔
288
    for plugin in Plugin.objects.for_context(plugin_type="project_import", project=project):
4✔
289
        import_plugin = plugin.initialize_class()
4✔
290
        if import_plugin is None:
4✔
291
            continue
×
292

293
        if isinstance(import_plugin.accept, dict):
4✔
294
            for mime_type, suffixes in import_plugin.accept.items():
4✔
295
                accept[mime_type].update(suffixes)
4✔
296

297
        elif isinstance(import_plugin.accept, str):
4✔
298
            # legacy fallback for pre 2.3.0 RDMO, e.g. `accept = '.xml'`
299
            suffix = import_plugin.accept
×
300
            mime_type, _encoding = mimetypes.guess_type(f'example{suffix}')
×
301
            if mime_type:
×
302
                accept[mime_type].update([suffix])
×
303

304
        elif import_plugin.upload is True:
4✔
305
            # if one of the plugins does not have the accept field, but is marked as upload plugin
306
            # all file types are allowed
307
            return {}
×
308

309
    return accept
4✔
310

311

312
def compute_set_prefix_from_set_value(set_value, value):
4✔
313
    set_prefix_length = len(set_value.set_prefix.split('|')) if set_value.set_prefix else 0
4✔
314
    return '|'.join([
4✔
315
        str(set_value.set_index) if (index == set_prefix_length) else value
316
        for index, value in enumerate(value.set_prefix.split('|'))
317
    ])
318

319

320
def get_contact_message(request, project):
4✔
321
    project_url = project.get_absolute_url()
4✔
322
    project_interview_url = reverse('project_interview', args=[project.id])
4✔
323

324
    context = {
4✔
325
        'user': request.user,
326
        'site': Site.objects.get_current(),
327
        'project': project,
328
        'project_url': request.build_absolute_uri(project_url)
329
    }
330

331
    page_id = request.GET.get('page')
4✔
332
    if page_id:
4✔
333
        page = project.catalog.get_page(page_id)
4✔
334
        if page:
4✔
335
            context.update({
4✔
336
                'page': page,
337
                'page_url': request.build_absolute_uri(f'{project_interview_url}{page_id}/')
338
            })
339

340
    questionset_id = request.GET.get('questionset')
4✔
341
    if questionset_id:
4✔
342
        context['questionset'] = project.catalog.get_questionset(questionset_id)
×
343

344
    question_id = request.GET.get('question')
4✔
345
    if question_id:
4✔
346
        context['question'] = project.catalog.get_question(question_id)
4✔
347

348
    value_ids = request.GET.getlist('values')
4✔
349
    if value_ids:
4✔
350
        values = project.values.filter(snapshot=None).filter(id__in=value_ids)
4✔
351
        context['values'] = values
4✔
352

353
        if page_id and page and page.is_collection and page.attribute is not None:
4✔
354
            value = values.filter(set_prefix='').first()
4✔
355
            if value:
4✔
356
                try:
4✔
357
                    context['set_value'] = project.values.filter(snapshot=None).get(
4✔
358
                        attribute=page.attribute,
359
                        set_prefix='',
360
                        set_index=value.set_index,
361
                        collection_index=0
362
                    )
363
                except ObjectDoesNotExist:
×
364
                    pass
×
365

366
    subject = render_to_string('projects/email/project_contact_subject.txt', context)
4✔
367
    message = render_to_string('projects/email/project_contact_message.txt', context)
4✔
368

369
    return {
4✔
370
        'subject': subject,
371
        'message': remove_double_newlines(message)
372
    }
373

374

375
def send_contact_message(request, subject, message):
4✔
376
    send_mail(subject, message,
4✔
377
              to=settings.PROJECT_CONTACT_RECIPIENTS,
378
              cc=[request.user.email], reply_to=[request.user.email])
379

380

381
def filter_tasks_or_views_for_project(task_or_view, project) -> TaskQuerySet | ViewQuerySet:
4✔
382
    queryset = ( task_or_view.objects
4✔
383
        .filter(Q(catalogs=None) | Q(catalogs=project.catalog))
384
        .filter(Q(groups=None) | Q(groups__in=project.groups))
385
    )
386

387
    if settings.MULTISITE:
4✔
388
        return  queryset.filter(sites=project.site)
4✔
389
    else:
390
        return  queryset.filter(Q(sites=None) | Q(sites=project.site))
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc