• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ephios-dev / ephios / 16229090076

11 Jul 2025 08:25PM UTC coverage: 84.107% (-0.04%) from 84.148%
16229090076

Pull #1593

github

web-flow
Merge e03fc7852 into 511597d47
Pull Request #1593: fix assigning designated participants to positions they dont qualify for

3138 of 3706 branches covered (84.67%)

Branch coverage included in aggregate %.

16 of 23 new or added lines in 4 files covered. (69.57%)

11 existing lines in 3 files now uncovered.

12516 of 14906 relevant lines covered (83.97%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.76
/ephios/plugins/complexsignup/structure.py
1
import itertools
1✔
2
import logging
1✔
3
from collections import defaultdict
1✔
4
from functools import cached_property, partial
1✔
5
from operator import attrgetter
1✔
6
from typing import Optional
1✔
7

8
from django import forms
1✔
9
from django.utils.translation import gettext_lazy as _
1✔
10
from django_select2.forms import ModelSelect2Widget
1✔
11

12
from ephios.core.models import AbstractParticipation
1✔
13
from ephios.core.services.matching import Matching, Position, match_participants_to_positions
1✔
14
from ephios.core.signup.disposition import BaseDispositionParticipationForm
1✔
15
from ephios.core.signup.flow.participant_validation import (
1✔
16
    ParticipantUnfitError,
17
    SignupDisallowedError,
18
)
19
from ephios.core.signup.forms import BaseSignupForm, SignupConfigurationForm
1✔
20
from ephios.core.signup.participants import AbstractParticipant
1✔
21
from ephios.core.signup.stats import SignupStats
1✔
22
from ephios.core.signup.structure.base import BaseShiftStructure
1✔
23
from ephios.plugins.baseshiftstructures.structure.common import MinimumAgeMixin
1✔
24
from ephios.plugins.complexsignup.models import BuildingBlock
1✔
25

26
logger = logging.getLogger(__name__)
1✔
27

28

29
def atomic_block_participant_qualifies_for(structure, participant: AbstractParticipant):
1✔
30
    available_qualification_ids = set(q.id for q in participant.collect_all_qualifications())
×
31
    return [
×
32
        block
33
        for block in iter_atomic_blocks(structure)
34
        if block["qualification_ids"] <= available_qualification_ids
35
        and any(
36
            {q.id for q in position.required_qualifications} <= available_qualification_ids
37
            and not position.designation_only
38
            for position in block["positions"]
39
        )
40
    ]
41

42

43
def _build_human_path(structure):
1✔
44
    return " ยป ".join(
×
45
        [
46
            *[s["display"] for s in reversed(structure["parents"])],
47
            f"{structure['display']} #{structure['number']}",
48
        ]
49
    )
50

51

52
class ComplexDispositionParticipationForm(BaseDispositionParticipationForm):
1✔
53
    disposition_participation_template = "complexsignup/fragment_participation.html"
1✔
54
    unit_path = forms.ChoiceField(
1✔
55
        label=_("Unit"),
56
        required=False,
57
        widget=forms.Select(
58
            attrs={"data-show-for-state": str(AbstractParticipation.States.CONFIRMED)}
59
        ),
60
    )
61

62
    def __init__(self, **kwargs):
1✔
63
        super().__init__(**kwargs)
×
64
        complex_structure = self.shift.structure
×
65
        complex_structure._assume_cache()
×
66

67
        qualified_blocks = atomic_block_participant_qualifies_for(
×
68
            complex_structure._structure, self.instance.participant
69
        )
70
        unqualified_blocks = [
×
71
            b for b in iter_atomic_blocks(complex_structure._structure) if b not in qualified_blocks
72
        ]
73

74
        self.fields["unit_path"].choices = [("", _("auto"))]
×
75
        if qualified_blocks:
×
76
            self.fields["unit_path"].choices += [
×
77
                (
78
                    _("qualified"),
79
                    [(b["path"], _build_human_path(b)) for b in qualified_blocks],
80
                )
81
            ]
82
        if unqualified_blocks:
×
83
            self.fields["unit_path"].choices += [
×
84
                (
85
                    _("unqualified"),
86
                    [(b["path"], _build_human_path(b)) for b in unqualified_blocks],
87
                )
88
            ]
89
        if preferred_unit_path := self.instance.structure_data.get("preferred_unit_path"):
×
90
            try:
×
91
                preferred_block = next(
×
92
                    filter(
93
                        lambda b: b["path"] == preferred_unit_path,
94
                        iter_atomic_blocks(complex_structure._structure),
95
                    )
96
                )
97
                self.preferred_unit_name = _build_human_path(preferred_block)
×
98
            except StopIteration:
×
99
                pass  # preferred block not found
×
100
        if initial := self.instance.structure_data.get("dispatched_unit_path"):
×
101
            self.fields["unit_path"].initial = initial
×
102

103
    def save(self, commit=True):
1✔
104
        self.instance.structure_data["dispatched_unit_path"] = self.cleaned_data["unit_path"]
×
105
        super().save(commit)
×
106

107

108
class ComplexSignupForm(BaseSignupForm):
1✔
109
    preferred_unit_path = forms.ChoiceField(
1✔
110
        label=_("Preferred Unit"),
111
        widget=forms.RadioSelect,
112
        required=False,
113
    )
114

115
    def __init__(self, *args, **kwargs):
1✔
116
        super().__init__(*args, **kwargs)
×
117
        self.fields["preferred_unit_path"].initial = self.instance.structure_data.get(
×
118
            "preferred_unit_path"
119
        )
120
        self.fields["preferred_unit_path"].required = (
×
121
            self.data.get("signup_choice") == "sign_up"
122
            and self.shift.structure.configuration.choose_preferred_unit
123
        )
124
        complex_structure = self.shift.structure
×
125
        complex_structure._assume_cache()
×
126
        self.fields["preferred_unit_path"].choices = [
×
127
            (b["path"], _build_human_path(b))
128
            for b in self.blocks_participant_qualifies_for(complex_structure._structure)
129
        ]
130
        unqualified_blocks = [
×
131
            b
132
            for b in iter_atomic_blocks(complex_structure._structure)
133
            if b not in self.blocks_participant_qualifies_for(complex_structure._structure)
134
        ]
135
        if unqualified_blocks:
×
136
            self.fields["preferred_unit_path"].help_text = _(
×
137
                "You don't qualify for {blocks}."
138
            ).format(blocks=", ".join(set(str(b["name"]) for b in unqualified_blocks)))
139

140
    def save(self, commit=True):
1✔
141
        self.instance.structure_data["preferred_unit_path"] = self.cleaned_data[
×
142
            "preferred_unit_path"
143
        ]
144
        return super().save(commit)
×
145

146
    def blocks_participant_qualifies_for(self, structure):
1✔
147
        return atomic_block_participant_qualifies_for(structure, self.participant)
×
148

149

150
class ComplexConfigurationForm(SignupConfigurationForm):
1✔
151
    building_block = forms.ModelChoiceField(
1✔
152
        widget=ModelSelect2Widget(
153
            model=BuildingBlock,
154
            search_fields=["name__icontains"],
155
        ),
156
        queryset=BuildingBlock.objects.all(),
157
    )
158
    choose_preferred_unit = forms.BooleanField(
1✔
159
        label=_("Participants must provide a preferred unit"),
160
        help_text=_("Participants will be asked during signup."),
161
        widget=forms.CheckboxInput,
162
        required=False,
163
        initial=False,
164
    )
165

166
    template_name = "complexsignup/configuration_form.html"
1✔
167

168

169
class ComplexShiftStructure(
1✔
170
    MinimumAgeMixin,
171
    BaseShiftStructure,
172
):
173
    slug = "complex"
1✔
174
    verbose_name = _("Preconfigured Structure (experimental)")
1✔
175
    description = _("Use preconfigured elements to build a custom structure.")
1✔
176
    shift_state_template_name = "complexsignup/shift_state.html"
1✔
177
    configuration_form_class = ComplexConfigurationForm
1✔
178
    disposition_participation_form_class = ComplexDispositionParticipationForm
1✔
179
    signup_form_class = ComplexSignupForm
1✔
180

181
    def _match(self, participations):
1✔
182
        participants = [participation.participant for participation in participations]
×
183
        confirmed_participants = [
×
184
            participation.participant
185
            for participation in participations
186
            if participation.state == AbstractParticipation.States.CONFIRMED
187
        ]
188
        all_positions, structure = convert_blocks_to_positions(self._base_blocks, participations)
×
189
        matching = match_participants_to_positions(
×
190
            participants, all_positions, confirmed_participants=confirmed_participants
191
        )
192
        matching.attach_participations(participations)
×
193

194
        # let's work up the blocks again, but now with matching
195
        all_positions, structure = convert_blocks_to_positions(
×
196
            self._base_blocks, participations, matching=matching
197
        )
198

199
        # we just have to add unpaired matches to the full stats
200
        signup_stats = structure["signup_stats"] + SignupStats.ZERO.replace(
×
201
            requested_count=len(
202
                [
203
                    p
204
                    for p in matching.unpaired_participations
205
                    if p.state == AbstractParticipation.States.REQUESTED
206
                ]
207
            ),
208
            confirmed_count=len(
209
                [
210
                    p
211
                    for p in matching.unpaired_participations
212
                    if p.state == AbstractParticipation.States.CONFIRMED
213
                ]
214
            ),
215
        )
216
        return matching, all_positions, structure, signup_stats
×
217

218
    @cached_property
1✔
219
    def _base_blocks(self):
1✔
220
        # for now, we only support one base block
221
        qs = BuildingBlock.objects.all()
×
222
        return list(qs.filter(id=self.configuration.building_block))
×
223

224
    def _assume_cache(self):
1✔
225
        if not hasattr(self, "_cached_work"):
×
226
            participations = [
×
227
                p
228
                for p in sorted(
229
                    self.shift.participations.all(), key=attrgetter("state"), reverse=True
230
                )
231
                if p.state
232
                in {AbstractParticipation.States.REQUESTED, AbstractParticipation.States.CONFIRMED}
233
            ]
234
            self._matching, self._all_positions, self._structure, self._signup_stats = self._match(
×
235
                participations
236
            )
237
            self._cached_work = True
×
238

239
    def get_shift_state_context_data(self, request, **kwargs):
1✔
240
        """
241
        Additionally to the context of the event detail view, provide context for rendering `shift_state_template_name`.
242
        """
243
        kwargs = super().get_shift_state_context_data(request, **kwargs)
×
244
        self._assume_cache()
×
245
        kwargs["matching"] = self._matching
×
246
        kwargs["structure"] = self._structure
×
247
        return kwargs
×
248

249
    def get_signup_stats(self) -> "SignupStats":
1✔
250
        self._assume_cache()
×
251
        return self._signup_stats
×
252

253
    def check_qualifications(self, shift, participant, strict_mode=True):
1✔
254
        confirmed_participations = [
×
255
            p
256
            for p in shift.participations.all()
257
            if p.state == AbstractParticipation.States.CONFIRMED
258
        ]
259
        self._assume_cache()
×
260

261
        if not strict_mode:
×
262
            # check if the participant fulfills any of the requirements
263
            if atomic_block_participant_qualifies_for(self._structure, participant):
×
264
                return
×
265
        else:
266
            # check if the participant can be matched into already confirmed participations
267
            confirmed_participants = [p.participant for p in confirmed_participations]
×
268
            if participant in confirmed_participants:
×
269
                return
×
270
            matching_without = match_participants_to_positions(
×
271
                confirmed_participants, self._all_positions
272
            )
273
            matching_with = match_participants_to_positions(
×
274
                confirmed_participants + [participant], self._all_positions
275
            )
276
            if len(matching_with.pairings) > len(matching_without.pairings):
×
277
                return
×
278
        if (free := shift.get_signup_stats().free) and free > 0:
×
279
            raise ParticipantUnfitError(_("You are not qualified."))
×
280
        raise SignupDisallowedError(_("The maximum number of participants is reached."))
×
281

282
    def get_checkers(self):
1✔
283
        return super().get_checkers() + [
×
284
            partial(
285
                self.check_qualifications,
286
                strict_mode=not self.shift.signup_flow.uses_requested_state,
287
            )
288
        ]
289

290
    def get_list_export_data(self):
1✔
291
        self._assume_cache()
×
292
        export_data = []
×
293
        for block in iter_atomic_blocks(self._structure):
×
294
            for position, participation in zip(
×
295
                block["positions"],
296
                block["participations"],
297
            ):
298
                if not participation and not position.required:
×
299
                    continue
×
300
                export_data.append(
×
301
                    {
302
                        "participation": participation,
303
                        "required_qualifications": position.required_qualifications,
304
                        "description": f"{block['display']} #{block['number']}",
305
                    }
306
                )
307
        return export_data
×
308

309

310
def _search_block(
1✔
311
    block: BuildingBlock,
312
    path: str,
313
    level: int,
314
    required_qualifications: set,
315
    path_optional: bool,
316
    participations: list[AbstractParticipation],
317
    opt_counter,
318
    matching: Matching,
319
    parents: list,
320
    composed_label: Optional[str] = None,
321
):  # pylint: disable=too-many-locals
322
    required_here = set(required_qualifications)
×
NEW
323
    for requirement in block.qualification_requirements.all():
×
NEW
324
        if not requirement.everyone:
×
325
            # at least one is not supported
NEW
326
            raise ValueError("unsupported requirement")
×
UNCOV
327
        required_here |= set(requirement.qualifications.all())
×
328

329
    all_positions = []
×
330
    structure = {
×
331
        "is_composite": block.is_composite(),
332
        "positions": [],
333
        "participations": [],
334
        "sub_blocks": [],
335
        "path": path,
336
        "level": level,
337
        "optional": path_optional,
338
        "name": block.name,
339
        "label": composed_label,
340
        "display": composed_label or block.name,
341
        "number": next(opt_counter[block.name]),
342
        "qualification_label": ", ".join(q.abbreviation for q in required_here),
343
        "qualification_ids": {q.id for q in required_here},
344
        "parents": parents,
345
        "signup_stats": SignupStats.ZERO,
346
    }
347
    if block.is_composite():
×
NEW
348
        for composition in (
×
349
            block.sub_compositions.all()
350
            .select_related(
351
                "sub_block",
352
            )
353
            .prefetch_related(
354
                "sub_block__positions__qualifications",
355
                "sub_block__qualification_requirements__qualifications",
356
                "sub_block__sub_compositions",
357
            )
358
        ):
359
            positions, sub_structure = _search_block(
×
360
                block=composition.sub_block,
361
                path=f"{path}{composition.id}-",
362
                level=level + 1,
363
                required_qualifications=required_here,
364
                path_optional=path_optional or composition.optional,
365
                opt_counter=opt_counter,
366
                participations=participations,
367
                matching=matching,
368
                parents=[structure, *parents],
369
                composed_label=composition.label,
370
            )
371
            structure["signup_stats"] += sub_structure["signup_stats"]
×
372
            all_positions.extend(positions)
×
373
            structure["sub_blocks"].append(sub_structure)
×
374
    else:
375
        _build_atomic_block_structure(
×
376
            all_positions,
377
            block,
378
            matching,
379
            opt_counter,
380
            participations,
381
            path,
382
            path_optional,
383
            required_here,
384
            structure,
385
        )
386
    return all_positions, structure
×
387

388

389
def _build_atomic_block_structure(
1✔
390
    all_positions,
391
    block,
392
    matching,
393
    opt_counter,
394
    participations,
395
    path,
396
    path_optional,
397
    required_here,
398
    structure,
399
):  # pylint: disable=too-many-locals
400
    designated_for = {
×
401
        p.participant
402
        for p in participations
403
        if p.structure_data.get("dispatched_unit_path") == path
404
    }
405
    preferred_by = {
×
406
        p.participant for p in participations if p.structure_data.get("preferred_unit_path") == path
407
    }
408
    for block_position in block.positions.all():
×
409
        match_id = _build_position_id(block, block_position.id, path)
×
410
        label = block_position.label or ", ".join(
×
411
            q.abbreviation for q in block_position.qualifications.all()
412
        )
413
        required = not (block_position.optional or path_optional)
×
414
        p = Position(
×
415
            id=match_id,
416
            required_qualifications=required_here | set(block_position.qualifications.all()),
417
            designated_for=designated_for,
418
            preferred_by=preferred_by,
419
            required=required,
420
            label=label,
421
            aux_score=1,
422
        )
423
        participation = matching.participation_for_position(match_id) if matching else None
×
424
        structure["signup_stats"] += SignupStats.ZERO.replace(
×
425
            min_count=int(required),
426
            max_count=1,
427
            missing=bool(required and not participation),
428
            free=bool(participation is None),
429
            requested_count=bool(
430
                participation and participation.state == AbstractParticipation.States.REQUESTED
431
            ),
432
            confirmed_count=bool(
433
                participation and participation.state == AbstractParticipation.States.CONFIRMED
434
            ),
435
        )
436
        all_positions.append(p)
×
437
        structure["positions"].append(p)
×
438
        structure["participations"].append(participation)
×
439

440
    # build derivative positions for "allow_more" participants
441
    allow_more_count = int(block.allow_more) * (
×
442
        len(participations) + 1
443
    )  # 1 extra in case of signup matching check
444
    for _ in range(allow_more_count):
×
445
        opt_match_id = _build_position_id(block, next(opt_counter[str(block.id)]), path)
×
446
        p = Position(
×
447
            id=opt_match_id,
448
            required_qualifications=required_here,
449
            preferred_by=preferred_by,
450
            designated_for=designated_for,
451
            aux_score=0,
452
            required=False,  # allow_more -> always optional
453
            label=block.name,
454
        )
455
        participation = matching.participation_for_position(opt_match_id) if matching else None
×
456
        structure["signup_stats"] += SignupStats.ZERO.replace(
×
457
            min_count=0,
458
            max_count=None,  # allow_more -> always free
459
            missing=0,
460
            free=None,
461
            requested_count=bool(
462
                participation and participation.state == AbstractParticipation.States.REQUESTED
463
            ),
464
            confirmed_count=bool(
465
                participation and participation.state == AbstractParticipation.States.CONFIRMED
466
            ),
467
        )
468
        all_positions.append(p)
×
469
        structure["positions"].append(p)
×
470
        structure["participations"].append(participation)
×
471

472
    for _ in range(max(0, len(designated_for) - len(block.positions.all()) - allow_more_count)):
×
473
        # if more designated participants than we have positions, we need to add placeholder anyway
474
        opt_match_id = _build_position_id(block, next(opt_counter[str(block.id)]), path)
×
475
        p = Position(
×
476
            id=opt_match_id,
477
            required_qualifications=required_here,
478
            preferred_by=preferred_by,
479
            designated_for=designated_for,
480
            aux_score=0,
481
            required=False,  # designated -> always optional
482
            label=block.name,
483
            designation_only=True,
484
        )
485
        participation = matching.participation_for_position(opt_match_id) if matching else None
×
486
        structure["signup_stats"] += SignupStats.ZERO.replace(
×
487
            min_count=0,
488
            max_count=0,  # designated overflow -> runs over max
489
            missing=0,
490
            free=0,
491
            requested_count=bool(
492
                participation and participation.state == AbstractParticipation.States.REQUESTED
493
            ),
494
            confirmed_count=bool(
495
                participation and participation.state == AbstractParticipation.States.CONFIRMED
496
            ),
497
        )
498
        all_positions.append(p)
×
499
        structure["positions"].append(p)
×
500
        structure["participations"].append(participation)
×
501

502

503
def _build_position_id(block, path, position_id):
1✔
504
    """
505
    For a given block, a counter providing running numbers and a path of blocks,
506
    construct an ID for the matching positions.
507
    """
508
    return f"{path}{block.uuid}-opt-{position_id}"
×
509

510

511
def convert_blocks_to_positions(starting_blocks, participations, matching=None):
1✔
512
    """
513
    If a matching is provided, the signup stats will have correct participation counts
514
    """
515
    root_path = "root-"
×
516
    all_positions = []
×
517
    structure = {
×
518
        "is_composite": True,  # root block is "virtual" and always composite
519
        "positions": [],
520
        "position_match_ids": [],
521
        "sub_blocks": [],
522
        "path": root_path,
523
        "optional": False,
524
        "level": 0,
525
        "name": "ROOT",
526
        "qualification_label": "",
527
    }
528
    opt_counter = defaultdict(partial(itertools.count, 1))
×
529
    for block in starting_blocks:
×
530
        positions, sub_structure = _search_block(
×
531
            block,
532
            path=root_path,
533
            level=1,
534
            path_optional=False,
535
            required_qualifications=set(),
536
            participations=participations,
537
            opt_counter=opt_counter,
538
            matching=matching,
539
            parents=[],
540
        )
541
        all_positions.extend(positions)
×
542
        structure["sub_blocks"].append(sub_structure)
×
543
    structure["signup_stats"] = SignupStats.reduce(
×
544
        [s["signup_stats"] for s in structure["sub_blocks"]]
545
    )
546
    return all_positions, structure
×
547

548

549
def iter_atomic_blocks(structure):
1✔
550
    for sub_block in structure["sub_blocks"]:
×
551
        if not sub_block["is_composite"]:
×
552
            yield sub_block
×
553
        else:
554
            yield from iter_atomic_blocks(sub_block)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc