• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ephios-dev / ephios / 12313892613

13 Dec 2024 10:24AM UTC coverage: 83.611% (-1.3%) from 84.861%
12313892613

Pull #1348

github

web-flow
Merge 7ba1fbea3 into 481a19acc
Pull Request #1348: add vue recurrence picker

2972 of 3523 branches covered (84.36%)

Branch coverage included in aggregate %.

83 of 358 new or added lines in 9 files covered. (23.18%)

6 existing lines in 1 file now uncovered.

12078 of 14477 relevant lines covered (83.43%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.65
/ephios/core/forms/events.py
1
import operator
1✔
2
import re
1✔
3
from datetime import datetime, timedelta
1✔
4

5
from crispy_forms.bootstrap import FormActions
1✔
6
from crispy_forms.helper import FormHelper
1✔
7
from crispy_forms.layout import Field, Layout, Submit
1✔
8
from django import forms
1✔
9
from django.contrib.auth import get_user_model
1✔
10
from django.contrib.auth.models import Group
1✔
11
from django.core.exceptions import ValidationError
1✔
12
from django.db.models import Q
1✔
13
from django.forms.utils import from_current_timezone
1✔
14
from django.utils.timezone import make_aware
1✔
15
from django.utils.translation import gettext as _
1✔
16
from django_select2.forms import Select2MultipleWidget
1✔
17
from dynamic_preferences.forms import PreferenceForm
1✔
18
from guardian.shortcuts import assign_perm, get_objects_for_user, get_users_with_perms, remove_perm
1✔
19

20
from ephios.core.dynamic_preferences_registry import event_type_preference_registry
1✔
21
from ephios.core.models import Event, EventType, LocalParticipation, Shift, UserProfile
1✔
22
from ephios.core.signup.flow import enabled_signup_flows, signup_flow_from_slug
1✔
23
from ephios.core.signup.structure import enabled_shift_structures, shift_structure_from_slug
1✔
24
from ephios.core.widgets import MultiUserProfileWidget
1✔
25
from ephios.extra.colors import clear_eventtype_color_css_fragment_cache
1✔
26
from ephios.extra.crispy import AbortLink
1✔
27
from ephios.extra.permissions import get_groups_with_perms
1✔
28
from ephios.extra.widgets import ColorInput, CustomDateInput, CustomTimeInput, RecurrenceField
1✔
29
from ephios.modellogging.log import add_log_recorder, update_log
1✔
30
from ephios.modellogging.recorders import (
1✔
31
    DerivedFieldsLogRecorder,
32
    InstanceActionType,
33
    PermissionLogRecorder,
34
)
35

36

37
class EventForm(forms.ModelForm):
1✔
38
    visible_for = forms.ModelMultipleChoiceField(
1✔
39
        queryset=Group.objects.none(),
40
        label=_("Visible for"),
41
        help_text=_(
42
            "Select groups which the event shall be visible for. Regardless, the event will be visible for users that already signed up."
43
        ),
44
        widget=Select2MultipleWidget,
45
        required=False,
46
    )
47
    responsible_users = forms.ModelMultipleChoiceField(
1✔
48
        queryset=UserProfile.objects.all(),
49
        required=False,
50
        label=_("Responsible persons"),
51
        widget=MultiUserProfileWidget,
52
    )
53
    responsible_groups = forms.ModelMultipleChoiceField(
1✔
54
        queryset=Group.objects.all(),
55
        required=False,
56
        label=_("Responsible groups"),
57
        widget=Select2MultipleWidget,
58
    )
59

60
    class Meta:
1✔
61
        model = Event
1✔
62
        fields = ["title", "type", "description", "location"]
1✔
63

64
    def __init__(self, **kwargs):
1✔
65
        user = kwargs.pop("user")
1✔
66
        can_publish_for_groups = get_objects_for_user(user, "publish_event_for_group", klass=Group)
1✔
67

68
        if (event := kwargs.get("instance", None)) is not None:
1✔
69
            self.eventtype = event.type
1✔
70
            responsible_users = get_users_with_perms(
1✔
71
                event, only_with_perms_in=["change_event"], with_group_users=False
72
            )
73
            responsible_groups = get_groups_with_perms(event, only_with_perms_in=["change_event"])
1✔
74
            visible_for = get_groups_with_perms(event, only_with_perms_in=["view_event"]).exclude(
1✔
75
                id__in=responsible_groups
76
            )
77

78
            self.locked_visible_for_groups = set(visible_for.exclude(id__in=can_publish_for_groups))
1✔
79
            kwargs["initial"] = {
1✔
80
                "visible_for": visible_for.filter(id__in=can_publish_for_groups),
81
                "responsible_users": responsible_users,
82
                "responsible_groups": responsible_groups,
83
                **kwargs.get("initial", {}),
84
            }
85
        else:
86
            # new event
87
            self.eventtype = kwargs.pop("eventtype")
1✔
88
            kwargs["initial"] = {
1✔
89
                "responsible_users": self.eventtype.preferences.get("responsible_users")
90
                | get_user_model().objects.filter(pk=user.pk),
91
                "responsible_groups": self.eventtype.preferences.get("responsible_groups"),
92
                "visible_for": self.eventtype.preferences.get("visible_for")
93
                or get_objects_for_user(user, "publish_event_for_group", klass=Group),
94
            }
95
            self.locked_visible_for_groups = set()
1✔
96

97
        super().__init__(**kwargs)
1✔
98

99
        if event is None:
1✔
100
            self.fields.pop("type")
1✔
101
        self.fields["visible_for"].queryset = can_publish_for_groups
1✔
102
        self.fields["visible_for"].disabled = not can_publish_for_groups
1✔
103
        if self.locked_visible_for_groups:
1!
104
            self.fields["visible_for"].help_text = _(
×
105
                "Select groups which the event shall be visible for. "
106
                "This event is also visible for <b>{groups}</b>, "
107
                "but you don't have the permission to change visibility "
108
                "for those groups."
109
            ).format(groups=", ".join(group.name for group in self.locked_visible_for_groups))
110

111
    def save(self, commit=True):
1✔
112
        if not self.instance.pk:
1✔
113
            self.instance.type = self.eventtype
1✔
114
        event: Event = super().save(commit=commit)
1✔
115

116
        add_log_recorder(event, PermissionLogRecorder("view_event", _("Visible for")))
1✔
117
        add_log_recorder(event, PermissionLogRecorder("change_event", _("Responsibles")))
1✔
118

119
        # delete existing permissions
120
        # (better implement https://github.com/django-guardian/django-guardian/issues/654)
121
        for group in get_groups_with_perms(
1✔
122
            event, only_with_perms_in=["view_event", "change_event"], must_have_all_perms=False
123
        ):
124
            remove_perm("view_event", group, event)
1✔
125
            remove_perm("change_event", group, event)
1✔
126
        for user in get_users_with_perms(event, only_with_perms_in=["view_event", "change_event"]):
1✔
127
            remove_perm("view_event", user, event)
1✔
128
            remove_perm("change_event", user, event)
1✔
129

130
        # assign designated permissions
131
        assign_perm(
1✔
132
            "view_event",
133
            Group.objects.filter(
134
                Q(id__in=self.cleaned_data["visible_for"])
135
                | Q(id__in=self.cleaned_data["responsible_groups"])
136
                | Q(id__in=(g.id for g in self.locked_visible_for_groups))
137
            ),
138
            event,
139
        )
140
        assign_perm("change_event", self.cleaned_data["responsible_groups"], event)
1✔
141
        assign_perm("change_event", self.cleaned_data["responsible_users"], event)
1✔
142

143
        # Assign view_event to responsible users  and to non-responsible users
144
        # that already have some sort of participation for the event
145
        # (-> they saw and interacted with it)
146
        # We can't just do users that aren't included by group permissions,
147
        # as they might get removed from that group.
148
        assign_perm(
1✔
149
            "view_event",
150
            UserProfile.objects.filter(
151
                Q(pk__in=self.cleaned_data["responsible_users"])
152
                | Q(
153
                    pk__in=LocalParticipation.objects.filter(
154
                        shift_id__in=event.shifts.all()
155
                    ).values_list("user", flat=True)
156
                )
157
            ),
158
            event,
159
        )
160

161
        update_log(event, InstanceActionType.CHANGE)
1✔
162
        return event
1✔
163

164

165
class ShiftForm(forms.ModelForm):
1✔
166
    date = forms.DateField(widget=CustomDateInput, label=_("Date"))
1✔
167
    meeting_time = forms.TimeField(widget=CustomTimeInput, label=_("Meeting time"))
1✔
168
    start_time = forms.TimeField(widget=CustomTimeInput, label=_("Start time"))
1✔
169
    end_time = forms.TimeField(widget=CustomTimeInput, label=_("End time"))
1✔
170

171
    field_order = [
1✔
172
        "date",
173
        "meeting_time",
174
        "start_time",
175
        "end_time",
176
        "label",
177
        "signup_flow_slug",
178
        "structure_slug",
179
    ]
180

181
    class Meta:
1✔
182
        model = Shift
1✔
183
        fields = [
1✔
184
            "meeting_time",
185
            "start_time",
186
            "end_time",
187
            "label",
188
            "signup_flow_slug",
189
            "structure_slug",
190
        ]
191

192
    def __init__(self, *args, **kwargs):
1✔
193
        super().__init__(*args, **kwargs)
1✔
194
        signup_flows = list(enabled_signup_flows())
1✔
195
        # make sure that if a shift uses a disabled but installed flow, it is also available in the list
196
        if self.instance and (flow_slug := self.instance.signup_flow_slug):
1✔
197
            if flow_slug not in map(operator.attrgetter("slug"), signup_flows):
1!
198
                try:
×
199
                    signup_flows.append(signup_flow_from_slug(flow_slug, self.instance))
×
200
                except ValueError:  # not installed
×
201
                    pass
×
202
        self.fields["signup_flow_slug"].widget = forms.Select(
1✔
203
            choices=((flow.slug, flow.verbose_name) for flow in signup_flows)
204
        )
205
        # same for structure
206
        shift_structures = list(enabled_shift_structures())
1✔
207
        if self.instance and (structure_slug := self.instance.structure_slug):
1✔
208
            if structure_slug not in map(operator.attrgetter("slug"), shift_structures):
1!
209
                try:
×
210
                    shift_structures.append(
×
211
                        shift_structure_from_slug(structure_slug, self.instance)
212
                    )
213
                except ValueError:  # not installed
×
214
                    pass
×
215
        self.fields["structure_slug"].widget = forms.Select(
1✔
216
            choices=((structure.slug, structure.verbose_name) for structure in shift_structures)
217
        )
218
        # this recorder may cause db queries, so it's not added on Shift init, but here in the form
219
        # pylint: disable=undefined-variable
220
        add_log_recorder(
1✔
221
            self.instance,
222
            DerivedFieldsLogRecorder(lambda shift: shift.get_signup_info()),
223
        )
224

225
    def clean_meeting_time(self):
1✔
226
        return from_current_timezone(
1✔
227
            datetime.combine(self.cleaned_data["date"], self.cleaned_data["meeting_time"])
228
        )
229

230
    def clean_start_time(self):
1✔
231
        return from_current_timezone(
1✔
232
            datetime.combine(self.cleaned_data["date"], self.cleaned_data["start_time"])
233
        )
234

235
    def clean_end_time(self):
1✔
236
        end_time = datetime.combine(self.cleaned_data["date"], self.cleaned_data["end_time"])
1✔
237
        if make_aware(end_time) <= self.cleaned_data["start_time"]:
1✔
238
            end_time += timedelta(days=1)
1✔
239
        return from_current_timezone(end_time)
1✔
240

241
    def clean(self):
1✔
242
        cleaned_data = super().clean()
1✔
243
        if {"meeting_time", "start_time"} <= set(cleaned_data.keys()):
1!
244
            if not cleaned_data["meeting_time"] <= cleaned_data["start_time"]:
1!
245
                raise ValidationError(_("Meeting time must not be after start time!"))
×
246
        return cleaned_data
1✔
247

248

249
class EventCopyForm(forms.Form):
1✔
250
    recurrence = RecurrenceField()
1✔
251

252
    def __init__(self, *args, **kwargs):
1✔
253
        original_start = kwargs.pop("original_start", None)
1✔
254
        super().__init__(*args, **kwargs)
1✔
255
        self.fields["recurrence"].widget.original_start = original_start
1✔
256

257

258
class ShiftCopyForm(forms.Form):
1✔
259
    recurrence = RecurrenceField(pick_hour=True)
1✔
260

261
    def __init__(self, *args, **kwargs):
1✔
NEW
262
        original_start = kwargs.pop("original_start", None)
×
NEW
263
        super().__init__(*args, **kwargs)
×
NEW
264
        self.fields["recurrence"].widget.original_start = original_start
×
265

266

267
class EventTypeForm(forms.ModelForm):
1✔
268
    class Meta:
1✔
269
        model = EventType
1✔
270
        fields = ["title", "color"]
1✔
271
        widgets = {"color": ColorInput()}
1✔
272

273
    def clean_color(self):
1✔
274
        regex = re.compile(r"#[a-fA-F\d]{6}")
×
275
        if not regex.match(self.cleaned_data["color"]):
×
276
            raise ValidationError(_("You need to enter a valid color"))
×
277
        return self.cleaned_data["color"]
×
278

279
    def save(self, commit=True):
1✔
280
        clear_eventtype_color_css_fragment_cache()
×
281
        return super().save(commit=commit)
×
282

283

284
class EventTypePreferenceForm(PreferenceForm):
1✔
285
    registry = event_type_preference_registry
1✔
286

287

288
class BasePluginFormMixin:
1✔
289
    template_name = "core/fragments/plugin_form.html"
1✔
290

291
    @property
1✔
292
    def heading(self):
1✔
293
        raise NotImplementedError
×
294

295
    def is_function_active(self):
1✔
296
        """
297
        When building forms for additional features, return whether that feature is enabled for the form instance.
298
        With the default template, if this is True, the collapse is expanded on page load.
299
        """
300
        return False
×
301

302

303
class EventNotificationForm(forms.Form):
1✔
304
    NEW_EVENT = "new"
1✔
305
    REMINDER = "remind"
1✔
306
    PARTICIPANTS = "participants"
1✔
307
    action = forms.ChoiceField(
1✔
308
        choices=[
309
            (NEW_EVENT, _("Send notification about new event to everyone")),
310
            (REMINDER, _("Send reminder to everyone that is not participating")),
311
            (PARTICIPANTS, _("Send a message to all participants")),
312
        ],
313
        widget=forms.RadioSelect,
314
        label=False,
315
    )
316
    mail_content = forms.CharField(required=False, widget=forms.Textarea, label=_("Mail content"))
1✔
317

318
    def __init__(self, *args, **kwargs):
1✔
319
        self.event = kwargs.pop("event")
1✔
320
        super().__init__(*args, **kwargs)
1✔
321
        self.helper = FormHelper(self)
1✔
322
        self.helper.layout = Layout(
1✔
323
            Field("action"),
324
            Field("mail_content"),
325
            FormActions(
326
                Submit("submit", _("Send"), css_class="float-end"),
327
                AbortLink(href=self.event.get_absolute_url()),
328
            ),
329
        )
330

331
    def clean(self):
1✔
332
        if (
1✔
333
            self.cleaned_data.get("action") == self.PARTICIPANTS
334
            and not self.cleaned_data["mail_content"]
335
        ):
336
            raise ValidationError(_("You cannot send an empty mail."))
1✔
337
        return super().clean()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc