• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bramp / build-along / 20086551557

10 Dec 2025 03:43AM UTC coverage: 90.303% (+0.3%) from 90.041%
20086551557

push

github

bramp
Refactor arrow shaft detection: unified method, stroked line support, multi-head grouping

- Merge _find_simple_shaft, _find_stroked_line_shaft, and _find_cornered_shaft
  into a single unified _find_shaft method that handles all shaft types by
  extracting points and finding closest/furthest from the arrowhead tip
- Add support for stroked line shafts (stroke_color instead of fill_color)
- Add tail_grouping_tolerance config for grouping arrowheads with nearby tails
- Group arrowheads that share the same shaft_block (L-shaped arrows with
  multiple heads at different ends)
- Use union-find algorithm to group arrowheads by shared shaft or tail proximity
- Extract colors_match to shared utils module
- Add comprehensive tests for stroked line shafts, tail correctness, and
  multi-head arrow grouping
- Update golden files for pages 011, 013, 015, 017 with corrected arrow detection

204 of 206 new or added lines in 5 files covered. (99.03%)

252 existing lines in 14 files now uncovered.

11855 of 13128 relevant lines covered (90.3%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.4
/src/build_a_long/pdf_extract/classifier/steps/step_classifier.py
1
"""
2
Step classifier.
3

4
Purpose
5
-------
6
Identify complete Step structures by combining step_number, parts_list, and diagram
7
elements. A Step represents a single building instruction comprising:
8
- A StepNumber label
9
- An optional PartsList (the parts needed for this step)
10
- A Diagram (the main instruction graphic showing what to build)
11

12
We look for step_numbers and attempt to pair them with nearby parts_lists and
13
identify the appropriate diagram region for each step.
14

15
Debugging
16
---------
17
Set environment variables to aid investigation without code changes:
18

19
- LOG_LEVEL=DEBUG
20
    Enables DEBUG-level logging (if not already configured by caller).
21
"""
22

23
import logging
1✔
24

25
import numpy as np
1✔
26
from scipy.optimize import linear_sum_assignment
1✔
27

28
from build_a_long.pdf_extract.classifier.candidate import Candidate
1✔
29
from build_a_long.pdf_extract.classifier.classification_result import (
1✔
30
    CandidateFailedError,
31
    ClassificationResult,
32
)
33
from build_a_long.pdf_extract.classifier.label_classifier import (
1✔
34
    LabelClassifier,
35
)
36
from build_a_long.pdf_extract.classifier.score import (
1✔
37
    Score,
38
    Weight,
39
    find_best_scoring,
40
)
41
from build_a_long.pdf_extract.classifier.text import (
1✔
42
    extract_step_number_value,
43
)
44
from build_a_long.pdf_extract.extractor.bbox import BBox, filter_overlapping
1✔
45
from build_a_long.pdf_extract.extractor.lego_page_elements import (
1✔
46
    Arrow,
47
    Diagram,
48
    Divider,
49
    LegoPageElements,
50
    PartsList,
51
    RotationSymbol,
52
    Step,
53
    StepNumber,
54
    SubAssembly,
55
)
56
from build_a_long.pdf_extract.extractor.page_blocks import Text
1✔
57

58
log = logging.getLogger(__name__)
1✔
59

60

61
class _StepScore(Score):
1✔
62
    """Internal score representation for step classification."""
63

64
    step_value: int
65
    """The parsed step number value (e.g., 1, 2, 3)."""
1✔
66

67
    step_number_candidate: Candidate
68
    """The step number candidate this step is associated with."""
1✔
69

70
    parts_list_candidate: Candidate | None
71
    """The parts list candidate paired with this step (if any)."""
1✔
72

73
    has_parts_list: bool
74
    """Whether this step has an associated parts list."""
1✔
75

76
    step_proximity_score: float
77
    """Score based on proximity to the PartsList above (0.0-1.0).
1✔
78
    1.0 for closest proximity, 0.0 if very far. 0.0 if no parts list."""
79

80
    step_alignment_score: float
81
    """Score based on left-edge alignment with PartsList above (0.0-1.0).
1✔
82
    1.0 is perfect alignment, 0.0 is very misaligned. 0.0 if no parts list."""
83

84
    def score(self) -> Weight:
1✔
85
        """Return the overall pairing score."""
86
        return self.overall_score()
×
87

88
    def overall_score(self) -> float:
1✔
89
        """Calculate overall quality score based on parts list pairing.
90

91
        Steps with a parts_list are given a bonus to prefer them over
92
        steps without parts_list. Diagrams are found at build time, not
93
        during scoring, to allow rotation symbols to claim small images first.
94
        """
95
        if self.has_parts_list:
1✔
96
            # Base score for having parts_list + proximity/alignment bonus
97
            parts_list_bonus = 0.5
1✔
98
            pairing_score = (
1✔
99
                self.step_proximity_score + self.step_alignment_score
100
            ) / 2.0
101
            return parts_list_bonus + 0.5 * pairing_score
1✔
102
        return 0.3  # Lower base score for steps without parts list
1✔
103

104
    def sort_key(self) -> tuple[float, int]:
1✔
105
        """Return a tuple for sorting candidates.
106

107
        We prefer:
108
        1. Higher overall scores (better StepNumber-PartsList-Diagram match)
109
        2. Lower step number values (to break ties and maintain order)
110
        """
111
        return (-self.overall_score(), self.step_value)
1✔
112

113

114
class StepClassifier(LabelClassifier):
1✔
115
    """Classifier for complete Step structures."""
116

117
    output = "step"
1✔
118
    requires = frozenset(
1✔
119
        {
120
            "step_number",
121
            "parts_list",
122
            "diagram",
123
            "rotation_symbol",
124
            "subassembly",
125
            "preview",
126
        }
127
    )
128

129
    def _score(self, result: ClassificationResult) -> None:
1✔
130
        """Score step pairings and create candidates."""
131
        page_data = result.page_data
1✔
132

133
        # Get step number and parts list candidates (not constructed elements)
134
        step_number_candidates = result.get_scored_candidates(
1✔
135
            "step_number", valid_only=False, exclude_failed=True
136
        )
137

138
        if not step_number_candidates:
1✔
139
            return
1✔
140

141
        # Get parts_list candidates
142
        parts_list_candidates = result.get_scored_candidates(
1✔
143
            "parts_list",
144
            valid_only=False,
145
            exclude_failed=True,
146
        )
147

148
        log.debug(
1✔
149
            "[step] page=%s step_candidates=%d parts_list_candidates=%d",
150
            page_data.page_number,
151
            len(step_number_candidates),
152
            len(parts_list_candidates),
153
        )
154

155
        # Create all possible Step candidates for pairings (without diagrams initially)
156
        # Deduplication happens at build time, not scoring time, so that
157
        # diagram assignment can be re-evaluated as blocks get claimed.
158
        all_candidates: list[Candidate] = []
1✔
159
        for step_candidate in step_number_candidates:
1✔
160
            # Create candidates for this StepNumber paired with each PartsList
161
            for parts_list_candidate in parts_list_candidates:
1✔
162
                candidate = self._create_step_candidate(
1✔
163
                    step_candidate, parts_list_candidate, result
164
                )
165
                if candidate:
1✔
166
                    all_candidates.append(candidate)
1✔
167

168
            # Also create a candidate with no PartsList (fallback)
169
            candidate = self._create_step_candidate(step_candidate, None, result)
1✔
170
            if candidate:
1✔
171
                all_candidates.append(candidate)
1✔
172

173
        # Add all candidates to result (deduplication happens at build time)
174
        for candidate in all_candidates:
1✔
175
            result.add_candidate(candidate)
1✔
176

177
        log.debug(
1✔
178
            "[step] Created %d step candidates",
179
            len(all_candidates),
180
        )
181

182
    def build_all(self, result: ClassificationResult) -> list[LegoPageElements]:
1✔
183
        """Build all Step elements with coordinated rotation symbol assignment.
184

185
        This method:
186
        1. Builds all rotation symbols first (so they claim their Drawing blocks)
187
        2. Builds all parts lists (so they claim their blocks)
188
        3. Builds Step candidates, deduplicating by step value at build time
189
        4. Uses Hungarian matching to optimally assign rotation symbols to steps
190

191
        Deduplication happens at build time (not scoring time) so that diagram
192
        assignment can be re-evaluated as blocks get claimed by other elements.
193

194
        This coordination ensures:
195
        - Rotation symbols are built before diagrams, preventing incorrect clustering
196
        - Each rotation symbol is assigned to at most one step
197
        - Assignment is globally optimal based on distance to step diagrams
198

199
        Returns:
200
            List of successfully constructed Step elements, sorted by step number
201
        """
202
        # Phase 1: Build all rotation symbols BEFORE steps.
203
        # This allows rotation symbols to claim their Drawing blocks first,
204
        # preventing them from being incorrectly clustered into diagrams.
205
        for rs_candidate in result.get_scored_candidates(
1✔
206
            "rotation_symbol", valid_only=False, exclude_failed=True
207
        ):
208
            try:
1✔
209
                result.build(rs_candidate)
1✔
210
                log.debug(
1✔
211
                    "[step] Built rotation symbol at %s (score=%.2f)",
212
                    rs_candidate.bbox,
213
                    rs_candidate.score,
214
                )
UNCOV
215
            except Exception as e:
×
UNCOV
216
                log.debug(
×
217
                    "[step] Failed to construct rotation_symbol candidate at %s: %s",
218
                    rs_candidate.bbox,
219
                    e,
220
                )
221

222
        # Phase 2: Build all parts lists BEFORE steps.
223
        # This allows parts lists to claim their Drawing blocks first,
224
        # preventing them from being claimed by subassemblies.
225
        for pl_candidate in result.get_scored_candidates(
1✔
226
            "parts_list", valid_only=False, exclude_failed=True
227
        ):
228
            try:
1✔
229
                result.build(pl_candidate)
1✔
230
                log.debug(
1✔
231
                    "[step] Built parts_list at %s (score=%.2f)",
232
                    pl_candidate.bbox,
233
                    pl_candidate.score,
234
                )
UNCOV
235
            except Exception as e:
×
UNCOV
236
                log.debug(
×
237
                    "[step] Failed to construct parts_list candidate at %s: %s",
238
                    pl_candidate.bbox,
239
                    e,
240
                )
241

242
        # Phase 3: Build subassemblies and previews BEFORE steps.
243
        # Both subassemblies and previews are white boxes with diagrams inside.
244
        # We combine them and build in score order so the higher-scoring
245
        # candidate claims the white box first. When a candidate is built,
246
        # its source_blocks are marked as consumed, causing any competing
247
        # candidate using the same blocks to fail.
248
        #
249
        # This allows subassemblies (which have step_count labels like "2x")
250
        # to be distinguished from previews (which appear before steps).
251
        subassembly_candidates = result.get_scored_candidates(
1✔
252
            "subassembly", valid_only=False, exclude_failed=True
253
        )
254
        preview_candidates = result.get_scored_candidates(
1✔
255
            "preview", valid_only=False, exclude_failed=True
256
        )
257

258
        # Combine and sort by score (highest first)
259
        combined_candidates = list(subassembly_candidates) + list(preview_candidates)
1✔
260
        combined_candidates.sort(key=lambda c: c.score, reverse=True)
1✔
261

262
        for candidate in combined_candidates:
1✔
263
            try:
1✔
264
                result.build(candidate)
1✔
265
                log.debug(
1✔
266
                    "[step] Built %s at %s (score=%.2f)",
267
                    candidate.label,
268
                    candidate.bbox,
269
                    candidate.score,
270
                )
271
            except Exception as e:
1✔
272
                log.debug(
1✔
273
                    "[step] Failed to construct %s candidate at %s: %s",
274
                    candidate.label,
275
                    candidate.bbox,
276
                    e,
277
                )
278

279
        # Phase 4: Build Step candidates with deduplication by step value
280
        # Filter out subassembly steps, then build in score order, skipping
281
        # step values that have already been built.
282
        # Steps are built as "partial" - just step_number + parts_list.
283
        # Diagrams and subassemblies are assigned later via Hungarian matching.
284
        all_step_candidates = result.get_scored_candidates(
1✔
285
            "step", valid_only=False, exclude_failed=True
286
        )
287
        page_level_step_candidates = self._filter_page_level_step_candidates(
1✔
288
            all_step_candidates
289
        )
290

291
        # Sort by score (highest first) so best candidates get built first
292
        sorted_candidates = sorted(
1✔
293
            page_level_step_candidates,
294
            key=lambda c: c.score,
295
            reverse=True,
296
        )
297

298
        steps: list[Step] = []
1✔
299
        built_step_values: set[int] = set()
1✔
300

301
        for step_candidate in sorted_candidates:
1✔
302
            # Get step value from score
303
            score = step_candidate.score_details
1✔
304
            if not isinstance(score, _StepScore):
1✔
UNCOV
305
                continue
×
306

307
            # Skip if we've already built a step with this value
308
            if score.step_value in built_step_values:
1✔
309
                log.debug(
1✔
310
                    "[step] Skipping duplicate step %d candidate (score=%.2f)",
311
                    score.step_value,
312
                    step_candidate.score,
313
                )
314
                continue
1✔
315

316
            try:
1✔
317
                elem = result.build(step_candidate)
1✔
318
                assert isinstance(elem, Step)
1✔
319
                steps.append(elem)
1✔
320
                built_step_values.add(score.step_value)
1✔
321
                log.debug(
1✔
322
                    "[step] Built partial step %d (parts_list=%s, score=%.2f)",
323
                    score.step_value,
324
                    "yes" if score.parts_list_candidate is not None else "no",
325
                    step_candidate.score,
326
                )
327
            except Exception as e:
1✔
328
                log.debug(
1✔
329
                    "[step] Failed to construct step %d candidate: %s",
330
                    score.step_value,
331
                    e,
332
                )
333

334
        # Sort steps by their step_number value
335
        steps.sort(key=lambda step: step.step_number.value)
1✔
336

337
        # Phase 5: Assign diagrams to steps using Hungarian matching
338
        # Collect available diagram candidates (not yet claimed by subassemblies)
339
        available_diagrams: list[Diagram] = []
1✔
340
        for diag_candidate in result.get_scored_candidates(
1✔
341
            "diagram", valid_only=False, exclude_failed=True
342
        ):
343
            if diag_candidate.constructed is None:
1✔
344
                # Build the diagram
345
                try:
1✔
346
                    diag_elem = result.build(diag_candidate)
1✔
347
                    assert isinstance(diag_elem, Diagram)
1✔
348
                    available_diagrams.append(diag_elem)
1✔
349
                except Exception as e:
1✔
350
                    log.debug(
1✔
351
                        "[step] Failed to build diagram at %s: %s",
352
                        diag_candidate.bbox,
353
                        e,
354
                    )
355
            elif isinstance(diag_candidate.constructed, Diagram):
1✔
356
                # Already built (claimed by subassembly) - skip
357
                pass
1✔
358

359
        log.debug(
1✔
360
            "[step] Available diagrams for step assignment: %d",
361
            len(available_diagrams),
362
        )
363

364
        # Assign diagrams to steps using Hungarian matching
365
        assign_diagrams_to_steps(steps, available_diagrams)
1✔
366

367
        # Phase 6: Assign subassemblies to steps using Hungarian matching
368
        # Collect built subassemblies
369
        subassemblies: list[SubAssembly] = []
1✔
370
        for sa_candidate in result.get_scored_candidates(
1✔
371
            "subassembly", valid_only=True
372
        ):
373
            assert sa_candidate.constructed is not None
1✔
374
            assert isinstance(sa_candidate.constructed, SubAssembly)
1✔
375
            subassemblies.append(sa_candidate.constructed)
1✔
376

377
        # Collect built dividers for obstruction checking
378
        dividers: list[Divider] = []
1✔
379
        for div_candidate in result.get_scored_candidates("divider", valid_only=True):
1✔
380
            assert div_candidate.constructed is not None
1✔
381
            assert isinstance(div_candidate.constructed, Divider)
1✔
382
            dividers.append(div_candidate.constructed)
1✔
383

384
        log.debug(
1✔
385
            "[step] Assigning %d subassemblies to %d steps (%d dividers for "
386
            "obstruction checking)",
387
            len(subassemblies),
388
            len(steps),
389
            len(dividers),
390
        )
391

392
        # Assign subassemblies to steps using Hungarian matching
393
        assign_subassemblies_to_steps(steps, subassemblies, dividers)
1✔
394

395
        # Phase 7: Finalize steps - compute arrows and final bboxes
396
        page_bbox = result.page_data.bbox
1✔
397
        for step in steps:
1✔
398
            # Get arrows for this step
399
            arrows = self._get_arrows_for_step(step.step_number, step.diagram, result)
1✔
400

401
            # Compute final bbox including all components
402
            final_bbox = self._compute_step_bbox(
1✔
403
                step.step_number,
404
                step.parts_list,
405
                step.diagram,
406
                step.subassemblies,
407
                page_bbox,
408
            )
409

410
            # Update the step (Step is a Pydantic model, so we need to reassign)
411
            # We mutate in place by directly setting attributes
412
            object.__setattr__(step, "arrows", arrows)
1✔
413
            object.__setattr__(step, "bbox", final_bbox)
1✔
414

415
        # Phase 8: Assign rotation symbols to steps using Hungarian matching
416
        # Collect built rotation symbols
417
        rotation_symbols: list[RotationSymbol] = []
1✔
418
        for rs_candidate in result.get_scored_candidates(
1✔
419
            "rotation_symbol", valid_only=True
420
        ):
421
            assert rs_candidate.constructed is not None
1✔
422
            assert isinstance(rs_candidate.constructed, RotationSymbol)
1✔
423
            rotation_symbols.append(rs_candidate.constructed)
1✔
424

425
        assign_rotation_symbols_to_steps(steps, rotation_symbols)
1✔
426

427
        log.debug(
1✔
428
            "[step] build_all complete: %d steps, %d rotation symbols assigned",
429
            len(steps),
430
            sum(1 for s in steps if s.rotation_symbol is not None),
431
        )
432

433
        # Cast for type compatibility with base class return type
434
        return list[LegoPageElements](steps)
1✔
435

436
    def _filter_page_level_step_candidates(
1✔
437
        self, candidates: list[Candidate]
438
    ) -> list[Candidate]:
439
        """Filter step candidates to exclude likely subassembly steps.
440

441
        Extracts step number values from candidates and uses the generic
442
        filter_subassembly_values function to filter out subassembly steps.
443

444
        Args:
445
            candidates: All step candidates to filter
446

447
        Returns:
448
            Filtered list of candidates likely to be page-level steps
449
        """
450
        # Extract step number values from candidates
451
        items_with_values: list[tuple[int, Candidate]] = []
1✔
452
        for candidate in candidates:
1✔
453
            score = candidate.score_details
1✔
454
            if not isinstance(score, _StepScore):
1✔
UNCOV
455
                continue
×
456
            items_with_values.append((score.step_value, candidate))
1✔
457

458
        # Use generic filtering function
459
        filtered_items = filter_subassembly_values(items_with_values)
1✔
460

461
        # If filtering occurred, return only the filtered candidates
462
        if len(filtered_items) != len(items_with_values):
1✔
463
            filtered_values = [v for v, _ in filtered_items]
1✔
464
            log.debug(
1✔
465
                "[step] Filtered out likely subassembly steps, keeping values: %s",
466
                sorted(filtered_values),
467
            )
468
            return [item for _, item in filtered_items]
1✔
469

470
        # No filtering occurred, return original candidates
471
        # (includes any candidates that couldn't have their value extracted)
472
        return candidates
1✔
473

474
    def build(self, candidate: Candidate, result: ClassificationResult) -> Step:
1✔
475
        """Construct a partial Step element from a single candidate.
476

477
        This creates a Step with just step_number and parts_list. The diagram,
478
        subassemblies, and arrows are assigned later in build_all() using
479
        Hungarian matching for optimal global assignment.
480
        """
481
        score = candidate.score_details
1✔
482
        assert isinstance(score, _StepScore)
1✔
483

484
        # Validate and extract step number from parent candidate
485
        step_num_candidate = score.step_number_candidate
1✔
486

487
        step_num_elem = result.build(step_num_candidate)
1✔
488
        assert isinstance(step_num_elem, StepNumber)
1✔
489
        step_num = step_num_elem
1✔
490

491
        # Validate and extract parts list from parent candidate (if present)
492
        parts_list = None
1✔
493
        if score.parts_list_candidate:
1✔
494
            parts_list_candidate = score.parts_list_candidate
1✔
495
            parts_list_elem = result.build(parts_list_candidate)
1✔
496
            assert isinstance(parts_list_elem, PartsList)
1✔
497
            parts_list = parts_list_elem
1✔
498

499
        # Create partial step - diagram, subassemblies, arrows assigned later
500
        initial_bbox = step_num.bbox
1✔
501
        if parts_list:
1✔
502
            initial_bbox = initial_bbox.union(parts_list.bbox)
1✔
503

504
        return Step(
1✔
505
            bbox=initial_bbox,
506
            step_number=step_num,
507
            parts_list=parts_list,
508
            diagram=None,
509
            rotation_symbol=None,
510
            arrows=[],
511
            subassemblies=[],
512
        )
513

514
    def _find_and_build_diagram_for_step(
1✔
515
        self,
516
        step_num: StepNumber,
517
        parts_list: PartsList | None,
518
        result: ClassificationResult,
519
    ) -> Diagram | None:
520
        """Find and build the best diagram for this step.
521

522
        This is called at build time, after rotation symbols and subassemblies
523
        have been built (in build_all Phases 1 and 3), so they've already
524
        claimed any images/diagrams they need. We filter to only consider
525
        diagram candidates that haven't been constructed yet.
526

527
        Args:
528
            step_num: The built step number element
529
            parts_list: The built parts list element (if any)
530
            result: Classification result containing diagram candidates
531

532
        Returns:
533
            The built Diagram element, or None if no suitable diagram found
534
        """
535
        # Get all non-failed, non-constructed diagram candidates
UNCOV
536
        diagram_candidates = result.get_scored_candidates(
×
537
            "diagram", valid_only=False, exclude_failed=True
538
        )
539

540
        # Filter to only candidates that haven't been built yet
541
        # (subassemblies and rotation symbols built earlier may have claimed some)
542
        available_candidates = [c for c in diagram_candidates if c.constructed is None]
×
543

544
        if not available_candidates:
×
UNCOV
545
            log.debug(
×
546
                "[step] No diagram candidates available for step %d",
547
                step_num.value,
548
            )
UNCOV
549
            return None
×
550

551
        # Score each candidate based on position relative to step
552
        step_bbox = step_num.bbox
×
UNCOV
553
        best_candidate = None
×
UNCOV
554
        best_score = -float("inf")
×
555

556
        for candidate in available_candidates:
×
557
            score = self._score_step_diagram_pair(step_bbox, candidate.bbox)
×
558

UNCOV
559
            log.debug(
×
560
                "[step] Diagram candidate at %s for step %d: score=%.2f",
561
                candidate.bbox,
562
                step_num.value,
563
                score,
564
            )
565

566
            if score > best_score:
×
UNCOV
567
                best_score = score
×
UNCOV
568
                best_candidate = candidate
×
569

UNCOV
570
        if best_candidate is None or best_score < 0.2:
×
571
            log.debug(
×
572
                "[step] No suitable diagram found for step %d (best_score=%.2f)",
573
                step_num.value,
574
                best_score,
575
            )
UNCOV
576
            return None
×
577

578
        # Build the diagram
UNCOV
579
        try:
×
UNCOV
580
            diagram_elem = result.build(best_candidate)
×
UNCOV
581
            assert isinstance(diagram_elem, Diagram)
×
UNCOV
582
            log.debug(
×
583
                "[step] Built diagram at %s for step %d (score=%.2f)",
584
                diagram_elem.bbox,
585
                step_num.value,
586
                best_score,
587
            )
UNCOV
588
            return diagram_elem
×
UNCOV
589
        except CandidateFailedError as e:
×
UNCOV
590
            log.debug(
×
591
                "[step] Failed to build diagram for step %d: %s",
592
                step_num.value,
593
                e,
594
            )
UNCOV
595
            return None
×
596

597
    def _create_step_candidate(
1✔
598
        self,
599
        step_candidate: Candidate,
600
        parts_list_candidate: Candidate | None,
601
        result: ClassificationResult,
602
    ) -> Candidate | None:
603
        """Create a Step candidate (without diagram assignment).
604

605
        Diagrams are found at build time, not during scoring, to allow
606
        rotation symbols to claim small images first.
607

608
        Args:
609
            step_candidate: The StepNumber candidate for this step
610
            parts_list_candidate: The PartsList candidate to pair with (or None)
611
            result: Classification result
612

613
        Returns:
614
            The created Candidate with score but no construction, or None if
615
            the step number value cannot be extracted.
616
        """
617
        ABOVE_EPS = 2.0  # Small epsilon for "above" check
1✔
618
        ALIGNMENT_THRESHOLD_MULTIPLIER = 1.0  # Max horizontal offset
1✔
619
        DISTANCE_THRESHOLD_MULTIPLIER = 1.0  # Max vertical distance
1✔
620

621
        # Extract step number value from the candidate
622
        if not step_candidate.source_blocks:
1✔
UNCOV
623
            return None
×
624
        source_block = step_candidate.source_blocks[0]
1✔
625
        if not isinstance(source_block, Text):
1✔
UNCOV
626
            return None
×
627
        step_value = extract_step_number_value(source_block.text)
1✔
628
        if step_value is None:
1✔
UNCOV
629
            return None
×
630

631
        step_bbox = step_candidate.bbox
1✔
632
        parts_list_bbox = parts_list_candidate.bbox if parts_list_candidate else None
1✔
633

634
        # Calculate pairing scores if there's a parts_list above the step
635
        proximity_score = 0.0
1✔
636
        alignment_score = 0.0
1✔
637

638
        if (
1✔
639
            parts_list_bbox is not None
640
            and parts_list_bbox.y1 <= step_bbox.y0 + ABOVE_EPS
641
        ):
642
            # Calculate distance (how far apart vertically)
643
            distance = step_bbox.y0 - parts_list_bbox.y1
1✔
644

645
            # Calculate proximity score
646
            max_distance = step_bbox.height * DISTANCE_THRESHOLD_MULTIPLIER
1✔
647
            if max_distance > 0:
1✔
648
                proximity_score = max(0.0, 1.0 - (distance / max_distance))
1✔
649

650
            # Calculate alignment score (how well left edges align)
651
            max_alignment_diff = step_bbox.width * ALIGNMENT_THRESHOLD_MULTIPLIER
1✔
652
            left_diff = abs(parts_list_bbox.x0 - step_bbox.x0)
1✔
653
            if max_alignment_diff > 0:
1✔
654
                alignment_score = max(0.0, 1.0 - (left_diff / max_alignment_diff))
1✔
655

656
        # Create score object with candidate references
657
        # Diagrams are found at build time, not during scoring
658
        score = _StepScore(
1✔
659
            step_value=step_value,
660
            step_number_candidate=step_candidate,
661
            parts_list_candidate=parts_list_candidate,
662
            has_parts_list=parts_list_candidate is not None,
663
            step_proximity_score=proximity_score,
664
            step_alignment_score=alignment_score,
665
        )
666

667
        # Calculate combined bbox for the candidate (without diagram)
668
        combined_bbox = step_bbox
1✔
669
        if parts_list_bbox:
1✔
670
            combined_bbox = BBox.union(combined_bbox, parts_list_bbox)
1✔
671

672
        # Create candidate
673
        return Candidate(
1✔
674
            bbox=combined_bbox,
675
            label="step",
676
            score=score.overall_score(),
677
            score_details=score,
678
            source_blocks=[],
679
        )
680

681
    def _score_step_diagram_pair(
1✔
682
        self,
683
        step_bbox: BBox,
684
        diagram_bbox: BBox,
685
    ) -> float:
686
        """Score how well a diagram matches a step.
687

688
        Diagrams are typically positioned to the right of and/or below the step
689
        number. This method scores based on:
690
        - Horizontal position: prefer diagrams to the right, penalize left
691
        - Vertical position: prefer diagrams below the step header
692
        - Distance: closer is better
693

694
        Args:
695
            step_bbox: The step number bounding box
696
            diagram_bbox: The diagram bounding box to score
697

698
        Returns:
699
            Score between 0.0 and 1.0 (higher is better match)
700
        """
701
        # Reference point: bottom-right of step number
702
        ref_x = step_bbox.x1
×
UNCOV
703
        ref_y = step_bbox.y1
×
704

705
        # TODO Move all these constants into config, or make them adaptive?
706

707
        # Horizontal score
708
        # Diagrams to the right are preferred, but allow some overlap
UNCOV
709
        x_offset = diagram_bbox.x0 - ref_x
×
710

711
        if x_offset >= -50:
×
712
            # Diagram starts to the right or slightly overlapping - good
713
            # Score decreases slightly with distance to the right
714
            x_score = max(0.5, 1.0 - abs(x_offset) / 400.0)
×
UNCOV
715
        elif x_offset >= -200:
×
716
            # Diagram is moderately to the left - acceptable
UNCOV
717
            x_score = 0.3 + 0.2 * (1.0 + x_offset / 200.0)
×
718
        else:
719
            # Diagram is far to the left - poor match
UNCOV
720
            x_score = max(0.1, 0.3 + x_offset / 400.0)
×
721

722
        # Vertical score
723
        # Diagrams below the step header are preferred
UNCOV
724
        y_offset = diagram_bbox.y0 - ref_y
×
725

UNCOV
726
        if y_offset >= -30:
×
727
            # Diagram starts below or slightly overlapping - good
728
            # Score decreases with vertical distance
UNCOV
729
            y_score = max(0.3, 1.0 - abs(y_offset) / 300.0)
×
UNCOV
730
        elif y_offset >= -100:
×
731
            # Diagram is moderately above - less good but acceptable
UNCOV
732
            y_score = 0.2 + 0.3 * (1.0 + y_offset / 100.0)
×
733
        else:
734
            # Diagram is far above - poor match
UNCOV
735
            y_score = max(0.05, 0.2 + y_offset / 300.0)
×
736

737
        # Combined score - weight both dimensions equally
UNCOV
738
        score = 0.5 * x_score + 0.5 * y_score
×
739

740
        return score
×
741

742
    def _get_rotation_symbol_for_step(
1✔
743
        self,
744
        step_bbox: BBox,
745
        result: ClassificationResult,
746
    ) -> RotationSymbol | None:
747
        """Find an already-built rotation symbol within this step's area.
748

749
        Rotation symbols are built by PageClassifier before steps are processed.
750
        This method finds the already-built rotation symbol that falls within
751
        the step's bounding box.
752

753
        Args:
754
            step_bbox: The step's bounding box (including step_num, parts_list,
755
                and diagram)
756
            result: Classification result containing rotation symbol candidates
757

758
        Returns:
759
            Single RotationSymbol element for this step, or None if not found
760
        """
761
        # Get rotation_symbol candidates - they should already be built
762
        # by PageClassifier. Use valid_only=True to only get successfully
763
        # constructed rotation symbols.
UNCOV
764
        rotation_symbol_candidates = result.get_scored_candidates(
×
765
            "rotation_symbol", valid_only=True
766
        )
767

768
        log.debug(
×
769
            "[step] Looking for rotation symbols in step bbox %s, "
770
            "found %d built candidates",
771
            step_bbox,
772
            len(rotation_symbol_candidates),
773
        )
774

UNCOV
775
        if not rotation_symbol_candidates:
×
UNCOV
776
            return None
×
777

778
        # Find best-scoring rotation symbol overlapping the step's bbox
UNCOV
779
        overlapping_candidates = filter_overlapping(
×
780
            rotation_symbol_candidates, step_bbox
781
        )
UNCOV
782
        best_candidate = find_best_scoring(overlapping_candidates)
×
783

UNCOV
784
        if best_candidate and best_candidate.constructed is not None:
×
UNCOV
785
            rotation_symbol = best_candidate.constructed
×
UNCOV
786
            assert isinstance(rotation_symbol, RotationSymbol)
×
UNCOV
787
            log.debug(
×
788
                "[step] Found rotation symbol at %s (score=%.2f)",
789
                rotation_symbol.bbox,
790
                best_candidate.score,
791
            )
UNCOV
792
            return rotation_symbol
×
793

UNCOV
794
        log.debug("[step] No rotation symbol found in step bbox %s", step_bbox)
×
UNCOV
795
        return None
×
796

797
    def _get_arrows_for_step(
1✔
798
        self,
799
        step_num: StepNumber,
800
        diagram: Diagram | None,
801
        result: ClassificationResult,
802
    ) -> list[Arrow]:
803
        """Find arrows associated with this step.
804

805
        Looks for arrow candidates that are positioned near the step's diagram
806
        or step number. Typically these are arrows pointing from subassembly
807
        callout boxes to the main diagram.
808

809
        Args:
810
            step_num: The step number element
811
            diagram: The diagram element (if any)
812
            result: Classification result containing arrow candidates
813

814
        Returns:
815
            List of Arrow elements for this step
816
        """
817
        arrow_candidates = result.get_scored_candidates(
1✔
818
            "arrow", valid_only=False, exclude_failed=True
819
        )
820

821
        log.debug(
1✔
822
            "[step] Looking for arrows for step %d, found %d candidates",
823
            step_num.value,
824
            len(arrow_candidates),
825
        )
826

827
        if not arrow_candidates:
1✔
828
            return []
1✔
829

830
        # Determine search region: prefer diagram area, fallback to step area
831
        search_bbox = diagram.bbox if diagram else step_num.bbox
1✔
832

833
        # Expand search region to catch arrows near the diagram
834
        # Use a larger margin than rotation symbols since arrows can extend further
835
        search_region = search_bbox.expand(100.0)
1✔
836

837
        log.debug(
1✔
838
            "[step] Arrow search region for step %d: %s",
839
            step_num.value,
840
            search_region,
841
        )
842

843
        # Find arrows within or overlapping the search region
844
        arrows: list[Arrow] = []
1✔
845
        overlapping_candidates = filter_overlapping(arrow_candidates, search_region)
1✔
846

847
        for candidate in overlapping_candidates:
1✔
848
            log.debug(
1✔
849
                "[step]   Arrow candidate at %s, overlaps=True, score=%.2f",
850
                candidate.bbox,
851
                candidate.score,
852
            )
853
            try:
1✔
854
                arrow = result.build(candidate)
1✔
855
                assert isinstance(arrow, Arrow)
1✔
856
                arrows.append(arrow)
1✔
UNCOV
857
            except CandidateFailedError:
×
858
                # Arrow lost conflict to another arrow (they share source blocks)
859
                # This is expected when multiple arrows overlap - skip it
UNCOV
860
                log.debug(
×
861
                    "[step]   Arrow candidate at %s failed (conflict), skipping",
862
                    candidate.bbox,
863
                )
UNCOV
864
                continue
×
865

866
        log.debug(
1✔
867
            "[step] Found %d arrows for step %d",
868
            len(arrows),
869
            step_num.value,
870
        )
871
        return arrows
1✔
872

873
    def _get_subassemblies_for_step(
1✔
874
        self,
875
        step_num: StepNumber,
876
        diagram: Diagram | None,
877
        result: ClassificationResult,
878
    ) -> list[SubAssembly]:
879
        """Get already-built subassemblies that belong to this step.
880

881
        Subassemblies are built in build_all() before steps. This method finds
882
        subassemblies that are positioned near this step's diagram and haven't
883
        been claimed by another step yet.
884

885
        Args:
886
            step_num: The step number element
887
            diagram: The diagram element (if any)
888
            result: Classification result containing subassembly candidates
889

890
        Returns:
891
            List of SubAssembly elements for this step
892
        """
893
        # Get all built subassemblies
UNCOV
894
        all_subassemblies: list[SubAssembly] = []
×
895
        for sa_candidate in result.get_scored_candidates(
×
896
            "subassembly", valid_only=True
897
        ):
UNCOV
898
            assert sa_candidate.constructed is not None
×
UNCOV
899
            assert isinstance(sa_candidate.constructed, SubAssembly)
×
UNCOV
900
            all_subassemblies.append(sa_candidate.constructed)
×
901

UNCOV
902
        log.debug(
×
903
            "[step] Looking for subassemblies for step %d, found %d built",
904
            step_num.value,
905
            len(all_subassemblies),
906
        )
907

UNCOV
908
        if not all_subassemblies:
×
UNCOV
909
            return []
×
910

911
        # Determine search region based on step_number and diagram
912
        reference_bbox = diagram.bbox.union(step_num.bbox) if diagram else step_num.bbox
×
913

UNCOV
914
        page_bbox = result.page_data.bbox
×
915

916
        # Expand search region around the reference area
917
        # Horizontally: search the full page width
918
        # Vertically: search a region around the reference bbox
919
        vertical_margin = reference_bbox.height
×
UNCOV
920
        search_region = BBox(
×
921
            x0=page_bbox.x0,
922
            y0=max(page_bbox.y0, reference_bbox.y0 - vertical_margin),
923
            x1=page_bbox.x1,
924
            y1=min(page_bbox.y1, reference_bbox.y1 + vertical_margin),
925
        )
926

UNCOV
927
        log.debug(
×
928
            "[step] SubAssembly search region for step %d: %s",
929
            step_num.value,
930
            search_region,
931
        )
932

933
        # Find subassemblies that overlap the search region
UNCOV
934
        subassemblies: list[SubAssembly] = []
×
UNCOV
935
        for subassembly in all_subassemblies:
×
UNCOV
936
            if subassembly.bbox.overlaps(search_region):
×
UNCOV
937
                log.debug(
×
938
                    "[step]   SubAssembly at %s overlaps search region",
939
                    subassembly.bbox,
940
                )
UNCOV
941
                subassemblies.append(subassembly)
×
942

UNCOV
943
        log.debug(
×
944
            "[step] Found %d subassemblies for step %d",
945
            len(subassemblies),
946
            step_num.value,
947
        )
UNCOV
948
        return subassemblies
×
949

950
    def _compute_step_bbox(
1✔
951
        self,
952
        step_num: StepNumber,
953
        parts_list: PartsList | None,
954
        diagram: Diagram | None,
955
        subassemblies: list[SubAssembly],
956
        page_bbox: BBox,
957
    ) -> BBox:
958
        """Compute the overall bounding box for the Step.
959

960
        This encompasses the step number, parts list (if any), diagram (if any),
961
        and all subassemblies.
962
        The result is clipped to the page bounds to handle elements that extend
963
        slightly off-page (e.g., arrows in diagrams).
964

965
        Args:
966
            step_num: The step number element
967
            parts_list: The parts list (if any)
968
            diagram: The diagram element (if any)
969
            subassemblies: List of subassemblies for this step
970
            page_bbox: The page bounding box to clip to
971

972
        Returns:
973
            Combined bounding box, clipped to page bounds
974
        """
975
        bboxes = [step_num.bbox]
1✔
976
        if parts_list:
1✔
977
            bboxes.append(parts_list.bbox)
1✔
978
        if diagram:
1✔
979
            bboxes.append(diagram.bbox)
1✔
980
        for sa in subassemblies:
1✔
981
            bboxes.append(sa.bbox)
1✔
982

983
        return BBox.union_all(bboxes).clip_to(page_bbox)
1✔
984

985

986
def assign_diagrams_to_steps(
1✔
987
    steps: list[Step],
988
    diagrams: list[Diagram],
989
    max_distance: float = 500.0,
990
) -> None:
991
    """Assign diagrams to steps using Hungarian algorithm.
992

993
    Uses optimal bipartite matching to pair diagrams with steps based on
994
    a scoring function that considers:
995
    - Distance from step_number to diagram (closer is better)
996
    - Relative position (diagram typically below/right of step_number)
997

998
    This function mutates the Step objects in place, setting their diagram
999
    attribute.
1000

1001
    Args:
1002
        steps: List of Step objects to assign diagrams to
1003
        diagrams: List of Diagram objects to assign
1004
        max_distance: Maximum distance for a valid assignment. Pairs with distance
1005
            greater than this will not be matched.
1006
    """
1007
    if not steps or not diagrams:
1✔
1008
        log.debug(
1✔
1009
            "[step] No diagram assignment needed: steps=%d, diagrams=%d",
1010
            len(steps),
1011
            len(diagrams),
1012
        )
1013
        return
1✔
1014

1015
    n_steps = len(steps)
1✔
1016
    n_diagrams = len(diagrams)
1✔
1017

1018
    log.debug(
1✔
1019
        "[step] Running Hungarian matching for diagrams: %d steps, %d diagrams",
1020
        n_steps,
1021
        n_diagrams,
1022
    )
1023

1024
    # Build cost matrix: rows = diagrams, cols = steps
1025
    # Lower cost = better match
1026
    cost_matrix = np.zeros((n_diagrams, n_steps))
1✔
1027

1028
    for i, diag in enumerate(diagrams):
1✔
1029
        diag_center = diag.bbox.center
1✔
1030
        for j, step in enumerate(steps):
1✔
1031
            step_num_center = step.step_number.bbox.center
1✔
1032

1033
            # Base cost is distance from step_number to diagram center
1034
            distance = (
1✔
1035
                (step_num_center[0] - diag_center[0]) ** 2
1036
                + (step_num_center[1] - diag_center[1]) ** 2
1037
            ) ** 0.5
1038

1039
            # Apply position penalty: prefer diagrams that are below or to the
1040
            # right of the step_number (typical LEGO instruction layout)
1041
            # If diagram is above the step_number, add penalty
1042
            if diag_center[1] < step_num_center[1] - 50:  # Diagram above step
1✔
1043
                distance *= 1.5  # Penalty for being above
1✔
1044

1045
            cost_matrix[i, j] = distance
1✔
1046
            log.debug(
1✔
1047
                "[step]   Cost diagram[%d] at %s to step[%d]: %.1f",
1048
                i,
1049
                diag.bbox,
1050
                step.step_number.value,
1051
                distance,
1052
            )
1053

1054
    # Apply max_distance threshold
1055
    high_cost = max_distance * 10
1✔
1056
    cost_matrix_thresholded = np.where(
1✔
1057
        cost_matrix > max_distance, high_cost, cost_matrix
1058
    )
1059

1060
    # Run Hungarian algorithm
1061
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1062

1063
    # Assign diagrams to steps based on the matching
1064
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1065
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1066
            diag = diagrams[row_idx]
1✔
1067
            step = steps[col_idx]
1✔
1068
            # Use object.__setattr__ because Step is a frozen Pydantic model
1069
            object.__setattr__(step, "diagram", diag)
1✔
1070
            log.debug(
1✔
1071
                "[step] Assigned diagram at %s to step %d (cost=%.1f)",
1072
                diag.bbox,
1073
                step.step_number.value,
1074
                cost_matrix[row_idx, col_idx],
1075
            )
1076
        else:
UNCOV
1077
            log.debug(
×
1078
                "[step] Skipped diagram assignment: diagram[%d] to step[%d] "
1079
                "cost=%.1f > max_distance=%.1f",
1080
                row_idx,
1081
                col_idx,
1082
                cost_matrix[row_idx, col_idx],
1083
                max_distance,
1084
            )
1085

1086

1087
def _has_divider_between(
1✔
1088
    bbox1: BBox,
1089
    bbox2: BBox,
1090
    dividers: list[Divider],
1091
) -> bool:
1092
    """Check if there is a divider line between two bboxes.
1093

1094
    A divider is considered "between" if it crosses the line connecting
1095
    the centers of the two bboxes.
1096

1097
    Args:
1098
        bbox1: First bounding box
1099
        bbox2: Second bounding box
1100
        dividers: List of dividers to check
1101

1102
    Returns:
1103
        True if a divider is between the two bboxes
1104
    """
1105
    center1 = bbox1.center
1✔
1106
    center2 = bbox2.center
1✔
1107

1108
    for divider in dividers:
1✔
1109
        div_bbox = divider.bbox
1✔
1110

1111
        # Check if divider is vertical (separates left/right)
1112
        if div_bbox.width < div_bbox.height * 0.2:  # Vertical line
1✔
1113
            # Check if divider x is between the two centers
1114
            min_x = min(center1[0], center2[0])
1✔
1115
            max_x = max(center1[0], center2[0])
1✔
1116
            div_x = div_bbox.center[0]
1✔
1117
            if min_x < div_x < max_x:
1✔
1118
                # Check if divider spans the y range
1119
                min_y = min(center1[1], center2[1])
1✔
1120
                max_y = max(center1[1], center2[1])
1✔
1121
                if div_bbox.y0 <= max_y and div_bbox.y1 >= min_y:
1✔
1122
                    return True
1✔
1123

1124
        # Check if divider is horizontal (separates top/bottom)
UNCOV
1125
        elif div_bbox.height < div_bbox.width * 0.2:  # Horizontal line
×
1126
            # Check if divider y is between the two centers
UNCOV
1127
            min_y = min(center1[1], center2[1])
×
UNCOV
1128
            max_y = max(center1[1], center2[1])
×
UNCOV
1129
            div_y = div_bbox.center[1]
×
UNCOV
1130
            if min_y < div_y < max_y:
×
1131
                # Check if divider spans the x range
UNCOV
1132
                min_x = min(center1[0], center2[0])
×
UNCOV
1133
                max_x = max(center1[0], center2[0])
×
UNCOV
1134
                if div_bbox.x0 <= max_x and div_bbox.x1 >= min_x:
×
UNCOV
1135
                    return True
×
1136

1137
    return False
1✔
1138

1139

1140
def assign_subassemblies_to_steps(
1✔
1141
    steps: list[Step],
1142
    subassemblies: list[SubAssembly],
1143
    dividers: list[Divider],
1144
    max_distance: float = 400.0,
1145
) -> None:
1146
    """Assign subassemblies to steps using Hungarian algorithm.
1147

1148
    Uses optimal bipartite matching to pair subassemblies with steps based on:
1149
    - Distance from step's diagram to subassembly (closer is better)
1150
    - No divider between the subassembly and the step's diagram
1151

1152
    This function mutates the Step objects in place, adding to their
1153
    subassemblies list.
1154

1155
    Args:
1156
        steps: List of Step objects to assign subassemblies to
1157
        subassemblies: List of SubAssembly objects to assign
1158
        dividers: List of Divider objects for obstruction checking
1159
        max_distance: Maximum distance for a valid assignment
1160
    """
1161
    if not steps or not subassemblies:
1✔
1162
        log.debug(
1✔
1163
            "[step] No subassembly assignment needed: steps=%d, subassemblies=%d",
1164
            len(steps),
1165
            len(subassemblies),
1166
        )
1167
        return
1✔
1168

1169
    n_steps = len(steps)
1✔
1170
    n_subassemblies = len(subassemblies)
1✔
1171

1172
    log.debug(
1✔
1173
        "[step] Running Hungarian matching for subassemblies: "
1174
        "%d steps, %d subassemblies",
1175
        n_steps,
1176
        n_subassemblies,
1177
    )
1178

1179
    # Build cost matrix: rows = subassemblies, cols = steps
1180
    cost_matrix = np.zeros((n_subassemblies, n_steps))
1✔
1181
    high_cost = max_distance * 10
1✔
1182

1183
    for i, sa in enumerate(subassemblies):
1✔
1184
        sa_center = sa.bbox.center
1✔
1185
        for j, step in enumerate(steps):
1✔
1186
            # Use diagram center if available, otherwise step bbox center
1187
            if step.diagram:
1✔
1188
                target_bbox = step.diagram.bbox
1✔
1189
                target_center = target_bbox.center
1✔
1190
            else:
UNCOV
1191
                target_bbox = step.bbox
×
UNCOV
1192
                target_center = step.step_number.bbox.center
×
1193

1194
            # Base cost is distance from diagram/step to subassembly center
1195
            distance = (
1✔
1196
                (target_center[0] - sa_center[0]) ** 2
1197
                + (target_center[1] - sa_center[1]) ** 2
1198
            ) ** 0.5
1199

1200
            # Check for divider between subassembly and step's diagram
1201
            if _has_divider_between(sa.bbox, target_bbox, dividers):
1✔
1202
                # High cost if there's a divider between
1203
                distance = high_cost
1✔
1204
                log.debug(
1✔
1205
                    "[step]   Divider between subassembly[%d] at %s and step[%d]",
1206
                    i,
1207
                    sa.bbox,
1208
                    step.step_number.value,
1209
                )
1210

1211
            cost_matrix[i, j] = distance
1✔
1212
            log.debug(
1✔
1213
                "[step]   Cost subassembly[%d] at %s to step[%d]: %.1f",
1214
                i,
1215
                sa.bbox,
1216
                step.step_number.value,
1217
                distance,
1218
            )
1219

1220
    # Apply max_distance threshold
1221
    cost_matrix_thresholded = np.where(
1✔
1222
        cost_matrix > max_distance, high_cost, cost_matrix
1223
    )
1224

1225
    # Run Hungarian algorithm
1226
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1227

1228
    # Assign subassemblies to steps based on the matching
1229
    # Note: Unlike diagrams, multiple subassemblies can be assigned to one step
1230
    # The Hungarian algorithm gives us the best 1:1 matching, but we may want
1231
    # to assign additional nearby subassemblies too
1232

1233
    # First, do the 1:1 optimal assignment
1234
    assigned_subassemblies: set[int] = set()
1✔
1235
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1236
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1237
            sa = subassemblies[row_idx]
1✔
1238
            step = steps[col_idx]
1✔
1239
            # Add to step's subassemblies list
1240
            new_subassemblies = list(step.subassemblies) + [sa]
1✔
1241
            object.__setattr__(step, "subassemblies", new_subassemblies)
1✔
1242
            assigned_subassemblies.add(row_idx)
1✔
1243
            log.debug(
1✔
1244
                "[step] Assigned subassembly at %s to step %d (cost=%.1f)",
1245
                sa.bbox,
1246
                step.step_number.value,
1247
                cost_matrix[row_idx, col_idx],
1248
            )
1249

1250
    # Then, try to assign remaining unassigned subassemblies to their nearest step
1251
    # (if within max_distance and no divider between)
1252
    for i, sa in enumerate(subassemblies):
1✔
1253
        if i in assigned_subassemblies:
1✔
1254
            continue
1✔
1255

1256
        # Find the step with lowest cost for this subassembly
1257
        best_step_idx = None
1✔
1258
        best_cost = high_cost
1✔
1259
        for j in range(len(steps)):
1✔
1260
            if cost_matrix[i, j] < best_cost:
1✔
1261
                best_cost = cost_matrix[i, j]
1✔
1262
                best_step_idx = j
1✔
1263

1264
        if best_step_idx is not None and best_cost <= max_distance:
1✔
1265
            step = steps[best_step_idx]
1✔
1266
            new_subassemblies = list(step.subassemblies) + [sa]
1✔
1267
            object.__setattr__(step, "subassemblies", new_subassemblies)
1✔
1268
            log.debug(
1✔
1269
                "[step] Assigned extra subassembly at %s to step %d (cost=%.1f)",
1270
                sa.bbox,
1271
                step.step_number.value,
1272
                best_cost,
1273
            )
1274

1275

1276
def assign_rotation_symbols_to_steps(
1✔
1277
    steps: list[Step],
1278
    rotation_symbols: list[RotationSymbol],
1279
    max_distance: float = 300.0,
1280
) -> None:
1281
    """Assign rotation symbols to steps using Hungarian algorithm.
1282

1283
    Uses optimal bipartite matching to pair rotation symbols with steps based on
1284
    minimum distance from the rotation symbol to the step's diagram (or step bbox
1285
    center if no diagram).
1286

1287
    This function mutates the Step objects in place, setting their rotation_symbol
1288
    attribute.
1289

1290
    Args:
1291
        steps: List of Step objects to assign rotation symbols to
1292
        rotation_symbols: List of RotationSymbol objects to assign
1293
        max_distance: Maximum distance for a valid assignment. Pairs with distance
1294
            greater than this will not be matched.
1295
    """
1296
    if not steps or not rotation_symbols:
1✔
1297
        log.debug(
1✔
1298
            "[step] No rotation symbol assignment needed: "
1299
            "steps=%d, rotation_symbols=%d",
1300
            len(steps),
1301
            len(rotation_symbols),
1302
        )
1303
        return
1✔
1304

1305
    n_steps = len(steps)
1✔
1306
    n_symbols = len(rotation_symbols)
1✔
1307

1308
    log.debug(
1✔
1309
        "[step] Running Hungarian matching: %d steps, %d rotation symbols",
1310
        n_steps,
1311
        n_symbols,
1312
    )
1313

1314
    # Build cost matrix: rows = rotation symbols, cols = steps
1315
    # Cost = distance from rotation symbol center to step's diagram center
1316
    # (or step bbox center if no diagram)
1317
    cost_matrix = np.zeros((n_symbols, n_steps))
1✔
1318

1319
    for i, rs in enumerate(rotation_symbols):
1✔
1320
        rs_center = rs.bbox.center
1✔
1321
        for j, step in enumerate(steps):
1✔
1322
            # Use diagram center if available, otherwise step bbox center
1323
            if step.diagram:
1✔
1324
                target_center = step.diagram.bbox.center
1✔
1325
            else:
UNCOV
1326
                target_center = step.bbox.center
×
1327

1328
            # Euclidean distance
1329
            distance = (
1✔
1330
                (rs_center[0] - target_center[0]) ** 2
1331
                + (rs_center[1] - target_center[1]) ** 2
1332
            ) ** 0.5
1333

1334
            cost_matrix[i, j] = distance
1✔
1335
            log.debug(
1✔
1336
                "[step]   Distance from rotation_symbol[%d] at %s to step[%d]: %.1f",
1337
                i,
1338
                rs.bbox,
1339
                step.step_number.value,
1340
                distance,
1341
            )
1342

1343
    # Apply max_distance threshold by setting costs above threshold to a high value
1344
    # This prevents assignments that are too far apart
1345
    high_cost = max_distance * 10  # Arbitrary large value
1✔
1346
    cost_matrix_thresholded = np.where(
1✔
1347
        cost_matrix > max_distance, high_cost, cost_matrix
1348
    )
1349

1350
    # Run Hungarian algorithm
1351
    row_indices, col_indices = linear_sum_assignment(cost_matrix_thresholded)
1✔
1352

1353
    # Assign rotation symbols to steps based on the matching
1354
    for row_idx, col_idx in zip(row_indices, col_indices, strict=True):
1✔
1355
        # Check if this assignment is within the max_distance threshold
1356
        if cost_matrix[row_idx, col_idx] <= max_distance:
1✔
1357
            rs = rotation_symbols[row_idx]
1✔
1358
            step = steps[col_idx]
1✔
1359
            step.rotation_symbol = rs
1✔
1360
            # Expand step bbox to include the rotation symbol
1361
            step.bbox = step.bbox.union(rs.bbox)
1✔
1362
            log.debug(
1✔
1363
                "[step] Assigned rotation symbol at %s to step %d "
1364
                "(distance=%.1f, new step bbox=%s)",
1365
                rs.bbox,
1366
                step.step_number.value,
1367
                cost_matrix[row_idx, col_idx],
1368
                step.bbox,
1369
            )
1370
        else:
UNCOV
1371
            log.debug(
×
1372
                "[step] Skipped assignment: rotation_symbol[%d] to step[%d] "
1373
                "distance=%.1f > max_distance=%.1f",
1374
                row_idx,
1375
                col_indices[row_idx] if row_idx < len(col_indices) else -1,
1376
                cost_matrix[row_idx, col_idx],
1377
                max_distance,
1378
            )
1379

1380

1381
def filter_subassembly_values[T](
1✔
1382
    items: list[tuple[int, T]],
1383
    *,
1384
    min_gap: int = 3,
1385
    max_subassembly_start: int = 3,
1386
) -> list[tuple[int, T]]:
1387
    """Filter items to exclude likely subassembly values.
1388

1389
    Subassembly steps (e.g., step 1, 2 inside a subassembly box) often appear
1390
    alongside higher-numbered page-level steps (e.g., 15, 16). This creates
1391
    sequences like [1, 2, 15, 16] which cannot all be page-level steps.
1392

1393
    This function identifies such cases by detecting a significant gap in values
1394
    and filtering out the lower-numbered items that are likely subassembly steps.
1395

1396
    Args:
1397
        items: List of (value, item) tuples where value is the step number
1398
            and item is any associated data (e.g., a Candidate).
1399
        min_gap: Minimum gap size to trigger filtering (exclusive).
1400
            Default is 3, so gaps > 3 trigger filtering.
1401
        max_subassembly_start: Maximum starting value for subassembly detection.
1402
            If min value is <= this, filtering is applied. Default is 3.
1403

1404
    Returns:
1405
        Filtered list of (value, item) tuples, keeping only the higher values
1406
        if a subassembly pattern is detected. Returns original list if no
1407
        filtering is needed.
1408

1409
    Examples:
1410
        >>> filter_subassembly_values([(1, "a"), (2, "b"), (15, "c"), (16, "d")])
1411
        [(15, "c"), (16, "d")]
1412

1413
        >>> filter_subassembly_values([(5, "a"), (6, "b"), (15, "c")])
1414
        [(5, "a"), (6, "b"), (15, "c")]  # min=5 > 3, no filtering
1415
    """
1416
    if len(items) <= 1:
1✔
1417
        return items
1✔
1418

1419
    # Sort by value
1420
    sorted_items = sorted(items, key=lambda x: x[0])
1✔
1421
    values = [v for v, _ in sorted_items]
1✔
1422

1423
    # Find the largest gap between consecutive values
1424
    max_gap_size = 0
1✔
1425
    max_gap_index = -1
1✔
1426
    for i in range(len(values) - 1):
1✔
1427
        gap = values[i + 1] - values[i]
1✔
1428
        if gap > max_gap_size:
1✔
1429
            max_gap_size = gap
1✔
1430
            max_gap_index = i
1✔
1431

1432
    # Check if filtering should occur:
1433
    # 1. Gap must be larger than min_gap
1434
    # 2. The minimum value must be <= max_subassembly_start
1435
    if max_gap_size > min_gap and max_gap_index >= 0:
1✔
1436
        min_value = values[0]
1✔
1437
        if min_value <= max_subassembly_start:
1✔
1438
            threshold = values[max_gap_index + 1]
1✔
1439
            return [(v, item) for v, item in sorted_items if v >= threshold]
1✔
1440

1441
    return items
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc