• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ephios-dev / ephios / 4396849668

pending completion
4396849668

push

github

Felix Rindt
Refactor: nullable date-of-birth

1550 of 1757 branches covered (88.22%)

Branch coverage included in aggregate %.

10 of 10 new or added lines in 2 files covered. (100.0%)

7374 of 8393 relevant lines covered (87.86%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.41
/ephios/core/models/users.py
1
import datetime
1✔
2
import functools
1✔
3
import secrets
1✔
4
import uuid
1✔
5
from datetime import date
1✔
6
from itertools import chain
1✔
7
from typing import Optional
1✔
8

9
import guardian.mixins
1✔
10
from django.contrib.auth import get_user_model
1✔
11
from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager
1✔
12
from django.contrib.auth.models import Group, PermissionsMixin
1✔
13
from django.db import models, transaction
1✔
14
from django.db.models import (
1✔
15
    BooleanField,
16
    CharField,
17
    DateField,
18
    EmailField,
19
    ExpressionWrapper,
20
    F,
21
    ForeignKey,
22
    Max,
23
    Model,
24
    Q,
25
    Sum,
26
    Value,
27
)
28
from django.db.models.functions import TruncDate
1✔
29
from django.utils import timezone
1✔
30
from django.utils.translation import gettext_lazy as _
1✔
31

32
from ephios.extra.fields import EndOfDayDateTimeField
1✔
33
from ephios.extra.json import CustomJSONDecoder, CustomJSONEncoder
1✔
34
from ephios.extra.widgets import CustomDateInput
1✔
35
from ephios.modellogging.log import (
1✔
36
    ModelFieldsLogConfig,
37
    add_log_recorder,
38
    register_model_for_logging,
39
)
40
from ephios.modellogging.recorders import FixedMessageLogRecorder, M2MLogRecorder
1✔
41

42

43
class UserProfileManager(BaseUserManager):
1✔
44
    def create_user(
1✔
45
        self,
46
        email,
47
        first_name,
48
        last_name,
49
        date_of_birth,
50
        password=None,
51
    ):
52
        # pylint: disable=unexpected-keyword-arg,no-value-for-parameter
53
        user = self.model(
×
54
            email=email,
55
            first_name=first_name,
56
            last_name=last_name,
57
            date_of_birth=date_of_birth,
58
        )
59
        user.set_password(password)
×
60
        user.save()
×
61
        return user
×
62

63
    def create_superuser(
1✔
64
        self,
65
        email,
66
        first_name,
67
        last_name,
68
        date_of_birth,
69
        password=None,
70
    ):
71
        user = self.create_user(
×
72
            email=email,
73
            password=password,
74
            first_name=first_name,
75
            last_name=last_name,
76
            date_of_birth=date_of_birth,
77
        )
78
        user.is_superuser = True
×
79
        user.is_staff = True
×
80
        user.save()
×
81
        return user
×
82

83

84
class VisibleUserProfileManager(BaseUserManager):
1✔
85
    def get_queryset(self):
1✔
86
        return super().get_queryset().filter(is_visible=True)
1✔
87

88

89
class UserProfile(guardian.mixins.GuardianUserMixin, PermissionsMixin, AbstractBaseUser):
1✔
90
    email = EmailField(_("email address"), unique=True)
1✔
91
    is_active = BooleanField(default=True, verbose_name=_("Active"))
1✔
92
    is_visible = BooleanField(default=True, verbose_name=_("Visible"))
1✔
93
    is_staff = BooleanField(default=False, verbose_name=_("Staff user"))
1✔
94
    first_name = CharField(_("first name"), max_length=254)
1✔
95
    last_name = CharField(_("last name"), max_length=254)
1✔
96
    date_of_birth = DateField(_("date of birth"), null=True, blank=False)
1✔
97
    phone = CharField(_("phone number"), max_length=254, blank=True, null=True)
1✔
98
    calendar_token = CharField(_("calendar token"), max_length=254, default=secrets.token_urlsafe)
1✔
99

100
    USERNAME_FIELD = "email"
1✔
101
    REQUIRED_FIELDS = [
1✔
102
        "first_name",
103
        "last_name",
104
        "date_of_birth",
105
    ]
106

107
    objects = VisibleUserProfileManager()
1✔
108
    all_objects = UserProfileManager()
1✔
109

110
    class Meta:
1✔
111
        verbose_name = _("user profile")
1✔
112
        verbose_name_plural = _("user profiles")
1✔
113
        db_table = "userprofile"
1✔
114
        base_manager_name = "all_objects"
1✔
115
        default_manager_name = "all_objects"
1✔
116

117
    def get_full_name(self):
1✔
118
        return self.first_name + " " + self.last_name
1✔
119

120
    def __str__(self):
1✔
121
        return self.get_full_name()
1✔
122

123
    def get_short_name(self):
1✔
124
        return self.first_name
×
125

126
    @property
1✔
127
    def age(self) -> Optional[int]:
1✔
128
        if self.date_of_birth is None:
1!
129
            return None
×
130
        today, born = date.today(), self.date_of_birth
1✔
131
        return today.year - born.year - ((today.month, today.day) < (born.month, born.day))
1✔
132

133
    @property
1✔
134
    def is_minor(self):
1✔
135
        return self.date_of_birth is not None and self.age < 18
1✔
136

137
    def as_participant(self):
1✔
138
        from ephios.core.signup.participants import LocalUserParticipant
1✔
139

140
        return LocalUserParticipant(
1✔
141
            first_name=self.first_name,
142
            last_name=self.last_name,
143
            qualifications=self.qualifications,
144
            date_of_birth=self.date_of_birth,
145
            email=self.email if self.is_active else None,
146
            user=self,
147
        )
148

149
    @property
1✔
150
    def qualifications(self):
1✔
151
        return Qualification.objects.filter(
1✔
152
            pk__in=self.qualification_grants.unexpired().values_list("qualification_id", flat=True)
153
        ).annotate(
154
            expires=Max(F("grants__expires"), filter=Q(grants__user=self)),
155
        )
156

157
    def get_workhour_items(self):
1✔
158
        from ephios.core.models import AbstractParticipation
1✔
159

160
        participations = (
1✔
161
            self.participations.filter(state=AbstractParticipation.States.CONFIRMED, finished=True)
162
            .annotate(
163
                duration=ExpressionWrapper(
164
                    (F("end_time") - F("start_time")),
165
                    output_field=models.DurationField(),
166
                ),
167
                date=ExpressionWrapper(TruncDate(F("start_time")), output_field=DateField()),
168
                reason=F("shift__event__title"),
169
                type=Value("event"),
170
                origin_id=F("shift__event__pk"),
171
            )
172
            .values("duration", "date", "reason", "type", "origin_id")
173
        )
174
        workinghours = self.workinghours_set.annotate(
1✔
175
            duration=F("hours"), type=Value("request"), origin_id=F("pk")
176
        ).values("duration", "date", "reason", "type", "origin_id")
177
        hour_sum = (
1✔
178
            participations.aggregate(Sum("duration"))["duration__sum"] or datetime.timedelta()
179
        ) + datetime.timedelta(
180
            hours=float(workinghours.aggregate(Sum("duration"))["duration__sum"] or 0)
181
        )
182
        return hour_sum, list(
1✔
183
            sorted(chain(participations, workinghours), key=lambda k: k["date"], reverse=True)
184
        )
185

186

187
register_model_for_logging(
1✔
188
    UserProfile,
189
    ModelFieldsLogConfig(
190
        unlogged_fields={"id", "password", "calendar_token", "last_login"},
191
    ),
192
)
193

194
register_model_for_logging(
1✔
195
    Group,
196
    ModelFieldsLogConfig(
197
        unlogged_fields={"id", "permissions"},
198
        initial_recorders_func=lambda group: [
199
            M2MLogRecorder(UserProfile.groups.field, reverse=True, verbose_name=_("Users")),
200
        ],
201
    ),
202
)
203

204

205
class QualificationCategoryManager(models.Manager):
1✔
206
    def get_by_natural_key(self, category_uuid, *args):
1✔
207
        return self.get(uuid=category_uuid)
×
208

209

210
class QualificationCategory(Model):
1✔
211
    uuid = models.UUIDField("UUID", unique=True, default=uuid.uuid4)
1✔
212
    title = CharField(_("title"), max_length=254)
1✔
213

214
    objects = QualificationCategoryManager()
1✔
215

216
    class Meta:
1✔
217
        verbose_name = _("qualification track")
1✔
218
        verbose_name_plural = _("qualification tracks")
1✔
219
        db_table = "qualificationcategory"
1✔
220

221
    def __str__(self):
1✔
222
        return str(self.title)
1✔
223

224
    def natural_key(self):
1✔
225
        return (self.uuid, self.title)
×
226

227

228
class QualificationManager(models.Manager):
1✔
229
    def get_by_natural_key(self, qualification_uuid, *args):
1✔
230
        return self.get(uuid=qualification_uuid)
×
231

232

233
class Qualification(Model):
1✔
234
    uuid = models.UUIDField(unique=True, default=uuid.uuid4, verbose_name="UUID")
1✔
235
    title = CharField(_("title"), max_length=254)
1✔
236
    abbreviation = CharField(max_length=254, verbose_name=_("Abbreviation"))
1✔
237
    category = ForeignKey(
1✔
238
        QualificationCategory,
239
        on_delete=models.CASCADE,
240
        related_name="qualifications",
241
        verbose_name=_("category"),
242
    )
243
    includes = models.ManyToManyField(
1✔
244
        "self",
245
        related_name="included_by",
246
        verbose_name=_("Included"),
247
        help_text=_("other qualifications that this qualification includes"),
248
        symmetrical=False,
249
        blank=True,
250
    )
251
    is_imported = models.BooleanField(verbose_name=_("imported"), default=True)
1✔
252

253
    objects = QualificationManager()
1✔
254

255
    def __eq__(self, other):
1✔
256
        return self.uuid == other.uuid if other else False
1✔
257

258
    def __hash__(self):
1✔
259
        return hash(self.uuid)
1✔
260

261
    class Meta:
1✔
262
        verbose_name = _("qualification")
1✔
263
        verbose_name_plural = _("qualifications")
1✔
264
        db_table = "qualification"
1✔
265

266
    def __str__(self):
1✔
267
        return str(self.title)
1✔
268

269
    def natural_key(self):
1✔
270
        return (self.uuid, self.title)
×
271

272
    natural_key.dependencies = ["core.QualificationCategory"]
1✔
273

274
    @classmethod
1✔
275
    def collect_all_included_qualifications(cls, given_qualifications) -> set:
1✔
276
        """We collect using breadth first search with one query for every layer of inclusion."""
277
        all_qualifications = set(given_qualifications)
1✔
278
        current = set(given_qualifications)
1✔
279
        while current:
1✔
280
            new = (
1✔
281
                Qualification.objects.filter(included_by__in=current)
282
                .exclude(id__in=(q.id for q in all_qualifications))
283
                .distinct()
284
            )
285
            all_qualifications |= set(new)
1✔
286
            current = new
1✔
287
        return all_qualifications
1✔
288

289

290
class CustomQualificationGrantQuerySet(models.QuerySet):
1✔
291
    # Available on both Manager and QuerySet.
292
    def unexpired(self):
1✔
293
        return self.exclude(expires__isnull=False, expires__lt=timezone.now())
1✔
294

295

296
class ExpirationDateField(models.DateTimeField):
1✔
297
    """
298
    A model datetime field whose formfield is an EndOfDayDateTimeField
299
    """
300

301
    def formfield(self, **kwargs):
1✔
302
        return super().formfield(
1✔
303
            **{
304
                "widget": CustomDateInput,
305
                "form_class": EndOfDayDateTimeField,
306
                **kwargs,
307
            }
308
        )
309

310

311
class QualificationGrant(Model):
1✔
312
    qualification = ForeignKey(
1✔
313
        Qualification,
314
        on_delete=models.CASCADE,
315
        verbose_name=_("qualification"),
316
        related_name="grants",
317
    )
318
    user = ForeignKey(
1✔
319
        get_user_model(),
320
        related_name="qualification_grants",
321
        on_delete=models.CASCADE,
322
        verbose_name=_("user profile"),
323
    )
324
    expires = ExpirationDateField(_("expiration date"), blank=True, null=True)
1✔
325

326
    objects = CustomQualificationGrantQuerySet.as_manager()
1✔
327

328
    def __str__(self):
1✔
329
        return f"{self.qualification!s} {_('for')} {self.user!s}"
1✔
330

331
    class Meta:
1✔
332
        unique_together = [["qualification", "user"]]  # issue #218
1✔
333
        db_table = "qualificationgrant"
1✔
334
        verbose_name = _("Qualification grant")
1✔
335

336

337
register_model_for_logging(
1✔
338
    QualificationGrant,
339
    ModelFieldsLogConfig(attach_to_func=lambda grant: (UserProfile, grant.user_id)),
340
)
341

342

343
class Consequence(Model):
1✔
344
    slug = models.CharField(max_length=255)
1✔
345
    data = models.JSONField(default=dict, encoder=CustomJSONEncoder, decoder=CustomJSONDecoder)
1✔
346

347
    user = models.ForeignKey(
1✔
348
        get_user_model(),
349
        on_delete=models.CASCADE,
350
        verbose_name=_("affected user"),
351
        null=True,
352
        related_name="affecting_consequences",
353
    )
354

355
    class States(models.TextChoices):
1✔
356
        NEEDS_CONFIRMATION = "needs_confirmation", _("needs confirmation")
1✔
357
        EXECUTED = "executed", _("executed")
1✔
358
        FAILED = "failed", _("failed")
1✔
359
        DENIED = "denied", _("denied")
1✔
360

361
    state = models.TextField(
1✔
362
        max_length=31,
363
        choices=States.choices,
364
        default=States.NEEDS_CONFIRMATION,
365
        verbose_name=_("State"),
366
    )
367

368
    class Meta:
1✔
369
        db_table = "consequence"
1✔
370
        verbose_name = _("Consequence")
1✔
371

372
    @property
1✔
373
    def handler(self):
1✔
374
        from ephios.core import consequences
1✔
375

376
        return consequences.consequence_handler_from_slug(self.slug)
1✔
377

378
    def confirm(self, user):
1✔
379
        from ephios.core.consequences import ConsequenceError
1✔
380

381
        if self.state not in {
1!
382
            self.States.NEEDS_CONFIRMATION,
383
            self.States.DENIED,
384
            self.States.FAILED,
385
        }:
386
            raise ConsequenceError(_("Consequence was executed already."))
×
387

388
        try:
1✔
389
            with transaction.atomic():
1✔
390
                self.handler.execute(self)
1✔
391
                from ephios.core.services.notifications.types import ConsequenceApprovedNotification
1✔
392

393
                if user != self.user:
1✔
394
                    ConsequenceApprovedNotification.send(self)
1✔
395
        except Exception as e:  # pylint: disable=broad-except
×
396
            self.state = self.States.FAILED
×
397
            add_log_recorder(
×
398
                self,
399
                FixedMessageLogRecorder(
400
                    label=_("Reason"),
401
                    message=str(e),
402
                ),
403
            )
404
            raise ConsequenceError(str(e)) from e
×
405
        else:
406
            self.state = self.States.EXECUTED
1✔
407
        finally:
408
            self.save()
1✔
409

410
    def deny(self, user):
1✔
411
        from ephios.core.consequences import ConsequenceError
1✔
412

413
        if self.state not in {self.States.NEEDS_CONFIRMATION, self.States.FAILED}:
1!
414
            raise ConsequenceError(_("Consequence was executed or denied already."))
×
415
        self.state = self.States.DENIED
1✔
416
        self.save()
1✔
417
        from ephios.core.services.notifications.types import ConsequenceDeniedNotification
1✔
418

419
        if user != self.user:
1!
420
            ConsequenceDeniedNotification.send(self)
1✔
421

422
    def render(self):
1✔
423
        return self.handler.render(self)
1✔
424

425
    def __str__(self):
1✔
426
        return self.render()
1✔
427

428
    def attach_log_to_object(self):
1✔
429
        if self.user_id:
1!
430
            return UserProfile, self.user_id
1✔
431
        return Consequence, self.id
×
432

433

434
register_model_for_logging(
1✔
435
    Consequence,
436
    ModelFieldsLogConfig(
437
        unlogged_fields=["id", "slug", "user", "data"],
438
        attach_to_func=lambda consequence: consequence.attach_log_to_object(),
439
    ),
440
)
441

442

443
class WorkingHours(Model):
1✔
444
    user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
1✔
445
    hours = models.DecimalField(decimal_places=2, max_digits=7, verbose_name=_("Hours of work"))
1✔
446
    reason = models.CharField(max_length=1024, default="", verbose_name=_("Occasion"))
1✔
447
    date = models.DateField()
1✔
448

449
    class Meta:
1✔
450
        db_table = "workinghours"
1✔
451

452
    def __str__(self):
1✔
453
        return f"{self.hours} hours for {self.user} because of {self.reason} on {self.date}"
1✔
454

455

456
class Notification(Model):
1✔
457
    slug = models.SlugField(max_length=255)
1✔
458
    user = models.ForeignKey(
1✔
459
        get_user_model(),
460
        on_delete=models.CASCADE,
461
        verbose_name=_("affected user"),
462
        null=True,
463
    )
464
    failed = models.BooleanField(default=False)
1✔
465
    data = models.JSONField(
1✔
466
        blank=True, default=dict, encoder=CustomJSONEncoder, decoder=CustomJSONDecoder
467
    )
468
    created_at = models.DateTimeField(auto_now_add=True)
1✔
469

470
    @functools.cached_property
1✔
471
    def notification_type(self):
1✔
472
        from ephios.core.services.notifications.types import notification_type_from_slug
1✔
473

474
        return notification_type_from_slug(self.slug)
1✔
475

476
    @property
1✔
477
    def subject(self):
1✔
478
        return self.notification_type.get_subject(self)
1✔
479

480
    def as_plaintext(self):
1✔
481
        return self.notification_type.as_plaintext(self)
1✔
482

483
    def as_html(self):
1✔
484
        return self.notification_type.as_html(self)
1✔
485

486
    def get_url(self):
1✔
487
        return self.notification_type.get_url(self)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc